Skip to content

Conversation

@parmesant
Copy link
Contributor

@parmesant parmesant commented Dec 14, 2025

Use ParquetExec's bytes_scanned metric instead of scanning manually

Implemented the same in case of streaming by wrapping our execution plan with a monitor

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • Refactor
    • Streaming and non‑streaming query paths now instrument per‑partition streams and emit post‑execution billing metrics (including bytes scanned); streaming results are returned as merged streaming output.
  • Chores
    • Billing simplified: per‑file counts continue; compressed byte‑size is no longer tracked or charged.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 14, 2025

Walkthrough

Replaces the previous non-streaming/streaming return with an Either of collected batches or a pinned RecordBatchStreamAdapter that wraps per-partition streams in a public PartitionedMetricMonitor; adds metric collection (get_total_bytes_scanned) on completion/error. Removes compressed-size billing in the stream schema provider.

Changes

Cohort / File(s) Summary
Streaming wrapper & metrics
src/query/mod.rs
Adds public PartitionedMetricMonitor and private MonitorState; changes execute signatures to return Either<Vec<RecordBatch>, Pin<Box<RecordBatchStreamAdapter<...>>>>; wraps partition streams with PartitionedMetricMonitor; implements Stream and RecordBatchStream; adds get_total_bytes_scanned and invokes billing on completion/error; updates imports (SelectAll, RecordBatchStreamAdapter, Pin, AtomicUsize, Poll).
Billing simplification (stream schema)
src/query/stream_schema_provider.rs
Removes compressed-size metric imports and accumulation; drops increment_bytes_scanned_in_query_by_date calls for compressed bytes, preserving file-count billing (increment_files_scanned_in_query_by_date).

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Client
    participant QueryExec as Query Executor
    participant ExecPlan as ExecutionPlan
    participant PartStream as Partition Stream
    participant Monitor as PartitionedMetricMonitor
    participant Merge as SelectAll / RecordBatchStreamAdapter
    participant Metrics

    Client->>QueryExec: execute(query, is_streaming)
    alt streaming
        QueryExec->>PartStream: execute_stream_partitioned(...)
        QueryExec->>Monitor: PartitionedMetricMonitor::new(part_stream, plan, state)
        QueryExec->>Merge: select_all(monitored_streams) -> RecordBatchStreamAdapter
        QueryExec-->>Client: return streaming RecordBatchStreamAdapter
        Client->>Merge: poll_next()
        Merge->>Monitor: poll_next() (per partition)
        Monitor->>PartStream: poll_next()
        PartStream-->>Monitor: RecordBatch / End / Error
        Monitor-->>Merge: forward batch / end / error
        alt final end or error
            Monitor->>ExecPlan: get_total_bytes_scanned(plan)
            ExecPlan-->>Monitor: total_bytes
            Monitor->>Metrics: increment_bytes_scanned_in_query_by_date(date, total)
        end
    else non-streaming
        QueryExec->>ExecPlan: collect_partitioned(...)
        QueryExec->>ExecPlan: get_total_bytes_scanned(plan)
        QueryExec->>Metrics: increment_bytes_scanned_in_query_by_date(date, total)
        QueryExec-->>Client: return Vec<RecordBatch>
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Inspect PartitionedMetricMonitor::poll_next for correct wake/poll semantics, error propagation, and finalization hooks.
  • Verify safe use of Arc<dyn ExecutionPlan> in get_total_bytes_scanned and across async/stream boundaries.
  • Confirm construction, pinning and type-erasure of the merged SelectAll -> RecordBatchStreamAdapter.
  • Ensure removed compressed-size billing left no unused imports/variables.

Possibly related PRs

Suggested reviewers

  • nikhilsinhaparseable
  • nitisht

Poem

