Skip to content

Conversation

@parmesant
Copy link
Contributor

@parmesant parmesant commented Dec 15, 2025

Add backend for SSE

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Real-time Server-Sent Events (SSE) broadcasting with session-aware client registration.
    • Targeted or broadcast delivery of event payloads (Alert, ControlPlane, Consent) with severity levels (Info, Warn, Error).
    • Background ping loop that prunes stale connections and exposes active-session enumeration.
  • Chores

    • Added actix-web-lab dependency and updated tokio-stream to 0.1.17.

✏️ Tip: You can customize this high-level summary in your review settings.

Add backend for SSE
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 15, 2025

Walkthrough

Adds a new SSE broadcasting module with session-aware client registration, message types, broadcast and periodic ping-based pruning; exports it via pub mod sse; integrates SSE broadcasting into alerts for authorized active sessions; and updates Cargo dependencies (actix-web-lab added, tokio-stream bumped).

Changes

Cohort / File(s) Summary
Dependency updates
Cargo.toml
Added actix-web-lab = "0.24.3"; upgraded tokio-stream from "0.1" to "0.1.17" (retains ["fs"] feature).
Public API export
src/lib.rs
Exported new public module: pub mod sse.
SSE broadcasting system
src/sse/mod.rs
New SSE module: Broadcaster (Arc + async RwLock map from Ulid session → client sender lists), Broadcaster::create, new_client, fetch_sessions, broadcast, ping loop (10s) to ping and prune stale clients, register_sse_client Actix handler, SSE payload types (SSEEvent, Message, Criticality, SSEAlertInfo, ControlPlaneEvent, Consent), and static SSE_HANDLER (Lazy<Arc<Broadcaster>>).
Alert integration
src/alerts/mod.rs
Imports sse::{SSE_HANDLER, SSEAlertInfo, SSEEvent} and rbac::map::{SessionKey, sessions}; uses context.message.clone_from(&message); gathers active sessions, filters by authorization, and broadcasts SSE alert payloads to matching sessions when state is Triggered.
Session API extension
src/rbac/map.rs
Added pub fn get_active_sessions(&self) -> Vec<SessionKey>; imported itertools::Itertools to use collect_vec.

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant ActixHandler as Actix Handler
    participant Sessions as Sessions Map
    participant Broadcaster
    participant SSEStream as SSE Stream
    participant Alerts as Alerts module

    Client->>ActixHandler: HTTP /sse request (with session)
    ActixHandler->>Sessions: validate/extract session
    Sessions-->>ActixHandler: session (Ulid)
    ActixHandler->>Broadcaster: register_sse_client(session)
    Broadcaster->>Broadcaster: create channel, store sender under session
    Broadcaster-->>SSEStream: return ReceiverStream as Sse
    SSEStream-->>Client: SSE connection established

    Note over Broadcaster: periodic ping loop (every 10s) prunes stale senders

    Alerts->>Sessions: get_active_sessions()
    Sessions-->>Alerts: list of session keys
    Alerts->>Broadcaster: broadcast(alert_payload, target_sessions)
    Broadcaster->>SSEStream: send events to matching client senders
    SSEStream-->>Client: event arrives

    alt Unresponsive client
        Broadcaster->>Broadcaster: ping fails -> remove stale sender
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Inspect concurrency and ownership around Arc + async RwLock + channel senders (src/sse/mod.rs).
  • Verify ping/prune correctness, ping payload handling, and cleanup timing.
  • Review Actix handler register_sse_client for session extraction and error handling (uses unwraps).
  • Confirm serialization (Serialize/Deserialize) and camelCase renaming for SSE payload types.
  • Check rbac::map::get_active_sessions usage and itertools import.

Poem

🐰 I nibble code and spin the stream,

I ping the night and prune the dream,
Sessions hop and messages beam,
Alerts flutter on a silver seam,
A rabbit hums — the SSE team.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is incomplete with placeholder text 'Fixes #XXXX', empty sections, and all checklist items unchecked; it lacks details about goals, solution rationale, and key changes. Complete the description by filling in the issue reference, explaining the SSE implementation goal, describing the Broadcaster and client registration architecture, and checking appropriate checklist items.
Title check ❓ Inconclusive The title 'feat: SSE' is vague and generic, using a non-specific abbreviation without context about what SSE implementation or feature this adds. Use a more descriptive title like 'feat: add SSE broadcasting system for session alerts' to clearly communicate the primary change.
✅ Passed checks (1 passed)
Check name Status Explanation
Docstring Coverage ✅ Passed Docstring coverage is 80.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
src/sse/mod.rs (1)

56-76: Consider cleaning up empty sessions.

When all clients for a session disconnect, the session key remains in the map with an empty Vec. This can lead to accumulation of stale session entries over time.

Apply this diff to filter out empty sessions:

         for (session, clients) in sse_inner.iter() {
             let mut ok_clients = Vec::new();
             for client in clients {
                 if client
                     .send(sse::Event::Comment("ping".into()))
                     .await
                     .is_ok()
                 {
                     ok_clients.push(client.clone())
                 }
             }
-            ok_sessions.insert(*session, ok_clients);
+            if !ok_clients.is_empty() {
+                ok_sessions.insert(*session, ok_clients);
+            }
         }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5c68c3f and 6ce3508.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • Cargo.toml (2 hunks)
  • src/lib.rs (1 hunks)
  • src/sse/mod.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/lib.rs
🧬 Code graph analysis (1)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
  • extract_session_key_from_req (51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
🔇 Additional comments (4)
src/lib.rs (1)

45-45: LGTM!

The new sse module is correctly exposed and follows the existing alphabetical ordering convention.

Cargo.toml (2)

87-87: LGTM!

Pinning tokio-stream to 0.1.17 for reproducibility is reasonable.


33-33: The actix-web-lab version 0.24.3 is the latest stable release and is compatible with actix-web 4.9.0. No changes needed.

src/sse/mod.rs (1)

20-40: LGTM!

The Broadcaster design is appropriate: Arc for shared ownership across handlers, RwLock for async read-write access, and spawning a cleanup loop on creation.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
src/sse/mod.rs (3)

89-93: Fix race condition with double write lock acquisition.

This issue was flagged in a previous review and remains unaddressed. The code acquires a write lock, checks if the session exists, drops the lock, then acquires another write lock in the else branch. A concurrent call could insert between these acquisitions.

Use the Entry API:

-        if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
-            clients.push(tx);
-        } else {
-            self.inner.write().await.clients.insert(*session, vec![tx]);
-        }
+        self.inner
+            .write()
+            .await
+            .clients
+            .entry(*session)
+            .or_default()
+            .push(tx);

141-151: Replace panics with proper error handling.

This issue was flagged in a previous review and remains unaddressed. The unwrap() on line 145 and unreachable!() on line 148 will panic on unauthenticated requests or BasicAuth usage. HTTP handlers should return appropriate error responses instead:

 pub async fn register_sse_client(
     broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
     req: HttpRequest,
-) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
-    let session = extract_session_key_from_req(&req).unwrap();
-    let sessionid = match session {
-        SessionKey::SessionId(ulid) => ulid,
-        _ => unreachable!("SSE requires a valid session. Unable to register client."),
-    };
-    broadcaster.new_client(&sessionid).await
+) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> {
+    let session = extract_session_key_from_req(&req)?;
+    let sessionid = match session {
+        SessionKey::SessionId(ulid) => ulid,
+        SessionKey::BasicAuth { .. } => {
+            return Err(actix_web::error::ErrorBadRequest(
+                "SSE requires session-based authentication, not BasicAuth",
+            ));
+        }
+    };
+    Ok(broadcaster.new_client(&sessionid).await)
 }

184-188: ControlPlaneEvent.message field is still private.

A previous review noted that private fields prevent external instantiation. While SSEAlertInfo fields were made public, ControlPlaneEvent.message remains private:

 pub struct ControlPlaneEvent {
-    message: String,
+    pub message: String,
 }
🧹 Nitpick comments (1)
src/sse/mod.rs (1)

57-78: Consider avoiding full HashMap clone in remove_stale_clients.

Cloning the entire clients HashMap on line 59 could become expensive with many connected clients. Consider iterating with the write lock held and mutating in place, or using retain semantics:

     async fn remove_stale_clients(&self) {
-        let sse_inner = self.inner.read().await.clients.clone();
-
-        let mut ok_sessions = HashMap::new();
-
-        for (session, clients) in sse_inner.iter() {
-            let mut ok_clients = Vec::new();
-            for client in clients {
-                if client
-                    .send(sse::Event::Comment("ping".into()))
-                    .await
-                    .is_ok()
-                {
-                    ok_clients.push(client.clone())
+        let mut guard = self.inner.write().await;
+        for clients in guard.clients.values_mut() {
+            let mut ok_clients = Vec::with_capacity(clients.len());
+            for client in clients.drain(..) {
+                if client.send(sse::Event::Comment("ping".into())).await.is_ok() {
+                    ok_clients.push(client);
                 }
             }
-            ok_sessions.insert(*session, ok_clients);
+            *clients = ok_clients;
         }
-
-        self.inner.write().await.clients = ok_sessions;
+        guard.clients.retain(|_, v| !v.is_empty());
     }

This avoids cloning and removes empty session entries.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6ce3508 and 9e6f085.

📒 Files selected for processing (3)
  • src/alerts/mod.rs (2 hunks)
  • src/rbac/map.rs (2 hunks)
  • src/sse/mod.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-07-24T11:09:21.781Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1388
File: src/alerts/mod.rs:88-104
Timestamp: 2025-07-24T11:09:21.781Z
Learning: In the Parseable alert system (src/alerts/mod.rs), alert versions are server-generated and controlled via CURRENT_ALERTS_VERSION constant, not user input. The AlertVerison enum's From<&str> implementation correctly defaults unknown versions to V2 since the server only generates known versions (v1, v2). Unknown versions would only occur in exceptional cases like file corruption, making the current fallback approach appropriate.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/alerts/mod.rs
🧬 Code graph analysis (2)
src/alerts/mod.rs (2)
src/rbac/map.rs (1)
  • sessions (92-98)
src/utils/mod.rs (1)
  • user_auth_for_query (78-87)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
  • extract_session_key_from_req (51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
🔇 Additional comments (2)
src/rbac/map.rs (1)

172-174: LGTM!

The get_active_sessions method cleanly exposes the session keys needed for SSE broadcasting. The implementation correctly clones keys to avoid holding the lock longer than necessary.

src/alerts/mod.rs (1)

617-636: SSE broadcast integration looks good.

The authorization check correctly filters sessions to only broadcast alerts to users who have access to the alert's query. The silent skip on serialization failure (line 629) is acceptable since SSEAlertInfo serialization is unlikely to fail.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/alerts/mod.rs (3)

610-610: Good optimization using clone_from.

Using clone_from instead of direct assignment with clone() can be more efficient as it may reuse existing allocations.


618-627: Consider parallelizing authorization checks for better performance.

The code sequentially iterates through all active sessions and performs async authorization checks. If there are many active sessions, this could introduce noticeable latency in the alert notification path.

Consider parallelizing the authorization checks similar to the pattern used in list_alerts_for_user (lines 1137-1155):

-        let active_sessions = sessions().get_active_sessions();
-        let mut broadcast_to = vec![];
-        for session in active_sessions {
-            if user_auth_for_query(&session, &self.query).await.is_ok()
-                && let SessionKey::SessionId(id) = &session
-            {
-                broadcast_to.push(*id);
-            }
-        }
+        let active_sessions = sessions().get_active_sessions();
+        let auth_futures: Vec<_> = active_sessions
+            .into_iter()
+            .map(|session| async move {
+                if user_auth_for_query(&session, &self.query).await.is_ok() {
+                    if let SessionKey::SessionId(id) = session {
+                        Some(id)
+                    } else {
+                        None
+                    }
+                } else {
+                    None
+                }
+            })
+            .collect();
+        let broadcast_to: Vec<_> = futures::future::join_all(auth_futures)
+            .await
+            .into_iter()
+            .flatten()
+            .collect();

629-635: Add error logging for SSE serialization and broadcast failures.

The code silently ignores serialization errors and doesn't handle potential broadcast failures. This could make debugging SSE issues difficult.

Apply this diff to add error logging:

-        if let Ok(msg) = &serde_json::to_string(&SSEAlertInfo {
+        match serde_json::to_string(&SSEAlertInfo {
             id: self.id,
             state: self.state,
             message,
-        }) {
+        }) {
+            Ok(msg) => {
-            SSE_HANDLER.broadcast(msg, Some(&broadcast_to)).await;
+                SSE_HANDLER.broadcast(&msg, Some(&broadcast_to)).await;
+            }
+            Err(e) => {
+                error!("Failed to serialize SSE alert info for alert {}: {}", self.id, e);
+            }
         }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9e6f085 and 8a35e0d.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • Cargo.toml (2 hunks)
  • src/alerts/mod.rs (2 hunks)
  • src/lib.rs (1 hunks)
  • src/rbac/map.rs (2 hunks)
  • src/sse/mod.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • Cargo.toml
  • src/rbac/map.rs
  • src/sse/mod.rs
  • src/lib.rs
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-07-24T11:09:21.781Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1388
File: src/alerts/mod.rs:88-104
Timestamp: 2025-07-24T11:09:21.781Z
Learning: In the Parseable alert system (src/alerts/mod.rs), alert versions are server-generated and controlled via CURRENT_ALERTS_VERSION constant, not user input. The AlertVerison enum's From<&str> implementation correctly defaults unknown versions to V2 since the server only generates known versions (v1, v2). Unknown versions would only occur in exceptional cases like file corruption, making the current fallback approach appropriate.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/alerts/mod.rs
🧬 Code graph analysis (1)
src/alerts/mod.rs (2)
src/rbac/map.rs (1)
  • sessions (92-98)
src/utils/mod.rs (1)
  • user_auth_for_query (78-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
🔇 Additional comments (2)
src/alerts/mod.rs (2)

618-636: SSE types and handler method signatures are correct.

Verification confirms SSEAlertInfo struct has the expected fields (id: Ulid, state: AlertState, message: String) and SSE_HANDLER.broadcast(&self, msg: &str, sessions: Option<&[Ulid]>) accepts the correct parameter types. The code usage is valid.


65-66: Imports are correct. Both SSE_HANDLER and SSEAlertInfo are properly exported from src/sse/mod.rs.

send alert message only if state = triggered
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (5)
src/sse/mod.rs (5)

88-88: Remove unwrap() to prevent panic on channel closure.

The unwrap() will panic if the channel is already closed, though unlikely immediately after creation. Handle this gracefully instead.

🔎 Suggested fix:
-        tx.send(sse::Data::new("connected").into()).await.unwrap();
+        let _ = tx.send(sse::Data::new("connected").into()).await;

90-94: Fix race condition with double write lock acquisition.

The code acquires a write lock, checks if the session exists, drops it, then acquires another write lock in the else branch. Between these acquisitions, another concurrent call could insert the same session, leading to lost client registrations.

🔎 Apply this fix using the Entry API:
-        if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
-            clients.push(tx);
-        } else {
-            self.inner.write().await.clients.insert(*session, vec![tx]);
-        }
+        self.inner
+            .write()
+            .await
+            .clients
+            .entry(*session)
+            .or_default()
+            .push(tx);

148-148: Replace unwrap() with proper error propagation.

The unwrap() will panic on unauthenticated requests. Since extract_session_key_from_req returns Result<SessionKey, Error>, use the ? operator to propagate the error properly.

🔎 Apply this fix:
-    let session = extract_session_key_from_req(&req).unwrap();
+    let session = extract_session_key_from_req(&req)?;

192-196: Make fields public for external instantiation.

ControlPlaneEvent has a private message field, preventing external modules from constructing instances. Make the field public or provide a constructor.

🔎 Apply this fix:
 #[derive(Serialize, Debug, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct ControlPlaneEvent {
-    message: String,
+    pub message: String,
 }

198-202: Make fields public for external instantiation.

Consent has a private given field, preventing external modules from constructing instances. Make the field public or provide a constructor.

🔎 Apply this fix:
 #[derive(Serialize, Debug, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct Consent {
-    given: bool,
+    pub given: bool,
 }
🧹 Nitpick comments (1)
src/sse/mod.rs (1)

112-143: Consider simplifying the broadcast logic.

The if let Some(sessions) and else branches have duplicated future-building code. You could filter the clients iterator once and then build futures, reducing duplication.

🔎 Example refactor:
-        let send_futures = if let Some(sessions) = sessions {
-            let mut futures = vec![];
-            for (session, clients) in clients.iter() {
-                if sessions.contains(session) {
-                    clients
-                        .iter()
-                        .for_each(|client| futures.push(client.send(sse::Data::new(msg).into())));
-                }
-            }
-            futures
-        } else {
-            // broadcast
-            let mut futures = vec![];
-            for (_, clients) in clients.iter() {
-                clients
-                    .iter()
-                    .for_each(|client| futures.push(client.send(sse::Data::new(msg).into())));
-            }
-            futures
-        };
+        let send_futures: Vec<_> = clients
+            .iter()
+            .filter(|(session, _)| sessions.map_or(true, |s| s.contains(session)))
+            .flat_map(|(_, clients)| clients.iter())
+            .map(|client| client.send(sse::Data::new(msg).into()))
+            .collect();
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8a35e0d and fa1aff6.

📒 Files selected for processing (2)
  • src/alerts/mod.rs (2 hunks)
  • src/sse/mod.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/alerts/mod.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/sse/mod.rs (2)
src/utils/actix.rs (1)
  • extract_session_key_from_req (51-71)
src/rbac/map.rs (1)
  • sessions (92-98)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (6)
src/sse/mod.rs (6)

1-31: LGTM! Well-structured initialization.

The imports, static initialization, and data structures are well-organized for a thread-safe SSE broadcaster implementation.


33-56: LGTM! Solid initialization and ping loop.

The broadcaster creation and periodic ping mechanism are implemented correctly. The 10-second interval is reasonable for SSE heartbeat.


58-79: LGTM! Efficient stale client removal.

The clone-before-ping pattern at line 60 is a good trade-off that releases the read lock before performing potentially blocking operations.


99-107: LGTM! Clean session fetching.

The implementation correctly retrieves active session IDs using itertools.


149-158: Good error handling for authentication type mismatch.

The check for SessionId vs. BasicAuth properly returns an error instead of panicking. This is a good improvement.


160-190: LGTM! Public data structures are well-defined.

SSEEvent, Message, Criticality, and SSEAlertInfo are properly defined with public fields and appropriate serde annotations.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
src/sse/mod.rs (4)

90-94: Fix race condition with double write lock acquisition.

This issue was previously flagged but remains unresolved. Between the two write lock acquisitions (line 90 and line 93), concurrent calls to new_client for the same session can race, potentially losing client registrations.

Use the Entry API to handle this atomically:

-        if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
-            clients.push(tx);
-        } else {
-            self.inner.write().await.clients.insert(*session, vec![tx]);
-        }
+        self.inner
+            .write()
+            .await
+            .clients
+            .entry(*session)
+            .or_default()
+            .push(tx);

192-196: Make message field public to enable external instantiation.

This issue was previously flagged but remains unresolved. The private message field prevents other modules from constructing ControlPlaneEvent instances.

🔎 Apply this diff to make the field public:
 #[derive(Serialize, Debug, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct ControlPlaneEvent {
-    message: String,
+    pub message: String,
 }

198-202: Make given field public to enable external instantiation.

The private given field prevents other modules from constructing Consent instances, which is needed if this type is part of the public API for broadcasting events.

🔎 Apply this diff to make the field public:
 #[derive(Serialize, Debug, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct Consent {
-    given: bool,
+    pub given: bool,
 }

