Skip to content

Conversation

@erratic-pattern
Copy link
Contributor

@erratic-pattern erratic-pattern commented Dec 16, 2025

Which issue does this PR close?

Rationale for this change

Join operators don't yield to the tokio runtime, which prevents query cancellation from working and causes long-running joins to be uncancellable.

What changes are included in this PR?

Wrap join streams with cooperative() in each operator's execute() method:

Are these changes tested?

I made a standalone reproducer with tests, but they are too flaky to include in DataFusion test suite.

Unsure of how to test this reliably in CI without introducing test flakiness / timing issues.

Are there any user-facing changes?

No API changes.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @erratic-pattern -- this makes sense to me

cc @pepijnve (our cancellation expert) and @comphead (as our join expert)

I think we should adding tests for this code to avoid potential regressions.

Perhaps you could adapt the tests from https://github.com/erratic-pattern/datafusion-join-cancellation-repro/blob/main/src/main.rs into https://github.com/alamb/datafusion/blob/775277ae3cef7a97f0ca4868b4ccdfc03e9507ee/datafusion/core/tests/execution/coop.rs#L67

I did verify locally the tests in https://github.com/erratic-pattern/datafusion-join-cancellation-repro/blob/main/src/main.rs fail on main and pass with this PR:

Details
---- tests::test_nested_loop_join stdout ----
Testing NestedLoopJoinExec cancellation...

thread 'tests::test_nested_loop_join' (28125262) panicked at src/main.rs:72:17:
FAILED: NestedLoopJoinExec did not respond to cancellation within 5.014958s
The join operator is not yielding to the tokio runtime.

---- tests::test_sort_merge_join stdout ----
Testing SortMergeJoinExec cancellation...

thread 'tests::test_sort_merge_join' (28125263) panicked at src/main.rs:72:17:
FAILED: SortMergeJoinExec did not respond to cancellation within 5.019358375s
The join operator is not yielding to the tokio runtime.

---- tests::test_cross_join stdout ----
Testing CrossJoinExec cancellation...

thread 'tests::test_cross_join' (28125260) panicked at src/main.rs:72:17:
FAILED: CrossJoinExec did not respond to cancellation within 5.005322833s
The join operator is not yielding to the tokio runtime.
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

---- tests::test_hash_join stdout ----
Testing HashJoinExec cancellation...

thread 'tests::test_hash_join' (28125261) panicked at src/main.rs:72:17:
FAILED: HashJoinExec did not respond to cancellation within 5.009637375s
The join operator is not yielding to the tokio runtime.

@pepijnve
Copy link
Contributor

I had a quick look at the test case. It seems like it should be triggering yielding already. I’ll take a closer look as soon as I can.

@pepijnve
Copy link
Contributor

@alamb for me, two of the four test cases pass on main. Hash and sort merge consistently work; cross and nested loop fail.

@pepijnve
Copy link
Contributor

pepijnve commented Dec 16, 2025

For cross join specifically, as far as I can tell we're getting stuck looping in BuildBatches state. I can't really explain why the proposed fix would make that loop exit since the coop wrapper is added around the inner build batches loop. The task budget should only be getting decremented once and then still get stuck in the same loop. A more precise fix for this problem would be to decrement the tokio task budget on each loop iteration explicitly.

--- a/datafusion/physical-plan/src/joins/cross_join.rs	(revision 2bea7968977fd9e2f78c766ca553a45068c15048)
+++ b/datafusion/physical-plan/src/joins/cross_join.rs	(date 1765923164223)
@@ -581,6 +581,7 @@
                     handle_state!(ready!(self.fetch_probe_batch(cx)))
                 }
                 CrossJoinStreamState::BuildBatches(_) => {
+                    ready!(tokio::task::coop::poll_proceed(cx)).made_progress();
                     let poll = handle_state!(self.build_batches());
                     self.join_metrics.baseline.record_poll(poll)
                 }

Edit: I was mistaken, the cross join is actually returning batches sufficiently frequently. It's actually the aggregation loop that's not yielding. I'm reminded of the original discussion we had on this topic in #16196. Whose responsibility is it to yield sufficiently frequently? Back then I argued that it's the looping code (i.e. the aggregation loop) that's responsible for this because that's where the root cause of the problem lies. We couldn't reach consensus on that and in the end only put budget consumption (and yielding) in the leaf nodes.
What you're seeing here is that decision coming back to bite us. In this particular test setup the cross join stream isn't pulling data sufficiently fast from the budget consuming streams. As a consequence it can take a long time to deplete the task budget and yield.
As before, there are two ways to go about fixing this. Either we inject more budget consumption at the batch production side (as this PR is doing), or we add it at the looping batch consumption side (i.e. in the aggregation logic).

@comphead
Copy link
Contributor

I checked PR in datafusion-cli with TPCDS q99 which takes some time to execute and having 3-4 joins.

Having started the query I can see CPU usage for top -pid $(pgrep datafusion-cli) but once Ctrl+C I can see 0 usage, so likely the query was cancelled and released resources. For main I can see the same behavior, but resources released perhaps couple of seconds later.

@pepijnve
Copy link
Contributor

I've experimented a bit with a variant of what's suggested in this PR in https://github.com/pepijnve/datafusion/tree/coop_joins. The idea there in the cross join implementation, is that you want to consume a unit of task budget in each loop iteration. For the fetch states the assumption is that the polled stream will take care of that. In the build batches state, there are no Tokio resources being accessed directly or indirectly, so we need to do so ourselves. I added a non-async variant of Tokio's async consume_budget() that's more convenient to use in manual stream implementations for this.


// create join stream
Ok(Box::pin(SortMergeJoinStream::try_new(
Ok(make_cooperative(Box::pin(SortMergeJoinStream::try_new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this kind of situation, you can use

Box::pin(cooperative(SortMergeJoinStream::try_new...

instead. This avoids one level of pointer indirection.

@erratic-pattern
Copy link
Contributor Author

If you are seeing passing on the repro tests, you may need to adjust the reproducer depending on local environment due to timing issues.

Some things I found that needed to be adjusted if the query is completing too quickly.

  • if the query finishes before the initial 250ms sleep it won't hang on the task cancellation. This initial sleep is needed to ensure the task enters the JoinExec stream before handle.abort() is called. A smaller initial sleep will fix this.
  • If the query finishes before the 5 second timeout the repro tests will pass despite not cancelling properly. In this case the query is completing normally within the 5 second window, but ignoring the cancellation. Maybe the repro needs a better way to indicate this by checking whether the query succeeded. To fix this, bump the number of rows OR reduce the timeout to ensure that the query execution time takes longer than the timeout.

@erratic-pattern erratic-pattern force-pushed the fix-join-cooperative-scheduling branch from 35f765d to 0a99e70 Compare December 17, 2025 16:18
@alamb
Copy link
Contributor

alamb commented Dec 17, 2025

Edit: I was mistaken, the cross join is actually returning batches sufficiently frequently. It's actually the aggregation loop that's not yielding. I'm reminded of the original discussion we had on this topic in #16196. Whose responsibility is it to yield sufficiently frequently? Back then I argued that it's the looping code (i.e. the aggregation loop) that's responsible for this because that's where the root cause of the problem lies.

My memory was that the general consensus was that the operators themselves (ie. in this case the aggregation loop) should be yielding periodically to the scheduler, and that the cooperative() method was a workaround that gave us some more time to figure out how to do this (and tokio's budget wasn't yet public)

I've experimented a bit with a variant of what's suggested in this PR in https://github.com/pepijnve/datafusion/tree/coop_joins.

I really like this method from pepijnve@aeecad3

/// Consumes a unit of budget. If the task budget has been depleted `Poll::Pending` is returned.
/// Otherwise, `Poll::Ready(()))` is returned.
///
/// The task will only yield if its entire coop budget has been exhausted.
/// This function can be used in order to insert optional yield points into long
/// computations that do not use Tokio resources,
/// without redundantly yielding to the runtime each time.
pub fn consume_budget(cx: &mut Context<'_>) -> Poll<()> {
    tokio::task::coop::poll_proceed(cx).map(|r| r.made_progress())
}

@pepijnve would your proposal be to add the appropriate consume_budget calls in group by hash instead?

@pepijnve
Copy link
Contributor

@pepijnve would your proposal be to add the appropriate consume_budget calls in group by hash instead?

That's a tricky question because I don't think there's a clear, 'best' choice.

A very quick recap first for the people who weren't involved in this work. Here's a quick and dirty adaptation of an example from the Tokio docs that resembles the type of code you typically see in DataFusion:

fn drop_all(mut input: Pin<&mut (dyn RecordBatchStream + Send)>, cx: &mut Context<'_>) -> Poll<()> {
    while let Some(_) = ready!(input.poll_next_unpin(cx)) {}
    Poll::Ready(())
}

In Tokio, a simple loop like that can be problematic. If every input.poll_next_unpin() invocation returns Poll::Ready, then the while loop will keep on going until the stream is completely consumed. The effect you see then is that the task that's calling drop_all is not cancelable.

The solution to this is to ensure you periodically "yield to the runtime". The way to do that is to ensure drop_all breaks out of the loop every now and then by returning Poll::Pending.

So in this example, what's the appropriate fix to achieve that? There are really only two options:

  1. ensure every possible input value returns Poll::Pending periodically
  2. adapt the drop_all function so that it returns Poll::Pending periodically itself

If we map this onto one of the problematic queries from the reproduction case

SELECT sum(t1.v + t2.v) FROM t1, t2

the loop here is introduced by the sum aggregation. If you remove it from the query, then the query cancels just fine. The fix we're discussing so far in this PR is option 1 described above. The alternative of changing the aggregation operator (and possibly other stream draining operators) would be option 2.

The downside of both options is that if we sprinkle task budget consumption around the codebase that task will yield more frequently which may introduce performance regressions. The question then is which approach leads to the least amount of redundant yielding.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Join operators don't yield to tokio runtime, preventing query cancellation

4 participants