-
-
Notifications
You must be signed in to change notification settings - Fork 157
feat: SSE #1487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: SSE #1487
Conversation
WalkthroughAdds a new SSE broadcasting module with session-aware client registration, message types, broadcast and periodic ping-based pruning; exports it via Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant ActixHandler as Actix Handler
participant Sessions as Sessions Map
participant Broadcaster
participant SSEStream as SSE Stream
participant Alerts as Alerts module
Client->>ActixHandler: HTTP /sse request (with session)
ActixHandler->>Sessions: validate/extract session
Sessions-->>ActixHandler: session (Ulid)
ActixHandler->>Broadcaster: register_sse_client(session)
Broadcaster->>Broadcaster: create channel, store sender under session
Broadcaster-->>SSEStream: return ReceiverStream as Sse
SSEStream-->>Client: SSE connection established
Note over Broadcaster: periodic ping loop (every 10s) prunes stale senders
Alerts->>Sessions: get_active_sessions()
Sessions-->>Alerts: list of session keys
Alerts->>Broadcaster: broadcast(alert_payload, target_sessions)
Broadcaster->>SSEStream: send events to matching client senders
SSEStream-->>Client: event arrives
alt Unresponsive client
Broadcaster->>Broadcaster: ping fails -> remove stale sender
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (1)
src/sse/mod.rs (1)
56-76: Consider cleaning up empty sessions.When all clients for a session disconnect, the session key remains in the map with an empty
Vec. This can lead to accumulation of stale session entries over time.Apply this diff to filter out empty sessions:
for (session, clients) in sse_inner.iter() { let mut ok_clients = Vec::new(); for client in clients { if client .send(sse::Event::Comment("ping".into())) .await .is_ok() { ok_clients.push(client.clone()) } } - ok_sessions.insert(*session, ok_clients); + if !ok_clients.is_empty() { + ok_sessions.insert(*session, ok_clients); + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
Cargo.toml(2 hunks)src/lib.rs(1 hunks)src/sse/mod.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.
Applied to files:
src/lib.rs
🧬 Code graph analysis (1)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
extract_session_key_from_req(51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
🔇 Additional comments (4)
src/lib.rs (1)
45-45: LGTM!The new
ssemodule is correctly exposed and follows the existing alphabetical ordering convention.Cargo.toml (2)
87-87: LGTM!Pinning tokio-stream to 0.1.17 for reproducibility is reasonable.
33-33: Theactix-web-labversion 0.24.3 is the latest stable release and is compatible with actix-web 4.9.0. No changes needed.src/sse/mod.rs (1)
20-40: LGTM!The
Broadcasterdesign is appropriate:Arcfor shared ownership across handlers,RwLockfor async read-write access, and spawning a cleanup loop on creation.
9e6f085 to
8a35e0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
src/sse/mod.rs (3)
89-93: Fix race condition with double write lock acquisition.This issue was flagged in a previous review and remains unaddressed. The code acquires a write lock, checks if the session exists, drops the lock, then acquires another write lock in the
elsebranch. A concurrent call could insert between these acquisitions.Use the
EntryAPI:- if let Some(clients) = self.inner.write().await.clients.get_mut(session) { - clients.push(tx); - } else { - self.inner.write().await.clients.insert(*session, vec![tx]); - } + self.inner + .write() + .await + .clients + .entry(*session) + .or_default() + .push(tx);
141-151: Replace panics with proper error handling.This issue was flagged in a previous review and remains unaddressed. The
unwrap()on line 145 andunreachable!()on line 148 will panic on unauthenticated requests or BasicAuth usage. HTTP handlers should return appropriate error responses instead:pub async fn register_sse_client( broadcaster: actix_web::web::Data<Arc<Broadcaster>>, req: HttpRequest, -) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> { - let session = extract_session_key_from_req(&req).unwrap(); - let sessionid = match session { - SessionKey::SessionId(ulid) => ulid, - _ => unreachable!("SSE requires a valid session. Unable to register client."), - }; - broadcaster.new_client(&sessionid).await +) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> { + let session = extract_session_key_from_req(&req)?; + let sessionid = match session { + SessionKey::SessionId(ulid) => ulid, + SessionKey::BasicAuth { .. } => { + return Err(actix_web::error::ErrorBadRequest( + "SSE requires session-based authentication, not BasicAuth", + )); + } + }; + Ok(broadcaster.new_client(&sessionid).await) }
184-188:ControlPlaneEvent.messagefield is still private.A previous review noted that private fields prevent external instantiation. While
SSEAlertInfofields were made public,ControlPlaneEvent.messageremains private:pub struct ControlPlaneEvent { - message: String, + pub message: String, }
🧹 Nitpick comments (1)
src/sse/mod.rs (1)
57-78: Consider avoiding full HashMap clone inremove_stale_clients.Cloning the entire
clientsHashMap on line 59 could become expensive with many connected clients. Consider iterating with the write lock held and mutating in place, or usingretainsemantics:async fn remove_stale_clients(&self) { - let sse_inner = self.inner.read().await.clients.clone(); - - let mut ok_sessions = HashMap::new(); - - for (session, clients) in sse_inner.iter() { - let mut ok_clients = Vec::new(); - for client in clients { - if client - .send(sse::Event::Comment("ping".into())) - .await - .is_ok() - { - ok_clients.push(client.clone()) + let mut guard = self.inner.write().await; + for clients in guard.clients.values_mut() { + let mut ok_clients = Vec::with_capacity(clients.len()); + for client in clients.drain(..) { + if client.send(sse::Event::Comment("ping".into())).await.is_ok() { + ok_clients.push(client); } } - ok_sessions.insert(*session, ok_clients); + *clients = ok_clients; } - - self.inner.write().await.clients = ok_sessions; + guard.clients.retain(|_, v| !v.is_empty()); }This avoids cloning and removes empty session entries.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/alerts/mod.rs(2 hunks)src/rbac/map.rs(2 hunks)src/sse/mod.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-07-24T11:09:21.781Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1388
File: src/alerts/mod.rs:88-104
Timestamp: 2025-07-24T11:09:21.781Z
Learning: In the Parseable alert system (src/alerts/mod.rs), alert versions are server-generated and controlled via CURRENT_ALERTS_VERSION constant, not user input. The AlertVerison enum's From<&str> implementation correctly defaults unknown versions to V2 since the server only generates known versions (v1, v2). Unknown versions would only occur in exceptional cases like file corruption, making the current fallback approach appropriate.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.
Applied to files:
src/alerts/mod.rs
🧬 Code graph analysis (2)
src/alerts/mod.rs (2)
src/rbac/map.rs (1)
sessions(92-98)src/utils/mod.rs (1)
user_auth_for_query(78-87)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
extract_session_key_from_req(51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
🔇 Additional comments (2)
src/rbac/map.rs (1)
172-174: LGTM!The
get_active_sessionsmethod cleanly exposes the session keys needed for SSE broadcasting. The implementation correctly clones keys to avoid holding the lock longer than necessary.src/alerts/mod.rs (1)
617-636: SSE broadcast integration looks good.The authorization check correctly filters sessions to only broadcast alerts to users who have access to the alert's query. The silent skip on serialization failure (line 629) is acceptable since
SSEAlertInfoserialization is unlikely to fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
src/alerts/mod.rs (3)
610-610: Good optimization usingclone_from.Using
clone_frominstead of direct assignment withclone()can be more efficient as it may reuse existing allocations.
618-627: Consider parallelizing authorization checks for better performance.The code sequentially iterates through all active sessions and performs async authorization checks. If there are many active sessions, this could introduce noticeable latency in the alert notification path.
Consider parallelizing the authorization checks similar to the pattern used in
list_alerts_for_user(lines 1137-1155):- let active_sessions = sessions().get_active_sessions(); - let mut broadcast_to = vec![]; - for session in active_sessions { - if user_auth_for_query(&session, &self.query).await.is_ok() - && let SessionKey::SessionId(id) = &session - { - broadcast_to.push(*id); - } - } + let active_sessions = sessions().get_active_sessions(); + let auth_futures: Vec<_> = active_sessions + .into_iter() + .map(|session| async move { + if user_auth_for_query(&session, &self.query).await.is_ok() { + if let SessionKey::SessionId(id) = session { + Some(id) + } else { + None + } + } else { + None + } + }) + .collect(); + let broadcast_to: Vec<_> = futures::future::join_all(auth_futures) + .await + .into_iter() + .flatten() + .collect();
629-635: Add error logging for SSE serialization and broadcast failures.The code silently ignores serialization errors and doesn't handle potential broadcast failures. This could make debugging SSE issues difficult.
Apply this diff to add error logging:
- if let Ok(msg) = &serde_json::to_string(&SSEAlertInfo { + match serde_json::to_string(&SSEAlertInfo { id: self.id, state: self.state, message, - }) { + }) { + Ok(msg) => { - SSE_HANDLER.broadcast(msg, Some(&broadcast_to)).await; + SSE_HANDLER.broadcast(&msg, Some(&broadcast_to)).await; + } + Err(e) => { + error!("Failed to serialize SSE alert info for alert {}: {}", self.id, e); + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
Cargo.toml(2 hunks)src/alerts/mod.rs(2 hunks)src/lib.rs(1 hunks)src/rbac/map.rs(2 hunks)src/sse/mod.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- Cargo.toml
- src/rbac/map.rs
- src/sse/mod.rs
- src/lib.rs
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-07-24T11:09:21.781Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1388
File: src/alerts/mod.rs:88-104
Timestamp: 2025-07-24T11:09:21.781Z
Learning: In the Parseable alert system (src/alerts/mod.rs), alert versions are server-generated and controlled via CURRENT_ALERTS_VERSION constant, not user input. The AlertVerison enum's From<&str> implementation correctly defaults unknown versions to V2 since the server only generates known versions (v1, v2). Unknown versions would only occur in exceptional cases like file corruption, making the current fallback approach appropriate.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.
Applied to files:
src/alerts/mod.rs
🧬 Code graph analysis (1)
src/alerts/mod.rs (2)
src/rbac/map.rs (1)
sessions(92-98)src/utils/mod.rs (1)
user_auth_for_query(78-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
🔇 Additional comments (2)
src/alerts/mod.rs (2)
618-636: SSE types and handler method signatures are correct.Verification confirms
SSEAlertInfostruct has the expected fields (id: Ulid,state: AlertState,message: String) andSSE_HANDLER.broadcast(&self, msg: &str, sessions: Option<&[Ulid]>)accepts the correct parameter types. The code usage is valid.
65-66: Imports are correct. BothSSE_HANDLERandSSEAlertInfoare properly exported fromsrc/sse/mod.rs.
send alert message only if state = triggered
fa1aff6 to
62aea32
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (5)
src/sse/mod.rs (5)
88-88: Removeunwrap()to prevent panic on channel closure.The
unwrap()will panic if the channel is already closed, though unlikely immediately after creation. Handle this gracefully instead.🔎 Suggested fix:
- tx.send(sse::Data::new("connected").into()).await.unwrap(); + let _ = tx.send(sse::Data::new("connected").into()).await;
90-94: Fix race condition with double write lock acquisition.The code acquires a write lock, checks if the session exists, drops it, then acquires another write lock in the
elsebranch. Between these acquisitions, another concurrent call could insert the same session, leading to lost client registrations.🔎 Apply this fix using the Entry API:
- if let Some(clients) = self.inner.write().await.clients.get_mut(session) { - clients.push(tx); - } else { - self.inner.write().await.clients.insert(*session, vec![tx]); - } + self.inner + .write() + .await + .clients + .entry(*session) + .or_default() + .push(tx);
148-148: Replaceunwrap()with proper error propagation.The
unwrap()will panic on unauthenticated requests. Sinceextract_session_key_from_reqreturnsResult<SessionKey, Error>, use the?operator to propagate the error properly.🔎 Apply this fix:
- let session = extract_session_key_from_req(&req).unwrap(); + let session = extract_session_key_from_req(&req)?;
192-196: Make fields public for external instantiation.
ControlPlaneEventhas a privatemessagefield, preventing external modules from constructing instances. Make the field public or provide a constructor.🔎 Apply this fix:
#[derive(Serialize, Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ControlPlaneEvent { - message: String, + pub message: String, }
198-202: Make fields public for external instantiation.
Consenthas a privategivenfield, preventing external modules from constructing instances. Make the field public or provide a constructor.🔎 Apply this fix:
#[derive(Serialize, Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Consent { - given: bool, + pub given: bool, }
🧹 Nitpick comments (1)
src/sse/mod.rs (1)
112-143: Consider simplifying the broadcast logic.The
if let Some(sessions)andelsebranches have duplicated future-building code. You could filter the clients iterator once and then build futures, reducing duplication.🔎 Example refactor:
- let send_futures = if let Some(sessions) = sessions { - let mut futures = vec![]; - for (session, clients) in clients.iter() { - if sessions.contains(session) { - clients - .iter() - .for_each(|client| futures.push(client.send(sse::Data::new(msg).into()))); - } - } - futures - } else { - // broadcast - let mut futures = vec![]; - for (_, clients) in clients.iter() { - clients - .iter() - .for_each(|client| futures.push(client.send(sse::Data::new(msg).into()))); - } - futures - }; + let send_futures: Vec<_> = clients + .iter() + .filter(|(session, _)| sessions.map_or(true, |s| s.contains(session))) + .flat_map(|(_, clients)| clients.iter()) + .map(|client| client.send(sse::Data::new(msg).into())) + .collect();
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/alerts/mod.rs(2 hunks)src/sse/mod.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/alerts/mod.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/sse/mod.rs (2)
src/utils/actix.rs (1)
extract_session_key_from_req(51-71)src/rbac/map.rs (1)
sessions(92-98)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (6)
src/sse/mod.rs (6)
1-31: LGTM! Well-structured initialization.The imports, static initialization, and data structures are well-organized for a thread-safe SSE broadcaster implementation.
33-56: LGTM! Solid initialization and ping loop.The broadcaster creation and periodic ping mechanism are implemented correctly. The 10-second interval is reasonable for SSE heartbeat.
58-79: LGTM! Efficient stale client removal.The clone-before-ping pattern at line 60 is a good trade-off that releases the read lock before performing potentially blocking operations.
99-107: LGTM! Clean session fetching.The implementation correctly retrieves active session IDs using itertools.
149-158: Good error handling for authentication type mismatch.The check for
SessionIdvs.BasicAuthproperly returns an error instead of panicking. This is a good improvement.
160-190: LGTM! Public data structures are well-defined.
SSEEvent,Message,Criticality, andSSEAlertInfoare properly defined with public fields and appropriate serde annotations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
src/sse/mod.rs (4)
90-94: Fix race condition with double write lock acquisition.This issue was previously flagged but remains unresolved. Between the two write lock acquisitions (line 90 and line 93), concurrent calls to
new_clientfor the same session can race, potentially losing client registrations.Use the
EntryAPI to handle this atomically:- if let Some(clients) = self.inner.write().await.clients.get_mut(session) { - clients.push(tx); - } else { - self.inner.write().await.clients.insert(*session, vec![tx]); - } + self.inner + .write() + .await + .clients + .entry(*session) + .or_default() + .push(tx);
192-196: Makemessagefield public to enable external instantiation.This issue was previously flagged but remains unresolved. The private
messagefield prevents other modules from constructingControlPlaneEventinstances.🔎 Apply this diff to make the field public:
#[derive(Serialize, Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ControlPlaneEvent { - message: String, + pub message: String, }
198-202: Makegivenfield public to enable external instantiation.The private
givenfield prevents other modules from constructingConsentinstances, which is needed if this type is part of the public API for broadcasting events.🔎 Apply this diff to make the field public:
#[derive(Serialize, Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Consent { - given: bool, + pub given: bool, }
145-158: Replace remainingunwrap()with proper error propagation.While the function signature now correctly returns
Resultand theunreachable!()has been fixed, line 148 still contains.unwrap()which will panic ifextract_session_key_from_reqreturns an error (e.g., no authentication provided).🔎 Apply this diff to propagate errors properly:
- let session = extract_session_key_from_req(&req).unwrap(); + let session = extract_session_key_from_req(&req)?;
🧹 Nitpick comments (1)
src/sse/mod.rs (1)
58-79: Consider reducing lock contention during stale client removal.The method clones the entire
clientsHashMap on line 60, which could be expensive with many concurrent sessions. While necessary to avoid holding the read lock during async pings, this approach scales linearly with client count.For typical usage this is acceptable, but if you anticipate high client counts, consider batching or using finer-grained locking.
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/alerts/mod.rs(2 hunks)src/sse/mod.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/alerts/mod.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
extract_session_key_from_req(51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
🔇 Additional comments (3)
src/sse/mod.rs (3)
109-143: LGTM!The broadcast logic correctly handles both targeted and global broadcasts, properly releases the read lock before sending, and defers failure handling to the periodic ping loop.
160-183: LGTM!The event type definitions are well-structured with proper serialization attributes and public visibility.
184-190: LGTM!The fields are now properly marked as
pub, enabling external modules to construct instances.
62aea32 to
c9e14ec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/sse/mod.rs (1)
147-160: Replaceunwrap()with proper error propagation.Line 150 still calls
.unwrap()onextract_session_key_from_req, which will panic if authentication fails. This is a critical issue that was flagged in previous reviews but remains unresolved. Use the?operator to propagate the error properly.🔎 Apply this diff to fix the panic:
pub async fn register_sse_client( req: HttpRequest, ) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> { - let session = extract_session_key_from_req(&req).unwrap(); + let session = extract_session_key_from_req(&req)?; let sessionid = match session { SessionKey::SessionId(ulid) => ulid, _ => { return Err(actix_web::error::ErrorBadRequest( "SSE requires session-based authentication, not BasicAuth", )); } }; Ok(SSE_HANDLER.new_client(&sessionid).await) }
🧹 Nitpick comments (2)
src/sse/mod.rs (2)
58-79: Consider avoiding the full HashMap clone.The current implementation clones the entire
clientsHashMap (line 60) before testing liveness. For better efficiency, you could acquire a write lock once and mutate in place:🔎 View suggested refactor
async fn remove_stale_clients(&self) { - let sse_inner = self.inner.read().await.clients.clone(); - - let mut ok_sessions = HashMap::new(); - - for (session, clients) in sse_inner.iter() { - let mut ok_clients = Vec::new(); - for client in clients { - if client - .send(sse::Event::Comment("ping".into())) - .await - .is_ok() - { - ok_clients.push(client.clone()) - } - } - ok_sessions.insert(*session, ok_clients); - } - - self.inner.write().await.clients = ok_sessions; + let mut inner = self.inner.write().await; + for clients in inner.clients.values_mut() { + let mut i = 0; + while i < clients.len() { + if clients[i].send(sse::Event::Comment("ping".into())).await.is_err() { + clients.swap_remove(i); + } else { + i += 1; + } + } + } + inner.clients.retain(|_, clients| !clients.is_empty()); }This approach holds the write lock longer but avoids cloning the entire map. Given that ping messages are lightweight and SSE clients are typically low-volume, the performance difference may be negligible—use whichever approach fits your latency/throughput profile.
111-144: Consider reducing code duplication.The filtered and broadcast-all branches share similar logic for building the send futures. You could refactor to eliminate the duplication:
🔎 View suggested refactor
pub async fn broadcast(&self, msg: &str, sessions: Option<&[Ulid]>) { let clients = self.inner.read().await.clients.clone(); if clients.is_empty() { return; } - let send_futures = if let Some(sessions) = sessions { - let mut futures = vec![]; - for (session, clients) in clients.iter() { - if sessions.contains(session) { - clients - .iter() - .for_each(|client| futures.push(client.send(sse::Data::new(msg).into()))); - } - } - futures - } else { - // broadcast - let mut futures = vec![]; - for (_, clients) in clients.iter() { - clients - .iter() - .for_each(|client| futures.push(client.send(sse::Data::new(msg).into()))); - } - futures - }; + let send_futures: Vec<_> = clients + .iter() + .filter(|(session, _)| sessions.map_or(true, |s| s.contains(session))) + .flat_map(|(_, clients)| clients.iter()) + .map(|client| client.send(sse::Data::new(msg).into())) + .collect(); // try to send to all clients, ignoring failures // disconnected clients will get swept up by `remove_stale_clients` let _ = future::join_all(send_futures).await; }
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/alerts/mod.rs(2 hunks)src/sse/mod.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/alerts/mod.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/sse/mod.rs (2)
src/utils/actix.rs (1)
extract_session_key_from_req(51-71)src/rbac/map.rs (1)
sessions(92-98)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (7)
src/sse/mod.rs (7)
1-20: LGTM!The imports are well-organized and appropriate for an SSE broadcasting implementation with session-based client management.
21-31: LGTM!The broadcaster architecture using
Lazy<Arc<Broadcaster>>withRwLock-protected state is a solid pattern for managing concurrent SSE client connections.
33-56: LGTM!The initialization pattern correctly spawns a background ping task to maintain SSE connections and prune stale clients.
81-99: LGTM!The client registration correctly uses the
entry().or_default()pattern to avoid race conditions, and gracefully handles the initial "connected" message send.
101-109: LGTM!Simple and correct implementation for retrieving active session IDs.
162-176: LGTM!The SSE event structure with public fields and proper serialization attributes is well-designed for external use.
178-184: LGTM!The
Criticalityenum is clean and appropriately defined.
| #[derive(Serialize, Debug, Deserialize)] | ||
| #[serde(rename_all = "camelCase")] | ||
| pub struct SSEAlertInfo { | ||
| pub id: Ulid, | ||
| pub state: AlertState, | ||
| pub name: String, | ||
| } | ||
|
|
||
| #[derive(Serialize, Debug, Deserialize)] | ||
| #[serde(rename_all = "camelCase")] | ||
| pub struct ControlPlaneEvent { | ||
| message: String, | ||
| } | ||
|
|
||
| #[derive(Serialize, Debug, Deserialize)] | ||
| #[serde(rename_all = "camelCase")] | ||
| pub struct Consent { | ||
| given: bool, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make remaining struct fields public.
SSEAlertInfo fields are now correctly public (addressed from previous reviews), but ControlPlaneEvent (line 197) and Consent (line 203) still have private fields. These structs cannot be constructed from outside this module, preventing their use by other parts of the codebase that need to broadcast events.
🔎 Apply this diff to make the fields public:
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ControlPlaneEvent {
- message: String,
+ pub message: String,
}
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Consent {
- given: bool,
+ pub given: bool,
}🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 186 to 204, the structs ControlPlaneEvent (line
~197) and Consent (line ~203) have private fields which prevent external
construction; change their fields to be public (make message -> pub message and
given -> pub given) while keeping the existing derives and serde attributes so
other modules can construct and serialize/deserialize these event types.
Add backend for SSE
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
Chores
✏️ Tip: You can customize this high-level summary in your review settings.