Skip to content

[SPARK-57547][SQL] Fix incorrect InMemoryRelation materialization under conrrent queries#56620

Open
liuzqt wants to merge 3 commits into
apache:masterfrom
liuzqt:SPARK-57547
Open

[SPARK-57547][SQL] Fix incorrect InMemoryRelation materialization under conrrent queries#56620
liuzqt wants to merge 3 commits into
apache:masterfrom
liuzqt:SPARK-57547

Conversation

@liuzqt

@liuzqt liuzqt commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This change adds PartitionKeyedAccumulator, a ConcurrentHashMap-backed accumulator keyed by partition id with last-write-wins merge semantics and replaces the counter-based accumulators in CachedRDDBuilder(used in InMemoryRelation) with the new accumulator. The cached relation now:

  • counts the DISTINCT materialized partition ids (the accumulator key set) when deciding whether the cache is fully loaded, so duplicate computes cannot inflate the count; and
  • derives exact, de-duplicated row-count and size stats by folding the per-partition values, counting each partition once.

The behavior is gated by a new internal conf
spark.sql.inMemoryColumnarStorage.distinctPartitionTracking (default true); setting it to false restores the prior raw task-completion-count behavior. clearCache resets the bookkeeping so a rebuilt cache starts clean.

Why are the changes needed?

