diff --git a/crates/core/src/crud_vtab.rs b/crates/core/src/crud_vtab.rs index 4c0f4070..36936e3e 100644 --- a/crates/core/src/crud_vtab.rs +++ b/crates/core/src/crud_vtab.rs @@ -247,7 +247,16 @@ impl SimpleCrudTransactionMode { fn record_local_write(&mut self, db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { if !self.had_writes { - db.exec_safe(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"))?; + // Also clear the seen/applied high-water marks: checkpoint request ids observed before + // this write can't acknowledge it, and they may come from an incompatible id namespace + // (legacy write checkpoints migrated by v14, or state from before a counter restart). + // Keeping them around could open the apply gate for a newly allocated target id that + // compares below a stale seen value. The legacy `$local` bookkeeping had the same + // behavior by resetting the entire row on local writes. + db.exec_safe(formatcp!( + "INSERT OR REPLACE INTO ps_kv(key, value) VALUES('local_target_op', {MAX_OP_ID}); +DELETE FROM ps_kv WHERE key IN ('last_seen_checkpoint_request_id', 'last_applied_checkpoint_request_id')" + ))?; self.had_writes = true; } diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 06f22146..afc1ae5a 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -16,7 +16,7 @@ use crate::fix_data::apply_v035_fix; use crate::schema::inspection::ExistingView; use crate::sync::BucketPriority; -pub const LATEST_VERSION: i32 = 13; +pub const LATEST_VERSION: i32 = 14; pub fn powersync_migrate( ctx: *mut sqlite::context, @@ -460,6 +460,145 @@ DROP TABLE ps_sync_state_old; track_migration.exec()?; } + if current_version < 14 && target_version >= 14 { + // Move the legacy `$local` checkpoint bookkeeping into ps_kv. + // + // In older databases, `$local.last_applied_op` represented the latest legacy write + // checkpoint that was actually applied, so it becomes the last applied checkpoint request. + // `$local.last_op` represented the latest legacy write checkpoint seen in the sync stream, + // so it becomes the last seen checkpoint request. + // + // `$local.target_op` can either be a concrete legacy write checkpoint id or a sentinel such + // as i64::MAX while local writes are pending. Store it separately as `local_target_op`. + // Seeding `last_requested_checkpoint_request_id` from a concrete target would be possible, + // but should be redundant because SDKs reconcile the request counter with service state on + // connect before advancing it through `next_checkpoint_request_id`. + // + // An absent local target can safely start client-created checkpoint requests from 1. The + // ambiguous case is an existing max-op local target without a concrete requested id: + // pending local writes may already be associated with legacy service-created write + // checkpoints, so SDKs should bridge once through the legacy endpoint before starting + // client-created checkpoint requests. + // + // This migration can also run on a database that was previously on version 14 and then + // downgraded: the down migration rebuilds the `$local` row from ps_kv but keeps the ps_kv + // keys around, and an older SDK may have advanced `$local` since. Clear the keys first so + // the `$local` row is the source of truth and the inserts below can't conflict. + // + // After copying, the `$local` row is deleted: version 14 tracks this state exclusively in + // ps_kv, so ps_buckets only contains real sync buckets. The down migration recreates the + // row from ps_kv when needed. + let up = "\ +DELETE FROM ps_kv + WHERE key IN ( + 'last_applied_checkpoint_request_id', + 'last_seen_checkpoint_request_id', + 'local_target_op' + ); + +INSERT INTO ps_kv(key, value) +SELECT 'last_applied_checkpoint_request_id', last_applied_op + FROM ps_buckets + WHERE name = '$local' + AND last_applied_op > 0; + +INSERT INTO ps_kv(key, value) +SELECT 'last_seen_checkpoint_request_id', last_op + FROM ps_buckets + WHERE name = '$local' + AND last_op > 0; + +INSERT INTO ps_kv(key, value) +SELECT 'local_target_op', target_op + FROM ps_buckets + WHERE name = '$local' + AND target_op > 0; + +DELETE FROM ps_buckets WHERE name = '$local'; + +ALTER TABLE ps_buckets DROP COLUMN target_op; +"; + local_db.exec_safe(up).into_db_result(local_db)?; + + // Downgrading needs to rebuild the old `$local` row from the new ps_kv state so older SDKs + // can keep using their target-op based blocking behavior. In that model, `$local.last_op` + // tracked the latest seen legacy write checkpoint and was compared with `$local.target_op` + // to decide whether downloaded changes could be applied. The `$local.last_applied_op` + // value represented the checkpoint that had actually been applied locally. + // `$local.pending_delete = 1` marked this as a synthetic local-only bucket instead of a + // normal service bucket. Restore each old progress column from its matching ps_kv key. + // The 0 defaults cover a local target that exists before any checkpoint has been seen or + // applied. If `local_target_op` is absent, don't create a `$local` row: the old + // implementation also didn't have a `$local` bucket unless there was local target state to + // track. + const DOWN_STATEMENTS: &[&str] = &[ + "ALTER TABLE ps_buckets RENAME TO ps_buckets_14", + "DROP INDEX ps_buckets_name", + "CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +) STRICT", + "CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)", + "ALTER TABLE ps_buckets ADD COLUMN count_at_last INTEGER NOT NULL DEFAULT 0", + "ALTER TABLE ps_buckets ADD COLUMN count_since_last INTEGER NOT NULL DEFAULT 0", + "ALTER TABLE ps_buckets ADD COLUMN downloaded_size INTEGER NOT NULL DEFAULT 0", + "INSERT INTO ps_buckets( + id, + name, + last_applied_op, + last_op, + add_checksum, + op_checksum, + pending_delete, + count_at_last, + count_since_last, + downloaded_size +) +SELECT + id, + name, + last_applied_op, + last_op, + add_checksum, + op_checksum, + pending_delete, + count_at_last, + count_since_last, + downloaded_size +FROM ps_buckets_14", + "DROP TABLE ps_buckets_14", + "INSERT INTO ps_buckets(name, pending_delete, last_op, last_applied_op, target_op) +SELECT '$local', 1, seen, applied, target + FROM ( + SELECT + IFNULL((SELECT CAST(value AS INTEGER) FROM ps_kv WHERE key = 'last_seen_checkpoint_request_id'), 0) AS seen, + IFNULL((SELECT CAST(value AS INTEGER) FROM ps_kv WHERE key = 'last_applied_checkpoint_request_id'), 0) AS applied, + (SELECT CAST(value AS INTEGER) FROM ps_kv WHERE key = 'local_target_op') AS target + ) + WHERE EXISTS ( + SELECT 1 FROM ps_kv WHERE key = 'local_target_op' + ) +ON CONFLICT(name) DO UPDATE SET + pending_delete = excluded.pending_delete, + last_op = excluded.last_op, + last_applied_op = excluded.last_applied_op, + target_op = excluded.target_op", + "DELETE FROM ps_migration WHERE id >= 14", + ]; + let down = serialize_down_statements(DOWN_STATEMENTS)?; + let track_migration = + local_db.prepare_v2("INSERT INTO ps_migration(id, down_migrations) VALUES (?, ?)")?; + track_migration.bind_int(1, 14)?; + track_migration.bind_text(2, &down, Destructor::STATIC)?; + track_migration.exec()?; + } + Ok(()) } diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index e09eba94..a673f353 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -12,6 +12,7 @@ use crate::sync::diagnostics::{DiagnosticOptions, DiagnosticsEvent}; use crate::sync::subscriptions::{StreamKey, apply_subscriptions}; use alloc::borrow::Cow; use alloc::boxed::Box; +use alloc::format; use alloc::rc::Rc; use alloc::{string::String, vec::Vec}; use powersync_sqlite_nostd::bindings::SQLITE_RESULT_SUBTYPE; @@ -76,6 +77,8 @@ pub enum SyncControlRequest<'a> { StartSyncStream(StartSyncStream), /// The client requests to stop the current sync iteration. StopSyncStream, + /// The client requests a new checkpoint request id. + NextCheckpointRequestId, /// The client is forwading a sync event to the core extension. SyncEvent(SyncEvent<'a>), } @@ -89,6 +92,10 @@ pub enum SyncEvent<'a> { /// /// In response, we'll stop the current iteration to begin another one with the new token. DidRefreshToken, + /// Seeds the checkpoint request counter from service state. + SeedCheckpointRequestId { + request_id: Option, + }, /// Notifies the sync client that the current CRUD upload (for which the client SDK is /// responsible) has finished. /// @@ -128,13 +135,26 @@ pub enum Instruction { }, /// Connect to the sync service using the [StreamingSyncRequest] created by the core extension, /// and then forward received lines via [SyncEvent::TextLine] and [SyncEvent::BinaryLine]. - EstablishSyncStream { request: StreamingSyncRequest }, + EstablishSyncStream { + request: StreamingSyncRequest, + /// The latest checkpoint request id known locally before opening this stream. + /// + /// SDKs can use a missing value as a cue to fetch checkpoint request state from the service + /// and report it back with `seed_checkpoint_request_id`. + last_checkpoint_request_id: Option, + }, FetchCredentials { /// Whether the credentials currently used have expired. /// /// If false, this is a pre-fetch. did_expire: bool, }, + /// Return a newly allocated checkpoint request id to the SDK. + CheckpointRequestId { request_id: i64 }, + /// Notify the SDK that a checkpoint request id has been applied locally. + CheckpointRequestApplied { request_id: i64 }, + /// Return the local target op value observed before an optional update. + LocalTargetOp { target_op: Option }, // These are defined like this because deserializers in Kotlin can't support either an // object or a literal value /// Close the websocket / HTTP stream to the sync service. @@ -232,6 +252,24 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() } }), "stop" => SyncControlRequest::StopSyncStream, + "next_checkpoint_request_id" => SyncControlRequest::NextCheckpointRequestId, + "local_target_op" => { + let target_op = parse_optional_i64_payload( + *payload, + "local target op", + "local target op must be an integer, integer string, or null", + )?; + let adapter = state.storage_adapter(db)?; + let target_op = adapter.probe_local_target_op(target_op)?; + let formatted = + serde_json::to_string(&alloc::vec![Instruction::LocalTargetOp { + target_op + },]) + .map_err(PowerSyncError::internal)?; + ctx.result_text_transient(&formatted); + ctx.result_subtype(SUBTYPE_JSON); + return Ok(()); + } "line_text" => SyncControlRequest::SyncEvent(SyncEvent::TextLine { data: if payload.value_type() == ColumnType::Text { payload.text() @@ -251,6 +289,15 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() }, }), "refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken), + "seed_checkpoint_request_id" => { + SyncControlRequest::SyncEvent(SyncEvent::SeedCheckpointRequestId { + request_id: parse_optional_i64_payload( + *payload, + "checkpoint request id", + "checkpoint request id must be an integer, integer string, or null", + )?, + }) + } "completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished), "update_subscriptions" => { SyncControlRequest::SyncEvent(SyncEvent::DidUpdateSubscriptions { @@ -345,3 +392,26 @@ create_sqlite_text_fn!( powersync_offline_sync_status_impl, "powersync_offline_sync_status" ); + +fn parse_optional_i64_payload( + payload: *mut sqlite::value, + name: &'static str, + type_error: &'static str, +) -> Result, PowerSyncError> { + let value = match payload.value_type() { + ColumnType::Null => return Ok(None), + ColumnType::Integer => payload.int64(), + ColumnType::Text => payload.text().parse::().map_err(|_| { + PowerSyncError::argument_error(format!("{name} must be an integer string")) + })?, + _ => return Err(PowerSyncError::argument_error(type_error)), + }; + + if value < 0 { + return Err(PowerSyncError::argument_error(format!( + "{name} must be a non-negative integer" + ))); + } + + Ok(Some(value)) +} diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index 7b5298f3..d76f3efd 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -26,6 +26,15 @@ use super::{ bucket_priority::BucketPriority, interface::BucketRequest, streaming_sync::OwnedCheckpoint, }; +const LAST_REQUESTED_CHECKPOINT_REQUEST_ID_KEY: &str = "last_requested_checkpoint_request_id"; +const LAST_SEEN_CHECKPOINT_REQUEST_ID_KEY: &str = "last_seen_checkpoint_request_id"; +const LAST_APPLIED_CHECKPOINT_REQUEST_ID_KEY: &str = "last_applied_checkpoint_request_id"; + +// Tracks the local target used to block applying downloaded rows while local writes are +// outstanding. When present, this is normally either the max-op sentinel for pending local writes or +// a concrete checkpoint request id also stored in LAST_REQUESTED_CHECKPOINT_REQUEST_ID_KEY. +const LOCAL_TARGET_OP_KEY: &str = "local_target_op"; + /// An adapter for storing sync state. /// /// This is used to encapsulate some SQL queries used for the sync implementation, making the code @@ -260,10 +269,8 @@ WHERE bucket = ?1", } } - if let (None, Some(write_checkpoint)) = (&priority, &checkpoint.write_checkpoint) { - update_bucket.bind_int64(1, *write_checkpoint)?; - update_bucket.bind_text(2, "$local", sqlite::Destructor::STATIC)?; - update_bucket.exec()?; + if let (None, Some(checkpoint_request_id)) = (&priority, &checkpoint.write_checkpoint) { + self.persist_last_seen_checkpoint_request_id(*checkpoint_request_id)?; } #[derive(Serialize)] @@ -500,18 +507,156 @@ WHERE bucket = ?1", } pub fn local_state(&self) -> Result, PowerSyncError> { - let stmt = self + Ok(self + .read_i64_kv(LOCAL_TARGET_OP_KEY)? + .map(|target_op| LocalState { target_op })) + } + + /// Probes and optionally updates the local target op used to block applying downloaded rows + /// while local writes are outstanding. + /// + /// In the write-checkpoint flow, callers allocate a checkpoint request id, post it to the + /// service, and then update this from the max-op sentinel to the concrete checkpoint request id + /// once the request succeeds. This is also used for older services where the SDK cannot create + /// checkpoint requests explicitly. + /// + /// The target op can also be used internally as a sentinel value such as max op id while local + /// writes are pending, so it must not always be interpreted as a checkpoint request id. + /// + /// This only updates the apply gate. It does not allocate, seed or overwrite + /// `last_requested_checkpoint_request_id`, which is managed by `seed_checkpoint_request_id` and + /// `next_checkpoint_request_id`. + /// + /// Returns the target op value from before this call. When `target_op` is `None`, this only + /// reads the current value. + /// + /// Negative values are rejected when parsing the `powersync_control` payload, before this is + /// called. + pub fn probe_local_target_op( + &self, + target_op: Option, + ) -> Result, PowerSyncError> { + let previous_target_op = self.local_state()?.map(|state| state.target_op); + + let Some(target_op) = target_op else { + return Ok(previous_target_op); + }; + + if target_op == 0 { + self.delete_kv(LOCAL_TARGET_OP_KEY)?; + return Ok(previous_target_op); + } + + self.write_i64_kv(LOCAL_TARGET_OP_KEY, target_op)?; + + Ok(previous_target_op) + } + + /// Persists the checkpoint request id observed in a complete sync checkpoint. + /// + /// This replaces the legacy `$local.last_op` bookkeeping used to decide whether downloaded + /// data can be applied after local uploads complete. + pub fn persist_last_seen_checkpoint_request_id( + &self, + request_id: i64, + ) -> Result<(), PowerSyncError> { + self.write_i64_kv(LAST_SEEN_CHECKPOINT_REQUEST_ID_KEY, request_id) + } + + /// Persists the checkpoint request id that was applied locally. + pub fn persist_last_applied_checkpoint_request_id( + &self, + request_id: i64, + ) -> Result<(), PowerSyncError> { + self.write_i64_kv(LAST_APPLIED_CHECKPOINT_REQUEST_ID_KEY, request_id) + } + + /// Increments, persists and returns the next client-created checkpoint request id. + pub fn next_checkpoint_request_id(&self) -> Result { + let statement = self.db.prepare_v2( + "INSERT INTO ps_kv(key, value) +VALUES(?1, 1) +ON CONFLICT(key) DO UPDATE SET value = CAST(value AS INTEGER) + 1 +RETURNING value", + )?; + statement.bind_text( + 1, + LAST_REQUESTED_CHECKPOINT_REQUEST_ID_KEY, + sqlite::Destructor::STATIC, + )?; + + if statement.step()? == ResultCode::ROW { + Ok(statement.column_int64(0)) + } else { + Err(PowerSyncError::unknown_internal()) + } + } + + /// Returns whether the local checkpoint request counter has been initialized. + pub fn has_checkpoint_request_id(&self) -> Result { + Ok(self.last_checkpoint_request_id()?.is_some()) + } + + /// Returns the latest checkpoint request id known locally. + pub fn last_checkpoint_request_id(&self) -> Result, PowerSyncError> { + self.read_i64_kv(LAST_REQUESTED_CHECKPOINT_REQUEST_ID_KEY) + } + + /// Seeds the local checkpoint request counter from service state without moving it backwards. + /// + /// A null service value means the service has no record for this client yet. Store zero in + /// that case so the first local allocation returns one while still marking the state as seeded. + pub fn seed_checkpoint_request_id( + &self, + request_id: Option, + ) -> Result<(), PowerSyncError> { + let stmt = self.db.prepare_v2( + "INSERT INTO ps_kv(key, value) +VALUES(?1, ?2) +ON CONFLICT(key) DO UPDATE SET value = max(CAST(value AS INTEGER), excluded.value)", + )?; + stmt.bind_text( + 1, + LAST_REQUESTED_CHECKPOINT_REQUEST_ID_KEY, + sqlite::Destructor::STATIC, + )?; + stmt.bind_int64(2, request_id.unwrap_or(0))?; + stmt.exec()?; + Ok(()) + } + + fn read_i64_kv(&self, key: &'static str) -> Result, PowerSyncError> { + let statement = self .db - .prepare_v2("SELECT target_op FROM ps_buckets WHERE name = ?")?; - stmt.bind_text(1, "$local", sqlite::Destructor::STATIC)?; + .prepare_v2("SELECT value FROM ps_kv WHERE key = ?1") + .into_db_result(self.db)?; + statement.bind_text(1, key, sqlite::Destructor::STATIC)?; - Ok(if stmt.step()? == ResultCode::ROW { - let target_op = stmt.column_int64(0); - Some(LocalState { target_op }) + Ok(if statement.step()? == ResultCode::ROW { + Some(statement.column_int64(0)) } else { None }) } + + fn write_i64_kv(&self, key: &'static str, value: i64) -> Result<(), PowerSyncError> { + let stmt = self.db.prepare_v2( + "INSERT INTO ps_kv(key, value) +VALUES(?1, ?2) +ON CONFLICT(key) DO UPDATE SET value = excluded.value", + )?; + stmt.bind_text(1, key, sqlite::Destructor::STATIC)?; + stmt.bind_int64(2, value)?; + stmt.exec()?; + Ok(()) + } + + fn delete_kv(&self, key: &'static str) -> Result<(), PowerSyncError> { + let stmt = self.db.prepare_v2("DELETE FROM ps_kv WHERE key = ?1")?; + stmt.bind_text(1, key, sqlite::Destructor::STATIC)?; + stmt.exec()?; + Ok(()) + } } pub struct LocalState { diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 5c680ce3..a578dcac 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -95,21 +95,24 @@ impl SyncClient { SyncControlRequest::SyncEvent(sync_event) => { let mut active = ActiveEvent::new(sync_event); - let ClientState::IterationActive(handle) = &mut self.state else { - return Err(PowerSyncError::state_error("No iteration is active")); + let run_result = { + let ClientState::IterationActive(handle) = &mut self.state else { + return Err(PowerSyncError::state_error("No iteration is active")); + }; + + handle.run(&mut active) }; - match handle.run(&mut active) { + match run_result { Err(e) => { self.state = ClientState::Idle; return Err(e); } - Ok(done) => { - if done { - self.state = ClientState::Idle; - } + Ok(true) => { + self.state = ClientState::Idle; } - }; + Ok(false) => {} + } if let Some(recoverable) = active.recoverable_error.take() { Err(recoverable) @@ -117,6 +120,19 @@ impl SyncClient { Ok(active.instructions) } } + SyncControlRequest::NextCheckpointRequestId => { + if !self.has_sync_iteration() { + return Err(PowerSyncError::state_error("No iteration is active")); + } + if !self.adapter.has_checkpoint_request_id()? { + return Err(PowerSyncError::state_error( + "Checkpoint request state has not been seeded", + )); + } + + let request_id = self.adapter.next_checkpoint_request_id()?; + Ok(alloc::vec![Instruction::CheckpointRequestId { request_id }]) + } SyncControlRequest::StopSyncStream => self.state.tear_down(), } } @@ -200,18 +216,18 @@ impl SyncIterationHandle { }; let mut context = Context::from_waker(&waker); - Ok( - if let Poll::Ready(result) = self.future.poll(&mut context) { - let close = result?; + let done = if let Poll::Ready(result) = self.future.poll(&mut context) { + let close = result?; - active - .instructions - .push(Instruction::CloseSyncStream(close)); - true - } else { - false - }, - ) + active + .instructions + .push(Instruction::CloseSyncStream(close)); + true + } else { + false + }; + + Ok(done) } } @@ -363,7 +379,9 @@ impl StreamingSyncIteration { line: "Validated and applied checkpoint".into(), }); event.instructions.push(Instruction::FlushFileSystem {}); + SyncStateMachineTransition::SyncLocalChangesApplied { + applied_checkpoint_request_id: target.write_checkpoint, partial: None, timestamp, } @@ -400,6 +418,7 @@ impl StreamingSyncIteration { } SyncLocalResult::ChangesApplied { timestamp } => { SyncStateMachineTransition::SyncLocalChangesApplied { + applied_checkpoint_request_id: target.checkpoint.write_checkpoint, partial: Some(priority), timestamp, } @@ -457,7 +476,7 @@ impl StreamingSyncIteration { target: &mut SyncTarget, event: &mut ActiveEvent, transition: SyncStateMachineTransition, - ) -> Option { + ) -> Result, PowerSyncError> { match transition { SyncStateMachineTransition::StartTrackingCheckpoint { progress, @@ -491,13 +510,17 @@ impl StreamingSyncIteration { diagnostics.handle_data_line(line, &*status, &mut event.instructions); } } - SyncStateMachineTransition::CloseIteration(close) => return Some(close), + SyncStateMachineTransition::CloseIteration(close) => return Ok(Some(close)), SyncStateMachineTransition::SyncLocalFailedDueToPendingCrud { validated_but_not_applied, } => { self.validated_but_not_applied = Some(validated_but_not_applied); } - SyncStateMachineTransition::SyncLocalChangesApplied { partial, timestamp } => { + SyncStateMachineTransition::SyncLocalChangesApplied { + applied_checkpoint_request_id, + partial, + timestamp, + } => { if let Some(priority) = partial { self.status.update( |status| { @@ -506,13 +529,17 @@ impl StreamingSyncIteration { &mut event.instructions, ); } else { - self.handle_checkpoint_applied(event, timestamp); + self.handle_checkpoint_applied( + event, + timestamp, + applied_checkpoint_request_id, + )?; } } SyncStateMachineTransition::Empty => {} }; - None + Ok(None) } /// Handles a single sync line. @@ -526,7 +553,7 @@ impl StreamingSyncIteration { line: &SyncLineWithSource, ) -> Result, PowerSyncError> { let transition = self.prepare_handling_sync_line(target, event, line)?; - Ok(self.apply_transition(target, event, transition)) + self.apply_transition(target, event, transition) } /// Runs a full sync iteration, returning nothing when it completes regularly or an error when @@ -546,6 +573,10 @@ impl StreamingSyncIteration { .update(|s| s.disconnect(), &mut event.instructions); break false; } + SyncEvent::SeedCheckpointRequestId { request_id } => { + self.adapter.seed_checkpoint_request_id(request_id)?; + continue; + } SyncEvent::TextLine { data } => SyncLineWithSource::from_text(data)?, SyncEvent::BinaryLine { data } => SyncLineWithSource::from_binary(data)?, SyncEvent::UploadFinished => { @@ -647,7 +678,7 @@ impl StreamingSyncIteration { line: "Applied pending checkpoint after completed upload".into(), }); - self.handle_checkpoint_applied(event, timestamp); + self.handle_checkpoint_applied(event, timestamp, checkpoint.write_checkpoint)?; } _ => { event.instructions.push(Instruction::LogLine { @@ -886,11 +917,11 @@ impl StreamingSyncIteration { /// subscriptions, used to associate [BucketSubscriptionReason::DerivedFromExplicitSubscription]. async fn prepare_request(&mut self) -> Result { let event = Self::receive_event().await; - let SyncEvent::Initialize = event.event else { + if !matches!(&event.event, SyncEvent::Initialize) { return Err(PowerSyncError::argument_error( "first event must initialize", )); - }; + } let offline_state = self.adapter.offline_sync_state()?; self.status.update( @@ -922,22 +953,38 @@ impl StreamingSyncIteration { app_metadata: self.options.app_metadata.take(), }; - event - .instructions - .push(Instruction::EstablishSyncStream { request }); + event.instructions.push(Instruction::EstablishSyncStream { + request, + last_checkpoint_request_id: self.adapter.last_checkpoint_request_id()?, + }); Ok(BeforeCheckpoint { local_buckets: local_bucket_names, stream_subscriptions: stream_subscriptions, }) } - fn handle_checkpoint_applied(&mut self, event: &mut ActiveEvent, timestamp: TimestampMicros) { + fn handle_checkpoint_applied( + &mut self, + event: &mut ActiveEvent, + timestamp: TimestampMicros, + applied_checkpoint_request_id: Option, + ) -> Result<(), PowerSyncError> { + if let Some(request_id) = applied_checkpoint_request_id { + self.adapter + .persist_last_applied_checkpoint_request_id(request_id)?; + event + .instructions + .push(Instruction::CheckpointRequestApplied { request_id }); + } + event.instructions.push(Instruction::DidCompleteSync {}); self.status.update( |status| status.applied_checkpoint(timestamp), &mut event.instructions, ); + + Ok(()) } } @@ -1113,6 +1160,7 @@ enum SyncStateMachineTransition<'a> { validated_but_not_applied: OwnedCheckpoint, }, SyncLocalChangesApplied { + applied_checkpoint_request_id: Option, partial: Option, timestamp: TimestampMicros, }, diff --git a/crates/core/src/sync/sync_local.rs b/crates/core/src/sync/sync_local.rs index 1293ef33..aef8f8d4 100644 --- a/crates/core/src/sync/sync_local.rs +++ b/crates/core/src/sync/sync_local.rs @@ -67,7 +67,11 @@ impl<'a> SyncOperation<'a> { if needs_check { // language=SQLite let statement = self.db.prepare_v2( - "SELECT 1 FROM ps_buckets WHERE target_op > last_op AND name = '$local'", + "SELECT 1 +FROM ps_kv AS target +LEFT JOIN ps_kv AS seen ON seen.key = 'last_seen_checkpoint_request_id' +WHERE target.key = 'local_target_op' + AND CAST(target.value AS INTEGER) > COALESCE(CAST(seen.value AS INTEGER), 0)", )?; if statement.step()? == ResultCode::ROW { diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs index eb7daeea..6530c460 100644 --- a/crates/core/src/sync/sync_status.rs +++ b/crates/core/src/sync/sync_status.rs @@ -180,7 +180,7 @@ impl Serialize for DownloadSyncStatus { } } - let mut serializer = serializer.serialize_struct("DownloadSyncStatus", 4)?; + let mut serializer = serializer.serialize_struct("DownloadSyncStatus", 5)?; serializer.serialize_field("connected", &self.connected)?; serializer.serialize_field("connecting", &self.connecting)?; serializer.serialize_field("priority_status", &self.priority_status)?; diff --git a/dart/benchmark/apply_lines.dart b/dart/benchmark/apply_lines.dart index ee0654b5..c06d361a 100644 --- a/dart/benchmark/apply_lines.dart +++ b/dart/benchmark/apply_lines.dart @@ -17,8 +17,10 @@ void main(List args) { final db = openTestDatabase(); db + ..execute('BEGIN') ..execute('select powersync_init()') - ..execute('select powersync_control(?, null)', ['start']); + ..execute('select powersync_control(?, null)', ['start']) + ..execute('COMMIT'); final stopwatch = Stopwatch()..start(); diff --git a/dart/test/crud_test.dart b/dart/test/crud_test.dart index 5d6d379b..8d13c70a 100644 --- a/dart/test/crud_test.dart +++ b/dart/test/crud_test.dart @@ -249,7 +249,15 @@ void main() { }); }); - test('updates local bucket and updated rows', () { + test('updates local target op and updated rows', () { + // Stale high-water marks (e.g. migrated legacy write checkpoints) must be cleared by a + // local write, so they can't open the apply gate for a smaller new target id. + db.execute(''' +INSERT INTO ps_kv(key, value) VALUES + ('last_seen_checkpoint_request_id', 6), + ('last_applied_checkpoint_request_id', 5); +'''); + db.execute( 'INSERT INTO powersync_crud (op, id, type, data) VALUES (?, ?, ?, ?)', [ @@ -262,12 +270,17 @@ void main() { expect(db.select('SELECT * FROM ps_updated_rows'), [ {'row_type': 'users', 'row_id': 'foo'} ]); - expect(db.select('SELECT * FROM ps_buckets'), [ - allOf( - containsPair('name', r'$local'), - containsPair('target_op', 9223372036854775807), - ) - ]); + expect(db.select(r"SELECT * FROM ps_buckets WHERE name = '$local'"), + isEmpty); + expect( + db.select( + "SELECT key, value FROM ps_kv WHERE key LIKE '%checkpoint_request_id' OR key = 'local_target_op'"), + [ + { + 'key': 'local_target_op', + 'value': 9223372036854775807, + } + ]); }); test('does not require data', () { diff --git a/dart/test/error_test.dart b/dart/test/error_test.dart index 74da34a6..305a91f4 100644 --- a/dart/test/error_test.dart +++ b/dart/test/error_test.dart @@ -78,6 +78,7 @@ void main() { final control = db.prepare(stmt); control.execute(['start', null]); + control.execute(['seed_checkpoint_request_id', null]); expect( () => control.execute(['line_text', 'invalid sync line']), throwsA(isSqliteException( diff --git a/dart/test/goldens/simple_iteration.json b/dart/test/goldens/simple_iteration.json index 32c1db89..e8eb7ff6 100644 --- a/dart/test/goldens/simple_iteration.json +++ b/dart/test/goldens/simple_iteration.json @@ -27,11 +27,17 @@ "include_defaults": true, "subscriptions": [] } - } + }, + "last_checkpoint_request_id": null } } ] }, + { + "operation": "seed_checkpoint_request_id", + "data": null, + "output": [] + }, { "operation": "line_text", "data": { @@ -173,4 +179,4 @@ } ] } -] \ No newline at end of file +] diff --git a/dart/test/goldens/starting_stream.json b/dart/test/goldens/starting_stream.json index 2549905a..b63f2a03 100644 --- a/dart/test/goldens/starting_stream.json +++ b/dart/test/goldens/starting_stream.json @@ -33,9 +33,15 @@ "include_defaults": true, "subscriptions": [] } - } + }, + "last_checkpoint_request_id": null } } ] + }, + { + "operation": "seed_checkpoint_request_id", + "data": null, + "output": [] } -] \ No newline at end of file +] diff --git a/dart/test/migration_test.dart b/dart/test/migration_test.dart index 9203f74f..514cf4da 100644 --- a/dart/test/migration_test.dart +++ b/dart/test/migration_test.dart @@ -97,6 +97,144 @@ void main() { }); } + test('migrates local checkpoint state to ps_kv', () async { + db.execute(fixtures.expectedState[13]!); + db.execute(r''' +INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) +VALUES(1, '$local', 5, 6, 7, 0, 0, 1, 0, 0, 0); +'''); + + db.executeInTx('select powersync_init()'); + + expect(db.select('SELECT key, value FROM ps_kv ORDER BY key'), [ + {'key': 'last_applied_checkpoint_request_id', 'value': 5}, + {'key': 'last_seen_checkpoint_request_id', 'value': 6}, + {'key': 'local_target_op', 'value': 7}, + ]); + expect(db.select(r"SELECT * FROM ps_buckets WHERE name = '$local'"), + isEmpty); + expect( + db.select("PRAGMA table_info('ps_buckets')").map((row) => row['name']), + isNot(contains('target_op')), + ); + expect( + () => db.execute( + r"UPDATE ps_buckets SET target_op = 8 WHERE name = '$local'"), + throwsA(isA()), + ); + }); + + test('does not migrate last applied op as requested checkpoint id', + () async { + db.execute(fixtures.expectedState[13]!); + // Simulate pending local writes during migration. The legacy target op is the max-op + // sentinel, while last_applied_op is the last write checkpoint applied locally. + db.execute(r''' +INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) +VALUES(1, '$local', 5, 6, 9223372036854775807, 0, 0, 1, 0, 0, 0); +'''); + + db.executeInTx('select powersync_init()'); + + // last_applied_op becomes the applied checkpoint id, but it must not seed the requested + // checkpoint counter. The sentinel target is preserved for blocking, but target ops no longer + // seed last_requested_checkpoint_request_id. + expect(db.select('SELECT key, value FROM ps_kv ORDER BY key'), [ + {'key': 'last_applied_checkpoint_request_id', 'value': 5}, + {'key': 'last_seen_checkpoint_request_id', 'value': 6}, + {'key': 'local_target_op', 'value': 9223372036854775807}, + ]); + }); + + test('does not migrate sentinel target op as requested checkpoint id', + () async { + db.execute(fixtures.expectedState[13]!); + db.execute(r''' +INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) +VALUES(1, '$local', 0, 0, 9223372036854775807, 0, 0, 1, 0, 0, 0); +'''); + + db.executeInTx('select powersync_init()'); + + // The max-op sentinel is valid local target state, but target ops no longer seed + // last_requested_checkpoint_request_id. + expect(db.select('SELECT key, value FROM ps_kv ORDER BY key'), [ + {'key': 'local_target_op', 'value': 9223372036854775807}, + ]); + }); + + test('restores local checkpoint state on downgrade', () async { + db.execute(fixtures.finalState); + db.execute(r''' +INSERT INTO ps_kv(key, value) VALUES + ('last_requested_checkpoint_request_id', 7), + ('last_seen_checkpoint_request_id', 6), + ('last_applied_checkpoint_request_id', 5), + ('local_target_op', 7); +'''); + + db.executeInTx('select powersync_test_migration(13)'); + + expect( + db.select( + r"SELECT pending_delete, last_op, last_applied_op, target_op FROM ps_buckets WHERE name = '$local'"), + [ + { + 'pending_delete': 1, + 'last_op': 6, + 'last_applied_op': 5, + 'target_op': 7, + } + ], + ); + }); + + test('re-upgrades after downgrade with checkpoint state', () async { + db.execute(fixtures.finalState); + db.execute(r''' +INSERT INTO ps_kv(key, value) VALUES + ('last_requested_checkpoint_request_id', 7), + ('last_seen_checkpoint_request_id', 6), + ('last_applied_checkpoint_request_id', 5), + ('local_target_op', 7); +'''); + + db.executeInTx('select powersync_test_migration(13)'); + + // Simulate an older SDK advancing the restored $local row while downgraded. + db.execute(r''' +UPDATE ps_buckets + SET last_op = 8, last_applied_op = 8, target_op = 9 + WHERE name = '$local' +'''); + + db.executeInTx('select powersync_init()'); + + // The $local row is the source of truth on re-upgrade; the request counter is unrelated to + // $local and survives the downgrade unchanged. + expect(db.select('SELECT key, value FROM ps_kv ORDER BY key'), [ + {'key': 'last_applied_checkpoint_request_id', 'value': 8}, + {'key': 'last_requested_checkpoint_request_id', 'value': 7}, + {'key': 'last_seen_checkpoint_request_id', 'value': 8}, + {'key': 'local_target_op', 'value': 9}, + ]); + expect(db.select(r"SELECT * FROM ps_buckets WHERE name = '$local'"), + isEmpty); + + final schema = '${getSchema(db)}\n${getMigrations(db)}'; + expect(schema, equals(fixtures.finalState.trim())); + }); + + test('does not restore local bucket without local target on downgrade', + () async { + db.execute(fixtures.finalState); + + db.executeInTx('select powersync_test_migration(13)'); + + expect(db.select(r"SELECT * FROM ps_buckets WHERE name = '$local'"), + isEmpty); + }); + /// Here we apply a developer schema _after_ migrating test('schema after migration', () async { db.execute(fixtures.expectedState[2]!); @@ -209,6 +347,7 @@ void main() { db.execute('begin'); control('start'); + control('seed_checkpoint_request_id'); control( 'line_text', json.encode(checkpoint(lastOpId: 3, buckets: [ diff --git a/dart/test/sync_local_performance_test.dart b/dart/test/sync_local_performance_test.dart index 92ba1c2b..81546158 100644 --- a/dart/test/sync_local_performance_test.dart +++ b/dart/test/sync_local_performance_test.dart @@ -100,6 +100,7 @@ COMMIT; // Start a fake sync client to apply the changes we've already written to // ps_oplog control('start'); + control('seed_checkpoint_request_id'); final lastOpid = db.select('select max(op_id) from ps_oplog').single.columnAt(0) as int; final allBuckets = db diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index e5ba8ac1..21c02561 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -32,7 +32,7 @@ void main() { ['client_id', 'test-test-test-test']); }); - List control(String operation, Object? data) { + List controlRaw(String operation, Object? data) { db.execute('begin'); ResultSet result; @@ -60,6 +60,23 @@ void main() { } } + bool establishesSyncStream(List instructions) { + return instructions.any((instruction) => + instruction is Map && + instruction.containsKey('EstablishSyncStream')); + } + + List control(String operation, Object? data) { + final result = controlRaw(operation, data); + if (operation == 'start' && establishesSyncStream(result)) { + return [ + ...result, + ...controlRaw('seed_checkpoint_request_id', null), + ]; + } + return result; + } + group('default streams', () { syncTest('are created on-demand', (_) { control('start', null); diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 579ece80..93a8a4e1 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -4,12 +4,12 @@ import 'dart:typed_data'; import 'package:bson/bson.dart'; import 'package:file/local.dart'; +import 'package:path/path.dart'; import 'package:sqlite3/common.dart'; import 'package:sqlite3/sqlite3.dart'; import 'package:sqlite3_test/sqlite3_test.dart'; import 'package:test/test.dart'; import 'package:test_descriptor/test_descriptor.dart' as d; -import 'package:path/path.dart'; import 'utils/native_test_utils.dart'; import 'utils/test_utils.dart'; @@ -63,13 +63,28 @@ void _syncTests({ return jsonDecode(row.columnAt(0)); } + bool establishesSyncStream(List instructions) { + return instructions.any((instruction) => + instruction is Map && instruction.containsKey('EstablishSyncStream')); + } + List invokeControl(String operation, Object? data) { + List result; if (matcher.enabled) { // Trace through golden matcher - return matcher.invoke(operation, data); + result = matcher.invoke(operation, data); } else { - return invokeControlRaw(operation, data); + result = invokeControlRaw(operation, data); } + + if (operation == 'start' && establishesSyncStream(result)) { + final seedResult = matcher.enabled + ? matcher.invoke('seed_checkpoint_request_id', null) + : invokeControlRaw('seed_checkpoint_request_id', null); + return [...result, ...seedResult]; + } + + return result; } setUp(() async { @@ -135,6 +150,37 @@ void _syncTests({ return syncLine(checkpointComplete(priority: priority, lastOpId: lastOpId)); } + Object? lastRequestedCheckpointRequestId() { + final rows = db.select( + "SELECT value FROM ps_kv WHERE key = 'last_requested_checkpoint_request_id'"); + return rows.isEmpty ? null : rows.single.columnAt(0); + } + + Object? lastAppliedCheckpointRequestId() { + final rows = db.select( + "SELECT value FROM ps_kv WHERE key = 'last_applied_checkpoint_request_id'"); + return rows.isEmpty ? null : rows.single.columnAt(0); + } + + Object? streamLastCheckpointRequestId(List instructions) { + final instruction = instructions.whereType().firstWhere( + (instruction) => instruction.containsKey('EstablishSyncStream')); + final establish = instruction['EstablishSyncStream'] as Map; + return establish['last_checkpoint_request_id']; + } + + int nextCheckpointRequestId() { + final [instruction] = invokeControl('next_checkpoint_request_id', null); + final data = (instruction as Map)['CheckpointRequestId'] as Map; + return data['request_id'] as int; + } + + Object? probeLocalTargetOp([Object? opId]) { + final [instruction] = invokeControl('local_target_op', opId); + final data = (instruction as Map)['LocalTargetOp'] as Map; + return data['target_op']; + } + ResultSet fetchRows() { return db.select('select * from items'); } @@ -180,12 +226,15 @@ void _syncTests({ }); syncTest('app_metadata is passed to EstablishSyncStream request', (_) { - final startInstructions = invokeControlRaw( - 'start', - json.encode({ - 'app_metadata': {'key1': 'value1', 'key2': 'value2'} - }), - ); + final startInstructions = [ + ...invokeControlRaw( + 'start', + json.encode({ + 'app_metadata': {'key1': 'value1', 'key2': 'value2'} + }), + ), + ...invokeControlRaw('seed_checkpoint_request_id', null), + ]; expect( startInstructions, @@ -318,8 +367,16 @@ void _syncTests({ syncTest('remembers sync state', (controller) { invokeControl('start', null); - pushCheckpoint(buckets: priorityBuckets); - pushCheckpointComplete(); + pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1'); + expect( + pushCheckpointComplete(), + contains( + containsPair( + 'CheckpointRequestApplied', + {'request_id': 1}, + ), + ), + ); controller.elapse(Duration(minutes: 10)); pushCheckpoint(buckets: priorityBuckets); @@ -331,26 +388,25 @@ void _syncTests({ instructions, contains( containsPair( - 'UpdateSyncStatus', - containsPair( - 'status', + 'UpdateSyncStatus', containsPair( - 'priority_status', - [ - { - 'priority': 2, - 'last_synced_at': timestamp(), - 'has_synced': true - }, - { - 'priority': 2147483647, - 'last_synced_at': timestamp(plusMinutes: -10), - 'has_synced': true - }, - ], - ), - ), - ), + 'status', + containsPair( + 'priority_status', + [ + { + 'priority': 2, + 'last_synced_at': timestamp(), + 'has_synced': true + }, + { + 'priority': 2147483647, + 'last_synced_at': timestamp(plusMinutes: -10), + 'has_synced': true + }, + ], + ), + )), ), ); @@ -371,6 +427,190 @@ void _syncTests({ }); }); + syncTest('allocates requested checkpoint request ids', (_) { + invokeControl('start', null); + + expect(nextCheckpointRequestId(), 1); + expect(lastRequestedCheckpointRequestId(), 1); + + expect(nextCheckpointRequestId(), 2); + expect(lastRequestedCheckpointRequestId(), 2); + }); + + syncTest('seeds requested checkpoint request ids from service state', (_) { + final startInstructions = invokeControlRaw('start', null); + expect(streamLastCheckpointRequestId(startInstructions), isNull); + invokeControlRaw('seed_checkpoint_request_id', 41); + + expect(nextCheckpointRequestId(), 42); + expect(lastRequestedCheckpointRequestId(), 42); + + final restartInstructions = invokeControlRaw('start', null); + expect( + restartInstructions, + contains(containsPair('EstablishSyncStream', anything)), + ); + expect(streamLastCheckpointRequestId(restartInstructions), 42); + expect(invokeControlRaw('seed_checkpoint_request_id', 100), isEmpty); + + expect(nextCheckpointRequestId(), 101); + expect(lastRequestedCheckpointRequestId(), 101); + }); + + syncTest('requires checkpoint request state before allocating checkpoint ids', + (_) { + invokeControlRaw('start', null); + + expect( + () => invokeControlRaw('next_checkpoint_request_id', null), + throwsA(isSqliteException( + 21, + contains('Checkpoint request state has not been seeded'), + )), + ); + expect(probeLocalTargetOp(), isNull); + }); + + syncTest('probes and updates local target op without sync iteration', (_) { + expect(probeLocalTargetOp(), isNull); + expect(probeLocalTargetOp(1), isNull); + expect(lastRequestedCheckpointRequestId(), isNull); + expect(probeLocalTargetOp(), 1); + + expect(probeLocalTargetOp(2), 1); + expect(lastRequestedCheckpointRequestId(), isNull); + expect(probeLocalTargetOp(), 2); + }); + + syncTest('accepts text checkpoint request ids for local target op', (_) { + expect(probeLocalTargetOp('1'), isNull); + expect(lastRequestedCheckpointRequestId(), isNull); + expect(probeLocalTargetOp(), 1); + }); + + syncTest('rejects negative local target ops', (_) { + expect( + () => invokeControlRaw('local_target_op', -1), + throwsA(isSqliteException( + 3091, + contains('local target op must be a non-negative integer'), + )), + ); + }); + + syncTest('local target op does not update checkpoint request id', (_) { + invokeControlRaw('start', null); + invokeControlRaw('seed_checkpoint_request_id', 10); + + expect(lastRequestedCheckpointRequestId(), 10); + expect(probeLocalTargetOp(7), isNull); + expect(probeLocalTargetOp(), 7); + expect(lastRequestedCheckpointRequestId(), 10); + }); + + syncTest('does not store target ops as checkpoint request id', (_) { + expect(probeLocalTargetOp(0), isNull); + expect(lastRequestedCheckpointRequestId(), isNull); + expect(probeLocalTargetOp(), isNull); + + expect(probeLocalTargetOp(1), isNull); + expect(lastRequestedCheckpointRequestId(), isNull); + expect(probeLocalTargetOp(), 1); + + expect(probeLocalTargetOp(0), 1); + expect(probeLocalTargetOp(), isNull); + + expect(probeLocalTargetOp(9223372036854775807), isNull); + expect(lastRequestedCheckpointRequestId(), isNull); + expect(probeLocalTargetOp(), 9223372036854775807); + }); + + syncTest('does not persist placeholder checkpoint request id', (_) { + db.execute("insert into items (id, col) values ('local', 'data');"); + + invokeControlRaw('start', null); + + expect(lastRequestedCheckpointRequestId(), isNull); + }); + + syncTest( + 'does not emit applied checkpoint request id for partial checkpoint', + (_) { + invokeControl('start', null); + + pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1'); + final instructions = pushCheckpointComplete(priority: 2); + + expect( + instructions, + isNot(contains(containsPair('CheckpointRequestApplied', anything))), + ); + expect(lastAppliedCheckpointRequestId(), isNull); + + final [row] = db.select('select powersync_offline_sync_status();'); + expect( + json.decode(row[0]), + isNot(containsPair('last_applied_checkpoint_request_id', anything)), + ); + }, + ); + + syncTest('emits applied checkpoint request id for full checkpoint', (_) { + invokeControl('start', null); + + pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1'); + final appliedInstructions = pushCheckpointComplete(); + + expect( + appliedInstructions, + contains( + containsPair( + 'CheckpointRequestApplied', + {'request_id': 1}, + ), + ), + ); + expect(lastAppliedCheckpointRequestId(), 1); + + pushCheckpoint(buckets: priorityBuckets); + final instructions = pushCheckpointComplete(); + + expect( + instructions, + isNot(contains(containsPair('CheckpointRequestApplied', anything))), + ); + + final [row] = db.select('select powersync_offline_sync_status();'); + expect( + json.decode(row[0]), + isNot(containsPair('last_applied_checkpoint_request_id', anything)), + ); + + expect( + db.select(r"SELECT * FROM ps_buckets WHERE name = '$local'"), isEmpty); + }); + + syncTest('local writes clear checkpoint request high-water marks', (_) { + invokeControl('start', null); + + pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '5'); + pushCheckpointComplete(); + expect(lastAppliedCheckpointRequestId(), 5); + + // A local write can only be acknowledged by a checkpoint request id observed after it. Stale + // seen/applied values (which may come from another id namespace, like migrated legacy write + // checkpoints) must not remain to open the apply gate for a smaller new target id. + db.execute("insert into items (id, col) values ('local', 'data');"); + + expect(lastAppliedCheckpointRequestId(), isNull); + expect( + db.select( + "SELECT 1 FROM ps_kv WHERE key = 'last_seen_checkpoint_request_id'"), + isEmpty, + ); + expect(probeLocalTargetOp(), 9223372036854775807); + }); + test('clearing database clears sync status', () { invokeControl('start', null); pushCheckpoint(buckets: priorityBuckets); @@ -405,15 +645,18 @@ void _syncTests({ final request = invokeControl('start', null); expect( request, - contains(containsPair( - 'EstablishSyncStream', - { - // Should request state from before clear - 'request': containsPair('buckets', [ - {'name': 'a', 'after': '1'} - ]), - }, - )), + contains( + containsPair( + 'EstablishSyncStream', + containsPair( + // Should request state from before clear + 'request', + containsPair('buckets', [ + {'name': 'a', 'after': '1'} + ]), + ), + ), + ), ); pushCheckpoint(buckets: [bucketDescription('a', count: 1)]); @@ -816,7 +1059,7 @@ void _syncTests({ ]); // Now complete the upload process. - db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'"); + probeLocalTargetOp(1); invokeControl('completed_upload', null); // This should apply the pending write checkpoint. @@ -832,8 +1075,9 @@ void _syncTests({ // Complete upload process db.execute('DELETE FROM ps_crud'); - db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'"); + probeLocalTargetOp(1); expect(invokeControl('completed_upload', null), isEmpty); + expect(lastRequestedCheckpointRequestId(), 0); // Sync afterwards containing data and write checkpoint. pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1'); @@ -861,7 +1105,7 @@ void _syncTests({ ]); // Now the upload is complete and requests a write checkpoint - db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'"); + probeLocalTargetOp(1); expect(invokeControl('completed_upload', null), isEmpty); // Which triggers a new iteration @@ -898,7 +1142,7 @@ void _syncTests({ db.execute("insert into items (id, col) values ('local2', 'data2');"); // Now the upload is complete and requests a write checkpoint - db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'"); + probeLocalTargetOp(1); expect(invokeControl('completed_upload', null), [ containsPair('LogLine', { 'severity': 'WARNING', diff --git a/dart/test/utils/fix_035_fixtures.dart b/dart/test/utils/fix_035_fixtures.dart index fa912e89..0b600a90 100644 --- a/dart/test/utils/fix_035_fixtures.dart +++ b/dart/test/utils/fix_035_fixtures.dart @@ -18,9 +18,9 @@ const dataBroken = ''' /// Data after applying the migration fix, but before sync_local const dataMigrated = ''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES - (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0), - (2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0, 0) +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES + (1, 'b1', 0, 0, 0, 120, 0, 0, 0, 0), + (2, 'b2', 0, 0, 0, 3, 0, 0, 0, 0) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES (1, 1, 'todos', 't1', '', '{}', 100), (1, 2, 'todos', 't2', '', '{}', 20), @@ -39,9 +39,9 @@ const dataMigrated = ''' /// Data after applying the migration fix and sync_local const dataFixed = ''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES - (1, 'b1', 3, 3, 0, 0, 120, 0, 1, 0, 0), - (2, 'b2', 3, 3, 0, 0, 3, 0, 1, 0, 0) +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES + (1, 'b1', 3, 3, 0, 120, 0, 1, 0, 0), + (2, 'b2', 3, 3, 0, 3, 0, 1, 0, 0) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES (1, 1, 'todos', 't1', '', '{}', 100), (1, 2, 'todos', 't2', '', '{}', 20), diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index 9566a182..7f6e6ba8 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,9 +1,12 @@ /// The current database version -const databaseVersion = 13; +const databaseVersion = 14; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. -const expectedState = { +final expectedState = _expectedState(); + +Map _expectedState() { + final state = { 2: r''' ;CREATE TABLE ps_buckets( name TEXT PRIMARY KEY, @@ -537,12 +540,19 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DROP TABLE ps_stream_subscriptions"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(12, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN downloaded_size"},{"sql":"DELETE FROM ps_migration WHERE id >= 12"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(13, '[{"sql":"UPDATE ps_stream_subscriptions SET expires_at = expires_at / 1000000, last_synced_at = last_synced_at / 1000000"},{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL PRIMARY KEY,\n last_synced_at TEXT NOT NULL\n) STRICT;"},{"sql":"INSERT INTO ps_sync_state (priority, last_synced_at) SELECT priority, datetime(last_synced_at / 1000000, ''unixepoch'') FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 13"}]')''', -}; + }; + state[14] = '''${state[13]!.trim().replaceFirst(' target_op INTEGER NOT NULL DEFAULT 0,\n', '')} +;INSERT INTO ps_migration(id, down_migrations) VALUES(14, '[{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_14"},{"sql":"DROP INDEX ps_buckets_name"},{"sql":"CREATE TABLE ps_buckets(\\n id INTEGER PRIMARY KEY,\\n name TEXT NOT NULL,\\n last_applied_op INTEGER NOT NULL DEFAULT 0,\\n last_op INTEGER NOT NULL DEFAULT 0,\\n target_op INTEGER NOT NULL DEFAULT 0,\\n add_checksum INTEGER NOT NULL DEFAULT 0,\\n op_checksum INTEGER NOT NULL DEFAULT 0,\\n pending_delete INTEGER NOT NULL DEFAULT 0\\n) STRICT"},{"sql":"CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)"},{"sql":"ALTER TABLE ps_buckets ADD COLUMN count_at_last INTEGER NOT NULL DEFAULT 0"},{"sql":"ALTER TABLE ps_buckets ADD COLUMN count_since_last INTEGER NOT NULL DEFAULT 0"},{"sql":"ALTER TABLE ps_buckets ADD COLUMN downloaded_size INTEGER NOT NULL DEFAULT 0"},{"sql":"INSERT INTO ps_buckets(\\n id,\\n name,\\n last_applied_op,\\n last_op,\\n add_checksum,\\n op_checksum,\\n pending_delete,\\n count_at_last,\\n count_since_last,\\n downloaded_size\\n)\\nSELECT\\n id,\\n name,\\n last_applied_op,\\n last_op,\\n add_checksum,\\n op_checksum,\\n pending_delete,\\n count_at_last,\\n count_since_last,\\n downloaded_size\\nFROM ps_buckets_14"},{"sql":"DROP TABLE ps_buckets_14"},{"sql":"INSERT INTO ps_buckets(name, pending_delete, last_op, last_applied_op, target_op)\\nSELECT ''\$local'', 1, seen, applied, target\\n FROM (\\n SELECT\\n IFNULL((SELECT CAST(value AS INTEGER) FROM ps_kv WHERE key = ''last_seen_checkpoint_request_id''), 0) AS seen,\\n IFNULL((SELECT CAST(value AS INTEGER) FROM ps_kv WHERE key = ''last_applied_checkpoint_request_id''), 0) AS applied,\\n (SELECT CAST(value AS INTEGER) FROM ps_kv WHERE key = ''local_target_op'') AS target\\n )\\n WHERE EXISTS (\\n SELECT 1 FROM ps_kv WHERE key = ''local_target_op''\\n )\\nON CONFLICT(name) DO UPDATE SET\\n pending_delete = excluded.pending_delete,\\n last_op = excluded.last_op,\\n last_applied_op = excluded.last_applied_op,\\n target_op = excluded.target_op"},{"sql":"DELETE FROM ps_migration WHERE id >= 14"}]')'''; + return state; +} final finalState = expectedState[databaseVersion]!; /// data to test "up" migrations -const data1 = { +final data1 = _data1(); + +Map _data1() { + final data = { 2: r''' ;INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, pending_delete) VALUES ('b1', 0, 0, 0, 0, 0), @@ -672,7 +682,20 @@ const data1 = { ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') ''' -}; + }; + data[14] = r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES + (1, 'b1', 0, 0, 0, 120, 0, 0, 0, 0), + (2, 'b2', 0, 0, 1005, 3, 0, 0, 0, 0) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') +'''; + return data; +} /// data to test "down" migrations /// This is slightly different from the above, @@ -719,6 +742,7 @@ final dataDown1 = { 10: data1[9]!, 11: data1[10]!, 12: data1[12]!, + 13: data1[13]!, }; final finalData1 = data1[databaseVersion]!; diff --git a/dart/test/utils/test_utils.dart b/dart/test/utils/test_utils.dart index 943f632d..8c6bd1fc 100644 --- a/dart/test/utils/test_utils.dart +++ b/dart/test/utils/test_utils.dart @@ -23,7 +23,10 @@ Object stream(String name, bool isDefault, {List errors = const []}) { } /// Creates a `checkpoint_complete` or `partial_checkpoint_complete` line. -Object checkpointComplete({int? priority, String lastOpId = '1'}) { +Object checkpointComplete({ + int? priority, + String lastOpId = '1', +}) { return { priority == null ? 'checkpoint_complete' : 'partial_checkpoint_complete': { 'last_op_id': lastOpId, diff --git a/docs/historic-write-checkpoints.md b/docs/historic-write-checkpoints.md new file mode 100644 index 00000000..dd697108 --- /dev/null +++ b/docs/historic-write-checkpoints.md @@ -0,0 +1,208 @@ +# Write Checkpointing + +The general flow for mutations is as follows. + +A client makes a write to a table/view. Triggers are used to populate the `ps_crud` table with an entry for the operation. Every local write marks the `$local` bucket in `ps_buckets` as having a `target_op` of the maximum i64 value - this effectively blocks incoming synced checkpoints from being applied. + +```sql +INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID}) +``` + +A connected client SDK monitors the `ps_crud` table - or the sync state machine triggers CRUD uploads when ready. The user's `uploadData` gets CRUD transactions with `getNextCrudTransaction` or some equivalent method. Calling the `complete` method on a CRUD transaction-like object will: + +- Remove the entries from `ps_crud` +- Depending on the write checkpoint method used: + - Optionally apply a custom_write_checkpoint as the target_op - ONLY if the `ps_crud` queue is empty + - Ensure that the `target_op` is at the MAX_OP_ID + +```TypeScript + await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE id <= ?`, [lastClientId]); + if (writeCheckpoint) { + const check = await tx.execute(`SELECT 1 FROM ${PSInternalTable.CRUD} LIMIT 1`); + if (!check.rows?.length) { + await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='$local'`, [ + writeCheckpoint + ]); + } + } else { + await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='$local'`, [ + this.bucketStorageAdapter.getMaxOpId() + ]); + } +``` + +Once all the uploads have completed, the Sync implementation will attempt to update the local target. + +```typescript +// private async _uploadAllCrud(signal: AbortSignal): Promise { + +// AbstractStreamingSyncImplementation.ts line 418 +// Uploading is completed +const neededUpdate = await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint()); + +// ... +// } + +// SqliteBucketStorage.ts line 67 +// async updateLocalTarget(cb: () => Promise): Promise { +const rs1 = await this.db.getAll( + "SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = CAST(? as INTEGER)", + [MAX_OP_ID] +); + +// If the target op is not the MAX_OP_ID (it's a concrete checkpoint ID) +// Then: Don't fetch a new write checkpoint from the service, leave it as is. +// This essentially caters for the custom write checkpoint case where a concrete `writeCheckpoint` +// is set after all items have been uploaded. +// In the managed checkpoint flow, the target_op should be the MAX_OP_ID here. +if (!rs1.length) { + // Nothing to update + return false; +} + +// The logic below tries to ensure that no uploads happened in-between async operations, +// like fetching a write-checkpoint from the PowerSync service +const rs = await this.db.getAll<{ seq: number }>("SELECT seq FROM main.sqlite_sequence WHERE name = 'ps_crud'"); +if (!rs.length) { + // Nothing to update + return false; +} + +const seqBefore: number = rs[0]['seq']; + +// This callback usually connects to the PowerSync service write-checkpoint2.json endpoint +const opId = await cb(); + +// Now we apply the target_op, only if no other CRUD items have dirtied the local state meanwhile. +return this.writeTransaction(async (tx) => { + const anyData = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1'); + if (anyData.rows?.length) { + // if isNotEmpty + this.logger.debug(`New data uploaded since write checkpoint ${opId} - need new write checkpoint`); + return false; + } + + const rs = await tx.execute("SELECT seq FROM main.sqlite_sequence WHERE name = 'ps_crud'"); + if (!rs.rows?.length) { + // assert isNotEmpty + throw new Error('SQLite Sequence should not be empty'); + } + + const seqAfter: number = rs.rows?.item(0)['seq']; + if (seqAfter != seqBefore) { + this.logger.debug( + `New data uploaded since write checkpoint ${opId} - need new write checkpoint (sequence updated)` + ); + + // New crud data may have been uploaded since we got the checkpoint. Abort. + return false; + } + + this.logger.debug(`Updating target write checkpoint to ${opId}`); + await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [opId]); + return true; +}); +// } +``` + +Concurrently, the streaming sync implementation is reading checkpoints from the PowerSync service `/sync/stream/` endpoint. The PowerSync service reports which checkpoints are associated with a corresponding write checkpoint. + +The client does not publish guarded changes as soon as it sees that value. After a full checkpoint has completed and its checksums have been validated, `sync_local` stores the checkpoint's `write_checkpoint` as `$local.last_op`. Incoming changes can then be applied locally only when `$local.target_op <= $local.last_op` and the `ps_crud` queue is empty. Partial priority 0 syncs are the exception: they may publish while uploads are outstanding. + +```Rust +// sync_local.rs + +fn can_apply_sync_changes(&self) -> Result { + // Don't publish downloaded data until the upload queue is empty (except for downloaded data + // in priority 0, which is published earlier). + + let needs_check = match &self.partial { + Some(p) => !p.priority.may_publish_with_outstanding_uploads(), + None => true, + }; + + if needs_check { + // language=SQLite + let statement = self.db.prepare_v2( + "SELECT 1 FROM ps_buckets WHERE target_op > last_op AND name = '$local'", + )?; + + if statement.step()? == ResultCode::ROW { + return Ok(false); + } + + let statement = self.db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?; + if statement.step()? != ResultCode::DONE { + return Ok(false); + } + } + + Ok(true) + } + +// storage_adapter.rs + +pub fn sync_local( + // ... +) { + +// ... + if let (None, Some(write_checkpoint)) = (&priority, &checkpoint.write_checkpoint) { + update_bucket.bind_int64(1, *write_checkpoint)?; + update_bucket.bind_text(2, "$local", sqlite::Destructor::STATIC)?; + update_bucket.exec()?; + } + +// ... +} +``` + +## The $local bucket + +`$local` is a special row in `ps_buckets` used to track whether downloaded changes are safe to +publish while local writes are being uploaded. It is stored in the same table as real sync buckets, +but it is not sent to the sync service as a bucket request. + +- `target_op`: The write checkpoint that must be reached before guarded upstream changes may be + published locally. Local writes set this to `MAX_OP_ID`; custom write checkpoints or managed + write-checkpoint requests replace it with a concrete checkpoint id once the relevant upload has + completed. + + ```sql + -- Local writes block guarded publishes until a concrete write checkpoint is known. + INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) + VALUES('$local', 0, {MAX_OP_ID}); + + -- After upload completion, custom or managed checkpointing stores the target checkpoint. + UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name = '$local'; + ``` + +- `last_op`: The latest write checkpoint observed from the sync service. This is updated from + `checkpoint.write_checkpoint` when a full checkpoint is validated. + + ```rust + let update_bucket = self.db.prepare_v2("UPDATE ps_buckets SET last_op = ? WHERE name = ?")?; + + if let (None, Some(write_checkpoint)) = (&priority, &checkpoint.write_checkpoint) { + update_bucket.bind_int64(1, *write_checkpoint)?; + update_bucket.bind_text(2, "$local", sqlite::Destructor::STATIC)?; + update_bucket.exec()?; + } + ``` + +- `last_applied_op`: The latest write checkpoint whose guarded changes have actually been published + locally. This advances to `last_op` after a full `sync_local` apply succeeds. + + ```sql + UPDATE ps_buckets + SET last_applied_op = last_op + WHERE last_applied_op != last_op; + ``` + +The apply gate checks `$local.target_op > $local.last_op` before publishing full checkpoints and +non-priority-0 partial checkpoints. It also checks that `ps_crud` is empty. This means downloaded +changes remain buffered until the client has both uploaded local CRUD and seen the corresponding +write checkpoint in the sync stream. + +Clearing the database removes `$local`: a hard clear deletes all rows from `ps_buckets`, while a +soft clear deletes only the `$local` row and keeps reusable remote bucket state. diff --git a/docs/schema.md b/docs/schema.md index f7dfbde3..4a6462b9 100644 --- a/docs/schema.md +++ b/docs/schema.md @@ -15,8 +15,8 @@ A bucket is instantiated for every row returned by a parameter query in a [bucke Clients create entries in `ps_buckets` when receiving a checkpoint message from the sync service, they are also responsible for removing buckets that are no longer relevant to the client. -There is also a special `$local` bucket representing pending -uploads. +Older schema versions also used a special `$local` bucket to represent pending uploads. Current +schema versions keep that local write gate in `ps_kv` instead. We store the following information in `ps_buckets`: @@ -24,12 +24,17 @@ We store the following information in `ps_buckets`: 2. `name`: The name of the bucket as received from the sync service. 3. `last_applied_op`: The last operation id that has been verified and published to views (meaning that it was part of a checkpoint and that we have validated its checksum). -4. `target_op`: Only used for `$local`. TODO: Document further. -5. `add_checksum`: TODO: Document further. -6. `op_checksum`: TODO: Document further. -7. `pending_delete`: TODO: Appears to be unused, document further. -8. `count_at_last`: The amount of operations in the bucket at the last verified checkpoint. -9. `count_since_last`: The amount of operations downloaded since the last verified checkpoint. +4. `add_checksum`: TODO: Document further. +5. `op_checksum`: TODO: Document further. +6. `pending_delete`: TODO: Appears to be unused, document further. +7. `count_at_last`: The amount of operations in the bucket at the last verified checkpoint. +8. `count_since_last`: The amount of operations downloaded since the last verified checkpoint. + +Schema version 14 removes the legacy `target_op` column after migrating `$local.target_op` to +`ps_kv.local_target_op`, and deletes the `$local` row so `ps_buckets` only contains real sync +buckets. This makes older SDKs fail with a hard SQLite error if they try to keep using the migrated +database without downgrading. The down migration restores `target_op` and recreates the `$local` +row from `ps_kv` for older schema versions. ## `ps_crud` diff --git a/docs/sync.md b/docs/sync.md index 8737cee0..929e1a35 100644 --- a/docs/sync.md +++ b/docs/sync.md @@ -10,7 +10,7 @@ The function should always be called in a transaction. The following commands are supported: 1. `start`: Payload is a JSON-encoded object. This requests the client to start a sync iteration. - The payload can either be `null` or an JSON object with: + The payload can either be `null` or a JSON object with: - An optional `parameters: Record` entry, specifying parameters to include in the request to the sync service. - A `schema: { tables: Table[], raw_tables: RawTable[] }` entry specifying the schema of the database to @@ -26,16 +26,79 @@ The following commands are supported: - The client will emit an instruction to stop the current stream, clients should restart by sending another `start` command. 6. `completed_upload`: Notify the sync implementation that all local changes have been uploaded. -7. `update_subscriptions`: Notify the sync implementation that subscriptions which are currently active in the app - have changed. Depending on the TTL of caches, this may cause it to request a reconnect. +7. `update_subscriptions`: Payload is a JSON-encoded array of + `{name: string, params: Record}`. Notify the sync implementation that subscriptions + which are currently active in the app have changed. Depending on the TTL of caches, this may + cause it to request a reconnect. 8. `connection`: Notify the sync implementation about the connection being opened (second parameter should be `established`) or the HTTP stream closing (second parameter should be `end`). This is used to set `connected` to true in the sync status without waiting for the first sync line. -9. `subscriptions`: Store a new sync steam subscription in the database or remove it. +9. `subscriptions`: Store a new sync stream subscription in the database or remove it. This command can run outside of a sync iteration and does not affect it. -10. `update_subscriptions`: Second parameter is a JSON-encoded array of `{name: string, params: Record}`. - If a new subscription is created, or when a subscription without a TTL has been removed, the client will ask to - restart the connection. +10. `next_checkpoint_request_id`: No payload. During an active sync iteration after checkpoint + request state exists locally, allocates and returns the next checkpoint request id in a + `CheckpointRequestId` instruction. +11. `local_target_op`: Payload is `null`, an integer, or an integer string. Probes, updates or + clears the local target op and returns the previously-observed value in a `LocalTargetOp` + result. This command can run outside of a sync iteration and does not affect it. +12. `seed_checkpoint_request_id`: Payload is `null`, an integer, or an integer string. After + receiving `EstablishSyncStream`, SDKs should reconcile the provided local hint with the service + checkpoint-request state on every connection attempt. This can bump core when the service is + ahead, or restore the service-side value when the service has cleared stale state but core still + has a local hint. Then seed core with the reconciled value. `NULL` means neither side has a + record for the client yet; core stores `0` only when no local seed exists. Integer seeds use + `max(local, service)` semantics so the local counter never moves backwards while either side + still remembers the value. If both the client and service have lost the value, the counter may + restart. + +When uploads request a write checkpoint, SDKs should call +`powersync_control('next_checkpoint_request_id', NULL)` inside a transaction to allocate the id to +pass to the request-checkpoint API. In checkpoint-request mode, the SDK should first allocate the id, +then post that id to the service, and then call `powersync_control('local_target_op', id)` with the +same id once the service accepts the request. This sets the local target op to the request op, +replacing the pending-write sentinel with the concrete checkpoint request id that the sync stream +can satisfy. `next_checkpoint_request_id` only advances the request counter; it does not update the +local target op used to block applying downloaded rows. + +`powersync_control('local_target_op', op_id)` probes and optionally updates the internal local +target op. The same command is used for compatibility when a new SDK is used with an older +PowerSync service that does not yet support client-created checkpoint requests; after the +service-side write checkpoint request returns a concrete id, call +`powersync_control('local_target_op', id)` with that id. Passing `NULL` returns the current target +without changing it, and passing `0` clears the local target. This command only updates the apply +gate; it does not allocate, seed, or overwrite `last_requested_checkpoint_request_id`. + +Database migration v14 moves legacy `$local` checkpoint state into `ps_kv`: `$local.last_applied_op` +becomes `last_applied_checkpoint_request_id`, `$local.last_op` becomes the internal +`last_seen_checkpoint_request_id`, and any positive `$local.target_op` is stored as +`local_target_op`. A concrete `$local.target_op` could be used to seed +`last_requested_checkpoint_request_id`, but it should be redundant because SDKs reconcile the +request counter with the service on connect. The migration then deletes the `$local` row, leaving +only real sync buckets in `ps_buckets`, and drops `ps_buckets.target_op` so +older SDKs fail hard if they try to keep using the migrated database directly. Downgrading restores +the column, and restores a `$local` row only when `local_target_op` exists, so older SDKs can keep +using target-op based blocking without inventing a synthetic local bucket when there was no local +target state. Because the down migration keeps the `ps_kv` keys around, the up migration clears +them before copying, so re-upgrading a downgraded database takes the `$local` row (including any +progress an older SDK made) as the source of truth instead of failing on the existing keys. + +`last_requested_checkpoint_request_id` is internal allocation state used by +`next_checkpoint_request_id` to allocate increasing ids for client-created checkpoint requests. +`last_seen_checkpoint_request_id` and `last_applied_checkpoint_request_id` are high-water marks +that local writes clear, so only checkpoint request ids observed after a write count towards the +apply gate. SDKs should use `CheckpointRequestApplied` instructions for explicit checkpoint +request waits instead of presenting these values as meaningful sync progress. + +If `local_target_op` is absent after migration, there is no local write gate waiting for a +checkpoint. In that case, SDKs can start client-created checkpoint requests normally, even when +`last_requested_checkpoint_request_id` is undefined and the first allocated id is `1`. + +The ambiguous migration case is a migrated `local_target_op` of max op id: local writes are +pending, but there is no concrete request id to wait for yet. The max-op sentinel may also cover +earlier pending uploads that were already associated with legacy service-created write checkpoints. +In that state, create one old-style write checkpoint first, store the returned concrete id with +`powersync_control('local_target_op', id)`, let that gate resolve, and then switch to +client-created checkpoint requests after the request counter has been reconciled on connect. `powersync_control` returns a JSON-encoded array of instructions for the client: @@ -44,6 +107,9 @@ type Instruction = { LogLine: LogLine } | { UpdateSyncStatus: UpdateSyncStatus } | { EstablishSyncStream: EstablishSyncStream } | { FetchCredentials: FetchCredentials } + | { CheckpointRequestId: { request_id: number } } + | { CheckpointRequestApplied: { request_id: number } } + | { LocalTargetOp: { target_op: null | number } } // Close a connection previously started after EstablishSyncStream | { CloseSyncStream: { hide_disconnect: boolean } } // For the Dart web client, flush the (otherwise non-durable) file system. @@ -58,8 +124,14 @@ interface LogLine { } // Instructs client SDKs to open a connection to the sync service. +// last_checkpoint_request_id is core's local counter value before this stream request. On connect, +// SDKs can use it to re-request this client's last checkpoint request state from the service, then +// call powersync_control('seed_checkpoint_request_id', value) with the actual response for +// reconciliation. `value` may be null when the service has no checkpoint request state for this +// client. interface EstablishSyncStream { request: any // The JSON-encoded StreamingSyncRequest to send to the sync service + last_checkpoint_request_id: null | number } // Instructs SDKS to update the downloading state of their SyncStatus. @@ -68,6 +140,13 @@ interface UpdateSyncStatus { connecting: boolean, priority_status: [], downloading: null | DownloadProgress, + streams: [], +} + +// Emitted when a full checkpoint with a write_checkpoint has been applied locally. +// SDKs can use this to resolve pending CheckpointRequest waiters. +interface CheckpointRequestApplied { + request_id: number, } // Instructs SDKs to refresh credentials from the backend connector. diff --git a/docs/write-checkpoint-requests.md b/docs/write-checkpoint-requests.md new file mode 100644 index 00000000..bc3d34b2 --- /dev/null +++ b/docs/write-checkpoint-requests.md @@ -0,0 +1,339 @@ +# Write Checkpoint State in `ps_kv` + +The new write checkpoint logic moves the historic `$local` bucket bookkeeping into `ps_kv`. +`ps_buckets` now tracks real sync buckets, while local upload gating and checkpoint-request +progress are represented as key/value state. + +At a high level: + +- `local_target_op` replaces `$local.target_op` as the local write apply gate. +- `last_seen_checkpoint_request_id` replaces `$local.last_op`. +- `last_applied_checkpoint_request_id` replaces `$local.last_applied_op`. +- `last_requested_checkpoint_request_id` tracks the latest concrete checkpoint request id allocated + by the client, so `next_checkpoint_request_id` can allocate increasing ids for each checkpoint + request. + +These keys are internal SDK/core state, not user-facing sync progress. +`last_requested_checkpoint_request_id` is functional allocation state, while +`last_seen_checkpoint_request_id` and `last_applied_checkpoint_request_id` are mostly diagnostic +high-water marks. Explicit checkpoint waits should follow `CheckpointRequestApplied` instructions. + +SDKs should not write these keys directly. They update the local target through +`powersync_control('local_target_op', value)`, which is the shared helper for both legacy write +checkpoints and new client-created checkpoint requests. The +`powersync_control('next_checkpoint_request_id', NULL)` command only allocates a checkpoint request +id; after the service accepts that request, the SDK uses +`powersync_control('local_target_op', id)` to make the accepted id the local target for write +checkpoints. + +For the historic `$local` bucket flow, see `historic-write-checkpoints.md`. + +## Local writes + +A client write to a synced table/view records an entry in `ps_crud`. For simple CRUD triggers, the +same transaction also records the affected row in `ps_updated_rows` and sets `local_target_op` to +the maximum i64 value. This is the `ps_kv` equivalent of the old `$local.target_op` sentinel: it +means "there are local writes, but we do not yet know the concrete checkpoint id that will +acknowledge them". + +The sentinel is stored in `ps_kv`, not in `ps_buckets`. The same statement clears the +`last_seen_checkpoint_request_id` and `last_applied_checkpoint_request_id` high-water marks: + +```sql +INSERT OR REPLACE INTO ps_kv(key, value) +VALUES('local_target_op', MAX_OP_ID); + +DELETE FROM ps_kv +WHERE key IN ('last_seen_checkpoint_request_id', 'last_applied_checkpoint_request_id'); +``` + +Clearing the high-water marks mirrors how the legacy flow reset the whole `$local` row on local +writes. A checkpoint request id observed before the write cannot acknowledge it, and a stale seen +value may even come from an incompatible id namespace — a legacy write checkpoint migrated by v14, +or state from before a request counter restart. If such a value stayed around, a newly allocated +target id could compare below it and open the apply gate before the service acknowledged the write. +After a local write, only checkpoint request ids observed from that point on count towards the gate. + +## Completing uploaded CRUD + +SDK upload code removes uploaded items from `ps_crud`. If the connector supplies a legacy custom +write checkpoint and the queue is empty, that concrete checkpoint becomes the local target +immediately. +Otherwise the target is reset to `MAX_OP_ID`, allowing the sync client to create a standard +checkpoint request after the queue drains. + +```text +transaction { + deleteUploadedCrud(upTo: lastUploadedId) + + if let customCheckpoint, crudQueueIsEmpty { + powersync_control('local_target_op', customCheckpoint) + } else { + powersync_control('local_target_op', MAX_OP_ID) + } +} +``` + +## Updating the local target + +Once uploads are complete, the sync client updates the local target through +`powersync_control('local_target_op', value)`. It only does this when the current target is still +`MAX_OP_ID`, which avoids overwriting a custom checkpoint that was already stored by +`complete(writeCheckpoint:)`. + +The SDK implementation: + +1. Probes the current target with `powersync_control('local_target_op', NULL)`. +2. Reads `sqlite_sequence.seq` for `ps_crud`. +3. Gets a concrete checkpoint id from either the new or legacy service API. +4. Re-enters a write transaction. +5. Verifies that `ps_crud` is still empty and that its sequence did not change. +6. Stores the concrete target with `powersync_control('local_target_op', opId)`. + +```text +let previousTarget = transaction { + powersync_control('local_target_op', NULL).LocalTargetOp.target_op +} + +if previousTarget == MAX_OP_ID { + let seqBefore = psCrudSequence() + let checkpointId = await createOrFetchCheckpointId() + + transaction { + guard ps_crud.isEmpty && psCrudSequence() == seqBefore else { + return + } + + powersync_control('local_target_op', checkpointId) + } +} +``` + +In checkpoint-request mode, `getWriteCheckpoint()` calls `requestCheckpoint()`. That allocates an +id locally, sends it to `/sync/checkpoint-request`, and returns the same id once the service accepts +the request. Only then does the upload path store that id as `local_target_op` with +`powersync_control('local_target_op', id)`. + +```text +let requestId = transaction { + powersync_control('next_checkpoint_request_id', NULL).CheckpointRequestId.request_id +} + +POST /sync/checkpoint-request { + client_id, + checkpoint_request_id: requestId +} + +return requestId +``` + +The legacy fallback still calls `/write-checkpoint2.json`; the returned write checkpoint is stored +through the same `powersync_control('local_target_op', opId)` helper. This keeps SDK target updates +consistent across both protocols. + +## Sync control commands + +These `powersync_control` commands are the SDK-facing API for the new `ps_kv` checkpoint state. + +`powersync_control('start', payload)` begins a sync iteration and emits `EstablishSyncStream` with a +`last_checkpoint_request_id` hint. This is core's local `last_requested_checkpoint_request_id` value +before opening the stream, or `NULL` when no local seed exists. On every connection attempt, SDKs +should reconcile this hint with the service checkpoint-request state before creating new requests. +The reconciliation is bidirectional: if the service still has a higher value, the SDK uses that +response to bump core locally so following requests are accepted; if the service has cleared stale +state but core still has a local hint, the SDK can use the hint to restore the service-side value. +After reconciliation, call `powersync_control('seed_checkpoint_request_id', value)` with the +reconciled value. + +`last_requested_checkpoint_request_id` is a best-effort counter seed, not durable application state. +If either the client or the service still remembers a higher id, the reconciliation response plus +core's `max(local, service)` seeding keeps the local counter moving forward. If the service has +cleared stale state but the client still has a local seed, the local hint can restore the +service-side value and keep the counter from moving backwards. If the client lost local state but +the service still has a record, the service response restores the seed locally. If both sides have +lost the value, it is acceptable for the counter to restart; this can happen after local state is +cleared and stale service state expires, or when multiple user ids share the same client id. After +reconciliation, `value` may be `NULL` when neither side has a record for the client; core stores `0` +only when no local seed exists. SDKs may also refresh service state when their user/client context +changes. + +`powersync_control('next_checkpoint_request_id', NULL)` must be called inside a transaction during +an active sync iteration after `last_requested_checkpoint_request_id` exists locally. It increments +and returns `last_requested_checkpoint_request_id` in a `CheckpointRequestId` instruction. + +```sql +INSERT INTO ps_kv(key, value) +VALUES('last_requested_checkpoint_request_id', 1) +ON CONFLICT(key) DO UPDATE SET value = CAST(value AS INTEGER) + 1 +RETURNING value; +``` + +This command only allocates an id. It does not update `local_target_op`. + +Note on sequences: SQLite does not have standalone sequences. The sequence-like alternatives are +either an `AUTOINCREMENT` table backed by SQLite's internal `sqlite_sequence`, or a dedicated +single-row counter table like the existing `ps_tx` transaction counter. The checkpoint request +counter currently lives in `ps_kv` so it can persist across requests and be reconciled with service +state on connect. If we want stricter structure later, a dedicated checkpoint-request counter table +would be the closest match to a sequence. + +`powersync_control('local_target_op', op_id)` probes and optionally updates the local target. Like +`subscriptions`, this command is handled directly by `powersync_control` and can run outside an +active sync iteration: + +- `NULL` returns the current `local_target_op` without changing it. +- `0` clears `local_target_op`. +- A positive value stores `local_target_op`. +- Negative values and non-integer inputs are rejected. + +This command only updates the apply gate. It does not allocate, seed, or overwrite +`last_requested_checkpoint_request_id`. + +The command returns the previous target value in a `LocalTargetOp` result, or `NULL` if there was no +target. + +```text +previous = ps_kv['local_target_op'] + +if target_op == NULL: + return previous +if target_op == 0: + delete ps_kv['local_target_op'] +else: + ps_kv['local_target_op'] = target_op + +return previous +``` + +## Applying downloaded checkpoints + +The sync stream reports the checkpoint request id in `checkpoint.write_checkpoint`. After a full +checkpoint validates, core persists it as `last_seen_checkpoint_request_id`. + +```text +on full checkpoint with write_checkpoint: + ps_kv['last_seen_checkpoint_request_id'] = checkpoint.write_checkpoint +``` + +Before publishing downloaded rows, `sync_local` checks the local gate. Full checkpoints and +non-priority-0 partial checkpoints can only apply when: + +- `local_target_op` is absent, or it is less than or equal to `last_seen_checkpoint_request_id`. +- `ps_crud` is empty. + +Priority 0 partial syncs are the exception: they may publish while uploads are outstanding. + +```sql +SELECT 1 +FROM ps_kv AS target +LEFT JOIN ps_kv AS seen ON seen.key = 'last_seen_checkpoint_request_id' +WHERE target.key = 'local_target_op' + AND CAST(target.value AS INTEGER) > COALESCE(CAST(seen.value AS INTEGER), 0); +``` + +If a full checkpoint validated but cannot apply because local CRUD is pending, the state machine +keeps it as `validated_but_not_applied`. When the SDK later sends `completed_upload`, core retries +that checkpoint unless its `write_checkpoint` is older than the current `local_target_op`. + +```text +on completed_upload: + if pending_checkpoint.write_checkpoint >= local_target_op: + retry applying pending_checkpoint +``` + +After a full checkpoint applies, core stores the applied checkpoint request id as +`last_applied_checkpoint_request_id` and emits a `CheckpointRequestApplied` instruction. + +```text +after full checkpoint apply: + ps_kv['last_applied_checkpoint_request_id'] = checkpoint.write_checkpoint + emit CheckpointRequestApplied { request_id: checkpoint.write_checkpoint } +``` + +## Explicit checkpoint requests + +SDKs can expose a `requestCheckpoint()`-style API for callers that want to wait until the local +database has caught up to the service. The SDK creates a checkpoint request id through the connected +sync client and returns a `CheckpointRequest`-style waiter. + +This explicit API does not update `local_target_op`: it is a wait marker, not a local upload gate. +The returned object waits until core emits `CheckpointRequestApplied` for an id greater than or +equal to the requested id. + +```text +waitForSync() { + for instruction in syncInstructions { + return when instruction.CheckpointRequestApplied.request_id >= requestId + throw if sync status reports a sync error + } +} +``` + +The public database method requires an active or connecting sync client, because a disconnected +request could not be delivered to the service or observed in the sync stream. + +## `ps_kv` checkpoint state + +- `local_target_op`: The current apply gate. It is either `MAX_OP_ID` while local writes are + pending, a concrete checkpoint request id after upload completion, or absent when there is no + local write gate. +- `last_requested_checkpoint_request_id`: The last client-created checkpoint request id allocated + by `powersync_control('next_checkpoint_request_id', NULL)`. This is the counter used to allocate + increasing ids for each client-created checkpoint request, including multiple requests in one + client lifetime. The persisted value is also useful for debugging and for seeding the next + connection attempt. SDKs should reconcile it with the service on every connect, and should + tolerate it restarting when both the client and service have lost the previous value. +- `last_seen_checkpoint_request_id`: The latest full checkpoint `write_checkpoint` observed and + validated from the sync stream since the last local write. Local writes clear this key, so only + checkpoint request ids observed after the write can satisfy the apply gate. +- `last_applied_checkpoint_request_id`: The latest full checkpoint `write_checkpoint` that has been + applied locally since the last local write, which clears this key. Core persists this for + migration/downgrade state and debugging; SDKs should use `CheckpointRequestApplied` instructions + to resolve `CheckpointRequest` waits. + +## Migration from `$local` + +Migration v14 moves the old `$local` bucket state into `ps_kv`: + +- `$local.last_applied_op` becomes `last_applied_checkpoint_request_id`. +- `$local.last_op` becomes `last_seen_checkpoint_request_id`. +- Any positive `$local.target_op`, including `MAX_OP_ID`, becomes `local_target_op`. + +A concrete `$local.target_op` could be used to seed `last_requested_checkpoint_request_id`, but it +should be redundant because SDKs reconcile the request counter with service state on connect before +advancing it through `next_checkpoint_request_id`. + +After copying this state, the migration deletes the `$local` row — version 14 tracks this state +exclusively in `ps_kv`, so `ps_buckets` only contains real sync buckets — and drops +`ps_buckets.target_op`. Dropping the column intentionally makes older SDKs fail with a hard SQLite +error if they try to keep using a migrated database without first downgrading. + +The up migration first deletes any existing `last_applied_checkpoint_request_id`, +`last_seen_checkpoint_request_id` and `local_target_op` keys. Those can be present when a database +was previously on version 14 and then downgraded, because the down migration keeps the ps_kv keys +while rebuilding `$local`. Clearing them makes the `$local` row the source of truth on re-upgrade, +picking up any progress an older SDK made while downgraded. `last_requested_checkpoint_request_id` +is unrelated to `$local` and survives a downgrade/upgrade cycle unchanged. + +An absent `local_target_op` is safe: there is no local write gate waiting for a checkpoint, so an +SDK can seed the request counter on connect and start client-created checkpoint requests normally. +If neither the client nor service has a previous request id, the first allocated id is `1`. The sync +stream will only report that request id after the service has accepted and reached it. + +The ambiguous case is a migrated `local_target_op` of `MAX_OP_ID`. That means there is a pending +local write gate but no concrete request id to wait for yet. The `MAX_OP_ID` sentinel only says that +local writes dirtied the gate; it does not prove that no earlier uploads were already associated +with legacy service-created write checkpoints. In that state, the SDK should create one legacy write +checkpoint first, store the concrete id with `powersync_control('local_target_op', id)`, let that +gate resolve, and then switch to client-created checkpoint requests after the request counter has +been reconciled on connect. + +The down migration restores `ps_buckets.target_op` and rebuilds a `$local` row only when +`local_target_op` exists, using: + +- `last_seen_checkpoint_request_id` as `$local.last_op` +- `last_applied_checkpoint_request_id` as `$local.last_applied_op` +- `local_target_op` as `$local.target_op` + +This keeps older SDKs able to use the historic target-op gate after a downgrade without inventing a +synthetic `$local` bucket when there was no local target state.