Skip to content

Conversation

@dwskoog
Copy link
Contributor

@dwskoog dwskoog commented Jan 16, 2026

map stops the flow of items in the stream when the function raises but map_async is outside of the direct line of return so it fails weirdly during an exception. To address that, I added the idea about stopping the stream or not. This way, if the stream does not deliberately invoke stop during an exception, the stream continues to process inputs after an exception.

Since the map_async now conceives of stopping or not, I added a boolean in the node state to control the loop inside the worker task.

In the case of an exception during mapping, map_async will now release the references held on the metadata for the offending input.

I added an example that shows off the failure modes of map and map_async that plainly demonstrates that exceptions can leave the stream in a weird state.

streamz/core.py Outdated
Comment on lines 780 to 782
while self.running:
task, metadata = await self.work_queue.get()
self.work_queue.task_done()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant I thought more about your feedback on the original PR last night and worked this up this morning. The new example file shows off the failure modes when map/map_async raise so I added this to give the users a better handle on failures.

`map` stops the flow of items in the stream when the function raises but
`map_async` is outside of the direct line of return so it fails weirdly
during an exception. To address that, I added the idea about stopping
the stream or not. This way, if the stream does not deliberately invoke
`stop` during an exception, the stream continues to process inputs after
an exception.

Since the `map_async` now conceives of stopping or not, I added a boolean
in the node state to control the loop inside the worker task.

In the case of an exception during mapping, `map_async` will now release
the references held on the metadata for the offending input.

I added an example that shows off the failure modes of `map` and
`map_async` that plainly demonstrates that exceptions can leave the
stream in a weird state.
@dwskoog dwskoog force-pushed the map_async_improvements branch from e6f1400 to 09969c5 Compare January 16, 2026 18:20
if results:
await asyncio.gather(*results)
self._release_refs(metadata)
self._release_refs(metadata)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map_async calls _retain_refs during the insert into the work queue so making sure that we call _release_refs even during an exception seems better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct; probably the assumption is that the exception simply stops the whole pipeline, but we can do better. Nodes that filter in/out on exceptions would be reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had this idea for the next improvement. It would be better for map/starmap/map_async to flow down Exceptions (probably paired with the offending input) so that the graph can fork the success one way and the failure to a logging/recovery flow.

streamz/core.py Outdated
stream_name = kwargs.pop('stream_name', None)
self.kwargs = kwargs
self.args = args
self.running = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't starting the stream optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rebuilt the stop/start mechanism to make it tolerate restarting from upstream or down.

@dwskoog dwskoog force-pushed the map_async_improvements branch from 4787deb to 9a7b3ed Compare January 16, 2026 23:39
Comment on lines +789 to +790
if self.stopped:
break
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By not checking self.stopped after coming back from the gather, the source over-consumes the underlying iterable and loses an element.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants