From f47eddb8c299af36eeac2b7b2d70274c88784963 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Mon, 13 Apr 2026 14:00:59 +0100 Subject: [PATCH 1/5] fix: Remove tiled writer if submitted task doesn't exist Whenever a task is submitted, a TiledWriter is added to the run engine and is only removed when the task is complete. If the task never starts due to not being found, the writer would never be removed and subsequent tasks would have duplicate writers causing write errors to tiled. Catching the KeyError and removing the subscribers immediately is a workaround that works but should be removed when the internal pending tasks queue is removed. --- src/blueapi/service/interface.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index c8b1e07245..05e67276af 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -221,7 +221,12 @@ def remove_callback_when_task_finished( ) if task.task_id is not None: - active_worker.begin_task(task.task_id) + try: + active_worker.begin_task(task.task_id) + except KeyError: + active_worker.worker_events.unsubscribe(remove_callback) + active_context.run_engine.unsubscribe(tiled_writer_token) + raise return task From 9f10a32cbf3ddb7d37ae71451132c3b9660a9367 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Mon, 13 Apr 2026 14:20:32 +0100 Subject: [PATCH 2/5] Add regression test --- tests/system_tests/test_blueapi_system.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/system_tests/test_blueapi_system.py b/tests/system_tests/test_blueapi_system.py index 3e075a7767..4763cebfac 100644 --- a/tests/system_tests/test_blueapi_system.py +++ b/tests/system_tests/test_blueapi_system.py @@ -617,3 +617,24 @@ def on_event(event: AnyEvent) -> None: assert outcome.result.message.startswith( "403: Access policy rejects the provided access blob" ) + + +# Regression test for #1480 +def test_task_submission_after_invalid_task(client_with_stomp: BlueapiClient): + with pytest.raises(KeyError): + # This task hasn't been submitted so should return an error... + client_with_stomp._rest.update_worker_task(WorkerTask(task_id="missing")) + + # ...but should leave the serve in a state where it can still run tasks + res = client_with_stomp.run_task( + TaskRequest( + name="count", + params={ + "detectors": [ + "det", + ], + }, + instrument_session=AUTHORIZED_INSTRUMENT_SESSION, + ) + ) + assert isinstance(res.result, TaskResult) From cfa6635f3217a756c1acf92418cf02f63e647a25 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 14 Apr 2026 16:27:24 +0100 Subject: [PATCH 3/5] Store all subscriptions in a list --- src/blueapi/service/interface.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 05e67276af..f55f86b0d0 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -187,6 +187,7 @@ def begin_task( if nt := active_context.numtracker: nt.set_headers(pass_through_headers or {}) + subscribers = [] if tiled_config := active_context.tiled_conf: # Tiled queries the root node, so must create an authorized client if isinstance(tiled_config.authentication, ServiceAccount): @@ -204,6 +205,7 @@ def begin_task( tiled_writer_token = active_context.run_engine.subscribe( TiledWriter(tiled_client, batch_size=1) ) + subscribers.append((active_context.run_engine, tiled_writer_token)) def remove_callback_when_task_finished( event: WorkerEvent, correlation_id: str | None @@ -213,19 +215,20 @@ def remove_callback_when_task_finished( and event.task_status.task_id == task.task_id and event.task_status.task_complete ): - active_context.run_engine.unsubscribe(tiled_writer_token) - active_worker.worker_events.unsubscribe(remove_callback) + for ch, token in subscribers: + ch.unsubscribe(token) remove_callback = active_worker.worker_events.subscribe( remove_callback_when_task_finished ) + subscribers.append((active_worker.worker_events, remove_callback)) if task.task_id is not None: try: active_worker.begin_task(task.task_id) except KeyError: - active_worker.worker_events.unsubscribe(remove_callback) - active_context.run_engine.unsubscribe(tiled_writer_token) + for ch, token in subscribers: + ch.unsubscribe(token) raise return task From 0590c8171f0eb06a12ab70db6e1f833bcd5b372c Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 14 Apr 2026 17:17:15 +0100 Subject: [PATCH 4/5] Add unit-test regression test system tests don't count for coverage --- tests/unit_tests/service/test_interface.py | 27 ++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/unit_tests/service/test_interface.py b/tests/unit_tests/service/test_interface.py index cc619259bd..ccef2f0971 100644 --- a/tests/unit_tests/service/test_interface.py +++ b/tests/unit_tests/service/test_interface.py @@ -242,6 +242,33 @@ def test_begin_task_no_task_id(worker_mock: MagicMock): worker_mock.assert_not_called() +@patch("blueapi.service.interface.from_uri") +@patch("blueapi.service.interface.config") +@patch("blueapi.service.interface.context") +@patch("blueapi.service.interface.worker") +def test_subscribers_removed_when_task_not_found( + worker_mock: MagicMock, + context_mock: MagicMock, + config_mock: MagicMock, + from_uri_mock: MagicMock, +): + # regression test for #1480 + worker = worker_mock() + ctx = context_mock() + worker.begin_task.side_effect = KeyError() + + with pytest.raises(KeyError): + interface.begin_task(WorkerTask(task_id="missing")) + + ctx.run_engine.subscribe.assert_called_once() + tiled_token = ctx.run_engine.subscribe() + ctx.run_engine.unsubscribe.assert_called_once_with(tiled_token) + + worker.worker_events.subscribe.assert_called_once() + remove_token = worker.worker_events.subscribe() + worker.worker_events.unsubscribe.assert_called_once_with(remove_token) + + @patch("blueapi.service.interface.TaskWorker.get_tasks_by_status") def test_get_tasks_by_status(get_tasks_by_status_mock: MagicMock): pending_task1 = TrackableTask(task_id="0", task=Task(name="pending_task1")) From a43b27c3ed904eca95b8fd1a288610f10fbbcbdf Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 17 Apr 2026 10:11:01 +0100 Subject: [PATCH 5/5] Rename ch -> channel --- src/blueapi/service/interface.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index f55f86b0d0..335d00477e 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -215,8 +215,8 @@ def remove_callback_when_task_finished( and event.task_status.task_id == task.task_id and event.task_status.task_complete ): - for ch, token in subscribers: - ch.unsubscribe(token) + for channel, token in subscribers: + channel.unsubscribe(token) remove_callback = active_worker.worker_events.subscribe( remove_callback_when_task_finished @@ -227,8 +227,8 @@ def remove_callback_when_task_finished( try: active_worker.begin_task(task.task_id) except KeyError: - for ch, token in subscribers: - ch.unsubscribe(token) + for channel, token in subscribers: + channel.unsubscribe(token) raise return task