-
Notifications
You must be signed in to change notification settings - Fork 152
Rework map_async to handle failures better #495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
streamz/core.py
Outdated
| while self.running: | ||
| task, metadata = await self.work_queue.get() | ||
| self.work_queue.task_done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martindurant I thought more about your feedback on the original PR last night and worked this up this morning. The new example file shows off the failure modes when map/map_async raise so I added this to give the users a better handle on failures.
`map` stops the flow of items in the stream when the function raises but `map_async` is outside of the direct line of return so it fails weirdly during an exception. To address that, I added the idea about stopping the stream or not. This way, if the stream does not deliberately invoke `stop` during an exception, the stream continues to process inputs after an exception. Since the `map_async` now conceives of stopping or not, I added a boolean in the node state to control the loop inside the worker task. In the case of an exception during mapping, `map_async` will now release the references held on the metadata for the offending input. I added an example that shows off the failure modes of `map` and `map_async` that plainly demonstrates that exceptions can leave the stream in a weird state.
e6f1400 to
09969c5
Compare
| if results: | ||
| await asyncio.gather(*results) | ||
| self._release_refs(metadata) | ||
| self._release_refs(metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map_async calls _retain_refs during the insert into the work queue so making sure that we call _release_refs even during an exception seems better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct; probably the assumption is that the exception simply stops the whole pipeline, but we can do better. Nodes that filter in/out on exceptions would be reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually had this idea for the next improvement. It would be better for map/starmap/map_async to flow down Exceptions (probably paired with the offending input) so that the graph can fork the success one way and the failure to a logging/recovery flow.
streamz/core.py
Outdated
| stream_name = kwargs.pop('stream_name', None) | ||
| self.kwargs = kwargs | ||
| self.args = args | ||
| self.running = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't starting the stream optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rebuilt the stop/start mechanism to make it tolerate restarting from upstream or down.
4787deb to
9a7b3ed
Compare
| if self.stopped: | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By not checking self.stopped after coming back from the gather, the source over-consumes the underlying iterable and loses an element.
mapstops the flow of items in the stream when the function raises butmap_asyncis outside of the direct line of return so it fails weirdly during an exception. To address that, I added the idea about stopping the stream or not. This way, if the stream does not deliberately invokestopduring an exception, the stream continues to process inputs after an exception.Since the
map_asyncnow conceives of stopping or not, I added a boolean in the node state to control the loop inside the worker task.In the case of an exception during mapping,
map_asyncwill now release the references held on the metadata for the offending input.I added an example that shows off the failure modes of
mapandmap_asyncthat plainly demonstrates that exceptions can leave the stream in a weird state.