Fix the bug(introduced here #39624, seems like a day-1 bug) where InMemoryRelation will be marked materialized prematurely under conrrent queries:

  • AQE creates a separate TableCacheQueryStageExec for every reference to the same df.cache (never reused), and each one submits its own build job over the shared cache RDD.
  • When concurrent queries reference the same cached relation, first-touches the cold cache from several jobs at once. Spark has no global, cross-executor "compute this partition once" barrier (only a per-executor write lock), so the same partition can be computed by multiple executors. CachedRDDBuilder.isCachedRDDLoaded decided the cache was materialized by comparing the partition count against a raw task-completion count. Duplicate completions of an empty-output partition could push that count to the partition total while a row-producing partition was still being built, so the cache latched as "loaded" with rowCount == 0.
  • One situation that can result in incorrect results: AQEPropagateEmptyRelation then ("correctly", given the stats it was told) collapsed the cache branch to an EmptyRelation and silently dropping rows.
  • Additional latent bugs:
    • size/rows accumulators could be over-counted
    • no accumulators reset upon clearCache

Does this PR introduce any user-facing change?

NO

How was this patch tested?

  • PartitionKeyedAccumulatorSuite - accumulator semantics (last-write-wins add/merge, distinct key count, snapshot/reset).
  • ConcurrentInMemoryRelationSuite - local-cluster reproduction: rows are preserved under concurrent first-touch with the fix on; stats are exact under duplicate cross-executor computes; and a negative control showing the row loss with the fix disabled.
  • Extended CachedTableSuite (clearCache resets bookkeeping) and InMemoryColumnarQuerySuite (size/row-count read through the new accessors).

Was this patch authored or co-authored using generative AI tooling?

Yes

…cached relation row loss

`CachedRDDBuilder` tracked materialization with a plain task-completion
`LongAccumulator` and summed per-batch row-count and size stats. When a cold
cache is first touched concurrently, the same partition can be computed more
than once across executors (duplicate cross-executor computes). Those
duplicates inflate the task-completion count and double-count the summed stats.
The inflated count can make `isCachedRDDLoaded` report the relation as fully
materialized before the distinct partitions have actually been recorded, and
`computeStats` can then expose a `rowCount` that does not reflect the cached
data. AQE's `PropagateEmptyRelation` can read that incorrect `rowCount` and
collapse the cached source to an empty relation, silently dropping rows.

This change adds `PartitionKeyedAccumulator`, a `ConcurrentHashMap`-backed
accumulator keyed by partition id with last-write-wins merge semantics. The
cached relation now:
  - counts the DISTINCT materialized partition ids (the accumulator key set)
    when deciding whether the cache is fully loaded, so duplicate computes
    cannot inflate the count; and
  - derives exact, de-duplicated row-count and size stats by folding the
    per-partition values, counting each partition once.

The behavior is gated by a new internal conf
`spark.sql.inMemoryColumnarStorage.distinctPartitionTracking` (default true);
setting it to false restores the prior raw task-completion-count behavior.
`clearCache` resets the bookkeeping so a rebuilt cache starts clean.

### Tests
- `PartitionKeyedAccumulatorSuite` - accumulator semantics (last-write-wins
  add/merge, distinct key count, snapshot/reset).
- `ConcurrentInMemoryRelationSuite` - local-cluster reproduction: rows are
  preserved under concurrent first-touch with the fix on; stats are exact
  under duplicate cross-executor computes; and a negative control showing the
  row loss with the fix disabled.
- Extended `CachedTableSuite` (clearCache resets bookkeeping) and
  `InMemoryColumnarQuerySuite` (size/row-count read through the new accessors).

Co-authored-by: Isaac
@liuzqt

liuzqt commented Jun 19, 2026

Copy link
Copy Markdown
Contributor Author

@cloud-fan @maryannxue

// barrier (only a per-executor write lock). Keying by partition id (last-write-wins) means
// those duplicate completions cannot mark the cache loaded before every partition has been
// computed -- which otherwise let AQE read rowCount 0 on a non-empty cache and propagate an
// empty relation, silently dropping rows -- and also yields exact, de-duplicated row count /

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "exact, de-duplicated" holds for deterministic stages. For indeterminate stages (e.g., filter(rand() > 0.5) cached with speculation), last-write-wins means the reported per-partition row count reflects whichever task's completion listener fires last -- which may differ from the task whose blocks the BlockManager actually stored (the first to acquire the partition write lock). The loaded-check is still sound (distinct partition IDs covers it) and this won't cause false empty-propagation since the count remains non-zero. Consider: "yields de-duplicated row count / size (exact for deterministic stages)."

// - Left (conf off): the raw per-batch accumulators (`LegacyAccumulators`) -- the buggy pre-fix
// behavior, kept only as a safety switch.
private val statsAccumulators
: Either[LegacyAccumulators, PartitionKeyedAccumulator[(Long, Long)]] =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Either / legacy path needed? The left branch re-enables the exact bug this PR fixes, so flipping the conf in production would silently re-introduce data loss. If the intent is a safety switch for rollback, a conf that disables the loaded-latch entirely (always report static plan stats until the RDD action returns) would be safer than re-enabling the broken counting. Otherwise, always using the keyed accumulator and dropping the branching would simplify every read site (materializedRowCount, materializedSizeInBytes, recordMaterialized, isCachedRDDLoaded) significantly.

@shrirangmhalgi

Copy link
Copy Markdown
Contributor

0 blocking, 1 non-blocking, 1 nit

This is a well-motivated fix for a real AQE correctness issue; the partition-keyed accumulator design is sound. Two items below.

Verification

Traced the race window end-to-end: TableCacheQueryStageExec.doMaterialize() submits a submitJob per reference → duplicate partition computes land on distinct executors → the old materializedPartitions.add(1L) inflates past numPartitions while the row-producing partition is still building → isCachedRDDLoaded latches true → computeStats() enters the materialized branch with rowCountStats.value == 0AQEPropagateEmptyRelation.getEstimatedRowCount reads rowCount == Some(0) → empty relation.

With the fix: ConcurrentHashMap.put is last-write-wins per partition ID, size() is linearizable, so accumulatedNumPartitions cannot exceed the number of distinct completed partitions. The synchronized on isCachedRDDLoaded and clearCache share the same monitor — the one-way latch cannot be poisoned by a concurrent reset. DAGScheduler event loop serializes merge calls.

Design / architecture (1)

InMemoryRelation.scalaEither[LegacyAccumulators, PartitionKeyedAccumulator]: the left branch re-enables the exact bug this PR fixes. If the intent is a safety switch, a conf that holds off the loaded-latch entirely (always report static plan stats until the RDD action returns) would be safer than re-enabling broken counting. Otherwise, always using the keyed accumulator and dropping the dual-path branching simplifies every read site significantly. - see inline

Suggestions (1)

InMemoryRelation.scala – "exact, de-duplicated row count" comment: overstates precision for indeterminate stages - see inline

liuzqt added 2 commits June 20, 2026 01:55
…he conf and legacy path

Addresses review feedback on the original change: the legacy raw
task-completion-count behavior is simply incorrect (it is what loses rows), and
OSS has no backport-auditing or emergency-rollback process that would need a
kill switch for it. So this removes the
`spark.sql.inMemoryColumnarStorage.distinctPartitionTracking` conf and the
legacy code path entirely, and tracks the distinct set of materialized
partitions unconditionally.

`InMemoryRelation`'s `Either[LegacyAccumulators, PartitionKeyedAccumulator]`
(and the conf branch that chose between them) collapses to a single
`PartitionKeyedAccumulator`; `clearCache`, `isCachedRDDLoaded`, the
`materialized*` accessors, and `buildBuffers` lose their dual-path handling. The
now-unused `LongAccumulator` and `scala.util.{Left, Right}` imports are dropped.

Also fixes the Java unidoc `reference not found` errors by replacing the
`[[AccumulatorV2]]` / `[[ConcurrentHashMap]]` scaladoc links in
`PartitionKeyedAccumulator` (whose generated `{@link}`s javadoc could not
resolve) with backtick code spans.

### Tests
- `ConcurrentInMemoryRelationSuite`: removed the conf toggle and the
  disabled-mode negative-control test (the disabled path no longer exists); the
  deterministic `runDataLossRepro` stays and still detects a regression (its
  `poisoned` branch would make `observed != expected`).
- `PartitionKeyedAccumulatorSuite`, `CachedTableSuite`, and
  `InMemoryColumnarQuerySuite` are unchanged and still pass.

Co-authored-by: Isaac
… line length

The prior follow-up's reworded scaladoc in ConcurrentInMemoryRelationSuite left four comment lines over 100 characters (the Scala linter checks test sources too, which a main-only scalastyle run missed). Re-wrap them; no behavior change.

Co-authored-by: Isaac
@liuzqt

liuzqt commented Jun 20, 2026

Copy link
Copy Markdown
Contributor Author

@shrirangmhalgi thank you for the review! Make sense I think we don't need do preserve the legacy buggy code path regardless. I've removed the conf and Either, the code change should be much cleaner now.

@shrirangmhalgi shrirangmhalgi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the feedback LGTM.

@cloud-fan cloud-fan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 blocking, 1 non-blocking, 1 nit. A well-motivated, correct fix for a real AQE data-loss bug; the partition-keyed accumulator design and concurrency reasoning hold up.

Design / architecture (1)

  • (non-blocking) PR description is stale: it still describes a spark.sql.inMemoryColumnarStorage.distinctPartitionTracking conf and a "negative control showing the row loss with the fix disabled", but per your 2026-06-20 comment the conf and Either dual-path were removed — neither exists in the diff (no distinctPartitionTracking anywhere in source; ConcurrentInMemoryRelationSuite has no fix-disabled path). Worth updating the description to match the shipped single-path code.

Nits: 1 minor item (see inline comments).

Verification

Traced the executor->driver accumulator path: each task adds exactly one (partitionId, (rows, bytes)) entry; Task.collectAccumulatorUpdates ships the accumulator back and DAGScheduler.updateAccumulators merges it via putAll (last-write-wins) on the single-threaded event loop. Under the concurrent first-touch race, N duplicate computes of one partition collapse to a single keyed entry, so accumulatedNumPartitions counts distinct partitions (the loaded latch cannot trip before every partition is in) and foldValues sums each partition once (stats not over-counted). Failed/interrupted tasks don't record (listener guard + countFailedValues=false). The shared synchronized monitor on the builder keeps the one-way latch safe against a concurrent clearCache reset.

* `countFailedValues`), so only complete per-partition values are ever merged.
*
* Backed by a `ConcurrentHashMap`, whose per-entry atomicity is sufficient here: `add` and the
* `putAll` in `merge` are last-write-wins per key, and the reads (`value`, `numPartitions`,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc references a method that doesn't exist — the read accessor is accumulatedNumPartitions (defined at :81, used correctly in the inline comment at :75); there is no numPartitions.

Suggested change
* `putAll` in `merge` are last-write-wins per key, and the reads (`value`, `numPartitions`,
* `putAll` in `merge` are last-write-wins per key, and the reads (`value`, `accumulatedNumPartitions`,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants