Skip to content

[VELOX] Support push filter for table cache#12073

Closed
jackylee-ch wants to merge 2 commits into
apache:mainfrom
jackylee-ch:table_cache_support_push_filters
Closed

[VELOX] Support push filter for table cache#12073
jackylee-ch wants to merge 2 commits into
apache:mainfrom
jackylee-ch:table_cache_support_push_filters

Conversation

@jackylee-ch
Copy link
Copy Markdown
Contributor

What changes are proposed in this pull request?

This PR adds batch-level statistics collection and filter pushdown for Velox columnar cached batches,
enabling Spark to skip cached batches that cannot satisfy query predicates (similar to Parquet
row-group pruning but for in-memory cache).

Architecture:

  • C++ (BatchStatsCollector): During columnar batch serialization, computes per-column min/max bounds,
    null counts, row counts, and byte sizes. Stats are appended to the serialized payload in a compact
    binary wire format.
  • Scala (ColumnarCachedBatchSerializer): Decodes the wire-format stats and integrates with Spark's
    SimpleMetricsCachedBatch filter evaluation to skip batches whose bounds prove the predicate cannot
    match.

Supported bound types: Boolean, Byte, Short, Int, Long, Float, Double, Date, Timestamp, String, and
Decimal(precision<=18).

Key design decisions:

  • Wire format v1: per-column tag(1B) + hasBounds(1B) + bounds(variable) + nullCount(4B) +
    rowCount(4B) + sizeInBytes(8B)
  • Tautological bounds (type extremes) for unknown/absent bounds to avoid the 3VL null-skip
    correctness bug (where null bounds cause Spark's predicate to evaluate to null → coerced to false →
    batch incorrectly skipped)
  • Float/Double with NaN degrade to pass-through (no finite tautological pair exists due to NaN
    ordering)
  • String bounds capped at 64 KiB to prevent metadata bloat
  • Tag/dataType compatibility validation to reject corrupt payloads gracefully
  • Backward compatible: unknown tags fall through to pass-through filtering
  • Controlled by config spark.gluten.sql.columnar.tableCacheFilterEnabled (default: true)

How was this patch tested?

  1. Unit tests (ColumnarCachedBatchSerializerSuite): 38 tests covering wire format round-trip for all
    supported types, NaN poisoning, inverted bounds rejection, tag/schema mismatch detection,
    tautological bounds fallback, truncated payload handling, negative counter rejection, and Decimal
    bounds (with/without bounds, precision>18 rejection).
  2. E2E tests (VeloxColumnarCacheSuite): Integration tests that cache real data, run filter queries,
    and validate correctness via checkAnswer for Int, String, Timestamp, and Decimal predicates including

, BETWEEN, =, and IS NULL.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.7)

Add batch-level statistics collection and filter pushdown for columnar
cached batches. The C++ layer (BatchStatsCollector) computes per-column
min/max bounds, null counts, row counts, and size during serialization.
The Scala decoder (ColumnarCachedBatchSerializer) reads the wire-format
stats and integrates with Spark's SimpleMetricsCachedBatch filter
evaluation to skip batches that cannot satisfy query predicates.

Supported bound types: Boolean, Byte, Short, Int, Long, Float, Double,
Date, Timestamp, String, and Decimal(precision<=18). Float/Double with
NaN degrade to pass-through. String bounds use length-prefixed UTF-8.
Decimal uses the unscaled int64 representation for short decimals.

Key design decisions:
- Wire format v1 with per-column tag + hasBounds + bounds + counters
- Tautological bounds (type extremes) for unknown/absent bounds to
  avoid the 3VL null-skip correctness bug
- Tag/dataType compatibility validation to reject corrupt payloads
- Backward compatible: unknown tags fall through to pass-through

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@jackylee-ch jackylee-ch marked this pull request as draft May 11, 2026 16:42
@github-actions github-actions Bot added CORE works for Gluten Core VELOX labels May 11, 2026
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Integrate apache#12092's compact stats implementation alongside the existing
BatchStatsCollector path. Adds framedSerializeWithStats which produces a
self-describing framed blob [magic|statsLen|statsBlob|bytesLen|bytesBlob]
with support for HUGEINT (Decimal p>18), YearMonthIntervalType, and
DayTimeIntervalType.

C++ changes:
- Add computeStats() and framedSerializeWithStats() to
  VeloxColumnarBatchSerializer covering BIGINT, INTEGER, SMALLINT,
  TINYINT, REAL, DOUBLE, BOOLEAN, HUGEINT, TIMESTAMP, VARCHAR
- Add framedSerializeWithStats virtual method to ColumnarBatchSerializer

JNI changes:
- Add framedSerializeWithStats native method returning byte[]

Scala changes:
- Wire convertColumnarBatchToCachedBatch to use framedSerializeWithStats
- Add parseFramedBytes to decode the framed wire format
- Add decodeFramedStats with full type coverage including int128 Decimal
- Add tautological bounds for YearMonthIntervalType and DayTimeIntervalType

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@jackylee-ch jackylee-ch force-pushed the table_cache_support_push_filters branch from b2f0c7c to b66114c Compare May 15, 2026 17:10
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

