Feature: Synchronous Low‑Level API for ezmsg (ROS2‑style ergonomics)#215
Feature: Synchronous Low‑Level API for ezmsg (ROS2‑style ergonomics)#215griffinmilsap wants to merge 3 commits intodevfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a synchronous “ROS2-style” low-level API layer on top of ezmsg’s async pub/sub, plus examples/tests, and introduces explicit control over GraphServer auto-start behavior via auto_start.
Changes:
- Introduces
ezmsg.core.sync(SyncContext,SyncPublisher,SyncSubscriber,init,spin,spin_once) backed by a background asyncio loop thread. - Extends GraphServer bring-up behavior with
GraphContext(auto_start=...)andGraphService.ensure(auto_start=...). - Adds split sync/async low-level examples, sync API tests, and a perf comparison script.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
src/ezmsg/core/sync.py |
New synchronous wrapper API around GraphContext/Publisher/Subscriber, including callback-based spinning. |
src/ezmsg/core/graphserver.py |
Adds auto_start override to GraphService.ensure and logs GraphServer startup. |
src/ezmsg/core/graphcontext.py |
Plumbs auto_start through GraphContext to GraphService.ensure. |
src/ezmsg/core/__init__.py |
Exposes the new sync API/module from ezmsg.core. |
examples/simple_publisher.py |
New sync publisher example using ez.sync.init. |
examples/simple_subscriber.py |
New sync subscriber example using spin(). |
examples/simple_async_publisher.py |
New async publisher example (replacement for prior combined example). |
examples/simple_async_subscriber.py |
New async subscriber example (replacement for prior combined example). |
examples/lowlevel_api.py |
Removes the previous combined low-level example in favor of split examples. |
tests/test_sync_api.py |
Adds tests for sync autostart/spin_once, backpressure behavior, and CacheMiss handling. |
tests/perf_sync_overhead.py |
Adds a standalone script to measure sync-wrapper overhead vs async usage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._loop = asyncio.new_event_loop() | ||
| super().start() | ||
| self._server_up.wait() | ||
| logger.info(f'Started GraphServer at {address}') |
There was a problem hiding this comment.
The startup log message uses the address argument, which is None when binding to an ephemeral port, and may not reflect the actual bound address. Prefer logging self.address (after _server_up.wait()) so the message always reports the real listening endpoint.
| logger.info(f'Started GraphServer at {address}') | |
| logger.info(f"Started GraphServer at {self.address}") |
There was a problem hiding this comment.
This is a very good catch!
There was a problem hiding this comment.
Implementing this locally actually caused a few tests to fail. I'm investigating.
| self._pub.pause() | ||
|
|
||
| def resume(self) -> None: | ||
| self._pub.resume() | ||
|
|
||
| def close(self) -> None: | ||
| self._pub.close() |
There was a problem hiding this comment.
SyncPublisher.pause()/resume()/close() call methods on the underlying async Publisher directly from the calling thread. Those methods mutate asyncio primitives and cancel asyncio tasks owned by the background loop, which is not thread-safe and can raise runtime errors or cause missed wakeups. Marshal these operations onto the loop thread (e.g., loop.call_soon_threadsafe for sync methods, or provide async wrappers and invoke via run_coroutine_threadsafe).
| self._pub.pause() | |
| def resume(self) -> None: | |
| self._pub.resume() | |
| def close(self) -> None: | |
| self._pub.close() | |
| self._loop.call_soon_threadsafe(self._pub.pause) | |
| def resume(self) -> None: | |
| self._loop.call_soon_threadsafe(self._pub.resume) | |
| def close(self) -> None: | |
| self._loop.call_soon_threadsafe(self._pub.close) |
| def close(self) -> None: | ||
| self._sub.close() | ||
|
|
There was a problem hiding this comment.
SyncSubscriber.close() calls Subscriber.close() directly from the calling thread, but Subscriber.close() cancels an asyncio task created on the background loop. Task cancellation is not thread-safe across event loops/threads. Schedule close onto the loop thread (e.g., loop.call_soon_threadsafe) and consider offering a synchronous wait_closed() pattern to ensure teardown completes.
| while True: | ||
| remaining = None if deadline is None else max(0.0, deadline - loop.time()) | ||
| tasks = [asyncio.create_task(_recv_entry(entry)) for entry in entries] | ||
| try: | ||
| done, pending = await asyncio.wait( | ||
| tasks, timeout=remaining, return_when=asyncio.FIRST_COMPLETED | ||
| ) | ||
| if not done: | ||
| for task in pending: | ||
| try: | ||
| task.cancel() | ||
| except RuntimeError: | ||
| pass | ||
| await asyncio.gather(*pending, return_exceptions=True) | ||
| return None | ||
|
|
||
| for task in pending: | ||
| try: | ||
| task.cancel() | ||
| except RuntimeError: | ||
| pass | ||
| await asyncio.gather(*pending, return_exceptions=True) | ||
|
|
There was a problem hiding this comment.
_recv_any() cancels and gathers pending recv_zero_copy() tasks without handling the case where a “pending” task actually completed between asyncio.wait() returning and cancellation. In that race, the task result contains an acquired zero-copy context manager (cm) that will never have aexit called, leaking backpressure/message leases. When gathering pending tasks, inspect results and explicitly aexit any successfully-entered context managers, or restructure to guarantee only one task can ever successfully enter before the others are cleaned up.
| return self._graph_address | ||
| return self._graph_context.graph_address | ||
|
|
||
| def __enter__(self) -> "SyncContext": |
There was a problem hiding this comment.
SyncContext instances aren’t safely reusable after shutdown(): _shutdown_requested and _closed are never reset, so re-entering the same instance (or calling spin/spin_once after a shutdown) will immediately no-op or skip cleanup paths. Either document that SyncContext is single-use and raise if enter is called after shutdown, or reset shutdown/closed state (and clear callback subscriptions) on enter.
| def __enter__(self) -> "SyncContext": | |
| def __enter__(self) -> "SyncContext": | |
| # SyncContext instances are single-use: they cannot be re-entered after shutdown. | |
| if self._closed: | |
| raise RuntimeError( | |
| "SyncContext instances cannot be reused after shutdown; " | |
| "create a new SyncContext instead." | |
| ) |
There was a problem hiding this comment.
Also very good catch.
There was a problem hiding this comment.
Should we leave the SyncContext single-use or make it reusable by resetting _closed/_shutdown_requested? It's not clear to me what is better from a typical use case.
| t2 = time.monotonic() | ||
|
|
||
| assert t2 - t1 >= 0.15 | ||
| done.wait(2.0) |
There was a problem hiding this comment.
This test can pass even if no messages are ever received: done.wait(2.0) isn’t asserted, and the received contents/order aren’t validated. Consider asserting that done.wait(...) returns True and that received matches the expected messages so the test reliably detects subscription/buffering regressions.
| done.wait(2.0) | |
| assert done.wait(2.0), "Timed out waiting for messages to be received" | |
| assert received == ["one", "two"] |
| import time | ||
| import ezmsg.core as ez | ||
|
|
There was a problem hiding this comment.
time is imported but only used in commented-out code, which will trigger Ruff/pyflakes unused-import checks. Remove the import or use it unconditionally in the example (e.g., behind a flag).
| PORT = 12345 | ||
| TOPIC = "/TEST" | ||
|
|
There was a problem hiding this comment.
PORT = 12345 is unused (the function uses the port parameter instead). This will fail Ruff unused-variable checks; remove the constant or use it as the default for the port parameter.
Summary
SyncContext,SyncPublisher,SyncSubscriber,init,spin,spin_once) so users can publish/subscribe without asyncio.GraphContext(auto_start=...)andGraphService.ensure(auto_start=...).Motivation
Many users find
asynciointimidating; the goal is to let them use ezmsg with a simple, synchronous API similar to ROS2 (with ez.sync.init(...),spin(),spin_once()), while preserving ezmsg’s backpressure semantics and zero‑copy safety.This wrapper is explicitly for ergonomics, not peak throughput.
Implementation Details
New Sync API (
src/ezmsg/core/sync.py)GraphContextand runs an asyncio event loop in a background thread usingnew_threaded_event_loop.create_publisher/create_subscriptioncall the underlying async API viaasyncio.run_coroutine_threadsafe.spin()/spin_once()pull messages directly viarecv_zero_copy()and only release backpressure after the user callback returns.spin_once()returns a boolean for “did work”.GraphServer Auto‑Start Control
GraphService.ensure(auto_start: bool | None = None)Nonepreserves existing “auto‑start only when address is not specified + no env override.”True/Falseoverrides.GraphContext(..., auto_start=...)passes through toGraphService.ensure.ez.sync.init(..., auto_start=...)mirrorsGraphContextdefaulting toNone.Examples
examples/simple_publisher.pyexamples/simple_subscriber.pyexamples/simple_async_publisher.pyexamples/simple_async_subscriber.pyTests
tests/test_sync_api.pyPerf Script
tests/perf_sync_overhead.pyThreads and Concurrency Model
spin,spin_once, callbacks).publish()/recv()call crosses threads viarun_coroutine_threadsafe.Performance
Measured using
tests/perf_sync_overhead.py(local macOS example):Interpretation:
Files Changed / Added
src/ezmsg/core/sync.pysrc/ezmsg/core/__init__.pysrc/ezmsg/core/graphcontext.pysrc/ezmsg/core/graphserver.pyexamples/simple_publisher.pyexamples/simple_subscriber.pyexamples/simple_async_publisher.pyexamples/simple_async_subscriber.pytests/test_sync_api.pytests/perf_sync_overhead.pyexamples/lowlevel.py(replaced by split examples)