-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix: Wrap join operators with make_cooperative() for cancellation support #19360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: Wrap join operators with make_cooperative() for cancellation support #19360
Conversation
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @erratic-pattern -- this makes sense to me
cc @pepijnve (our cancellation expert) and @comphead (as our join expert)
I think we should adding tests for this code to avoid potential regressions.
Perhaps you could adapt the tests from https://github.com/erratic-pattern/datafusion-join-cancellation-repro/blob/main/src/main.rs into https://github.com/alamb/datafusion/blob/775277ae3cef7a97f0ca4868b4ccdfc03e9507ee/datafusion/core/tests/execution/coop.rs#L67
I did verify locally the tests in https://github.com/erratic-pattern/datafusion-join-cancellation-repro/blob/main/src/main.rs fail on main and pass with this PR:
Details
---- tests::test_nested_loop_join stdout ----
Testing NestedLoopJoinExec cancellation...
thread 'tests::test_nested_loop_join' (28125262) panicked at src/main.rs:72:17:
FAILED: NestedLoopJoinExec did not respond to cancellation within 5.014958s
The join operator is not yielding to the tokio runtime.
---- tests::test_sort_merge_join stdout ----
Testing SortMergeJoinExec cancellation...
thread 'tests::test_sort_merge_join' (28125263) panicked at src/main.rs:72:17:
FAILED: SortMergeJoinExec did not respond to cancellation within 5.019358375s
The join operator is not yielding to the tokio runtime.
---- tests::test_cross_join stdout ----
Testing CrossJoinExec cancellation...
thread 'tests::test_cross_join' (28125260) panicked at src/main.rs:72:17:
FAILED: CrossJoinExec did not respond to cancellation within 5.005322833s
The join operator is not yielding to the tokio runtime.
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
---- tests::test_hash_join stdout ----
Testing HashJoinExec cancellation...
thread 'tests::test_hash_join' (28125261) panicked at src/main.rs:72:17:
FAILED: HashJoinExec did not respond to cancellation within 5.009637375s
The join operator is not yielding to the tokio runtime.|
I had a quick look at the test case. It seems like it should be triggering yielding already. I’ll take a closer look as soon as I can. |
|
@alamb for me, two of the four test cases pass on |
|
For cross join specifically, as far as I can tell we're getting stuck looping in Edit: I was mistaken, the cross join is actually returning batches sufficiently frequently. It's actually the aggregation loop that's not yielding. I'm reminded of the original discussion we had on this topic in #16196. Whose responsibility is it to yield sufficiently frequently? Back then I argued that it's the looping code (i.e. the aggregation loop) that's responsible for this because that's where the root cause of the problem lies. We couldn't reach consensus on that and in the end only put budget consumption (and yielding) in the leaf nodes. |
|
I checked PR in Having started the query I can see CPU usage for |
|
I've experimented a bit with a variant of what's suggested in this PR in https://github.com/pepijnve/datafusion/tree/coop_joins. The idea there in the cross join implementation, is that you want to consume a unit of task budget in each loop iteration. For the fetch states the assumption is that the polled stream will take care of that. In the build batches state, there are no Tokio resources being accessed directly or indirectly, so we need to do so ourselves. I added a non-async variant of Tokio's |
|
|
||
| // create join stream | ||
| Ok(Box::pin(SortMergeJoinStream::try_new( | ||
| Ok(make_cooperative(Box::pin(SortMergeJoinStream::try_new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this kind of situation, you can use
Box::pin(cooperative(SortMergeJoinStream::try_new...
instead. This avoids one level of pointer indirection.
|
If you are seeing passing on the repro tests, you may need to adjust the reproducer depending on local environment due to timing issues. Some things I found that needed to be adjusted if the query is completing too quickly.
|
35f765d to
0a99e70
Compare
My memory was that the general consensus was that the operators themselves (ie. in this case the aggregation loop) should be yielding periodically to the scheduler, and that the cooperative() method was a workaround that gave us some more time to figure out how to do this (and tokio's budget wasn't yet public)
I really like this method from pepijnve@aeecad3 /// Consumes a unit of budget. If the task budget has been depleted `Poll::Pending` is returned.
/// Otherwise, `Poll::Ready(()))` is returned.
///
/// The task will only yield if its entire coop budget has been exhausted.
/// This function can be used in order to insert optional yield points into long
/// computations that do not use Tokio resources,
/// without redundantly yielding to the runtime each time.
pub fn consume_budget(cx: &mut Context<'_>) -> Poll<()> {
tokio::task::coop::poll_proceed(cx).map(|r| r.made_progress())
}@pepijnve would your proposal be to add the appropriate |
That's a tricky question because I don't think there's a clear, 'best' choice. A very quick recap first for the people who weren't involved in this work. Here's a quick and dirty adaptation of an example from the Tokio docs that resembles the type of code you typically see in DataFusion: In Tokio, a simple loop like that can be problematic. If every The solution to this is to ensure you periodically "yield to the runtime". The way to do that is to ensure So in this example, what's the appropriate fix to achieve that? There are really only two options:
If we map this onto one of the problematic queries from the reproduction case the loop here is introduced by the The downside of both options is that if we sprinkle task budget consumption around the codebase that task will yield more frequently which may introduce performance regressions. The question then is which approach leads to the least amount of redundant yielding. |
Which issue does this PR close?
Rationale for this change
Join operators don't yield to the tokio runtime, which prevents query cancellation from working and causes long-running joins to be uncancellable.
What changes are included in this PR?
Wrap join streams with
cooperative()in each operator'sexecute()method:HashJoinExecSortMergeJoinExecCrossJoinExecNestedLoopJoinExecSymmetricHashJoinExecAre these changes tested?
I made a standalone reproducer with tests, but they are too flaky to include in DataFusion test suite.
Unsure of how to test this reliably in CI without introducing test flakiness / timing issues.
Are there any user-facing changes?
No API changes.