throw new UnsupportedOperationException(
s"CachedColumnarBatch stats must be GenericInternalRow, got ${other.getClass}")
}
// H8: Encode stats body into a sized buffer so we can write a length prefix.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments cover non-obvious rationale well, but density slows the diff:

  • CachedColumnarBatchKryoSerializer.write: ~30 lines code, ~50 lines comments
  • tautologicalBoundsFor FloatType case: 17 lines of NaN rationale
  • The NaN-3VL-skip explanation repeats 5+ times in ColumnarCachedBatchSerializer.scala
  • Internal review-cycle tags (R2-H4, R2-H11, R3-H1, H8, etc.) leak into production
    code

val (lo, hi) = readBounds(buf, typeTag, dataType)
if (lo == null || hi == null) {
val opt = tautologicalBoundsFor(dataType)
if (opt.isEmpty) return null else opt.get
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When any column's tautologicalBoundsFor returns None (NaN-degraded Float/Double, collated
StringType, unknown DataType), decodeStats demotes the entire batch's stats to null. This
nullifies valid min/max bounds on the OTHER columns in the same batch — e.g. a 12-column ML
feature table where only one column carries NaN: the remaining 11 columns lose pruning
entirely on that batch.

The unknown-DataType path (YearMonthIntervalType etc.) can switch to per-column demotion:
fill the offending column's lower/upper slots with type-extreme sentinels (long min/max)
and set nullCount=1 plus the real rowCount so IsNull/IsNotNull predicates remain
useful while bound-comparison predicates degrade to no-ops on that column only. The other
columns keep their valid bounds. Prerequisite: extend tautologicalBoundsFor to
recognise these types — the current case _ => None (:1250-1255) is a deliberate
conservative choice ("safer than fabricating bounds we can't prove correct"); per-column
demotion flips this trade-off from "conservative across the board" to "non-conservative
once a type is recognised".

Caveat: Float/Double NaN truly has no finite tautological pair (Spark orders NaN above +Inf),
and collated StringType has no byte-wise max either (the code comment at :1104-1107 admits
this — "0xFF*256 is not the max", and Spark 4.0+ collation-aware ordering breaks any byte-
wise sentinel). Both must keep the all-or-nothing demotion or the three-valued-logic skip
bug returns (a NULL bound coerces the predicate to NULL, which is then coerced to false,
wrongly skipping the batch). Only the unknown-DataType case can be safely narrowed.

case StatsTypeTag.TIMESTAMP =>
// Match the compat pattern used in Validators.containsNTZ so this
// compiles across Spark 3.3 (no TimestampNTZType) through 3.5+.
dataType == TimestampType || dataType.catalogString == "timestamp_ntz"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isTagCompatibleWithDataType accepts both TimestampType and TimestampNTZType for the
TIMESTAMP wire tag. Both are int64 underneath, but their semantics are NOT interchangeable:

  • TimestampType: epoch micros (UTC)
  • TimestampNTZType: local micros (no tz)

If a cache block is written under a TimestampType schema (C++ converts Velox Timestamp
to UTC micros) but later read against a TimestampNTZType schema, the predicate literal is
parsed differently on each side — both end up as long, but the values now disagree by
exactly the local-tz offset. Filter pushdown silently mis-prunes against tz-shifted bounds.

Pre-merge: Tighten to TimestampType-only; let TimestampNTZType fall into case _ => false
and pass through.

Follow-up issue: Split the C++ typeTagFor into kTimestamp / kTimestampNtz so
cross-version schema drift becomes an explicit incompat rather than silent wrong answers.

// prefix. Prior release mismatched the reader at 1 MiB vs. the writer at
// 64 KiB; tightening the reader side aligns the contract and closes the
// over-allocation window.
private val MAX_STATS_STRING_LEN: Int = 64 * 1024
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 64-KiB string cap is hand-replicated on both sides:

  • kStringBoundsCap = 64 * 1024 in BatchStatsCollector.cc:296
  • MAX_STATS_STRING_LEN = 64 * 1024 in ColumnarCachedBatchSerializer.scala:981

Unlike StatsTypeTag (which has cross-side static_assert plus a Scala unit-test pinning),
this constant has no automated drift guard. If a future change bumps the C++ cap to 1 MiB
without updating Scala, the reader rejects every string in the 64 KiB–1 MiB range
(len > MAX_STATS_STRING_LEN → null stats → pass-through) — a silent pruning regression
with green checkAnswer tests.

Two options, either acceptable:

  1. Add static_assert(kStringBoundsCap == 64 * 1024, ...) on C++ AND a unit test pinning
    MAX_STATS_STRING_LEN == 64 * 1024 on Scala — makes the value a build-break contract.
  2. Plumb the cap through GlutenConfig so there is only one source of truth.

"kill switch); in that case reads continue to decode both v0 and v1 transparently, " +
"but writes emit v0 only and no stats are produced for new cache blocks.")
.booleanConf
.createWithDefault(false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description states "default: true" but the code is createWithDefault(false), and the
config key itself differs (PR description: spark.gluten.sql.columnar.tableCacheFilterEnabled;
code: spark.gluten.sql.columnar.tableCache.filterPushdown.enabled).

Keep the code default (false is safer for a v1 feature: opt-in lets production clusters get
observability before the behavior change lands), and update the PR description and release
notes to match.

val COLUMNAR_TABLE_CACHE_STATS_WIRE_V1_ENABLED =
buildConf("spark.gluten.sql.columnar.tableCache.stats.wire.v1.enabled")
.internal()
.doc(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current doc only mentions rolling upgrades but misses the reader-side effect:

  1. Writer: forces v0 wire format
  2. Reader Kryo decoding: always double-decodes v0/v1 (independent of this flag)
  3. Reader buildFilter (:911-916): !filterPushdown.enabled || !wire.v1.enabled returns
    pass-through, including for pre-cached v1 blocks

So flipping wire.v1.enabled=false alone is enough to stop reader-side pruning (OR, not
AND). No code change required, only the .doc(...) expansion below; collapsing the two
flags into a single kill switch can be a follow-up.

Suggest expanding the .doc(...):

"Rolling-upgrade kill switch for the stats-carrying v1 wire format. Affects both writer
AND reader. Writer side: when false, executors emit v0 (no stats) regardless of
filterPushdown.enabled. Reader side: when false, buildFilter returns pass-through
for ALL cached batches (including v1 blocks already on disk), identical to flipping
filterPushdown.enabled off. Either flag set to false fully disables pruning — they are
OR'ed in the reader-side kill-switch (ColumnarCachedBatchSerializer.scala:911-916).
Use during a rolling upgrade with mixed Gluten binaries; flip back to true once the
cluster is uniform."

// representation of `value`. Used by the string path to backfill a length
// header after the payload is written.
template <typename T>
void writeLEAt(std::vector<uint8_t>& out, size_t offset, T value) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeLEAt seems dead code. The doc-comment claims "used by the string path to backfill a
length header after the payload is written", but the actual string branch in toBytes()
writes length first, then payload — it never calls writeLEAt. git grep writeLEAt finds
only this definition.

The comment doesn't match the implementation; future readers chasing the doc will look for
a backfill path that doesn't exist. Delete the function and its static_assert. If a future
change wants the backfill shape (reserve a slot, write payload, then patch length),
reintroduce it together with a reserveLE helper at that time.

object CachedColumnarBatchKryoSerializer {
// 0xC0DEC0DE as a signed int is -1059192130 (negative). `numRows` in Spark is non-negative,
// so any v0 stream starts with a non-negative int and can never collide with the magic.
private[execution] val MAGIC: Int = 0xc0dec0de
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decimal equivalent in the comment is wrong. 0xC0DEC0DE as signed int32 is
-1059143458, not -1059192130 (verify: python3 -c 'print(0xC0DEC0DE - 2**32)').

grep -rn 1059192130 backends-velox/ finds two copies that need the same fix:

  • ColumnarCachedBatchSerializer.scala:564 (production comment)
  • ColumnarCachedBatchSerializerSuite.scala:108 (test comment)

Functionally harmless (the code uses the hex literal), but any docs or tools that grep on
this exact decimal value will mismatch. Either fix both occurrences to -1059143458 or drop
the specific decimal and keep just the qualitative "negative" claim.

// collationName == "UTF8_BINARY"), and "string collate xxx" for
// any other collation. This gives us a shim-safe predicate that
// compiles unchanged across Spark 3.3..4.1.
dataType.isInstanceOf[StringType] && dataType.catalogString == "string"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test("StringType.catalogString stability for UTF8_BINARY default") {
  assert(StringType.catalogString == "string")
}

It is recommended to add a simple test case to confirm that this behavior remains consistent across different Spark versions.

}
}

void BatchStatsCollector::update(const RowVectorPtr& vector) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update has no try/catch. The caller (VeloxColumnarBatchSerializer::append) already
handles exceptions: for std::exception / unknown exceptions it calls
statsCollector_.reset(), destroying the unique_ptr (the next append sees nullptr and
skips stats — cache write itself never crashes); for std::bad_alloc it rethrows so OOM
propagates as a cluster-wide fatal error.
Note: toBytes() at BatchStatsCollector.cc:641 returns an empty vector when
schemaDriftPoisoned_ is true, identical to the wire-level outcome of caller-reset followed
by subsequent skip-on-nullptr. Along the std::exception path, both produce a 0-byte stats
payload that the Scala side treats as pass-through.
Adding an in-update try/catch is therefore a hygiene-only change: it localizes exception
handling and avoids the unique_ptr destroy/recreate cost. If you do add it, latch
schemaDriftPoisoned_=true plus a single LOG(WARNING) so the path matches the existing
schema-drift handler.

@jackylee-ch
Copy link
Copy Markdown
Contributor Author

this would be closed and keep on in #12092

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CORE works for Gluten Core VELOX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants