[SPARK-57547][SQL] Fix incorrect InMemoryRelation materialization under conrrent queries#56620
[SPARK-57547][SQL] Fix incorrect InMemoryRelation materialization under conrrent queries#56620liuzqt wants to merge 3 commits into
Conversation
…cached relation row loss
`CachedRDDBuilder` tracked materialization with a plain task-completion
`LongAccumulator` and summed per-batch row-count and size stats. When a cold
cache is first touched concurrently, the same partition can be computed more
than once across executors (duplicate cross-executor computes). Those
duplicates inflate the task-completion count and double-count the summed stats.
The inflated count can make `isCachedRDDLoaded` report the relation as fully
materialized before the distinct partitions have actually been recorded, and
`computeStats` can then expose a `rowCount` that does not reflect the cached
data. AQE's `PropagateEmptyRelation` can read that incorrect `rowCount` and
collapse the cached source to an empty relation, silently dropping rows.
This change adds `PartitionKeyedAccumulator`, a `ConcurrentHashMap`-backed
accumulator keyed by partition id with last-write-wins merge semantics. The
cached relation now:
- counts the DISTINCT materialized partition ids (the accumulator key set)
when deciding whether the cache is fully loaded, so duplicate computes
cannot inflate the count; and
- derives exact, de-duplicated row-count and size stats by folding the
per-partition values, counting each partition once.
The behavior is gated by a new internal conf
`spark.sql.inMemoryColumnarStorage.distinctPartitionTracking` (default true);
setting it to false restores the prior raw task-completion-count behavior.
`clearCache` resets the bookkeeping so a rebuilt cache starts clean.
### Tests
- `PartitionKeyedAccumulatorSuite` - accumulator semantics (last-write-wins
add/merge, distinct key count, snapshot/reset).
- `ConcurrentInMemoryRelationSuite` - local-cluster reproduction: rows are
preserved under concurrent first-touch with the fix on; stats are exact
under duplicate cross-executor computes; and a negative control showing the
row loss with the fix disabled.
- Extended `CachedTableSuite` (clearCache resets bookkeeping) and
`InMemoryColumnarQuerySuite` (size/row-count read through the new accessors).
Co-authored-by: Isaac
| // barrier (only a per-executor write lock). Keying by partition id (last-write-wins) means | ||
| // those duplicate completions cannot mark the cache loaded before every partition has been | ||
| // computed -- which otherwise let AQE read rowCount 0 on a non-empty cache and propagate an | ||
| // empty relation, silently dropping rows -- and also yields exact, de-duplicated row count / |
There was a problem hiding this comment.
nit: "exact, de-duplicated" holds for deterministic stages. For indeterminate stages (e.g., filter(rand() > 0.5) cached with speculation), last-write-wins means the reported per-partition row count reflects whichever task's completion listener fires last -- which may differ from the task whose blocks the BlockManager actually stored (the first to acquire the partition write lock). The loaded-check is still sound (distinct partition IDs covers it) and this won't cause false empty-propagation since the count remains non-zero. Consider: "yields de-duplicated row count / size (exact for deterministic stages)."
| // - Left (conf off): the raw per-batch accumulators (`LegacyAccumulators`) -- the buggy pre-fix | ||
| // behavior, kept only as a safety switch. | ||
| private val statsAccumulators | ||
| : Either[LegacyAccumulators, PartitionKeyedAccumulator[(Long, Long)]] = |
There was a problem hiding this comment.
Is the Either / legacy path needed? The left branch re-enables the exact bug this PR fixes, so flipping the conf in production would silently re-introduce data loss. If the intent is a safety switch for rollback, a conf that disables the loaded-latch entirely (always report static plan stats until the RDD action returns) would be safer than re-enabling the broken counting. Otherwise, always using the keyed accumulator and dropping the branching would simplify every read site (materializedRowCount, materializedSizeInBytes, recordMaterialized, isCachedRDDLoaded) significantly.
|
0 blocking, 1 non-blocking, 1 nit This is a well-motivated fix for a real AQE correctness issue; the partition-keyed accumulator design is sound. Two items below. VerificationTraced the race window end-to-end: With the fix: Design / architecture (1)
Suggestions (1)
|
…he conf and legacy path
Addresses review feedback on the original change: the legacy raw
task-completion-count behavior is simply incorrect (it is what loses rows), and
OSS has no backport-auditing or emergency-rollback process that would need a
kill switch for it. So this removes the
`spark.sql.inMemoryColumnarStorage.distinctPartitionTracking` conf and the
legacy code path entirely, and tracks the distinct set of materialized
partitions unconditionally.
`InMemoryRelation`'s `Either[LegacyAccumulators, PartitionKeyedAccumulator]`
(and the conf branch that chose between them) collapses to a single
`PartitionKeyedAccumulator`; `clearCache`, `isCachedRDDLoaded`, the
`materialized*` accessors, and `buildBuffers` lose their dual-path handling. The
now-unused `LongAccumulator` and `scala.util.{Left, Right}` imports are dropped.
Also fixes the Java unidoc `reference not found` errors by replacing the
`[[AccumulatorV2]]` / `[[ConcurrentHashMap]]` scaladoc links in
`PartitionKeyedAccumulator` (whose generated `{@link}`s javadoc could not
resolve) with backtick code spans.
### Tests
- `ConcurrentInMemoryRelationSuite`: removed the conf toggle and the
disabled-mode negative-control test (the disabled path no longer exists); the
deterministic `runDataLossRepro` stays and still detects a regression (its
`poisoned` branch would make `observed != expected`).
- `PartitionKeyedAccumulatorSuite`, `CachedTableSuite`, and
`InMemoryColumnarQuerySuite` are unchanged and still pass.
Co-authored-by: Isaac
… line length The prior follow-up's reworded scaladoc in ConcurrentInMemoryRelationSuite left four comment lines over 100 characters (the Scala linter checks test sources too, which a main-only scalastyle run missed). Re-wrap them; no behavior change. Co-authored-by: Isaac
|
@shrirangmhalgi thank you for the review! Make sense I think we don't need do preserve the legacy buggy code path regardless. I've removed the conf and |
shrirangmhalgi
left a comment
There was a problem hiding this comment.
Thanks for addressing the feedback LGTM.
cloud-fan
left a comment
There was a problem hiding this comment.
0 blocking, 1 non-blocking, 1 nit. A well-motivated, correct fix for a real AQE data-loss bug; the partition-keyed accumulator design and concurrency reasoning hold up.
Design / architecture (1)
- (non-blocking) PR description is stale: it still describes a
spark.sql.inMemoryColumnarStorage.distinctPartitionTrackingconf and a "negative control showing the row loss with the fix disabled", but per your 2026-06-20 comment the conf andEitherdual-path were removed — neither exists in the diff (nodistinctPartitionTrackinganywhere in source;ConcurrentInMemoryRelationSuitehas no fix-disabled path). Worth updating the description to match the shipped single-path code.
Nits: 1 minor item (see inline comments).
Verification
Traced the executor->driver accumulator path: each task adds exactly one (partitionId, (rows, bytes)) entry; Task.collectAccumulatorUpdates ships the accumulator back and DAGScheduler.updateAccumulators merges it via putAll (last-write-wins) on the single-threaded event loop. Under the concurrent first-touch race, N duplicate computes of one partition collapse to a single keyed entry, so accumulatedNumPartitions counts distinct partitions (the loaded latch cannot trip before every partition is in) and foldValues sums each partition once (stats not over-counted). Failed/interrupted tasks don't record (listener guard + countFailedValues=false). The shared synchronized monitor on the builder keeps the one-way latch safe against a concurrent clearCache reset.
| * `countFailedValues`), so only complete per-partition values are ever merged. | ||
| * | ||
| * Backed by a `ConcurrentHashMap`, whose per-entry atomicity is sufficient here: `add` and the | ||
| * `putAll` in `merge` are last-write-wins per key, and the reads (`value`, `numPartitions`, |
There was a problem hiding this comment.
Doc references a method that doesn't exist — the read accessor is accumulatedNumPartitions (defined at :81, used correctly in the inline comment at :75); there is no numPartitions.
| * `putAll` in `merge` are last-write-wins per key, and the reads (`value`, `numPartitions`, | |
| * `putAll` in `merge` are last-write-wins per key, and the reads (`value`, `accumulatedNumPartitions`, |
What changes were proposed in this pull request?
This change adds
PartitionKeyedAccumulator, aConcurrentHashMap-backed accumulator keyed by partition id with last-write-wins merge semantics and replaces the counter-based accumulators inCachedRDDBuilder(used inInMemoryRelation) with the new accumulator. The cached relation now:The behavior is gated by a new internal conf
spark.sql.inMemoryColumnarStorage.distinctPartitionTracking(default true); setting it to false restores the prior raw task-completion-count behavior.clearCacheresets the bookkeeping so a rebuilt cache starts clean.Why are the changes needed?
Fix the bug(introduced here #39624, seems like a day-1 bug) where InMemoryRelation will be marked materialized prematurely under conrrent queries:
TableCacheQueryStageExecfor every reference to the samedf.cache(never reused), and each one submits its own build job over the shared cache RDD.CachedRDDBuilder.isCachedRDDLoadeddecided the cache was materialized by comparing the partition count against a raw task-completion count. Duplicate completions of an empty-output partition could push that count to the partition total while a row-producing partition was still being built, so the cache latched as "loaded" withrowCount == 0.AQEPropagateEmptyRelationthen ("correctly", given the stats it was told) collapsed the cache branch to anEmptyRelationand silently dropping rows.clearCacheDoes this PR introduce any user-facing change?
NO
How was this patch tested?
PartitionKeyedAccumulatorSuite- accumulator semantics (last-write-wins add/merge, distinct key count, snapshot/reset).ConcurrentInMemoryRelationSuite- local-cluster reproduction: rows are preserved under concurrent first-touch with the fix on; stats are exact under duplicate cross-executor computes; and a negative control showing the row loss with the fix disabled.CachedTableSuite(clearCache resets bookkeeping) andInMemoryColumnarQuerySuite(size/row-count read through the new accessors).Was this patch authored or co-authored using generative AI tooling?
Yes