🐇 I hop through partitions, one by one,
I watch the streams until the tally's done,
When final batches fade or errors chime,
I count the bytes and mark the time,
Files still counted — compressed rests for now. 🥕

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The description identifies the core goal but lacks specifics: no issue reference is provided (uses placeholder #XXXX), and all three checklist items remain unchecked, indicating incomplete testing, documentation, and comments. Provide a valid issue number in 'Fixes #XXXX', complete testing and add evidence, explain key changes more thoroughly, and mark checklist items as done or document what remains.
Docstring Coverage ⚠️ Warning Docstring coverage is 70.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix query scan metrics' is concise and directly reflects the main change: replacing manual scanning with ParquetExec's built-in metrics.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/query/mod.rs (1)

806-835: Consider guarding against potential double metric increment.

If poll_next is called after the stream returns None (which shouldn't happen with well-behaved consumers, but streams aren't required to be fused), get_total_bytes_scanned would be called and metrics incremented again.

Consider adding a done flag to track completion:

 pub struct MetricMonitorStream {
     inner: SendableRecordBatchStream,
     plan: Arc<dyn ExecutionPlan>,
+    metrics_recorded: bool,
 }

 impl MetricMonitorStream {
     pub fn new(inner: SendableRecordBatchStream, plan: Arc<dyn ExecutionPlan>) -> Self {
-        Self { inner, plan }
+        Self { inner, plan, metrics_recorded: false }
     }
 }

 impl Stream for MetricMonitorStream {
     type Item = datafusion::error::Result<RecordBatch>;

     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
         let poll = self.inner.as_mut().poll_next(cx);

         match &poll {
-            Poll::Ready(None) => {
+            Poll::Ready(None) if !self.metrics_recorded => {
+                self.metrics_recorded = true;
                 let bytes = get_total_bytes_scanned(&self.plan);
                 // ...
             }
-            Poll::Ready(Some(Err(e))) => {
+            Poll::Ready(Some(Err(e))) if !self.metrics_recorded => {
+                self.metrics_recorded = true;
                 // ...
             }
             _ => {}
         }

         poll
     }
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 13b0568 and 923fc8d.

📒 Files selected for processing (2)
  • src/query/mod.rs (7 hunks)
  • src/query/stream_schema_provider.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/s3.rs:837-844
Timestamp: 2025-09-25T07:13:58.909Z
Learning: In the Parseable codebase, the existing pattern of placing metrics calls after the `?` operator in S3 list_hours method is correct and should not be changed. The metrics are properly recorded only on successful operations.
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/query/stream_schema_provider.rs
  • src/query/mod.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/query/stream_schema_provider.rs
  • src/query/mod.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/query/stream_schema_provider.rs
📚 Learning: 2025-08-18T19:10:11.941Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/handlers/http/ingest.rs:163-164
Timestamp: 2025-08-18T19:10:11.941Z
Learning: Field statistics calculation in src/storage/field_stats.rs uses None for the time_partition parameter when calling flatten_and_push_logs(), as field stats generation does not require time partition functionality.

Applied to files:

  • src/query/stream_schema_provider.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/query/stream_schema_provider.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/query/mod.rs
🧬 Code graph analysis (2)
src/query/stream_schema_provider.rs (1)
src/metrics/mod.rs (1)
  • increment_files_scanned_in_query_by_date (557-561)
src/query/mod.rs (1)
src/metrics/mod.rs (1)
  • increment_bytes_scanned_in_query_by_date (563-567)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
🔇 Additional comments (3)
src/query/stream_schema_provider.rs (1)

59-59: LGTM! Clean separation of metrics responsibilities.

The removal of increment_bytes_scanned_in_query_by_date here aligns with the PR objective to use ParquetExec's bytes_scanned metric instead of manual calculation. File count tracking remains appropriately in partitioned_files().

src/query/mod.rs (2)

210-235: LGTM! Clean metric tracking for both streaming and non-streaming paths.

The implementation correctly:

  • Creates the physical plan upfront
  • For non-streaming: collects results, then reads bytes_scanned from the plan
  • For streaming: wraps in MetricMonitorStream to defer metric collection until stream completion

326-342: LGTM! Correct recursive metric aggregation.

The helper correctly traverses the entire execution plan tree and sums bytes_scanned from all nodes. The metric key "bytes_scanned" is the standard used by ParquetExec.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 923fc8d and 0495e8a.

📒 Files selected for processing (2)
  • src/query/mod.rs (7 hunks)
  • src/query/stream_schema_provider.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/s3.rs:837-844
Timestamp: 2025-09-25T07:13:58.909Z
Learning: In the Parseable codebase, the existing pattern of placing metrics calls after the `?` operator in S3 list_hours method is correct and should not be changed. The metrics are properly recorded only on successful operations.
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/query/stream_schema_provider.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/query/stream_schema_provider.rs
  • src/query/mod.rs
📚 Learning: 2025-08-18T19:10:11.941Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/handlers/http/ingest.rs:163-164
Timestamp: 2025-08-18T19:10:11.941Z
Learning: Field statistics calculation in src/storage/field_stats.rs uses None for the time_partition parameter when calling flatten_and_push_logs(), as field stats generation does not require time partition functionality.

Applied to files:

  • src/query/stream_schema_provider.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/query/stream_schema_provider.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/query/stream_schema_provider.rs
  • src/query/mod.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/query/mod.rs
🧬 Code graph analysis (2)
src/query/stream_schema_provider.rs (1)
src/metrics/mod.rs (1)
  • increment_files_scanned_in_query_by_date (563-567)
src/query/mod.rs (1)
src/metrics/mod.rs (1)
  • increment_bytes_scanned_in_query_by_date (569-573)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: coverage
🔇 Additional comments (2)
src/query/stream_schema_provider.rs (2)

59-60: No action needed. The current design is correct—TOTAL_BYTES_SCANNED_IN_QUERY_BY_DATE is intentionally centralized in src/query/mod.rs and emitted during execution (using ParquetExec's metrics), while stream_schema_provider.rs only emits files_scanned at the plan-building stage. This separation is appropriate because plan-building cannot predict actual bytes read (which depends on runtime filtering and execution context). The removal of bytes-scanned metrics from the imports reflects a deliberate architectural fix to measure actual I/O instead of file-size estimates.


341-427: Metric emitted at plan construction, not execution: move increment_files_scanned_in_query_by_date() to execution-time.

The metric is labeled "billing metrics for files scanned" but is currently emitted during query plan construction (line 424), not after files are actually read/scanned. Per existing patterns in the codebase, metrics should only be recorded when operations succeed. Consider moving this call to the execution layer (e.g., after create_parquet_physical_plan() completes or in the execution plan itself) to ensure it counts only files that are actually processed.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/query/mod.rs (1)

388-404: The unsafe truncation issue from the previous review was not addressed.

Line 395 still uses scanned.as_usize() as u64, which will silently truncate on 32-bit systems. The previous review comment (lines 388-404) explicitly requested direct pattern matching on the ScalarValue::UInt64 variant.

Apply the recommended fix from the previous review:

     if let Some(metrics) = plan.metrics() {
         // "bytes_scanned" is the standard key used by ParquetExec
-        if let Some(scanned) = metrics.sum_by_name("bytes_scanned") {
-            total_bytes += scanned.as_usize() as u64;
-        }
+        if let Some(datafusion::common::ScalarValue::UInt64(Some(v))) = 
+            metrics.sum_by_name("bytes_scanned") 
+        {
+            total_bytes += v;
+        }
     }
🧹 Nitpick comments (1)
src/query/mod.rs (1)

88-117: Simplify the public API surface by abstracting the return type.

The return type exposes deeply nested implementation details (Pin<Box<RecordBatchStreamAdapter<select_all::SelectAll<...>>>>), making the API brittle and difficult to evolve. Consider using a type alias or returning a trait object:

// Option 1: Type alias
type QueryResultStream = Pin<Box<dyn RecordBatchStream + Send>>;

pub async fn execute(
    query: Query,
    is_streaming: bool,
) -> Result<(Either<Vec<RecordBatch>, QueryResultStream>, Vec<String>), ExecuteError>

Or simplify further by boxing the complex stream:

// In the streaming path (line 297)
Either::Right(Box::pin(final_stream) as Pin<Box<dyn RecordBatchStream + Send>>)

This prevents implementation details from leaking into the public API.

Also applies to: 209-237

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0495e8a and 78a918c.

📒 Files selected for processing (2)
  • src/query/mod.rs (7 hunks)
  • src/query/stream_schema_provider.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/query/stream_schema_provider.rs
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/s3.rs:837-844
Timestamp: 2025-09-25T07:13:58.909Z
Learning: In the Parseable codebase, the existing pattern of placing metrics calls after the `?` operator in S3 list_hours method is correct and should not be changed. The metrics are properly recorded only on successful operations.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/gcs.rs:625-633
Timestamp: 2025-09-25T07:12:40.189Z
Learning: In Parseable's object storage metrics system, metrics should only be captured when API calls succeed, not when they error out. The current pattern of calling increment_object_store_calls_by_date and related metrics functions after the `?` operator is the correct approach.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/gcs.rs:674-681
Timestamp: 2025-09-25T07:13:04.112Z
Learning: In the Parseable codebase, object store metrics should only be captured for successful operations. Metric recording calls should be placed after `await?` operators to ensure they only execute when operations succeed, not when they fail.
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-09-18T09:59:20.177Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:700-756
Timestamp: 2025-09-18T09:59:20.177Z
Learning: In src/event/mod.rs, the parsed_timestamp used in increment_events_ingested_by_date() is correctly UTC-normalized: for dynamic streams it remains Utc::now(), and for streams with time partition enabled it uses the time partition value. Both cases result in proper UTC date strings for metrics labeling, preventing double-counting issues.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-09-25T07:12:27.407Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-09-25T07:13:04.112Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/gcs.rs:674-681
Timestamp: 2025-09-25T07:13:04.112Z
Learning: In the Parseable codebase, object store metrics should only be captured for successful operations. Metric recording calls should be placed after `await?` operators to ensure they only execute when operations succeed, not when they fail.

Applied to files:

  • src/query/mod.rs
🧬 Code graph analysis (1)
src/query/mod.rs (1)
src/metrics/mod.rs (1)
  • increment_bytes_scanned_in_query_by_date (569-573)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
🔇 Additional comments (3)
src/query/mod.rs (3)

255-298: Excellent fix for multi-partition execution!

The code now correctly handles all partitions:

  • Non-streaming: collect_partitioned materializes all partitions and flattens them
  • Streaming: execute_stream_partitioned wraps each partition with metrics monitoring

This addresses the critical data loss issue flagged in the previous review where only partition 0 was being executed.


854-858: Clean coordination design for multi-partition metrics.

The MonitorState struct effectively coordinates metric emission across partitions using AtomicUsize. When the last partition completes, metrics are recorded exactly once.


860-928: Correct implementation addresses all previous review concerns.

The PartitionedMetricMonitor implementation properly addresses the issues from the previous review:

  1. Emit-once guard: The is_finished flag (line 867) prevents re-emission if polled after completion/error.
  2. Atomic coordination: fetch_sub with SeqCst ordering (line 920) ensures exactly one partition emits the aggregated metrics.
  3. Error handling: Errors trigger metric emission before propagation (lines 896-900).

The implementation correctly handles concurrent partition completion.

Use ParquetExec's `bytes_scanned` metric instead of scanning manually

Implemented the same in case of streaming by wrapping our execution plan with a monitor
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/query/mod.rs (2)

88-122: Consider a type alias for the complex return type.

The deeply nested Either type with Pin<Box<RecordBatchStreamAdapter<...>>> makes the signature difficult to read. A type alias would improve maintainability:

type QueryStream = Pin<Box<RecordBatchStreamAdapter<select_all::SelectAll<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, datafusion::error::DataFusionError>> + Send>>>>>>;

pub async fn execute(
    query: Query,
    is_streaming: bool,
) -> Result<(Either<Vec<RecordBatch>, QueryStream>, Vec<String>), ExecuteError>

907-909: Consider preserving the inner stream's size hint.

Returning a hardcoded (0, None) discards potentially useful size information from the inner stream. Delegating to the inner stream would provide better hints for downstream allocation:

     fn size_hint(&self) -> (usize, Option<usize>) {
-        (0, None)
+        self.inner.size_hint()
     }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 78a918c and 91d8088.

📒 Files selected for processing (2)
  • src/query/mod.rs (7 hunks)
  • src/query/stream_schema_provider.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/query/stream_schema_provider.rs
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/s3.rs:837-844
Timestamp: 2025-09-25T07:13:58.909Z
Learning: In the Parseable codebase, the existing pattern of placing metrics calls after the `?` operator in S3 list_hours method is correct and should not be changed. The metrics are properly recorded only on successful operations.
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-09-25T07:12:27.407Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-09-25T07:13:04.112Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/gcs.rs:674-681
Timestamp: 2025-09-25T07:13:04.112Z
Learning: In the Parseable codebase, object store metrics should only be captured for successful operations. Metric recording calls should be placed after `await?` operators to ensure they only execute when operations succeed, not when they fail.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-09-18T09:59:20.177Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:700-756
Timestamp: 2025-09-18T09:59:20.177Z
Learning: In src/event/mod.rs, the parsed_timestamp used in increment_events_ingested_by_date() is correctly UTC-normalized: for dynamic streams it remains Utc::now(), and for streams with time partition enabled it uses the time partition value. Both cases result in proper UTC date strings for metrics labeling, preventing double-counting issues.

Applied to files:

  • src/query/mod.rs
🧬 Code graph analysis (1)
src/query/mod.rs (1)
src/metrics/mod.rs (1)
  • increment_bytes_scanned_in_query_by_date (569-573)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
🔇 Additional comments (5)
src/query/mod.rs (5)

23-56: LGTM!

The new imports properly support the streaming metric monitoring functionality added in this PR.


260-275: LGTM!

The non-streaming path correctly uses collect_partitioned to execute all partitions and properly records bytes scanned metrics after successful collection.


276-298: LGTM!

The streaming path correctly executes all partitions using execute_stream_partitioned and implements proper metric collection through the PartitionedMetricMonitor wrapper with atomic counting to ensure metrics are recorded exactly once.


388-404: The as_usize() as u64 cast at line 395 could silently truncate on 32-bit systems when bytes scanned exceeds 4GB. Consider whether DataFusion 51's metrics API provides an alternative extraction method that preserves the full u64 value directly, or handle the conversion more defensively to prevent data loss.


896-900: Recording metrics on error may violate established patterns.

Lines 896-900 record bytes scanned metrics even when the stream fails with an error. Based on learnings, object store metrics should only be recorded for successful operations, not failures. Recording metrics on error could lead to:

  1. Incorrect billing for failed queries that should be retried
  2. Inconsistency with the established metric recording pattern in the codebase

Consider removing the metric recording from the error path or clearly documenting why failed queries should be billed.

Based on learnings: "object store metrics should only be recorded for successful operations, not for failed attempts."

🔎 Proposed fix
             Poll::Ready(Some(Err(e))) => {
                 tracing::error!("Stream Failed with error: {}", e);
                 self.is_finished = true;
-                self.check_if_last_stream();
+                // Don't record metrics on error - only successful operations should be billed
             }
⛔ Skipped due to learnings
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/gcs.rs:674-681
Timestamp: 2025-09-25T07:13:04.112Z
Learning: In the Parseable codebase, object store metrics should only be captured for successful operations. Metric recording calls should be placed after `await?` operators to ensure they only execute when operations succeed, not when they fail.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/gcs.rs:625-633
Timestamp: 2025-09-25T07:12:40.189Z
Learning: In Parseable's object storage metrics system, metrics should only be captured when API calls succeed, not when they error out. The current pattern of calling increment_object_store_calls_by_date and related metrics functions after the `?` operator is the correct approach.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/s3.rs:837-844
Timestamp: 2025-09-25T07:13:58.909Z
Learning: In the Parseable codebase, the existing pattern of placing metrics calls after the `?` operator in S3 list_hours method is correct and should not be changed. The metrics are properly recorded only on successful operations.

@nitisht nitisht merged commit 651a8dd into parseablehq:main Dec 19, 2025
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants