Propagate SortedValue to Polars set_sorted() on index column#3049
Propagate SortedValue to Polars set_sorted() on index column#3049G-D-Petrov wants to merge 6 commits into
Conversation
| from arcticdb_ext.version_store import PandasOutputFrame | ||
| from arcticdb_ext.version_store import PandasOutputFrame, SortedValue | ||
| from arcticdb.version_store._normalization import FrameData | ||
|
|
There was a problem hiding this comment.
sorted shadows the Python built-in sorted(). This can cause subtle bugs in any method that needs to call the built-in while also accessing self.sorted. Suggest renaming to sort_order or sort_value throughout (including the ReadResult counterpart and the C++-side parameter names).
| def __init__(self, sym, frame_data, norm, sort_order=SortedValue.UNKNOWN): |
| assert isinstance(result, pl.DataFrame) | ||
| assert result["__index__"].flags["SORTED_ASC"] is True | ||
| assert result["sorted_val"].flags["SORTED_ASC"] is False | ||
| assert result["another"].flags["SORTED_ASC"] is False |
There was a problem hiding this comment.
The DESCENDING code path in _apply_polars_sorted_flag_to_index is exercised by production code but not covered by any test here. A simple case — write data with a descending datetime index (or use update() with sort_on_index=False to produce a reverse-sorted symbol) — would verify that result["__index__"].flags["SORTED_DESC"] is True and SORTED_ASC is False. Without this, a regression in the descending=True branch would go undetected.
| elif input_type in ("df", "series"): | ||
| common = norm.df.common if input_type == "df" else norm.series.common | ||
| index_type = common.WhichOneof("index_type") | ||
| if index_type == "index" and common.index.is_physically_stored: |
There was a problem hiding this comment.
WhichOneof("index_type") can also return "multi_index" for MultiIndex DataFrames. In that case index_type != "index" and the function returns None, so no sorted flag is set — which is probably the right conservative choice since multi-level index sorting is more complex. Worth adding a comment here (or in the PR description) acknowledging this as a known limitation so future maintainers don't think it was accidentally missed.
ArcticDB Code Review SummaryPR propagates ArcticDB's internal Changes since last review (post-rebase commits
|
| assert result[col].flags["SORTED_ASC"] is False, f"Column {col} should not have SORTED_ASC" | ||
|
|
||
|
|
||
| def test_sorted_flag_not_set_for_pandas_output(lmdb_library): |
There was a problem hiding this comment.
Do we need this test? We have a lot of other pandas coverage of pandas reads.
|
|
||
| result = NativeVersionStore._apply_polars_sorted_flag_to_index(data, read_result) | ||
|
|
||
| assert result["__index__"].flags["SORTED_ASC"] is False |
There was a problem hiding this comment.
A meta point worth discussing is whether we should treat UNKNOWN as SORTED_ASC.
For all other purposes we treat UNKOWN (written by old arctcdb versions before sortedness was introduced) as sorted. (e.g. for update we use this code).
I'm personally in favor of treating unknown constistently and set sorted even if unkown.
We should instead add a test for UNSORTED
|
|
||
| class ReadResult: | ||
| def __init__(self, version, frame_data, norm, udm, mmeta, node_read_results): | ||
| def __init__(self, version, frame_data, norm, udm, mmeta, node_read_results, sort_order=SortedValue.UNKNOWN): |
There was a problem hiding this comment.
I'm slightly worried if we're using a default value we might have missed some place where this is used incorrectly.
| Output format for the returned dataframe. | ||
| If `None`, uses the output format from the `Library` instance. | ||
| See `OutputFormat` documentation for details on available formats. | ||
| When using ``POLARS`` output format, the index column (if physically stored) will automatically have |
There was a problem hiding this comment.
We should probably move these docs to OutputFormat docs. The same description applies to methods like head and tail all of which reference the OutputFormat docs.
| return True | ||
|
|
||
|
|
||
| def _get_index_col_from_norm(norm): |
There was a problem hiding this comment.
This will get drastically simpler after #3037 as the index column will always be first.
|
|
||
| @staticmethod | ||
| def _apply_polars_sorted_flag_to_index(data, read_result): | ||
| if pl is None or not isinstance(data, pl.DataFrame): |
There was a problem hiding this comment.
Nit: I think it would be slightly cleaner if we call this only for polars DataFrames here, then this check would not be needed.
| elif input_type in ("df", "series"): | ||
| common = norm.df.common if input_type == "df" else norm.series.common | ||
| index_type = common.WhichOneof("index_type") | ||
| # multi_index intentionally excluded: multi-level sorting semantics are more complex |
There was a problem hiding this comment.
We probably still want to include multi_index as long as first index is sorted.
| pyuser_meta, | ||
| multi_key_meta, | ||
| std::move(node_results), | ||
| static_cast<int>(ret.sorted) |
There was a problem hiding this comment.
Why cast to int? Can't we expose SortedValue in python bindings?
| NodeReadResult( | ||
| const StreamId& symbol, OutputFrame&& frame_data, | ||
| arcticdb::proto::descriptors::NormalizationMetadata&& norm_meta | ||
| arcticdb::proto::descriptors::NormalizationMetadata&& norm_meta, SortedValue sorted = SortedValue::UNKNOWN |
There was a problem hiding this comment.
Nit: Again slightly worried about this having a default value, we might have missed some constructions which should specify a sorted value.
This only matters if we decide to tread UNKNOWN as sorted. Otherwise it's probably fine
Arrow writes previously hard-coded the index SortedValue to ASCENDING, which
could misreport sortedness (e.g. the polars read path would set_sorted() on an
unsorted column). Arrow input now stores UNKNOWN by default; when a sorted index
is required it walks the index column once (O(n)) to determine the actual sort
order, mirroring how pandas computes it during normalization.
- New SortednessScan{SKIP, SCAN_IF_UNKNOWN} threaded through py_ndf_to_frame /
record_batches_to_frame / InputFrame::set_segment with no default, so every
call site is explicit: SCAN_IF_UNKNOWN for write/append/update/stage and the
batch_write/batch_append/batch_update variants; SKIP for read-from-file,
partitioned, composite, merge and pandas-tuple paths.
- The walk sets the descriptor's SortedValue, so it both validates the index and
persists the sort order, which lets the polars set_sorted() flag work for Arrow.
- Fixes batch_update silently skipping the sortedness check.
- Docs: drop the stale "no checks are performed for Arrow input data" note.
- Tests: validate_index x sorted/unsorted across write/append/update/stage and
the batch variants, plus the polars flag being set only when validation ran.
Cost of the walk relative to the write (release build, single timestamp index):
In-memory (zero-copy write, so this is the worst case for relative overhead):
| rows | batches | no-verify | verify | overhead |
|------|---------|-----------|----------|----------|
| 1M | 1 | 3.36 ms | 4.49 ms | +34% |
| 1M | 100 | 4.12 ms | 5.30 ms | +29% |
| 10M | 1 | 5.53 ms | 15.87 ms | +187% |
| 10M | 100 | 6.07 ms | 16.16 ms | +166% |
Real pure-S3 (network-bound, parallel segment encode/write):
| library | rows | no-verify | verify | overhead |
|------------------|------|------------|------------|----------|
| default segment | 1M | 81.7 ms | 82.6 ms | +1.1% |
| default segment | 10M | 214.2 ms | 226.1 ms | +5.6% |
| 5000-row segment | 1M | 329.9 ms | 346.2 ms | +1.8% |
| 5000-row segment | 10M | 1699.9 ms | 1726.8 ms | +1.6% |
The walk is only paid when validate_index requests it; on real storage it is a
low single-digit percentage of the write, and more segment parallelism dilutes
it further rather than amplifying it.
| norm, | ||
| user_meta, | ||
| cfg().write_options().empty_types(), | ||
| sortedness_scan_for(validate_index) |
There was a problem hiding this comment.
Recording UNKNOWN for non-validated Arrow writes (the SortednessScan::SKIP default) interacts badly with append's existing-data check. sorted_data_check_append (version_core.cpp:144-148) requires the existing data's tsd().sorted() == SortedValue::ASCENDING strictly — unlike check_update_data_is_sorted (version_core.cpp:433-434), which accepts ASCENDING || UNKNOWN.
So this sequence now raises a spurious UnsortedDataException on genuinely-sorted data:
lib.write("sym", sorted_arrow_table, index_column=True) # validate_index=False -> descriptor UNKNOWN
lib.append("sym", more_sorted_arrow, index_column=True, validate_index=True) # "existing data must be sorted" -> raises
The equivalent pandas workflow succeeds (pandas records ASCENDING even without validate_index), and the equivalent update succeeds (UNKNOWN existing allowed). The existing append tests don't catch this because they seed the symbol with validate_index=True.
Please either (a) seed the append/update tests with a non-validated initial write to cover this case, and (b) align sorted_data_check_append to accept UNKNOWN existing data like the update path does — or confirm this stricter Arrow behaviour is intended and document the pandas/Arrow divergence.
Reference Issues/PRs
Monday ticket ref: 10659398340
What does this implement or fix?
When reading data from ArcticDB with
output_format="polars", the resulting Polars DataFrame had no sorted flag set on the index column, even though ArcticDB tracks sort order internally viaSortedValuein the C++StreamDescriptor. This means Polars couldn't take advantage of optimized operations (e.g., fast joins, binary search filtering) that rely on knowing a column is sorted.Changes
C++ layer — propagate
SortedValuethrough the read path:cpp/arcticdb/entity/read_result.hpp— Added aSortedValue sortedfield to bothNodeReadResultandReadResult, defaulting toUNKNOWN.cpp/arcticdb/pipeline/pipeline_utils.hpp— Increate_python_read_result, extracts the sorted value from the stream descriptor and passes it intoNodeReadResultand the top-levelReadResult.cpp/arcticdb/python/adapt_read_dataframe.hpp— Includessorted(cast toint) in the tuple returned to Python fromadapt_read_df.cpp/arcticdb/python/python_utils.hpp— Includessortedin the tuples produced bynode_results_to_python_listandadapt_read_dfs.Python layer — receive sorted value and apply Polars flag:
python/arcticdb/version_store/read_result.py—ReadResultandNodeReadResultnow accept and store asortedparameter (converted from int toSortedValueenum).python/arcticdb/version_store/_store.py:_get_index_col_from_norm()helper to extract the index column name from normalization metadata (handles pandas DataFrames, Series, and experimental Arrow formats)._apply_polars_sorted_flag_to_index()static method that callspolars.col(...).set_sorted()on the index column when the stored sort order isASCENDINGorDESCENDING._adapt_read_res()so it runs automatically on every read.Tests:
python/tests/unit/arcticdb/version_store/test_polars_set_sorted.py— New test file with 6 tests covering:DatetimeIndex→__index__column getsSORTED_ASCDatetimeIndex→ named column getsSORTED_ASCRangeIndex(not physically stored) → no sorted flag setBenchmark Results
Before vs After (stock ArcticDB 6.13.0 vs branch dev build)
The benchmark script reads 1M rows as Polars and measures various operations. On stock ArcticDB, the index has no sorted flag (
SORTED_ASC: False). With the branch's dev build, the sorted flag is automatically applied (SORTED_ASC: True).Comparing stock (6.13.0) vs branch (dev) on key index operations:
The branch delivers significant speedups on
sort_by_index(6.2x),unique_index(5.4x), andupsample(3x). These are operations where Polars can skip work entirely when it knows the column is sorted.Branch dev build: detailed benchmark (Polars 1.35.2, 1M rows, 10 iters)
Note: In the branch build, "Current (no flag)" already has
SORTED_ASC: Trueon the index (the branch's feature), so applyingset_sorted()again is a no-op. This confirms the feature is working automatically.Part 2: Value Columns — Future Work
These measure the potential benefit of also setting
set_sorted()on value columns, which would require populatingFieldStats.sorted_in C++. Not implemented in this branch.Overhead
The
set_sorted()call adds negligible overhead (~0.3 ms) relative to the read cost (~399 ms debug build):set_sorted(index only)set_sorted(index + 2 cols)