diff --git a/CHANGELOG.md b/CHANGELOG.md index 557e984..3461a9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.0.3 + +- Add `PowerSyncDatabase::watch_statement` to get an auto-updating stream of query results. + ## 0.0.2 - Configure automated publishing to crates.io. diff --git a/Cargo.lock b/Cargo.lock index 21e845e..dc9ad6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3674,7 +3674,6 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" name = "powersync" version = "0.0.2" dependencies = [ - "async-broadcast", "async-channel 2.5.0", "async-executor", "async-io 2.6.0", diff --git a/Cargo.toml b/Cargo.toml index 02d4996..b950f2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,10 @@ [workspace] resolver = "3" -members = [ "examples/egui_todolist","powersync", "powersync_test_utils"] +members = [ + "powersync", + "powersync_test_utils", + "examples/egui_todolist" +] [workspace.package] repository = "https://github.com/powersync-ja/powersync-native" diff --git a/powersync/Cargo.toml b/powersync/Cargo.toml index 0356d7a..e550b31 100644 --- a/powersync/Cargo.toml +++ b/powersync/Cargo.toml @@ -20,7 +20,6 @@ smol = ["dep:async-io"] ffi = [] [dependencies] -async-broadcast = "0.7.2" async-channel = "2.5.0" async-lock = "3.4.1" async-io = { version = "2.6.0", optional = true } diff --git a/powersync/README.md b/powersync/README.md index d083f99..6ff50ff 100644 --- a/powersync/README.md +++ b/powersync/README.md @@ -116,8 +116,3 @@ This SDK is in development. Some items that are still being worked on are: 1. Token prefetching and caching. 2. Unit tests for CRUD uploads. - -Also, this crate's `build.rs` dynamically downloads a binary -(the [PowerSync core extension](https://github.com/powersync-ja/powersync-sqlite-core/)) to link into the final -executable. The reason is that, while this library works with stable Rust, the core extension requires a nightly build. -We'll work towards making the core extension a regular Rust crate supporting stable compilers in the future. diff --git a/powersync/src/db/mod.rs b/powersync/src/db/mod.rs index 89a69e5..c3e1686 100644 --- a/powersync/src/db/mod.rs +++ b/powersync/src/db/mod.rs @@ -1,7 +1,8 @@ +use std::borrow::Cow; +use std::collections::HashSet; +use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use futures_lite::{FutureExt, Stream, StreamExt}; - use crate::db::async_support::AsyncDatabaseTasks; use crate::sync::coordinator::SyncCoordinator; use crate::{ @@ -15,6 +16,9 @@ use crate::{ schema::Schema, sync::{download::DownloadActor, status::SyncStatusData, upload::UploadActor}, }; +use futures_lite::stream::{once, once_future}; +use futures_lite::{FutureExt, Stream, StreamExt}; +use rusqlite::{Params, Statement, params}; mod async_support; pub mod core_extension; @@ -84,26 +88,140 @@ impl PowerSyncDatabase { /// The `emit_initially` option can be used to control whether the stream should emit as well /// when polled for the first time. This can be useful to build streams emitting a complete /// snapshot of results every time a source table is changed. - pub fn watch_tables<'a>( + pub fn watch_tables<'a, Tables: IntoIterator>>>( &self, emit_initially: bool, - tables: impl IntoIterator, - ) -> impl Stream { + tables: Tables, + ) -> impl Stream + 'static { self.inner.env.pool.update_notifiers().listen( emit_initially, tables .into_iter() .flat_map(|s| { + let s = s.into(); + [ - s.to_string(), - format!("ps_data__{s}"), - format!("ps_data_local__{s}"), + format!("{}{s}", Self::PS_DATA_PREFIX), + format!("{}{s}", Self::PS_DATA_LOCAL_PREFIX), + Cow::into_owned(s), ] }) .collect(), ) } + /// Returns an asynchronous [Stream] emitting snapshots of a `SELECT` statement every time + /// source tables are modified. + /// + /// `sql` is the `SELECT` statement to execute and `params` are parameters to use. + /// The `read` function obtains the raw prepared statement and a copy of parameters to use, + /// and can run the statements into desired results. + /// + /// This method is a core building block for reactive applications with PowerSync - since it + /// updates automatically, all writes (regardless of whether they're local or due to synced + /// writes from your backend) are reflected. + pub fn watch_statement( + &self, + sql: String, + params: P, + read: F, + ) -> impl Stream> + 'static + where + for<'a> F: (Fn(&'a mut Statement, P) -> Result) + 'static + Clone, + { + let update_notifications = + self.emit_on_statement_changes(true, sql.to_string(), params.clone()); + + let db = self.clone(); + update_notifications.then(move |notification| { + let db = db.clone(); + let sql = sql.clone(); + let params = params.clone(); + let mapper = read.clone(); + + async move { + if let Err(e) = notification { + return Err(e); + } + + let reader = db.reader().await?; + let mut stmt = reader.prepare_cached(&sql)?; + + mapper(&mut stmt, params) + } + }) + } + + fn emit_on_statement_changes( + &self, + emit_initially: bool, + sql: String, + params: impl Params + 'static, + ) -> impl Stream> + 'static { + // Stream emitting referenced tables once. + let tables = once_future(self.clone().find_tables(sql, params)); + + // Stream emitting updates, or a single error if we couldn't resolve tables. + let db = self.clone(); + tables.flat_map(move |referenced_tables| match referenced_tables { + Ok(referenced_tables) => db + .watch_tables(emit_initially, referenced_tables) + .map(Ok) + .boxed(), + Err(e) => once(Err(e)).boxed(), + }) + } + + /// Finds all tables that are used in a given select statement. + /// + /// This can be used together with [watch_tables] to build an auto-updating stream of queries. + async fn find_tables( + self, + sql: impl Into>, + params: P, + ) -> Result, PowerSyncError> { + let reader = self.reader().await?; + let mut stmt = reader.prepare(&format!("EXPLAIN {}", sql.into()))?; + let mut rows = stmt.query(params)?; + + let mut find_table_stmt = + reader.prepare_cached("SELECT tbl_name FROM sqlite_schema WHERE rootpage = ?")?; + let mut found_tables = HashSet::new(); + + while let Some(row) = rows.next()? { + let opcode = row.get_ref("opcode")?; + let p2 = row.get_ref("p2")?; + let p3 = row.get_ref("p3")?; + + if matches!(opcode.as_str(), Ok("OpenRead")) + && matches!(p3.as_i64(), Ok(0)) + && let Ok(page) = p2.as_i64() + { + let mut found_table = find_table_stmt.query(params![page])?; + if let Some(found_table) = found_table.next()? { + let table_name: String = found_table.get(0)?; + found_tables.insert(table_name); + } + } + } + + Ok(found_tables + .into_iter() + .map(|mut table| { + if table.starts_with(Self::PS_DATA_PREFIX) { + table.split_off(Self::PS_DATA_PREFIX.len()) + } else if table.starts_with(Self::PS_DATA_LOCAL_PREFIX) { + table.split_off(Self::PS_DATA_LOCAL_PREFIX.len()) + } else { + table + } + }) + .collect()) + } + + const PS_DATA_PREFIX: &'static str = "ps_data__"; + const PS_DATA_LOCAL_PREFIX: &'static str = "ps_data_local__"; + /// Returns a [Stream] traversing through transactions that have been completed on this /// database. /// @@ -198,3 +316,9 @@ impl PowerSyncDatabase { } */ } + +impl Debug for PowerSyncDatabase { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PowerSyncDatabase").finish_non_exhaustive() + } +} diff --git a/powersync/src/db/streams.rs b/powersync/src/db/streams.rs index a4538b5..b5d011d 100644 --- a/powersync/src/db/streams.rs +++ b/powersync/src/db/streams.rs @@ -1,12 +1,3 @@ -use std::{ - cell::Cell, - collections::HashMap, - sync::{Arc, Mutex, Weak}, - time::Duration, -}; - -use rusqlite::params; - use crate::{ PowerSyncDatabase, StreamPriority, db::internal::InnerPowerSyncState, @@ -17,6 +8,13 @@ use crate::{ }, util::SerializedJsonObject, }; +use rusqlite::params; +use std::{ + cell::Cell, + collections::HashMap, + sync::{Arc, Mutex, Weak}, + time::Duration, +}; /// Tracks all sync streams that currently have at least one active [StreamSubscription]. #[derive(Default)] diff --git a/powersync/src/lib.rs b/powersync/src/lib.rs index b8fa586..bea3cb4 100644 --- a/powersync/src/lib.rs +++ b/powersync/src/lib.rs @@ -4,9 +4,9 @@ mod sync; mod util; pub use db::PowerSyncDatabase; +pub use db::crud::{CrudEntry, CrudTransaction, UpdateType}; #[cfg(feature = "ffi")] pub use db::internal::InnerPowerSyncState; -pub use db::crud::{CrudEntry, CrudTransaction, UpdateType}; pub use db::pool::{ConnectionPool, LeasedConnection}; pub use db::streams::StreamSubscription; pub use db::streams::StreamSubscriptionOptions; diff --git a/powersync/tests/database_test.rs b/powersync/tests/database_test.rs index 0e0164c..1a559cd 100644 --- a/powersync/tests/database_test.rs +++ b/powersync/tests/database_test.rs @@ -5,7 +5,7 @@ use futures_lite::{StreamExt, future}; use powersync::error::PowerSyncError; use powersync_test_utils::{DatabaseTest, UserRow, execute, query_all}; use rusqlite::params; -use serde_json::json; +use serde_json::{Value, json}; #[test] fn link_core_extension() { @@ -152,3 +152,83 @@ fn test_table_updates() { ); }); } + +#[test] +fn test_watch_statement() { + let test = DatabaseTest::new(); + let db = Arc::new(test.test_dir_database()); + + future::block_on(async move { + let mut stream = db + .watch_statement( + "SELECT name FROM users".to_string(), + params![], + |stmt, params| { + let mut rows = stmt.query(params)?; + let mut names = vec![]; + while let Some(row) = rows.next()? { + let name = row.get(0)?; + names.push(Value::String(name)); + } + + Ok(Value::Array(names)) + }, + ) + .boxed_local(); + + // Initial query. + assert_eq!(stream.next().await.unwrap().unwrap(), json!([])); + + execute( + &db, + "INSERT INTO users (id, name) VALUES (uuid(), ?)", + params!["Test"], + ) + .await; + assert_eq!(stream.next().await.unwrap().unwrap(), json!(["Test"])); + + { + let mut writer = db.writer().await.unwrap(); + let writer = writer.transaction().unwrap(); + + writer + .execute( + "INSERT INTO users (id, name) VALUES (uuid(), ?)", + params!["Test2"], + ) + .unwrap(); + writer + .execute( + "INSERT INTO users (id, name) VALUES (uuid(), ?)", + params!["Test3"], + ) + .unwrap(); + + writer.commit().unwrap(); + } + + assert_eq!( + stream.next().await.unwrap().unwrap(), + json!(["Test", "Test2", "Test3"]) + ); + + { + let mut writer = db.writer().await.unwrap(); + let writer = writer.transaction().unwrap(); + + writer.execute("DELETE FROM users", params![]).unwrap(); + // Transactions we're rolling back should not impact streams. + } + + execute( + &db, + "INSERT INTO users (id, name) VALUES (uuid(), ?)", + params!["Test4"], + ) + .await; + assert_eq!( + stream.next().await.unwrap().unwrap(), + json!(["Test", "Test2", "Test3", "Test4"]) + ); + }); +}