-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: integrate batch coalescer with async fn exec #19342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: integrate batch coalescer with async fn exec #19342
Conversation
54fe5f4 to
76f6837
Compare
76f6837 to
f0e4be2
Compare
|
Hey @jizezhang 👋🏻 I think we can review it now :) |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @feniljain and @jizezhang -- I think this PR needs tests but otherwise could be merged in. I also have some ideas of how to simplify it a bit
| let schema_captured = Arc::clone(&schema_captured); | ||
| let config_options = Arc::clone(&config_options_ref); | ||
|
|
||
| async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could move the batch coalescer into this logic directly which would
- Make this PR smaller
- Be more efficient (we wouldn't end up with the overhead of another stream at runtime)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I was playing around with this and have a small doubt:
Lets consider the case where input batches are smaller than coalescer's target batch size. Above code is implemented as a then on input_stream. Every iteration we need to return at least one batch. That would mean we will have to poll input_stream again in then itself. But as input_stream is already used we can not use it again in then block, this is causing an error and I don't see a good way out of it except wrapping it around a stream. Should we leave it as it is? Do help me if I am missing some other obvious thing here 🙈
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding tests, I see we don't have tests where I have control over stream, which is required to calculate how many times did it get called. And I am not sure how hard would it be create a test with these conditions.
I tested this flow earlier by keeping a DEBUG log in stream flow of AsyncFuncExec, manipulating target_batch_size in this test, this helped me confirm everything was working fine.
Also I checked other two PRs and none contained tests:
- feat: integrate batch coalescer with repartition exec #19002
- Coalesce batches inside FilterExec #18604
I suspect tests were not added in them for the same reason I mentioned above, do correct me if I am missing a point, thanks! 🙇🏻
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#19002 contains test, e.g.
| async fn test_repartition_with_coalescing() -> Result<()> { |
Have you by any chance tried leveraging TestMemoryExec
| pub struct TestMemoryExec { |
Which issue does this PR close?
BatchCoalescerintoAsyncFuncExecand remove fromCoalesceBatchesoptimization rule #19331What changes are included in this PR?
Currently haven't included changes to remove it from optimizer, will do it in a subsequent PR as I see everyone else has done the same?
Mostly took code from other PRs and just stitched it together to work with AsyncFnExec
Are these changes tested?
In Progress
Are there any user-facing changes?
No