Skip to content

Conversation

@feniljain
Copy link
Contributor

Which issue does this PR close?

What changes are included in this PR?

Currently haven't included changes to remove it from optimizer, will do it in a subsequent PR as I see everyone else has done the same?

Mostly took code from other PRs and just stitched it together to work with AsyncFnExec

Are these changes tested?

In Progress

Are there any user-facing changes?

No

@github-actions github-actions bot added optimizer Optimizer rules physical-plan Changes to the physical-plan crate labels Dec 15, 2025
@feniljain feniljain force-pushed the feat-batch-coalesce-async-fn-exec branch from 76f6837 to f0e4be2 Compare December 19, 2025 08:32
@github-actions github-actions bot removed the optimizer Optimizer rules label Dec 19, 2025
@feniljain feniljain marked this pull request as ready for review December 19, 2025 10:11
@feniljain
Copy link
Contributor Author

Hey @jizezhang 👋🏻

I think we can review it now :)

@feniljain feniljain changed the title wip: integrate batch coalescer with async fn exec feat: integrate batch coalescer with async fn exec Dec 19, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @feniljain and @jizezhang -- I think this PR needs tests but otherwise could be merged in. I also have some ideas of how to simplify it a bit

let schema_captured = Arc::clone(&schema_captured);
let config_options = Arc::clone(&config_options_ref);

async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could move the batch coalescer into this logic directly which would

  1. Make this PR smaller
  2. Be more efficient (we wouldn't end up with the overhead of another stream at runtime)

Copy link
Contributor Author

@feniljain feniljain Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I was playing around with this and have a small doubt:

Lets consider the case where input batches are smaller than coalescer's target batch size. Above code is implemented as a then on input_stream. Every iteration we need to return at least one batch. That would mean we will have to poll input_stream again in then itself. But as input_stream is already used we can not use it again in then block, this is causing an error and I don't see a good way out of it except wrapping it around a stream. Should we leave it as it is? Do help me if I am missing some other obvious thing here 🙈

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding tests, I see we don't have tests where I have control over stream, which is required to calculate how many times did it get called. And I am not sure how hard would it be create a test with these conditions.

I tested this flow earlier by keeping a DEBUG log in stream flow of AsyncFuncExec, manipulating target_batch_size in this test, this helped me confirm everything was working fine.

Also I checked other two PRs and none contained tests:

I suspect tests were not added in them for the same reason I mentioned above, do correct me if I am missing a point, thanks! 🙇🏻

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#19002 contains test, e.g.

async fn test_repartition_with_coalescing() -> Result<()> {

Have you by any chance tried leveraging TestMemoryExec

pub struct TestMemoryExec {
to create an input plan/stream to this exec and check output batch sizes? or it did not work?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Integrate BatchCoalescer into AsyncFuncExec and remove from CoalesceBatches optimization rule

3 participants