[VELOX] Support push filter for table cache#12073
Conversation
Add batch-level statistics collection and filter pushdown for columnar cached batches. The C++ layer (BatchStatsCollector) computes per-column min/max bounds, null counts, row counts, and size during serialization. The Scala decoder (ColumnarCachedBatchSerializer) reads the wire-format stats and integrates with Spark's SimpleMetricsCachedBatch filter evaluation to skip batches that cannot satisfy query predicates. Supported bound types: Boolean, Byte, Short, Int, Long, Float, Double, Date, Timestamp, String, and Decimal(precision<=18). Float/Double with NaN degrade to pass-through. String bounds use length-prefixed UTF-8. Decimal uses the unscaled int64 representation for short decimals. Key design decisions: - Wire format v1 with per-column tag + hasBounds + bounds + counters - Tautological bounds (type extremes) for unknown/absent bounds to avoid the 3VL null-skip correctness bug - Tag/dataType compatibility validation to reject corrupt payloads - Backward compatible: unknown tags fall through to pass-through Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Run Gluten Clickhouse CI on x86 |
6c67d6e to
b2f0c7c
Compare
|
Run Gluten Clickhouse CI on x86 |
Integrate apache#12092's compact stats implementation alongside the existing BatchStatsCollector path. Adds framedSerializeWithStats which produces a self-describing framed blob [magic|statsLen|statsBlob|bytesLen|bytesBlob] with support for HUGEINT (Decimal p>18), YearMonthIntervalType, and DayTimeIntervalType. C++ changes: - Add computeStats() and framedSerializeWithStats() to VeloxColumnarBatchSerializer covering BIGINT, INTEGER, SMALLINT, TINYINT, REAL, DOUBLE, BOOLEAN, HUGEINT, TIMESTAMP, VARCHAR - Add framedSerializeWithStats virtual method to ColumnarBatchSerializer JNI changes: - Add framedSerializeWithStats native method returning byte[] Scala changes: - Wire convertColumnarBatchToCachedBatch to use framedSerializeWithStats - Add parseFramedBytes to decode the framed wire format - Add decodeFramedStats with full type coverage including int128 Decimal - Add tautological bounds for YearMonthIntervalType and DayTimeIntervalType Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
b2f0c7c to
b66114c
Compare
|
Run Gluten Clickhouse CI on x86 |
| throw new UnsupportedOperationException( | ||
| s"CachedColumnarBatch stats must be GenericInternalRow, got ${other.getClass}") | ||
| } | ||
| // H8: Encode stats body into a sized buffer so we can write a length prefix. |
There was a problem hiding this comment.
Comments cover non-obvious rationale well, but density slows the diff:
CachedColumnarBatchKryoSerializer.write: ~30 lines code, ~50 lines commentstautologicalBoundsForFloatType case: 17 lines of NaN rationale- The NaN-3VL-skip explanation repeats 5+ times in
ColumnarCachedBatchSerializer.scala - Internal review-cycle tags (
R2-H4,R2-H11,R3-H1,H8, etc.) leak into production
code
| val (lo, hi) = readBounds(buf, typeTag, dataType) | ||
| if (lo == null || hi == null) { | ||
| val opt = tautologicalBoundsFor(dataType) | ||
| if (opt.isEmpty) return null else opt.get |
There was a problem hiding this comment.
When any column's tautologicalBoundsFor returns None (NaN-degraded Float/Double, collated
StringType, unknown DataType), decodeStats demotes the entire batch's stats to null. This
nullifies valid min/max bounds on the OTHER columns in the same batch — e.g. a 12-column ML
feature table where only one column carries NaN: the remaining 11 columns lose pruning
entirely on that batch.
The unknown-DataType path (YearMonthIntervalType etc.) can switch to per-column demotion:
fill the offending column's lower/upper slots with type-extreme sentinels (long min/max)
and set nullCount=1 plus the real rowCount so IsNull/IsNotNull predicates remain
useful while bound-comparison predicates degrade to no-ops on that column only. The other
columns keep their valid bounds. Prerequisite: extend tautologicalBoundsFor to
recognise these types — the current case _ => None (:1250-1255) is a deliberate
conservative choice ("safer than fabricating bounds we can't prove correct"); per-column
demotion flips this trade-off from "conservative across the board" to "non-conservative
once a type is recognised".
Caveat: Float/Double NaN truly has no finite tautological pair (Spark orders NaN above +Inf),
and collated StringType has no byte-wise max either (the code comment at :1104-1107 admits
this — "0xFF*256 is not the max", and Spark 4.0+ collation-aware ordering breaks any byte-
wise sentinel). Both must keep the all-or-nothing demotion or the three-valued-logic skip
bug returns (a NULL bound coerces the predicate to NULL, which is then coerced to false,
wrongly skipping the batch). Only the unknown-DataType case can be safely narrowed.
| case StatsTypeTag.TIMESTAMP => | ||
| // Match the compat pattern used in Validators.containsNTZ so this | ||
| // compiles across Spark 3.3 (no TimestampNTZType) through 3.5+. | ||
| dataType == TimestampType || dataType.catalogString == "timestamp_ntz" |
There was a problem hiding this comment.
isTagCompatibleWithDataType accepts both TimestampType and TimestampNTZType for the
TIMESTAMP wire tag. Both are int64 underneath, but their semantics are NOT interchangeable:
TimestampType: epoch micros (UTC)TimestampNTZType: local micros (no tz)
If a cache block is written under a TimestampType schema (C++ converts Velox Timestamp
to UTC micros) but later read against a TimestampNTZType schema, the predicate literal is
parsed differently on each side — both end up as long, but the values now disagree by
exactly the local-tz offset. Filter pushdown silently mis-prunes against tz-shifted bounds.
Pre-merge: Tighten to TimestampType-only; let TimestampNTZType fall into case _ => false
and pass through.
Follow-up issue: Split the C++ typeTagFor into kTimestamp / kTimestampNtz so
cross-version schema drift becomes an explicit incompat rather than silent wrong answers.
| // prefix. Prior release mismatched the reader at 1 MiB vs. the writer at | ||
| // 64 KiB; tightening the reader side aligns the contract and closes the | ||
| // over-allocation window. | ||
| private val MAX_STATS_STRING_LEN: Int = 64 * 1024 |
There was a problem hiding this comment.
The 64-KiB string cap is hand-replicated on both sides:
kStringBoundsCap = 64 * 1024inBatchStatsCollector.cc:296MAX_STATS_STRING_LEN = 64 * 1024inColumnarCachedBatchSerializer.scala:981
Unlike StatsTypeTag (which has cross-side static_assert plus a Scala unit-test pinning),
this constant has no automated drift guard. If a future change bumps the C++ cap to 1 MiB
without updating Scala, the reader rejects every string in the 64 KiB–1 MiB range
(len > MAX_STATS_STRING_LEN → null stats → pass-through) — a silent pruning regression
with green checkAnswer tests.
Two options, either acceptable:
- Add
static_assert(kStringBoundsCap == 64 * 1024, ...)on C++ AND a unit test pinning
MAX_STATS_STRING_LEN == 64 * 1024on Scala — makes the value a build-break contract. - Plumb the cap through
GlutenConfigso there is only one source of truth.
| "kill switch); in that case reads continue to decode both v0 and v1 transparently, " + | ||
| "but writes emit v0 only and no stats are produced for new cache blocks.") | ||
| .booleanConf | ||
| .createWithDefault(false) |
There was a problem hiding this comment.
PR description states "default: true" but the code is createWithDefault(false), and the
config key itself differs (PR description: spark.gluten.sql.columnar.tableCacheFilterEnabled;
code: spark.gluten.sql.columnar.tableCache.filterPushdown.enabled).
Keep the code default (false is safer for a v1 feature: opt-in lets production clusters get
observability before the behavior change lands), and update the PR description and release
notes to match.
| val COLUMNAR_TABLE_CACHE_STATS_WIRE_V1_ENABLED = | ||
| buildConf("spark.gluten.sql.columnar.tableCache.stats.wire.v1.enabled") | ||
| .internal() | ||
| .doc( |
There was a problem hiding this comment.
The current doc only mentions rolling upgrades but misses the reader-side effect:
- Writer: forces v0 wire format
- Reader Kryo decoding: always double-decodes v0/v1 (independent of this flag)
- Reader
buildFilter(:911-916):!filterPushdown.enabled || !wire.v1.enabledreturns
pass-through, including for pre-cached v1 blocks
So flipping wire.v1.enabled=false alone is enough to stop reader-side pruning (OR, not
AND). No code change required, only the .doc(...) expansion below; collapsing the two
flags into a single kill switch can be a follow-up.
Suggest expanding the .doc(...):
"Rolling-upgrade kill switch for the stats-carrying v1 wire format. Affects both writer
AND reader. Writer side: when false, executors emit v0 (no stats) regardless of
filterPushdown.enabled. Reader side: when false,buildFilterreturns pass-through
for ALL cached batches (including v1 blocks already on disk), identical to flipping
filterPushdown.enabledoff. Either flag set to false fully disables pruning — they are
OR'ed in the reader-side kill-switch (ColumnarCachedBatchSerializer.scala:911-916).
Use during a rolling upgrade with mixed Gluten binaries; flip back to true once the
cluster is uniform."
| // representation of `value`. Used by the string path to backfill a length | ||
| // header after the payload is written. | ||
| template <typename T> | ||
| void writeLEAt(std::vector<uint8_t>& out, size_t offset, T value) { |
There was a problem hiding this comment.
writeLEAt seems dead code. The doc-comment claims "used by the string path to backfill a
length header after the payload is written", but the actual string branch in toBytes()
writes length first, then payload — it never calls writeLEAt. git grep writeLEAt finds
only this definition.
The comment doesn't match the implementation; future readers chasing the doc will look for
a backfill path that doesn't exist. Delete the function and its static_assert. If a future
change wants the backfill shape (reserve a slot, write payload, then patch length),
reintroduce it together with a reserveLE helper at that time.
| object CachedColumnarBatchKryoSerializer { | ||
| // 0xC0DEC0DE as a signed int is -1059192130 (negative). `numRows` in Spark is non-negative, | ||
| // so any v0 stream starts with a non-negative int and can never collide with the magic. | ||
| private[execution] val MAGIC: Int = 0xc0dec0de |
There was a problem hiding this comment.
The decimal equivalent in the comment is wrong. 0xC0DEC0DE as signed int32 is
-1059143458, not -1059192130 (verify: python3 -c 'print(0xC0DEC0DE - 2**32)').
grep -rn 1059192130 backends-velox/ finds two copies that need the same fix:
ColumnarCachedBatchSerializer.scala:564(production comment)ColumnarCachedBatchSerializerSuite.scala:108(test comment)
Functionally harmless (the code uses the hex literal), but any docs or tools that grep on
this exact decimal value will mismatch. Either fix both occurrences to -1059143458 or drop
the specific decimal and keep just the qualitative "negative" claim.
| // collationName == "UTF8_BINARY"), and "string collate xxx" for | ||
| // any other collation. This gives us a shim-safe predicate that | ||
| // compiles unchanged across Spark 3.3..4.1. | ||
| dataType.isInstanceOf[StringType] && dataType.catalogString == "string" |
There was a problem hiding this comment.
test("StringType.catalogString stability for UTF8_BINARY default") {
assert(StringType.catalogString == "string")
}
It is recommended to add a simple test case to confirm that this behavior remains consistent across different Spark versions.
| } | ||
| } | ||
|
|
||
| void BatchStatsCollector::update(const RowVectorPtr& vector) { |
There was a problem hiding this comment.
update has no try/catch. The caller (VeloxColumnarBatchSerializer::append) already
handles exceptions: for std::exception / unknown exceptions it calls
statsCollector_.reset(), destroying the unique_ptr (the next append sees nullptr and
skips stats — cache write itself never crashes); for std::bad_alloc it rethrows so OOM
propagates as a cluster-wide fatal error.
Note: toBytes() at BatchStatsCollector.cc:641 returns an empty vector when
schemaDriftPoisoned_ is true, identical to the wire-level outcome of caller-reset followed
by subsequent skip-on-nullptr. Along the std::exception path, both produce a 0-byte stats
payload that the Scala side treats as pass-through.
Adding an in-update try/catch is therefore a hygiene-only change: it localizes exception
handling and avoids the unique_ptr destroy/recreate cost. If you do add it, latch
schemaDriftPoisoned_=true plus a single LOG(WARNING) so the path matches the existing
schema-drift handler.
|
this would be closed and keep on in #12092 |
What changes are proposed in this pull request?
This PR adds batch-level statistics collection and filter pushdown for Velox columnar cached batches,
enabling Spark to skip cached batches that cannot satisfy query predicates (similar to Parquet
row-group pruning but for in-memory cache).
Architecture:
null counts, row counts, and byte sizes. Stats are appended to the serialized payload in a compact
binary wire format.
SimpleMetricsCachedBatch filter evaluation to skip batches whose bounds prove the predicate cannot
match.
Supported bound types: Boolean, Byte, Short, Int, Long, Float, Double, Date, Timestamp, String, and
Decimal(precision<=18).
Key design decisions:
rowCount(4B) + sizeInBytes(8B)
correctness bug (where null bounds cause Spark's predicate to evaluate to null → coerced to false →
batch incorrectly skipped)
ordering)
How was this patch tested?
supported types, NaN poisoning, inverted bounds rejection, tag/schema mismatch detection,
tautological bounds fallback, truncated payload handling, negative counter rejection, and Decimal
bounds (with/without bounds, precision>18 rejection).
and validate correctness via checkAnswer for Int, String, Timestamp, and Decimal predicates including
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)