From fbab54a51b1af936983ecc9bb0c167e6bef56efd Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Wed, 11 Feb 2026 22:20:17 +0100 Subject: [PATCH 01/32] test code for labels added --- ipyparallel/tests/test_label.py | 108 ++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 ipyparallel/tests/test_label.py diff --git a/ipyparallel/tests/test_label.py b/ipyparallel/tests/test_label.py new file mode 100644 index 00000000..8820b863 --- /dev/null +++ b/ipyparallel/tests/test_label.py @@ -0,0 +1,108 @@ +"""Tests for task label functionality""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. +import logging +import os +from unittest import TestCase + +import pytest + +import ipyparallel as ipp +from ipyparallel.cluster.launcher import LocalControllerLauncher + + +def speudo_wait(t): + import time + + tic = time.time() + print(f"waiting for {t}s...") + # time.sleep(t) # do NOT wait for t seconds to speed up tests + print("done") + return time.time() - tic + + +class TaskLabelTest: + def setUp(self): + self.cluster = ipp.Cluster( + n=2, log_level=10, controller=self.get_controller_launcher() + ) + self.cluster.start_cluster_sync() + + self.rc = self.cluster.connect_client_sync() + self.rc.wait_for_engines(n=2) + + def get_controller_launcher(self): + raise NotImplementedError + + def tearDown(self): + self.cluster.stop_engines() + self.cluster.stop_controller() + # self.cluster.close() + + def run_tasks(self, view): + ar_list = [] + # use context to set label + with view.temp_flags(label="mylabel_map"): + ar_list.append(view.map_async(speudo_wait, [1.1, 1.2, 1.3, 1.4, 1.5])) + # use set_flags to set label + ar_list.extend( + [ + view.set_flags(label=f"mylabel_apply_{i:02}").apply_async( + speudo_wait, 2 + i / 10 + ) + for i in range(5) + ] + ) + view.wait(ar_list) + + # build list of used labels + map_labels = ["mylabel_map"] + apply_labels = [] + for i in range(5): + apply_labels.append(f"mylabel_apply_{i:02}") + return map_labels, apply_labels + + def check_labels(self, labels): + # query database + data = self.rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label']) + for d in data: + msg_id = d['msg_id'] + label = d['label'] + assert label in labels + labels.remove(label) + + assert len(labels) == 0 + + def clear_db(self): + self.rc.purge_everything() + + def test_balanced_view(self): + bview = self.rc.load_balanced_view() + map_labels, apply_labels = self.run_tasks(bview) + labels = map_labels * 5 + apply_labels + self.check_labels(labels) + self.clear_db() + + def test_direct_view(self): + dview = self.rc[:] + map_labels, apply_labels = self.run_tasks(dview) + labels = map_labels * 2 + apply_labels * 2 + self.check_labels(labels) + self.clear_db() + + +class TestLabelDictDB(TaskLabelTest, TestCase): + def get_controller_launcher(self): + class dictDB(LocalControllerLauncher): + controller_args = ["--dictdb"] + + return dictDB + + +class TestLabelSqliteDB(TaskLabelTest, TestCase): + def get_controller_launcher(self): + class sqliteDB(LocalControllerLauncher): + controller_args = ["--sqlitedb"] + + return sqliteDB From 171bc98c1bb3b6b54774add1d121d795aa5cdbac Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Thu, 12 Feb 2026 18:52:01 +0100 Subject: [PATCH 02/32] tzinfo got lost when storing in mongodb --- ipyparallel/controller/mongodb.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/ipyparallel/controller/mongodb.py b/ipyparallel/controller/mongodb.py index 7f247ac3..ea1883f4 100644 --- a/ipyparallel/controller/mongodb.py +++ b/ipyparallel/controller/mongodb.py @@ -15,6 +15,7 @@ from traitlets import Dict, Instance, List, Unicode from .dictdb import BaseDB +from datetime import datetime, timezone # we need to determine the pymongo version because of API changes. see # https://pymongo.readthedocs.io/en/stable/migrate-to-pymongo4.html @@ -25,6 +26,16 @@ # MongoDB class # ----------------------------------------------------------------------------- +def _ensure_utc(obj): + if isinstance(obj, datetime): + obj = obj.replace(tzinfo=timezone.utc) + return obj + +def _ensure_utc_for_record(rec): + for key in ('submitted', 'started', 'completed', 'received'): + if key in rec: + rec[key] = _ensure_utc(rec[key]) + class MongoDB(BaseDB): """MongoDB TaskRecord backend.""" @@ -91,6 +102,7 @@ def get_record(self, msg_id): if not r: # r will be '' if nothing is found raise KeyError(msg_id) + _ensure_utc_for_record(r) return r def update_record(self, msg_id, rec): @@ -125,6 +137,7 @@ def find_records(self, check, keys=None): matches = list(self._records.find(check, keys)) for rec in matches: rec.pop('_id') + _ensure_utc_for_record(rec) return matches def get_history(self): From 2f1293f234157da846d393d06bf0f2950c4d64cf Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 12 Feb 2026 17:53:34 +0000 Subject: [PATCH 03/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ipyparallel/controller/mongodb.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ipyparallel/controller/mongodb.py b/ipyparallel/controller/mongodb.py index 6c102fd3..ad580007 100644 --- a/ipyparallel/controller/mongodb.py +++ b/ipyparallel/controller/mongodb.py @@ -11,10 +11,11 @@ except ImportError: from bson import Binary +from datetime import datetime, timezone + from traitlets import Dict, Instance, List, Unicode from .dictdb import BaseDB -from datetime import datetime, timezone # we need to determine the pymongo version because of API changes. see # https://pymongo.readthedocs.io/en/stable/migrate-to-pymongo4.html @@ -25,11 +26,13 @@ # MongoDB class # ----------------------------------------------------------------------------- + def _ensure_utc(obj): if isinstance(obj, datetime): obj = obj.replace(tzinfo=timezone.utc) return obj + def _ensure_utc_for_record(rec): for key in ('submitted', 'started', 'completed', 'received'): if key in rec: From 7f0400a1f4570d97335dec511060cab99c44193e Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Thu, 12 Feb 2026 19:07:48 +0100 Subject: [PATCH 04/32] make mongodb tzinfo aware --- ipyparallel/controller/mongodb.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/ipyparallel/controller/mongodb.py b/ipyparallel/controller/mongodb.py index 6c102fd3..83cd9c4a 100644 --- a/ipyparallel/controller/mongodb.py +++ b/ipyparallel/controller/mongodb.py @@ -14,7 +14,7 @@ from traitlets import Dict, Instance, List, Unicode from .dictdb import BaseDB -from datetime import datetime, timezone +from bson.codec_options import CodecOptions # we need to determine the pymongo version because of API changes. see # https://pymongo.readthedocs.io/en/stable/migrate-to-pymongo4.html @@ -25,16 +25,6 @@ # MongoDB class # ----------------------------------------------------------------------------- -def _ensure_utc(obj): - if isinstance(obj, datetime): - obj = obj.replace(tzinfo=timezone.utc) - return obj - -def _ensure_utc_for_record(rec): - for key in ('submitted', 'started', 'completed', 'received'): - if key in rec: - rec[key] = _ensure_utc(rec[key]) - class MongoDB(BaseDB): """MongoDB TaskRecord backend.""" @@ -70,8 +60,11 @@ def __init__(self, **kwargs): ) if not self.database: self.database = self.session + options = CodecOptions(tz_aware=True) self._db = self._connection[self.database] - self._records = self._db['task_records'] + self._records = self._db.get_collection("task_records", options) + #self._records = self._db['task_records'] + if pymongo_version_major >= 4: # mimic the old API 3.x self._records.insert = self._records.insert_one @@ -101,7 +94,6 @@ def get_record(self, msg_id): if not r: # r will be '' if nothing is found raise KeyError(msg_id) - _ensure_utc_for_record(r) return r def update_record(self, msg_id, rec): @@ -136,7 +128,6 @@ def find_records(self, check, keys=None): matches = list(self._records.find(check, keys)) for rec in matches: rec.pop('_id') - _ensure_utc_for_record(rec) return matches def get_history(self): From a9e4266ba8d9941b31077c0b8aa26af961fd4a1c Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Thu, 12 Feb 2026 19:09:17 +0100 Subject: [PATCH 05/32] make mongodb tzinfo aware --- ipyparallel/controller/mongodb.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ipyparallel/controller/mongodb.py b/ipyparallel/controller/mongodb.py index 0cb55dca..83cd9c4a 100644 --- a/ipyparallel/controller/mongodb.py +++ b/ipyparallel/controller/mongodb.py @@ -94,7 +94,6 @@ def get_record(self, msg_id): if not r: # r will be '' if nothing is found raise KeyError(msg_id) - _ensure_utc_for_record(r) return r def update_record(self, msg_id, rec): From e473c818ede7db8e7d459db5cbf3e784add67b88 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 12 Feb 2026 18:09:42 +0000 Subject: [PATCH 06/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ipyparallel/controller/mongodb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipyparallel/controller/mongodb.py b/ipyparallel/controller/mongodb.py index 83cd9c4a..acf60005 100644 --- a/ipyparallel/controller/mongodb.py +++ b/ipyparallel/controller/mongodb.py @@ -11,10 +11,10 @@ except ImportError: from bson import Binary +from bson.codec_options import CodecOptions from traitlets import Dict, Instance, List, Unicode from .dictdb import BaseDB -from bson.codec_options import CodecOptions # we need to determine the pymongo version because of API changes. see # https://pymongo.readthedocs.io/en/stable/migrate-to-pymongo4.html @@ -63,7 +63,7 @@ def __init__(self, **kwargs): options = CodecOptions(tz_aware=True) self._db = self._connection[self.database] self._records = self._db.get_collection("task_records", options) - #self._records = self._db['task_records'] + # self._records = self._db['task_records'] if pymongo_version_major >= 4: # mimic the old API 3.x From 5762cd60057fe7b1887c3382c3016c7924fb7282 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Thu, 12 Feb 2026 20:13:21 +0100 Subject: [PATCH 07/32] switch to new mongodb api --- ipyparallel/controller/mongodb.py | 33 +++++++++++++------------------ 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/ipyparallel/controller/mongodb.py b/ipyparallel/controller/mongodb.py index 83cd9c4a..d17378e1 100644 --- a/ipyparallel/controller/mongodb.py +++ b/ipyparallel/controller/mongodb.py @@ -16,11 +16,6 @@ from .dictdb import BaseDB from bson.codec_options import CodecOptions -# we need to determine the pymongo version because of API changes. see -# https://pymongo.readthedocs.io/en/stable/migrate-to-pymongo4.html -pymongo_version_major = int(version.split('.')[0]) -pymongo_version_minor = int(version.split('.')[1]) - # ----------------------------------------------------------------------------- # MongoDB class # ----------------------------------------------------------------------------- @@ -54,6 +49,14 @@ class MongoDB(BaseDB): def __init__(self, **kwargs): super().__init__(**kwargs) + + # make sure that pymongo version is at least 4.x because of API changes. see + # https://pymongo.readthedocs.io/en/stable/migrate-to-pymongo4.html + # for more details + pymongo_version_major = int(version.split('.')[0]) + if pymongo_version_major < 4: + raise Exception(f"pymongo package too old (current version={version}). Please update to version 4.0 or higher") + if self._connection is None: self._connection = MongoClient( *self.connection_args, **self.connection_kwargs @@ -64,16 +67,8 @@ def __init__(self, **kwargs): self._db = self._connection[self.database] self._records = self._db.get_collection("task_records", options) #self._records = self._db['task_records'] - - if pymongo_version_major >= 4: - # mimic the old API 3.x - self._records.insert = self._records.insert_one - self._records.update = self._records.update_one - self._records.ensure_index = self._records.create_index - self._records.remove = self._records.delete_many - - self._records.ensure_index('msg_id', unique=True) - self._records.ensure_index('submitted') # for sorting history + self._records.create_index('msg_id', unique=True) + self._records.create_index('submitted') # for sorting history # for rec in self._records.find def _binary_buffers(self, rec): @@ -86,7 +81,7 @@ def add_record(self, msg_id, rec): """Add a new Task Record, by msg_id.""" # print rec rec = self._binary_buffers(rec) - self._records.insert(rec) + self._records.insert_one(rec) def get_record(self, msg_id): """Get a specific Task Record, by msg_id.""" @@ -100,15 +95,15 @@ def update_record(self, msg_id, rec): """Update the data in an existing record.""" rec = self._binary_buffers(rec) - self._records.update({'msg_id': msg_id}, {'$set': rec}) + self._records.update_one({'msg_id': msg_id}, {'$set': rec}) def drop_matching_records(self, check): """Remove a record from the DB.""" - self._records.remove(check) + self._records.delete_many(check) def drop_record(self, msg_id): """Remove a record from the DB.""" - self._records.remove({'msg_id': msg_id}) + self._records.delete_many({'msg_id': msg_id}) def find_records(self, check, keys=None): """Find records matching a query dict, optionally extracting subset of keys. From 6a36864d3f27ea88b0b8c6417176671d70a53cef Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Thu, 12 Feb 2026 20:16:21 +0100 Subject: [PATCH 08/32] mongodb installation added to github actions --- .github/workflows/test.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f38d5d27..6613bde6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -128,6 +128,23 @@ jobs: pip install distributed joblib pip install --only-binary :all: matplotlib + - name: Start MongoDB + if: ${{ ! matrix.runs_on }} # only under linux/ubuntu + uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default + + - name: Install pymongo package + if: ${{ ! matrix.runs_on }} # only under linux/ubuntu + run: pip install pymongo + + - name: Try to connect to mongodb + if: ${{ ! matrix.runs_on }} # only under linux/ubuntu + run: | + python3 < Date: Thu, 12 Feb 2026 20:23:52 +0100 Subject: [PATCH 09/32] ruff format changes --- ipyparallel/controller/mongodb.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ipyparallel/controller/mongodb.py b/ipyparallel/controller/mongodb.py index aa2c1809..45a0f68c 100644 --- a/ipyparallel/controller/mongodb.py +++ b/ipyparallel/controller/mongodb.py @@ -11,10 +11,10 @@ except ImportError: from bson import Binary +from bson.codec_options import CodecOptions from traitlets import Dict, Instance, List, Unicode from .dictdb import BaseDB -from bson.codec_options import CodecOptions # ----------------------------------------------------------------------------- # MongoDB class @@ -55,7 +55,9 @@ def __init__(self, **kwargs): # for more details pymongo_version_major = int(version.split('.')[0]) if pymongo_version_major < 4: - raise Exception(f"pymongo package too old (current version={version}). Please update to version 4.0 or higher") + raise Exception( + f"pymongo package too old (current version={version}). Please update to version 4.0 or higher" + ) if self._connection is None: self._connection = MongoClient( @@ -66,7 +68,7 @@ def __init__(self, **kwargs): options = CodecOptions(tz_aware=True) self._db = self._connection[self.database] self._records = self._db.get_collection("task_records", options) - #self._records = self._db['task_records'] + # self._records = self._db['task_records'] self._records.create_index('msg_id', unique=True) self._records.create_index('submitted') # for sorting history # for rec in self._records.find From 1a2624e6c4a7df63c8a19a16250610a79753584a Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Thu, 12 Feb 2026 22:22:11 +0100 Subject: [PATCH 10/32] exclude mongodb isntall from slurm --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6613bde6..b56f9f29 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -129,15 +129,15 @@ jobs: pip install --only-binary :all: matplotlib - name: Start MongoDB - if: ${{ ! matrix.runs_on }} # only under linux/ubuntu + if: ${{ ! matrix.runs_on && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default - name: Install pymongo package - if: ${{ ! matrix.runs_on }} # only under linux/ubuntu + if: ${{ ! matrix.runs_on && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu run: pip install pymongo - name: Try to connect to mongodb - if: ${{ ! matrix.runs_on }} # only under linux/ubuntu + if: ${{ ! matrix.runs_on && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu run: | python3 < Date: Thu, 12 Feb 2026 23:33:32 +0100 Subject: [PATCH 11/32] Update test.yml --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b56f9f29..867be574 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -129,15 +129,15 @@ jobs: pip install --only-binary :all: matplotlib - name: Start MongoDB - if: ${{ ! matrix.runs_on && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu + if: ${{ ! matrix.runs_on || matrix.cluster_type != 'slurm' }} # only under linux/ubuntu uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default - name: Install pymongo package - if: ${{ ! matrix.runs_on && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu + if: ${{ ! matrix.runs_on || matrix.cluster_type != 'slurm' }} # only under linux/ubuntu run: pip install pymongo - name: Try to connect to mongodb - if: ${{ ! matrix.runs_on && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu + if: ${{ ! matrix.runs_on || matrix.cluster_type != 'slurm' }} # only under linux/ubuntu run: | python3 < Date: Thu, 12 Feb 2026 23:40:28 +0100 Subject: [PATCH 12/32] Update test.yml --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 867be574..4cb71e9e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -129,15 +129,15 @@ jobs: pip install --only-binary :all: matplotlib - name: Start MongoDB - if: ${{ ! matrix.runs_on || matrix.cluster_type != 'slurm' }} # only under linux/ubuntu + if: ${{ (! matrix.runs_on) && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default - name: Install pymongo package - if: ${{ ! matrix.runs_on || matrix.cluster_type != 'slurm' }} # only under linux/ubuntu + if: ${{ (! matrix.runs_on) && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu run: pip install pymongo - name: Try to connect to mongodb - if: ${{ ! matrix.runs_on || matrix.cluster_type != 'slurm' }} # only under linux/ubuntu + if: ${{ (! matrix.runs_on) && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu run: | python3 < Date: Thu, 12 Feb 2026 23:46:53 +0100 Subject: [PATCH 13/32] Update test.yml --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4cb71e9e..3b0ed761 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -129,15 +129,15 @@ jobs: pip install --only-binary :all: matplotlib - name: Start MongoDB - if: ${{ (! matrix.runs_on) && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu + if: ${{ (! matrix.runs_on) && (matrix.cluster_type != 'slurm') }} # only under linux/ubuntu uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default - name: Install pymongo package - if: ${{ (! matrix.runs_on) && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu + if: ${{ (! matrix.runs_on) && (matrix.cluster_type != 'slurm') }} # only under linux/ubuntu run: pip install pymongo - name: Try to connect to mongodb - if: ${{ (! matrix.runs_on) && matrix.cluster_type != 'slurm' }} # only under linux/ubuntu + if: ${{ (! matrix.runs_on) && (matrix.cluster_type != 'slurm') }} # only under linux/ubuntu run: | python3 < Date: Fri, 13 Feb 2026 15:58:19 +0100 Subject: [PATCH 14/32] work-a-round fix for image already exists --- .github/workflows/test.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3b0ed761..94043b99 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -84,11 +84,14 @@ jobs: - name: Set up slurm if: ${{ matrix.cluster_type == 'slurm' }} + # docker build can lead to race condition -> image "docker.io/library/ipp-cluster:slurm": already exists + # see https://github.com/mlflow/mlflow/pull/20779 + # work-a-round fix: docker compose again if first call failed run: | export DOCKER_BUILDKIT=1 export COMPOSE_DOCKER_CLI_BUILD=1 cd ci/slurm - docker compose up -d --build + docker compose up -d --build || docker compose up -d --build - name: Install Python (conda) ${{ matrix.python }} if: ${{ matrix.cluster_type == 'mpi' }} From 84009a616e568abbdd379665a30337ed76f1ab95 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 13 Feb 2026 16:16:05 +0100 Subject: [PATCH 15/32] pre-commit format changes --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 94043b99..ac5d9c09 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -86,7 +86,7 @@ jobs: if: ${{ matrix.cluster_type == 'slurm' }} # docker build can lead to race condition -> image "docker.io/library/ipp-cluster:slurm": already exists # see https://github.com/mlflow/mlflow/pull/20779 - # work-a-round fix: docker compose again if first call failed + # work-a-round fix: docker compose again if first call failed run: | export DOCKER_BUILDKIT=1 export COMPOSE_DOCKER_CLI_BUILD=1 @@ -133,7 +133,7 @@ jobs: - name: Start MongoDB if: ${{ (! matrix.runs_on) && (matrix.cluster_type != 'slurm') }} # only under linux/ubuntu - uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default + uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default - name: Install pymongo package if: ${{ (! matrix.runs_on) && (matrix.cluster_type != 'slurm') }} # only under linux/ubuntu From 0d94418875b2538014908e39e281fec6a7fef0a6 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 13 Feb 2026 17:08:27 +0100 Subject: [PATCH 16/32] enable mongodb for slurm test environment as well --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ac5d9c09..c1308a78 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -132,15 +132,15 @@ jobs: pip install --only-binary :all: matplotlib - name: Start MongoDB - if: ${{ (! matrix.runs_on) && (matrix.cluster_type != 'slurm') }} # only under linux/ubuntu + if: ${{ ! matrix.runs_on }} # only under linux/ubuntu uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default - name: Install pymongo package - if: ${{ (! matrix.runs_on) && (matrix.cluster_type != 'slurm') }} # only under linux/ubuntu + if: ${{ ! matrix.runs_on }} # only under linux/ubuntu run: pip install pymongo - name: Try to connect to mongodb - if: ${{ (! matrix.runs_on) && (matrix.cluster_type != 'slurm') }} # only under linux/ubuntu + if: ${{ ! matrix.runs_on }} # only under linux/ubuntu run: | python3 < Date: Fri, 13 Feb 2026 17:46:28 +0100 Subject: [PATCH 17/32] enable mongodb only under linux with no cluster --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c1308a78..fdc88c30 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -132,15 +132,15 @@ jobs: pip install --only-binary :all: matplotlib - name: Start MongoDB - if: ${{ ! matrix.runs_on }} # only under linux/ubuntu + if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default - name: Install pymongo package - if: ${{ ! matrix.runs_on }} # only under linux/ubuntu + if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster run: pip install pymongo - name: Try to connect to mongodb - if: ${{ ! matrix.runs_on }} # only under linux/ubuntu + if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster run: | python3 < Date: Fri, 13 Feb 2026 17:48:24 +0100 Subject: [PATCH 18/32] pre-commit format correction --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fdc88c30..b9a249ed 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -132,15 +132,15 @@ jobs: pip install --only-binary :all: matplotlib - name: Start MongoDB - if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster + if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default - name: Install pymongo package - if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster + if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster run: pip install pymongo - name: Try to connect to mongodb - if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster + if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster run: | python3 < Date: Mon, 23 Feb 2026 16:16:48 +0100 Subject: [PATCH 19/32] corrections within windows --- docs/make.bat | 4 ++-- ipyparallel/tests/test_label.py | 22 ++++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/docs/make.bat b/docs/make.bat index f7d8779b..40738ae8 100644 --- a/docs/make.bat +++ b/docs/make.bat @@ -4,8 +4,8 @@ pushd %~dp0 REM Command file for Sphinx documentation -if "%SPHINXBUILD%" == "" ( - set SPHINXBUILD=--color -W --keep-going +if "%SPHINXOPTS%" == "" ( + set SPHINXOPTS=--color -W --keep-going ) if "%SPHINXBUILD%" == "" ( set SPHINXBUILD=sphinx-build diff --git a/ipyparallel/tests/test_label.py b/ipyparallel/tests/test_label.py index 8820b863..4707009f 100644 --- a/ipyparallel/tests/test_label.py +++ b/ipyparallel/tests/test_label.py @@ -106,3 +106,25 @@ class sqliteDB(LocalControllerLauncher): controller_args = ["--sqlitedb"] return sqliteDB + + +class TestLabelMongoDB(TaskLabelTest, TestCase): + + def get_controller_launcher(self): + class mongoDB(LocalControllerLauncher): + database = "mongodb-label-test" # use this database label for testing + controller_args = ["--mongodb", f"--MongoDB.database={database}" ] + + try: + from pymongo import MongoClient + c = MongoClient(serverSelectionTimeoutMS=2000) + servinfo = c.server_info() # checks if mongo server is reachable using the default connection parameter + + # make sure that test database is empty + db = c[mongoDB.database] + records = db.get_collection("task_records") + records.delete_many({}) + except (ImportError, Exception): + pytest.skip("Requires mongodb") + + return mongoDB From 1a5750732c40911c52595b8c7c6222e2a6569419 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 23 Feb 2026 15:50:52 +0000 Subject: [PATCH 20/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ipyparallel/tests/test_label.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ipyparallel/tests/test_label.py b/ipyparallel/tests/test_label.py index 4707009f..de10e04b 100644 --- a/ipyparallel/tests/test_label.py +++ b/ipyparallel/tests/test_label.py @@ -109,14 +109,14 @@ class sqliteDB(LocalControllerLauncher): class TestLabelMongoDB(TaskLabelTest, TestCase): - def get_controller_launcher(self): class mongoDB(LocalControllerLauncher): - database = "mongodb-label-test" # use this database label for testing - controller_args = ["--mongodb", f"--MongoDB.database={database}" ] + database = "mongodb-label-test" # use this database label for testing + controller_args = ["--mongodb", f"--MongoDB.database={database}"] try: from pymongo import MongoClient + c = MongoClient(serverSelectionTimeoutMS=2000) servinfo = c.server_info() # checks if mongo server is reachable using the default connection parameter From d2a0595041af979bff0a38196b17a3b16c678bff Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Mon, 23 Feb 2026 16:54:35 +0100 Subject: [PATCH 21/32] minor change: formating corrected --- ipyparallel/tests/test_label.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ipyparallel/tests/test_label.py b/ipyparallel/tests/test_label.py index 4707009f..de10e04b 100644 --- a/ipyparallel/tests/test_label.py +++ b/ipyparallel/tests/test_label.py @@ -109,14 +109,14 @@ class sqliteDB(LocalControllerLauncher): class TestLabelMongoDB(TaskLabelTest, TestCase): - def get_controller_launcher(self): class mongoDB(LocalControllerLauncher): - database = "mongodb-label-test" # use this database label for testing - controller_args = ["--mongodb", f"--MongoDB.database={database}" ] + database = "mongodb-label-test" # use this database label for testing + controller_args = ["--mongodb", f"--MongoDB.database={database}"] try: from pymongo import MongoClient + c = MongoClient(serverSelectionTimeoutMS=2000) servinfo = c.server_info() # checks if mongo server is reachable using the default connection parameter From 51367d2f0bec8c00a0d2766707c570b9e152ea4c Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Mon, 23 Feb 2026 16:55:33 +0100 Subject: [PATCH 22/32] label added to docu / execute and run now supports label --- docs/source/reference/db.md | 1 + ipyparallel/client/client.py | 7 ++++++- ipyparallel/client/view.py | 32 ++++++++++++++++++++++++-------- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/docs/source/reference/db.md b/docs/source/reference/db.md index 46476469..e3df0896 100644 --- a/docs/source/reference/db.md +++ b/docs/source/reference/db.md @@ -75,6 +75,7 @@ TaskRecord keys: | error | dict | Python traceback (error message content) | | stdout | str | Stream of stdout data | | stderr | str | Stream of stderr data | +| label | str | optional user-defined task identifier | MongoDB operators we emulate on all backends: diff --git a/ipyparallel/client/client.py b/ipyparallel/client/client.py index 0b3d8846..0e7fd64c 100644 --- a/ipyparallel/client/client.py +++ b/ipyparallel/client/client.py @@ -1226,7 +1226,9 @@ def relay_comm(msg): for callback in msg_future.iopub_callbacks: callback(msg) - def create_message_futures(self, msg_id, header, async_result=False, track=False): + def create_message_futures( + self, msg_id, header, async_result=False, track=False, label=None + ): msg_future = MessageFuture(msg_id, header=header, track=track) futures = [msg_future] self._futures[msg_id] = msg_future @@ -1237,6 +1239,7 @@ def create_message_futures(self, msg_id, header, async_result=False, track=False # hook up metadata output.metadata = self.metadata[msg_id] output.metadata['submitted'] = util.utcnow() + output.metadata['label'] = label msg_future.output = output futures.append(output) return futures @@ -1266,6 +1269,7 @@ def _send( msg_id = msg['header']['msg_id'] expect_reply = msg_type not in {"comm_msg", "comm_close", "comm_open"} + label = metadata["label"] if metadata and "label" in metadata else None if expect_reply and track_outstanding: # add to outstanding, history @@ -1289,6 +1293,7 @@ def _send( msg['header'], async_result=msg_type in {'execute_request', 'apply_request'}, track=track, + label=label, ) if message_future_hook is not None: message_future_hook(futures[0]) diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index 15ad4b69..f5ca23b3 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -138,7 +138,7 @@ def __len__(self): def set_flags(self, **kwargs): """set my attribute flags by keyword. - Views determine behavior with a few attributes (`block`, `track`, etc.). + Views determine behavior with a few attributes (`block`, `track`, `label`, etc.). These attributes can be set all at once by name with this method. Parameters @@ -149,6 +149,8 @@ def set_flags(self, **kwargs): whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends. + label : str + set an optional user-defined task identifier """ for name, value in kwargs.items(): if name not in self._flag_names: @@ -557,6 +559,8 @@ def _really_apply( whether to block track : bool [default: self.track] whether to ask zmq to track the message, for safe non-copying sends + label : str [default self.label] + set an optional user-defined task identifier Returns ------- @@ -642,6 +646,8 @@ def map( Only for zero-copy sends such as numpy arrays that are going to be modified in-place. return_exceptions : bool [default False] Return remote Exceptions in the result sequence instead of raising them. + label : str [default self.label] + set an optional user-defined task identifier Returns ------- @@ -672,7 +678,7 @@ def map( @sync_results @save_ids - def execute(self, code, silent=True, targets=None, block=None): + def execute(self, code, silent=True, targets=None, block=None, label=None): """Executes `code` on `targets` in blocking or nonblocking manner. ``execute`` is always `bound` (affects engine namespace) @@ -681,18 +687,21 @@ def execute(self, code, silent=True, targets=None, block=None): ---------- code : str the code string to be executed - block : bool + block : bool [default self.block] whether or not to wait until done to return - default: self.block + label : str [default self.label] + set an optional user-defined task identifier """ block = self.block if block is None else block targets = self.targets if targets is None else targets + label = self.label if label is None else label + metadata = dict(label=label) _idents, _targets = self.client._build_targets(targets) futures = [] for ident in _idents: future = self.client.send_execute_request( - self._socket, code, silent=silent, ident=ident + self._socket, code, silent=silent, ident=ident, metadata=metadata ) futures.append(future) if isinstance(targets, int): @@ -708,7 +717,7 @@ def execute(self, code, silent=True, targets=None, block=None): pass return ar - def run(self, filename, targets=None, block=None): + def run(self, filename, targets=None, block=None, label=None): """Execute contents of `filename` on my engine(s). This simply reads the contents of the file and calls `execute`. @@ -723,13 +732,15 @@ def run(self, filename, targets=None, block=None): block : bool whether or not to wait until done default: self.block + label : str + set an optional user-defined task identifier """ with open(filename) as f: # add newline in case of trailing indented whitespace # which will cause SyntaxError code = f.read() + '\n' - return self.execute(code, block=block, targets=targets) + return self.execute(code, block=block, targets=targets, label=label) def update(self, ns): """update remote namespace with dict `ns` @@ -1076,7 +1087,6 @@ def map( block=None, track=False, return_exceptions=False, - label=None, ): """Parallel version of builtin `map`, using this View's `targets`. @@ -1297,6 +1307,8 @@ def set_flags(self, **kwargs): DependencyTimeout. retries : int Number of times a task will be retried on failure. + label : str + set an optional user-defined task identifier """ super().set_flags(**kwargs) @@ -1348,6 +1360,8 @@ def _really_apply( whether to block track : bool [default: self.track] whether to ask zmq to track the message, for safe non-copying sends + label : str [default self.label] + set an optional user-defined task identifier !!!!!! TODO : THE REST HERE !!!! Returns @@ -1470,6 +1484,8 @@ def map( return_exceptions: bool [default False] Return Exceptions instead of raising on the first exception. + label : str [default self.label] + set an optional user-defined task identifier Returns ------- From f3f07feca154073681005df2554e9f81978f9bee Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Mon, 23 Feb 2026 17:09:55 +0100 Subject: [PATCH 23/32] corrections for mongodb --- ipyparallel/tests/test_label.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipyparallel/tests/test_label.py b/ipyparallel/tests/test_label.py index de10e04b..d9910c87 100644 --- a/ipyparallel/tests/test_label.py +++ b/ipyparallel/tests/test_label.py @@ -65,7 +65,7 @@ def run_tasks(self, view): def check_labels(self, labels): # query database - data = self.rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label']) + data = self.rc.db_query({'label': {"$nin": [""]}}, keys=['msg_id', 'label']) for d in data: msg_id = d['msg_id'] label = d['label'] From b69497f1dac378e34c24a081d88da6ba2744e34a Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Thu, 5 Mar 2026 09:28:21 +0100 Subject: [PATCH 24/32] glob and regex supoort added --- ipyparallel/controller/dictdb.py | 4 ++++ ipyparallel/controller/sqlitedb.py | 8 +++++++ ipyparallel/tests/test_db.py | 36 ++++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index 4091e43a..261ed887 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -38,6 +38,8 @@ import copy from copy import deepcopy from datetime import datetime +import fnmatch +import re from traitlets import Dict, Float, Integer, Unicode from traitlets.config.configurable import LoggingConfigurable @@ -59,6 +61,8 @@ '$all': lambda a, b: all([a in bb for bb in b]), '$mod': lambda a, b: a % b[0] == b[1], '$exists': lambda a, b: (b and a is not None) or (a is None and not b), + '$glob': lambda a, b: fnmatch.fnmatch(a, b), + '$regex': lambda a, b: re.match(b, a), } diff --git a/ipyparallel/controller/sqlitedb.py b/ipyparallel/controller/sqlitedb.py index 482baf80..59e98e94 100644 --- a/ipyparallel/controller/sqlitedb.py +++ b/ipyparallel/controller/sqlitedb.py @@ -4,6 +4,7 @@ # Distributed under the terms of the Modified BSD License. import json import os +import re try: import cPickle as pickle @@ -47,6 +48,8 @@ # '$all': None, # '$mod': None, # '$exists' : None + '$glob': "GLOB", + '$regex': "REGEXP", } null_operators = { '=': "IS NULL", @@ -97,6 +100,10 @@ def _convert_timestamp(s): """Adapt text timestamp to datetime""" return ensure_timezone(dateutil_parse(s)) +def _regexp(expr, item): + """sqlite callback function for performing a regex operation""" + reg = re.compile(expr) + return reg.match(item) is not None # ----------------------------------------------------------------------------- # SQLiteDB class @@ -271,6 +278,7 @@ def _init_db(self): # isolation_level = None)#, cached_statements=64, ) + self._db.create_function("REGEXP", 2, _regexp) # print dir(self._db) first_table = previous_table = self.table i = 0 diff --git a/ipyparallel/tests/test_db.py b/ipyparallel/tests/test_db.py index ccf59a4a..8afa887e 100644 --- a/ipyparallel/tests/test_db.py +++ b/ipyparallel/tests/test_db.py @@ -8,6 +8,8 @@ import time from datetime import datetime, timedelta from unittest import TestCase +import fnmatch +import re import pytest from jupyter_client.session import Session @@ -131,6 +133,40 @@ def test_find_records_in(self): found = [r['msg_id'] for r in recs] assert set(odd) == set(found) + # disabled for now. jo, 5.3.26 + # def test_find_records_glob(self): + # """test finding records with '$glob' operators""" + # hist = self.db.get_history() + # + # pattern = "*"+hist[0][5:-2]+"_?" + # ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] + # recs = self.db.find_records({'msg_id': {'$glob': pattern}}, ["msg_id"]) + # found = [r['msg_id'] for r in recs] + # assert set(ref) == set(found) + # + # pattern = "*_1?" + # ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] + # recs = self.db.find_records({'msg_id': {'$glob': pattern}},["msg_id"]) + # found = [r['msg_id'] for r in recs] + # assert set(ref) == set(found) + + def test_find_records_regex(self): + """test finding records with '$regex' operators""" + hist = self.db.get_history() + + pattern = "^.+_.$" + ref = [msg_id for msg_id in hist if re.match(pattern,msg_id)] + recs = self.db.find_records({'msg_id': {'$regex': pattern}}, ["msg_id"]) + found = [r['msg_id'] for r in recs] + assert set(ref) == set(found) + + pattern = "^.+_1[0-9]$" + ref = [msg_id for msg_id in hist if re.match(pattern,msg_id)] + recs = self.db.find_records({'msg_id': {'$regex': pattern}}, ["msg_id"]) + found = [r['msg_id'] for r in recs] + assert set(ref) == set(found) + + def test_get_history(self): msg_ids = self.db.get_history() latest = datetime(1984, 1, 1).replace(tzinfo=utc) From e74c26353cffac2c54b533a1aad40be319cb1a10 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 5 Mar 2026 08:30:03 +0000 Subject: [PATCH 25/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ipyparallel/controller/dictdb.py | 4 ++-- ipyparallel/controller/sqlitedb.py | 2 ++ ipyparallel/tests/test_db.py | 9 ++++----- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index 261ed887..3637715a 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -36,10 +36,10 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. import copy -from copy import deepcopy -from datetime import datetime import fnmatch import re +from copy import deepcopy +from datetime import datetime from traitlets import Dict, Float, Integer, Unicode from traitlets.config.configurable import LoggingConfigurable diff --git a/ipyparallel/controller/sqlitedb.py b/ipyparallel/controller/sqlitedb.py index 59e98e94..481ce78e 100644 --- a/ipyparallel/controller/sqlitedb.py +++ b/ipyparallel/controller/sqlitedb.py @@ -100,11 +100,13 @@ def _convert_timestamp(s): """Adapt text timestamp to datetime""" return ensure_timezone(dateutil_parse(s)) + def _regexp(expr, item): """sqlite callback function for performing a regex operation""" reg = re.compile(expr) return reg.match(item) is not None + # ----------------------------------------------------------------------------- # SQLiteDB class # ----------------------------------------------------------------------------- diff --git a/ipyparallel/tests/test_db.py b/ipyparallel/tests/test_db.py index 8afa887e..5c55b096 100644 --- a/ipyparallel/tests/test_db.py +++ b/ipyparallel/tests/test_db.py @@ -2,14 +2,14 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import fnmatch import logging import os +import re import tempfile import time from datetime import datetime, timedelta from unittest import TestCase -import fnmatch -import re import pytest from jupyter_client.session import Session @@ -155,18 +155,17 @@ def test_find_records_regex(self): hist = self.db.get_history() pattern = "^.+_.$" - ref = [msg_id for msg_id in hist if re.match(pattern,msg_id)] + ref = [msg_id for msg_id in hist if re.match(pattern, msg_id)] recs = self.db.find_records({'msg_id': {'$regex': pattern}}, ["msg_id"]) found = [r['msg_id'] for r in recs] assert set(ref) == set(found) pattern = "^.+_1[0-9]$" - ref = [msg_id for msg_id in hist if re.match(pattern,msg_id)] + ref = [msg_id for msg_id in hist if re.match(pattern, msg_id)] recs = self.db.find_records({'msg_id': {'$regex': pattern}}, ["msg_id"]) found = [r['msg_id'] for r in recs] assert set(ref) == set(found) - def test_get_history(self): msg_ids = self.db.get_history() latest = datetime(1984, 1, 1).replace(tzinfo=utc) From f990325eff98058103aef18f696d5abb17649321 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Thu, 5 Mar 2026 09:56:18 +0100 Subject: [PATCH 26/32] $glob supported added for monogdb $glob tests added --- ipyparallel/controller/mongodb.py | 33 ++++++++++++++++++++++++++++++- ipyparallel/tests/test_db.py | 31 ++++++++++++++--------------- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/ipyparallel/controller/mongodb.py b/ipyparallel/controller/mongodb.py index 45a0f68c..ec96cc8c 100644 --- a/ipyparallel/controller/mongodb.py +++ b/ipyparallel/controller/mongodb.py @@ -11,6 +11,8 @@ except ImportError: from bson import Binary +import fnmatch + from bson.codec_options import CodecOptions from traitlets import Dict, Instance, List, Unicode @@ -107,6 +109,35 @@ def drop_record(self, msg_id): """Remove a record from the DB.""" self._records.delete_many({'msg_id': msg_id}) + def _translate(self, filter): + """translates $glob to $regex operators""" + if isinstance(filter, list): + ret = [] + for f in filter: + ret.append(self._translate(f)) + return ret + elif isinstance(filter, dict): + ret = dict() + if "$glob" in filter: + params = dict(filter) + glob = params.pop("$glob") + if len(params) != 0: + raise Exception( + f"unkown paramters {params.keys()} for %glob operator" + ) + ret["$regex"] = fnmatch.translate(glob) + else: + for key, value in filter.items(): + if isinstance(value, dict): + ret[key] = self._translate(value) + elif isinstance(value, list): + ret[key] = self._translate(value) + else: + ret[key] = value + return ret + else: + return filter + def find_records(self, check, keys=None): """Find records matching a query dict, optionally extracting subset of keys. @@ -122,7 +153,7 @@ def find_records(self, check, keys=None): """ if keys and 'msg_id' not in keys: keys.append('msg_id') - matches = list(self._records.find(check, keys)) + matches = list(self._records.find(self._translate(check), keys)) for rec in matches: rec.pop('_id') return matches diff --git a/ipyparallel/tests/test_db.py b/ipyparallel/tests/test_db.py index 5c55b096..ba615e17 100644 --- a/ipyparallel/tests/test_db.py +++ b/ipyparallel/tests/test_db.py @@ -133,22 +133,21 @@ def test_find_records_in(self): found = [r['msg_id'] for r in recs] assert set(odd) == set(found) - # disabled for now. jo, 5.3.26 - # def test_find_records_glob(self): - # """test finding records with '$glob' operators""" - # hist = self.db.get_history() - # - # pattern = "*"+hist[0][5:-2]+"_?" - # ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] - # recs = self.db.find_records({'msg_id': {'$glob': pattern}}, ["msg_id"]) - # found = [r['msg_id'] for r in recs] - # assert set(ref) == set(found) - # - # pattern = "*_1?" - # ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] - # recs = self.db.find_records({'msg_id': {'$glob': pattern}},["msg_id"]) - # found = [r['msg_id'] for r in recs] - # assert set(ref) == set(found) + def test_find_records_glob(self): + """test finding records with '$glob' operators""" + hist = self.db.get_history() + + pattern = "*" + hist[0][5:-2] + "_?" + ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] + recs = self.db.find_records({'msg_id': {'$glob': pattern}}, ["msg_id"]) + found = [r['msg_id'] for r in recs] + assert set(ref) == set(found) + + pattern = "*_1?" + ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] + recs = self.db.find_records({'msg_id': {'$glob': pattern}}, ["msg_id"]) + found = [r['msg_id'] for r in recs] + assert set(ref) == set(found) def test_find_records_regex(self): """test finding records with '$regex' operators""" From 9b915937d742dfafd45f62250a8625bbe483ff67 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 6 Mar 2026 10:56:20 +0100 Subject: [PATCH 27/32] $regex operator removed and $glob docu added --- docs/source/reference/db.md | 26 ++++++++++++++++---------- ipyparallel/controller/dictdb.py | 1 - ipyparallel/controller/mongodb.py | 4 +++- ipyparallel/controller/sqlitedb.py | 1 - ipyparallel/tests/test_db.py | 16 ---------------- 5 files changed, 19 insertions(+), 29 deletions(-) diff --git a/docs/source/reference/db.md b/docs/source/reference/db.md index e3df0896..baa99c53 100644 --- a/docs/source/reference/db.md +++ b/docs/source/reference/db.md @@ -79,16 +79,22 @@ TaskRecord keys: MongoDB operators we emulate on all backends: -| Operator | Python equivalent | -| -------- | ----------------- | -| '\$in' | in | -| '\$nin' | not in | -| '\$eq' | == | -| '\$ne' | != | -| '\$gt' | > | -| '\$gte' | >= | -| '\$le' | \< | -| '\$lte' | \<= | +| Operator | Python equivalent | +| -------- |-------------------------------------------------------------------------------| +| '\$in' | in | +| '\$nin' | not in | +| '\$eq' | == | +| '\$ne' | != | +| '\$gt' | > | +| '\$gte' | >= | +| '\$le' | \< | +| '\$lte' | \<= | +| '\$glob' | [fnmatch](https://docs.python.org/3/library/fnmatch.html) (wildcard matching) | + +Remarks on _$glob_: The operator can be used to find substrings in DB columns based on +[unix style filename pattern matching](https://docs.python.org/3/library/fnmatch.html) +_$glob_ is **not** a regular MongoDB opertor, but is internally translated to a regular +expression (_$regex_) which is natively supported by MongoDB. The DB Query is useful for two primary cases: diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index 3637715a..db9df500 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -62,7 +62,6 @@ '$mod': lambda a, b: a % b[0] == b[1], '$exists': lambda a, b: (b and a is not None) or (a is None and not b), '$glob': lambda a, b: fnmatch.fnmatch(a, b), - '$regex': lambda a, b: re.match(b, a), } diff --git a/ipyparallel/controller/mongodb.py b/ipyparallel/controller/mongodb.py index ec96cc8c..99974efb 100644 --- a/ipyparallel/controller/mongodb.py +++ b/ipyparallel/controller/mongodb.py @@ -125,7 +125,9 @@ def _translate(self, filter): raise Exception( f"unkown paramters {params.keys()} for %glob operator" ) - ret["$regex"] = fnmatch.translate(glob) + # we need to attach the start and end match character to achieve + # an equivalent matching behavior to fnmatch in mongoDB + ret["$regex"] = "^"+fnmatch.translate(glob)+"$" else: for key, value in filter.items(): if isinstance(value, dict): diff --git a/ipyparallel/controller/sqlitedb.py b/ipyparallel/controller/sqlitedb.py index 481ce78e..952bdf72 100644 --- a/ipyparallel/controller/sqlitedb.py +++ b/ipyparallel/controller/sqlitedb.py @@ -49,7 +49,6 @@ # '$mod': None, # '$exists' : None '$glob': "GLOB", - '$regex': "REGEXP", } null_operators = { '=': "IS NULL", diff --git a/ipyparallel/tests/test_db.py b/ipyparallel/tests/test_db.py index ba615e17..da34bce0 100644 --- a/ipyparallel/tests/test_db.py +++ b/ipyparallel/tests/test_db.py @@ -149,22 +149,6 @@ def test_find_records_glob(self): found = [r['msg_id'] for r in recs] assert set(ref) == set(found) - def test_find_records_regex(self): - """test finding records with '$regex' operators""" - hist = self.db.get_history() - - pattern = "^.+_.$" - ref = [msg_id for msg_id in hist if re.match(pattern, msg_id)] - recs = self.db.find_records({'msg_id': {'$regex': pattern}}, ["msg_id"]) - found = [r['msg_id'] for r in recs] - assert set(ref) == set(found) - - pattern = "^.+_1[0-9]$" - ref = [msg_id for msg_id in hist if re.match(pattern, msg_id)] - recs = self.db.find_records({'msg_id': {'$regex': pattern}}, ["msg_id"]) - found = [r['msg_id'] for r in recs] - assert set(ref) == set(found) - def test_get_history(self): msg_ids = self.db.get_history() latest = datetime(1984, 1, 1).replace(tzinfo=utc) From 651fcc03e630b0caa6f135142802552c2424fe15 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 6 Mar 2026 09:56:47 +0000 Subject: [PATCH 28/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- docs/source/reference/db.md | 12 ++++++------ ipyparallel/controller/dictdb.py | 1 - ipyparallel/controller/mongodb.py | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/docs/source/reference/db.md b/docs/source/reference/db.md index baa99c53..41640047 100644 --- a/docs/source/reference/db.md +++ b/docs/source/reference/db.md @@ -80,7 +80,7 @@ TaskRecord keys: MongoDB operators we emulate on all backends: | Operator | Python equivalent | -| -------- |-------------------------------------------------------------------------------| +| -------- | ----------------------------------------------------------------------------- | | '\$in' | in | | '\$nin' | not in | | '\$eq' | == | @@ -89,12 +89,12 @@ MongoDB operators we emulate on all backends: | '\$gte' | >= | | '\$le' | \< | | '\$lte' | \<= | -| '\$glob' | [fnmatch](https://docs.python.org/3/library/fnmatch.html) (wildcard matching) | +| '\$glob' | [fnmatch](https://docs.python.org/3/library/fnmatch.html) (wildcard matching) | -Remarks on _$glob_: The operator can be used to find substrings in DB columns based on -[unix style filename pattern matching](https://docs.python.org/3/library/fnmatch.html) -_$glob_ is **not** a regular MongoDB opertor, but is internally translated to a regular -expression (_$regex_) which is natively supported by MongoDB. +Remarks on _$glob_: The operator can be used to find substrings in DB columns based on +[unix style filename pattern matching](https://docs.python.org/3/library/fnmatch.html) +_$glob_ is **not** a regular MongoDB opertor, but is internally translated to a regular +expression (_$regex_) which is natively supported by MongoDB. The DB Query is useful for two primary cases: diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index db9df500..df7f3769 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -37,7 +37,6 @@ # Distributed under the terms of the Modified BSD License. import copy import fnmatch -import re from copy import deepcopy from datetime import datetime diff --git a/ipyparallel/controller/mongodb.py b/ipyparallel/controller/mongodb.py index 99974efb..8bff3b96 100644 --- a/ipyparallel/controller/mongodb.py +++ b/ipyparallel/controller/mongodb.py @@ -127,7 +127,7 @@ def _translate(self, filter): ) # we need to attach the start and end match character to achieve # an equivalent matching behavior to fnmatch in mongoDB - ret["$regex"] = "^"+fnmatch.translate(glob)+"$" + ret["$regex"] = "^" + fnmatch.translate(glob) + "$" else: for key, value in filter.items(): if isinstance(value, dict): From 80704f1db836993b95bc90c56a7d144b52d32cd7 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 6 Mar 2026 12:47:11 +0100 Subject: [PATCH 29/32] $glob tests improved using labels --- ipyparallel/controller/dictdb.py | 2 +- ipyparallel/tests/test_db.py | 64 +++++++++++++++++++++++++------- 2 files changed, 52 insertions(+), 14 deletions(-) diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index df7f3769..e9e67e77 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -60,7 +60,7 @@ '$all': lambda a, b: all([a in bb for bb in b]), '$mod': lambda a, b: a % b[0] == b[1], '$exists': lambda a, b: (b and a is not None) or (a is None and not b), - '$glob': lambda a, b: fnmatch.fnmatch(a, b), + '$glob': lambda a, b: fnmatch.fnmatch(a, b) if a is not None else False, } diff --git a/ipyparallel/tests/test_db.py b/ipyparallel/tests/test_db.py index da34bce0..a5abae6e 100644 --- a/ipyparallel/tests/test_db.py +++ b/ipyparallel/tests/test_db.py @@ -135,19 +135,57 @@ def test_find_records_in(self): def test_find_records_glob(self): """test finding records with '$glob' operators""" - hist = self.db.get_history() - - pattern = "*" + hist[0][5:-2] + "_?" - ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] - recs = self.db.find_records({'msg_id': {'$glob': pattern}}, ["msg_id"]) - found = [r['msg_id'] for r in recs] - assert set(ref) == set(found) - - pattern = "*_1?" - ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] - recs = self.db.find_records({'msg_id': {'$glob': pattern}}, ["msg_id"]) - found = [r['msg_id'] for r in recs] - assert set(ref) == set(found) + msg_ids = self.load_records(10) + labels = [ + "group_a", + "group_a", + "group_a/subgroup_a", + "group_a/subgroup_b", + "group_b", + "group_b/subgroup_a", + "group_b/subgroup_b", + "group_b/subgroup_c", + "label*task", + "label?task", + ] + assert len(msg_ids) == len(labels) + for msg_id, label in zip(msg_ids, labels): + self.db.update_record(msg_id, dict(label=label)) + + patterns = [ + "group_a*", + "group_a/subgroup_?", + "group_b", + "group_b/subgroup_[ab]", + "*/subgroup_a", + "*[*]*", + "*[?]*", + ] + for pattern in patterns: + ref = [ + msg_id + for msg_id, label in zip(msg_ids, labels) + if fnmatch.fnmatch(label, pattern) + ] + recs = self.db.find_records( + {'label': {'$glob': pattern}}, ["msg_id", 'label'] + ) + found = [r['msg_id'] for r in recs] + assert set(ref) == set(found) + + # hist = self.db.get_history() + # + # pattern = "*" + hist[0][5:-2] + "_?" + # ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] + # recs = self.db.find_records({'msg_id': {'$glob': pattern}}, ["msg_id"]) + # found = [r['msg_id'] for r in recs] + # assert set(ref) == set(found) + # + # pattern = "*_1?" + # ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] + # recs = self.db.find_records({'msg_id': {'$glob': pattern}}, ["msg_id"]) + # found = [r['msg_id'] for r in recs] + # assert set(ref) == set(found) def test_get_history(self): msg_ids = self.db.get_history() From f95b3c381df52f340e4cabcc5a04508bdb32aec3 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 6 Mar 2026 13:03:02 +0100 Subject: [PATCH 30/32] comments added --- ipyparallel/tests/test_db.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ipyparallel/tests/test_db.py b/ipyparallel/tests/test_db.py index a5abae6e..74d2e6c7 100644 --- a/ipyparallel/tests/test_db.py +++ b/ipyparallel/tests/test_db.py @@ -156,10 +156,10 @@ def test_find_records_glob(self): "group_a*", "group_a/subgroup_?", "group_b", - "group_b/subgroup_[ab]", + "group_b/subgroup_[ab]", # check sequence character matching "*/subgroup_a", - "*[*]*", - "*[?]*", + "*[*]*", # test wildcard escaping + "*[?]*", # test wildcard escaping ] for pattern in patterns: ref = [ From e7d721aeba4f9802a162a20c18386caf1fc4dd81 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 6 Mar 2026 15:14:39 +0100 Subject: [PATCH 31/32] task label example improved --- docs/source/examples/basic_task_label.py | 49 +++++++++++++++++++----- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py index afd78846..0eebc3a4 100644 --- a/docs/source/examples/basic_task_label.py +++ b/docs/source/examples/basic_task_label.py @@ -1,4 +1,12 @@ -"""Basic task label example""" +"""Basic task label example + +Labels can be used for identifying or semantically grouping tasks. Using the $glob operator +relevant task records can be queried from the Task Database. + +Authors +------- +* ottointhesky +""" import ipyparallel as ipp @@ -18,13 +26,17 @@ def wait(t): return time.time() - tic +count = 5 # number of tasks per method + # use load balanced view bview = rc.load_balanced_view() ar_list_b1 = [ - bview.set_flags(label=f"mylabel_map_{i:02}").map_async(wait, [2]) for i in range(10) + bview.set_flags(label=f"bview_map_{i:02}").map_async(wait, [2]) + for i in range(count) ] ar_list_b2 = [ - bview.set_flags(label=f"mylabel_map_{i:02}").apply_async(wait, 2) for i in range(10) + bview.set_flags(label=f"bview_apply_{i:02}").apply_async(wait, 2) + for i in range(count) ] bview.wait(ar_list_b1) bview.wait(ar_list_b2) @@ -33,19 +45,36 @@ def wait(t): # use direct view dview = rc[:] ar_list_d1 = [ - dview.set_flags(label=f"mylabel_map_{i + 10:02}").apply_async(wait, 2) - for i in range(10) + dview.set_flags(label=f"dview_map_{i + 10:02}").map_async(wait, [2]) + for i in range(count) ] ar_list_d2 = [ - dview.set_flags(label=f"mylabel_map_{i + 10:02}").map_async(wait, [2]) - for i in range(10) + dview.set_flags(label=f"dview_apply_{i + 10:02}").apply_async(wait, 2) + for i in range(count) ] dview.wait(ar_list_d1) dview.wait(ar_list_d2) + +def print_records(titel, data): + print(f"{titel} ({len(data)} records)") + for d in data: + print( + f"\tmsg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}" + ) + + +query_keys = ['msg_id', 'label', 'engine_uuid'] + # query database -data = rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid']) -for d in data: - print(f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}") +data = rc.db_query({'label': {"$nin": ""}}, keys=query_keys) +print_records("all entries with labels", data) + +data = rc.db_query({'label': {"$glob": "dview_*"}}, keys=query_keys) +print_records("all dview label entries", data) + +data = rc.db_query({'label': {"$glob": "*_map_*"}}, keys=query_keys) +print_records("all map label entries", data) +# stop cluster cluster.stop_cluster_sync() From d90ca2487c0c78d3e732a4c54ab0bbff920e675e Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 6 Mar 2026 15:33:09 +0100 Subject: [PATCH 32/32] minor cleanup (necessary code removed) --- ipyparallel/controller/sqlitedb.py | 8 -------- ipyparallel/tests/test_db.py | 14 -------------- 2 files changed, 22 deletions(-) diff --git a/ipyparallel/controller/sqlitedb.py b/ipyparallel/controller/sqlitedb.py index 952bdf72..8eb42ff8 100644 --- a/ipyparallel/controller/sqlitedb.py +++ b/ipyparallel/controller/sqlitedb.py @@ -4,7 +4,6 @@ # Distributed under the terms of the Modified BSD License. import json import os -import re try: import cPickle as pickle @@ -100,12 +99,6 @@ def _convert_timestamp(s): return ensure_timezone(dateutil_parse(s)) -def _regexp(expr, item): - """sqlite callback function for performing a regex operation""" - reg = re.compile(expr) - return reg.match(item) is not None - - # ----------------------------------------------------------------------------- # SQLiteDB class # ----------------------------------------------------------------------------- @@ -279,7 +272,6 @@ def _init_db(self): # isolation_level = None)#, cached_statements=64, ) - self._db.create_function("REGEXP", 2, _regexp) # print dir(self._db) first_table = previous_table = self.table i = 0 diff --git a/ipyparallel/tests/test_db.py b/ipyparallel/tests/test_db.py index 74d2e6c7..69617ef0 100644 --- a/ipyparallel/tests/test_db.py +++ b/ipyparallel/tests/test_db.py @@ -173,20 +173,6 @@ def test_find_records_glob(self): found = [r['msg_id'] for r in recs] assert set(ref) == set(found) - # hist = self.db.get_history() - # - # pattern = "*" + hist[0][5:-2] + "_?" - # ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] - # recs = self.db.find_records({'msg_id': {'$glob': pattern}}, ["msg_id"]) - # found = [r['msg_id'] for r in recs] - # assert set(ref) == set(found) - # - # pattern = "*_1?" - # ref = [msg_id for msg_id in hist if fnmatch.fnmatch(msg_id, pattern)] - # recs = self.db.find_records({'msg_id': {'$glob': pattern}}, ["msg_id"]) - # found = [r['msg_id'] for r in recs] - # assert set(ref) == set(found) - def test_get_history(self): msg_ids = self.db.get_history() latest = datetime(1984, 1, 1).replace(tzinfo=utc)