145-158: Replace remaining unwrap() with proper error propagation.

While the function signature now correctly returns Result and the unreachable!() has been fixed, line 148 still contains .unwrap() which will panic if extract_session_key_from_req returns an error (e.g., no authentication provided).

🔎 Apply this diff to propagate errors properly:
-    let session = extract_session_key_from_req(&req).unwrap();
+    let session = extract_session_key_from_req(&req)?;
🧹 Nitpick comments (1)
src/sse/mod.rs (1)

58-79: Consider reducing lock contention during stale client removal.

The method clones the entire clients HashMap on line 60, which could be expensive with many concurrent sessions. While necessary to avoid holding the read lock during async pings, this approach scales linearly with client count.

For typical usage this is acceptable, but if you anticipate high client counts, consider batching or using finer-grained locking.

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fa1aff6 and 62aea32.

📒 Files selected for processing (2)
  • src/alerts/mod.rs (2 hunks)
  • src/sse/mod.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/alerts/mod.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
  • extract_session_key_from_req (51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
🔇 Additional comments (3)
src/sse/mod.rs (3)

109-143: LGTM!

The broadcast logic correctly handles both targeted and global broadcasts, properly releases the read lock before sending, and defers failure handling to the periodic ping loop.


160-183: LGTM!

The event type definitions are well-structured with proper serialization attributes and public visibility.


184-190: LGTM!

The fields are now properly marked as pub, enabling external modules to construct instances.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/sse/mod.rs (1)

147-160: Replace unwrap() with proper error propagation.

Line 150 still calls .unwrap() on extract_session_key_from_req, which will panic if authentication fails. This is a critical issue that was flagged in previous reviews but remains unresolved. Use the ? operator to propagate the error properly.

🔎 Apply this diff to fix the panic:
 pub async fn register_sse_client(
     req: HttpRequest,
 ) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> {
-    let session = extract_session_key_from_req(&req).unwrap();
+    let session = extract_session_key_from_req(&req)?;
     let sessionid = match session {
         SessionKey::SessionId(ulid) => ulid,
         _ => {
             return Err(actix_web::error::ErrorBadRequest(
                 "SSE requires session-based authentication, not BasicAuth",
             ));
         }
     };
     Ok(SSE_HANDLER.new_client(&sessionid).await)
 }
🧹 Nitpick comments (2)
src/sse/mod.rs (2)

58-79: Consider avoiding the full HashMap clone.

The current implementation clones the entire clients HashMap (line 60) before testing liveness. For better efficiency, you could acquire a write lock once and mutate in place:

🔎 View suggested refactor
 async fn remove_stale_clients(&self) {
-    let sse_inner = self.inner.read().await.clients.clone();
-
-    let mut ok_sessions = HashMap::new();
-
-    for (session, clients) in sse_inner.iter() {
-        let mut ok_clients = Vec::new();
-        for client in clients {
-            if client
-                .send(sse::Event::Comment("ping".into()))
-                .await
-                .is_ok()
-            {
-                ok_clients.push(client.clone())
-            }
-        }
-        ok_sessions.insert(*session, ok_clients);
-    }
-
-    self.inner.write().await.clients = ok_sessions;
+    let mut inner = self.inner.write().await;
+    for clients in inner.clients.values_mut() {
+        let mut i = 0;
+        while i < clients.len() {
+            if clients[i].send(sse::Event::Comment("ping".into())).await.is_err() {
+                clients.swap_remove(i);
+            } else {
+                i += 1;
+            }
+        }
+    }
+    inner.clients.retain(|_, clients| !clients.is_empty());
 }

This approach holds the write lock longer but avoids cloning the entire map. Given that ping messages are lightweight and SSE clients are typically low-volume, the performance difference may be negligible—use whichever approach fits your latency/throughput profile.


111-144: Consider reducing code duplication.

The filtered and broadcast-all branches share similar logic for building the send futures. You could refactor to eliminate the duplication:

🔎 View suggested refactor
 pub async fn broadcast(&self, msg: &str, sessions: Option<&[Ulid]>) {
     let clients = self.inner.read().await.clients.clone();
     if clients.is_empty() {
         return;
     }

-    let send_futures = if let Some(sessions) = sessions {
-        let mut futures = vec![];
-        for (session, clients) in clients.iter() {
-            if sessions.contains(session) {
-                clients
-                    .iter()
-                    .for_each(|client| futures.push(client.send(sse::Data::new(msg).into())));
-            }
-        }
-        futures
-    } else {
-        // broadcast
-        let mut futures = vec![];
-        for (_, clients) in clients.iter() {
-            clients
-                .iter()
-                .for_each(|client| futures.push(client.send(sse::Data::new(msg).into())));
-        }
-        futures
-    };
+    let send_futures: Vec<_> = clients
+        .iter()
+        .filter(|(session, _)| sessions.map_or(true, |s| s.contains(session)))
+        .flat_map(|(_, clients)| clients.iter())
+        .map(|client| client.send(sse::Data::new(msg).into()))
+        .collect();

     // try to send to all clients, ignoring failures
     // disconnected clients will get swept up by `remove_stale_clients`
     let _ = future::join_all(send_futures).await;
 }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 62aea32 and c9e14ec.

📒 Files selected for processing (2)
  • src/alerts/mod.rs (2 hunks)
  • src/sse/mod.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/alerts/mod.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/sse/mod.rs (2)
src/utils/actix.rs (1)
  • extract_session_key_from_req (51-71)
src/rbac/map.rs (1)
  • sessions (92-98)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (7)
src/sse/mod.rs (7)

1-20: LGTM!

The imports are well-organized and appropriate for an SSE broadcasting implementation with session-based client management.


21-31: LGTM!

The broadcaster architecture using Lazy<Arc<Broadcaster>> with RwLock-protected state is a solid pattern for managing concurrent SSE client connections.


33-56: LGTM!

The initialization pattern correctly spawns a background ping task to maintain SSE connections and prune stale clients.


81-99: LGTM!

The client registration correctly uses the entry().or_default() pattern to avoid race conditions, and gracefully handles the initial "connected" message send.


101-109: LGTM!

Simple and correct implementation for retrieving active session IDs.


162-176: LGTM!

The SSE event structure with public fields and proper serialization attributes is well-designed for external use.


178-184: LGTM!

The Criticality enum is clean and appropriately defined.

Comment on lines +186 to +204
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SSEAlertInfo {
pub id: Ulid,
pub state: AlertState,
pub name: String,
}

#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ControlPlaneEvent {
message: String,
}

#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Consent {
given: bool,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make remaining struct fields public.

SSEAlertInfo fields are now correctly public (addressed from previous reviews), but ControlPlaneEvent (line 197) and Consent (line 203) still have private fields. These structs cannot be constructed from outside this module, preventing their use by other parts of the codebase that need to broadcast events.

🔎 Apply this diff to make the fields public:
 #[derive(Serialize, Debug, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct ControlPlaneEvent {
-    message: String,
+    pub message: String,
 }

 #[derive(Serialize, Debug, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct Consent {
-    given: bool,
+    pub given: bool,
 }
🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 186 to 204, the structs ControlPlaneEvent (line
~197) and Consent (line ~203) have private fields which prevent external
construction; change their fields to be public (make message -> pub message and
given -> pub given) while keeping the existing derives and serde attributes so
other modules can construct and serialize/deserialize these event types.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant