Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
fbab54a
test code for labels added
Feb 11, 2026
171bc98
tzinfo got lost when storing in mongodb
Feb 12, 2026
07923f8
Merge branch 'task_label_feature' of github.com:ottointhesky/ipyparal…
Feb 12, 2026
2f1293f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 12, 2026
7f0400a
make mongodb tzinfo aware
Feb 12, 2026
6a887c1
Merge remote-tracking branch 'origin/task_label_feature' into task_la…
Feb 12, 2026
a9e4266
make mongodb tzinfo aware
Feb 12, 2026
e473c81
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 12, 2026
5762cd6
switch to new mongodb api
Feb 12, 2026
6a36864
mongodb installation added to github actions
Feb 12, 2026
29fe5e1
Merge remote-tracking branch 'origin/task_label_feature' into task_la…
Feb 12, 2026
d6436d1
ruff format changes
Feb 12, 2026
1a2624e
exclude mongodb isntall from slurm
Feb 12, 2026
d4c6fab
Update test.yml
ottointhesky Feb 12, 2026
6a365f2
Update test.yml
ottointhesky Feb 12, 2026
3d20d63
Update test.yml
ottointhesky Feb 12, 2026
a45a644
work-a-round fix for image already exists
Feb 13, 2026
84009a6
pre-commit format changes
Feb 13, 2026
0d94418
enable mongodb for slurm test environment as well
Feb 13, 2026
8a3438d
enable mongodb only under linux with no cluster
Feb 13, 2026
9216581
pre-commit format correction
Feb 13, 2026
4e16aad
corrections within windows
Feb 23, 2026
1a57507
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 23, 2026
d2a0595
minor change: formating corrected
Feb 23, 2026
51367d2
label added to docu / execute and run now supports label
Feb 23, 2026
a2421f7
Merge branch 'task_label_feature' of github.com:ottointhesky/ipyparal…
Feb 23, 2026
fba459f
Merge branch 'ipython:main' into task_label_feature
ottointhesky Feb 23, 2026
f3f07fe
corrections for mongodb
Feb 23, 2026
b9f604d
Merge branch 'ipython:main' into task_label_feature
ottointhesky Feb 25, 2026
b69497f
glob and regex supoort added
Mar 5, 2026
e74c263
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 5, 2026
f990325
$glob supported added for monogdb
Mar 5, 2026
9b91593
$regex operator removed and $glob docu added
Mar 6, 2026
651fcc0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 6, 2026
7622f41
Merge branch 'ipython:main' into task_label_feature
ottointhesky Mar 6, 2026
80704f1
$glob tests improved using labels
Mar 6, 2026
f95b3c3
comments added
Mar 6, 2026
e7d721a
task label example improved
Mar 6, 2026
d90ca24
minor cleanup (necessary code removed)
Mar 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,14 @@ jobs:

- name: Set up slurm
if: ${{ matrix.cluster_type == 'slurm' }}
# docker build can lead to race condition -> image "docker.io/library/ipp-cluster:slurm": already exists
# see https://github.com/mlflow/mlflow/pull/20779
# work-a-round fix: docker compose again if first call failed
run: |
export DOCKER_BUILDKIT=1
export COMPOSE_DOCKER_CLI_BUILD=1
cd ci/slurm
docker compose up -d --build
docker compose up -d --build || docker compose up -d --build

- name: Install Python (conda) ${{ matrix.python }}
if: ${{ matrix.cluster_type == 'mpi' }}
Expand Down Expand Up @@ -128,6 +131,23 @@ jobs:
pip install distributed joblib
pip install --only-binary :all: matplotlib

- name: Start MongoDB
if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster
uses: supercharge/[email protected] # uses latest mongodb per default

- name: Install pymongo package
if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster
run: pip install pymongo

- name: Try to connect to mongodb
if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster
run: |
python3 <<EOF
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/',serverSelectionTimeoutMS=1)
print(client.server_info())
EOF

- name: Show environment
run: pip freeze

Expand Down
4 changes: 2 additions & 2 deletions docs/make.bat
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pushd %~dp0

REM Command file for Sphinx documentation

if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=--color -W --keep-going
if "%SPHINXOPTS%" == "" (
set SPHINXOPTS=--color -W --keep-going
)
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
Expand Down
49 changes: 39 additions & 10 deletions docs/source/examples/basic_task_label.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
"""Basic task label example"""
"""Basic task label example

Labels can be used for identifying or semantically grouping tasks. Using the $glob operator
relevant task records can be queried from the Task Database.

Authors
-------
* ottointhesky
"""

import ipyparallel as ipp

Expand All @@ -18,13 +26,17 @@ def wait(t):
return time.time() - tic


count = 5 # number of tasks per method

# use load balanced view
bview = rc.load_balanced_view()
ar_list_b1 = [
bview.set_flags(label=f"mylabel_map_{i:02}").map_async(wait, [2]) for i in range(10)
bview.set_flags(label=f"bview_map_{i:02}").map_async(wait, [2])
for i in range(count)
]
ar_list_b2 = [
bview.set_flags(label=f"mylabel_map_{i:02}").apply_async(wait, 2) for i in range(10)
bview.set_flags(label=f"bview_apply_{i:02}").apply_async(wait, 2)
for i in range(count)
]
bview.wait(ar_list_b1)
bview.wait(ar_list_b2)
Expand All @@ -33,19 +45,36 @@ def wait(t):
# use direct view
dview = rc[:]
ar_list_d1 = [
dview.set_flags(label=f"mylabel_map_{i + 10:02}").apply_async(wait, 2)
for i in range(10)
dview.set_flags(label=f"dview_map_{i + 10:02}").map_async(wait, [2])
for i in range(count)
]
ar_list_d2 = [
dview.set_flags(label=f"mylabel_map_{i + 10:02}").map_async(wait, [2])
for i in range(10)
dview.set_flags(label=f"dview_apply_{i + 10:02}").apply_async(wait, 2)
for i in range(count)
]
dview.wait(ar_list_d1)
dview.wait(ar_list_d2)


def print_records(titel, data):
print(f"{titel} ({len(data)} records)")
for d in data:
print(
f"\tmsg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}"
)


query_keys = ['msg_id', 'label', 'engine_uuid']

# query database
data = rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid'])
for d in data:
print(f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}")
data = rc.db_query({'label': {"$nin": ""}}, keys=query_keys)
print_records("all entries with labels", data)

data = rc.db_query({'label': {"$glob": "dview_*"}}, keys=query_keys)
print_records("all dview label entries", data)

data = rc.db_query({'label': {"$glob": "*_map_*"}}, keys=query_keys)
print_records("all map label entries", data)

# stop cluster
cluster.stop_cluster_sync()
27 changes: 17 additions & 10 deletions docs/source/reference/db.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,26 @@ TaskRecord keys:
| error | dict | Python traceback (error message content) |
| stdout | str | Stream of stdout data |
| stderr | str | Stream of stderr data |
| label | str | optional user-defined task identifier |

MongoDB operators we emulate on all backends:

| Operator | Python equivalent |
| -------- | ----------------- |
| '\$in' | in |
| '\$nin' | not in |
| '\$eq' | == |
| '\$ne' | != |
| '\$gt' | > |
| '\$gte' | >= |
| '\$le' | \< |
| '\$lte' | \<= |
| Operator | Python equivalent |
| -------- | ----------------------------------------------------------------------------- |
| '\$in' | in |
| '\$nin' | not in |
| '\$eq' | == |
| '\$ne' | != |
| '\$gt' | > |
| '\$gte' | >= |
| '\$le' | \< |
| '\$lte' | \<= |
| '\$glob' | [fnmatch](https://docs.python.org/3/library/fnmatch.html) (wildcard matching) |

Remarks on _$glob_: The operator can be used to find substrings in DB columns based on
[unix style filename pattern matching](https://docs.python.org/3/library/fnmatch.html)
_$glob_ is **not** a regular MongoDB opertor, but is internally translated to a regular
expression (_$regex_) which is natively supported by MongoDB.

The DB Query is useful for two primary cases:

Expand Down
7 changes: 6 additions & 1 deletion ipyparallel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,9 @@ def relay_comm(msg):
for callback in msg_future.iopub_callbacks:
callback(msg)

def create_message_futures(self, msg_id, header, async_result=False, track=False):
def create_message_futures(
self, msg_id, header, async_result=False, track=False, label=None
):
msg_future = MessageFuture(msg_id, header=header, track=track)
futures = [msg_future]
self._futures[msg_id] = msg_future
Expand All @@ -1237,6 +1239,7 @@ def create_message_futures(self, msg_id, header, async_result=False, track=False
# hook up metadata
output.metadata = self.metadata[msg_id]
output.metadata['submitted'] = util.utcnow()
output.metadata['label'] = label
msg_future.output = output
futures.append(output)
return futures
Expand Down Expand Up @@ -1266,6 +1269,7 @@ def _send(
msg_id = msg['header']['msg_id']

expect_reply = msg_type not in {"comm_msg", "comm_close", "comm_open"}
label = metadata["label"] if metadata and "label" in metadata else None

if expect_reply and track_outstanding:
# add to outstanding, history
Expand All @@ -1289,6 +1293,7 @@ def _send(
msg['header'],
async_result=msg_type in {'execute_request', 'apply_request'},
track=track,
label=label,
)
if message_future_hook is not None:
message_future_hook(futures[0])
Expand Down
32 changes: 24 additions & 8 deletions ipyparallel/client/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __len__(self):
def set_flags(self, **kwargs):
"""set my attribute flags by keyword.

Views determine behavior with a few attributes (`block`, `track`, etc.).
Views determine behavior with a few attributes (`block`, `track`, `label`, etc.).
These attributes can be set all at once by name with this method.

Parameters
Expand All @@ -149,6 +149,8 @@ def set_flags(self, **kwargs):
whether to create a MessageTracker to allow the user to
safely edit after arrays and buffers during non-copying
sends.
label : str
set an optional user-defined task identifier
"""
for name, value in kwargs.items():
if name not in self._flag_names:
Expand Down Expand Up @@ -557,6 +559,8 @@ def _really_apply(
whether to block
track : bool [default: self.track]
whether to ask zmq to track the message, for safe non-copying sends
label : str [default self.label]
set an optional user-defined task identifier

Returns
-------
Expand Down Expand Up @@ -642,6 +646,8 @@ def map(
Only for zero-copy sends such as numpy arrays that are going to be modified in-place.
return_exceptions : bool [default False]
Return remote Exceptions in the result sequence instead of raising them.
label : str [default self.label]
set an optional user-defined task identifier

Returns
-------
Expand Down Expand Up @@ -672,7 +678,7 @@ def map(

@sync_results
@save_ids
def execute(self, code, silent=True, targets=None, block=None):
def execute(self, code, silent=True, targets=None, block=None, label=None):
"""Executes `code` on `targets` in blocking or nonblocking manner.

``execute`` is always `bound` (affects engine namespace)
Expand All @@ -681,18 +687,21 @@ def execute(self, code, silent=True, targets=None, block=None):
----------
code : str
the code string to be executed
block : bool
block : bool [default self.block]
whether or not to wait until done to return
default: self.block
label : str [default self.label]
set an optional user-defined task identifier
"""
block = self.block if block is None else block
targets = self.targets if targets is None else targets
label = self.label if label is None else label
metadata = dict(label=label)

_idents, _targets = self.client._build_targets(targets)
futures = []
for ident in _idents:
future = self.client.send_execute_request(
self._socket, code, silent=silent, ident=ident
self._socket, code, silent=silent, ident=ident, metadata=metadata
)
futures.append(future)
if isinstance(targets, int):
Expand All @@ -708,7 +717,7 @@ def execute(self, code, silent=True, targets=None, block=None):
pass
return ar

def run(self, filename, targets=None, block=None):
def run(self, filename, targets=None, block=None, label=None):
"""Execute contents of `filename` on my engine(s).

This simply reads the contents of the file and calls `execute`.
Expand All @@ -723,13 +732,15 @@ def run(self, filename, targets=None, block=None):
block : bool
whether or not to wait until done
default: self.block
label : str
set an optional user-defined task identifier

"""
with open(filename) as f:
# add newline in case of trailing indented whitespace
# which will cause SyntaxError
code = f.read() + '\n'
return self.execute(code, block=block, targets=targets)
return self.execute(code, block=block, targets=targets, label=label)

def update(self, ns):
"""update remote namespace with dict `ns`
Expand Down Expand Up @@ -1076,7 +1087,6 @@ def map(
block=None,
track=False,
return_exceptions=False,
label=None,
):
"""Parallel version of builtin `map`, using this View's `targets`.

Expand Down Expand Up @@ -1297,6 +1307,8 @@ def set_flags(self, **kwargs):
DependencyTimeout.
retries : int
Number of times a task will be retried on failure.
label : str
set an optional user-defined task identifier
"""

super().set_flags(**kwargs)
Expand Down Expand Up @@ -1348,6 +1360,8 @@ def _really_apply(
whether to block
track : bool [default: self.track]
whether to ask zmq to track the message, for safe non-copying sends
label : str [default self.label]
set an optional user-defined task identifier
!!!!!! TODO : THE REST HERE !!!!

Returns
Expand Down Expand Up @@ -1470,6 +1484,8 @@ def map(

return_exceptions: bool [default False]
Return Exceptions instead of raising on the first exception.
label : str [default self.label]
set an optional user-defined task identifier

Returns
-------
Expand Down
2 changes: 2 additions & 0 deletions ipyparallel/controller/dictdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import copy
import fnmatch
from copy import deepcopy
from datetime import datetime

Expand All @@ -59,6 +60,7 @@
'$all': lambda a, b: all([a in bb for bb in b]),
'$mod': lambda a, b: a % b[0] == b[1],
'$exists': lambda a, b: (b and a is not None) or (a is None and not b),
'$glob': lambda a, b: fnmatch.fnmatch(a, b) if a is not None else False,
}


Expand Down
Loading