Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/core/src/crud_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ impl SimpleCrudTransactionMode {

fn record_local_write(&mut self, db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
if !self.had_writes {
db.exec_safe(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"))?;
db.exec_safe(formatcp!(
"INSERT OR REPLACE INTO ps_kv(key, value) VALUES('local_target_op', {MAX_OP_ID})"
))?;
self.had_writes = true;
}

Expand Down
46 changes: 46 additions & 0 deletions crates/core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,49 @@ macro_rules! create_sqlite_optional_text_fn {
}
};
}

#[macro_export]
macro_rules! create_sqlite_int_fn {
($fn_name:ident, $fn_impl_name:ident, $description:literal) => {
extern "C" fn $fn_name(
ctx: *mut sqlite::context,
argc: c_int,
argv: *mut *mut sqlite::value,
) {
let args = sqlite::args!(argc, argv);

let result = $fn_impl_name(ctx, args);

if let Err(err) = result {
PowerSyncError::from(err).apply_to_ctx($description, ctx);
} else if let Ok(r) = result {
ctx.result_int64(r);
}
}
};
}

#[macro_export]
macro_rules! create_sqlite_optional_int_fn {
($fn_name:ident, $fn_impl_name:ident, $description:literal) => {
extern "C" fn $fn_name(
ctx: *mut sqlite::context,
argc: c_int,
argv: *mut *mut sqlite::value,
) {
let args = sqlite::args!(argc, argv);

let result = $fn_impl_name(ctx, args);

if let Err(err) = result {
PowerSyncError::from(err).apply_to_ctx($description, ctx);
} else if let Ok(r) = result {
if let Some(i) = r {
ctx.result_int64(i);
} else {
ctx.result_null();
}
}
}
};
}
130 changes: 129 additions & 1 deletion crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::fix_data::apply_v035_fix;
use crate::schema::inspection::ExistingView;
use crate::sync::BucketPriority;

pub const LATEST_VERSION: i32 = 13;
pub const LATEST_VERSION: i32 = 14;

pub fn powersync_migrate(
ctx: *mut sqlite::context,
Expand Down Expand Up @@ -460,6 +460,134 @@ DROP TABLE ps_sync_state_old;
track_migration.exec()?;
}

if current_version < 14 && target_version >= 14 {
// Move the legacy `$local` checkpoint bookkeeping into ps_kv.
//
// In older databases, `$local.last_applied_op` represented the latest legacy write
// checkpoint that was actually applied, so it becomes the last applied checkpoint request.
// `$local.last_op` represented the latest legacy write checkpoint seen in the sync stream,
// so it becomes the last seen checkpoint request.
//
// `$local.target_op` can either be a concrete checkpoint request id or a sentinel such as
// i64::MAX while local writes are pending. Store it separately as `local_target_op`, but
// only treat concrete values as requested checkpoint ids. We intentionally don't seed
// `last_requested_checkpoint_request_id` from `$local.last_applied_op` because that is an
// applied value, not necessarily the current requested target.
//
// An absent local target can safely start client-created checkpoint requests from 1. The
// ambiguous case is an existing max-op local target without a concrete requested id:
// pending local writes may already be associated with legacy service-created write
// checkpoints, so SDKs should bridge once through the legacy endpoint before starting
// client-created checkpoint requests.
let up = "\
INSERT INTO ps_kv(key, value)
SELECT 'last_applied_checkpoint_request_id', last_applied_op
FROM ps_buckets
WHERE name = '$local'
AND last_applied_op > 0;

INSERT INTO ps_kv(key, value)
SELECT 'last_seen_checkpoint_request_id', last_op
FROM ps_buckets
WHERE name = '$local'
AND last_op > 0;

INSERT INTO ps_kv(key, value)
SELECT 'last_requested_checkpoint_request_id', target_op
FROM ps_buckets
WHERE name = '$local'
AND target_op > 0
AND target_op != 9223372036854775807;

INSERT INTO ps_kv(key, value)
SELECT 'local_target_op', target_op
FROM ps_buckets
WHERE name = '$local'
AND target_op > 0;

ALTER TABLE ps_buckets DROP COLUMN target_op;
";
local_db.exec_safe(up).into_db_result(local_db)?;

// Downgrading needs to rebuild the old `$local` row from the new ps_kv state so older SDKs
// can keep using their target-op based blocking behavior. In that model, `$local.last_op`
// tracked the latest seen legacy write checkpoint and was compared with `$local.target_op`
// to decide whether downloaded changes could be applied. The `$local.last_applied_op`
// value represented the checkpoint that had actually been applied locally.
// `$local.pending_delete = 1` marked this as a synthetic local-only bucket instead of a
// normal service bucket. Restore each old progress column from its matching ps_kv key.
// The 0 defaults cover a local target that exists before any checkpoint has been seen or
// applied. If `local_target_op` is absent, don't create a `$local` row: the old
// implementation also didn't have a `$local` bucket unless there was local target state to
// track.
const DOWN_STATEMENTS: &[&str] = &[
"ALTER TABLE ps_buckets RENAME TO ps_buckets_14",
"DROP INDEX ps_buckets_name",
"CREATE TABLE ps_buckets(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
last_applied_op INTEGER NOT NULL DEFAULT 0,
last_op INTEGER NOT NULL DEFAULT 0,
target_op INTEGER NOT NULL DEFAULT 0,
add_checksum INTEGER NOT NULL DEFAULT 0,
op_checksum INTEGER NOT NULL DEFAULT 0,
pending_delete INTEGER NOT NULL DEFAULT 0
) STRICT",
"CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)",
"ALTER TABLE ps_buckets ADD COLUMN count_at_last INTEGER NOT NULL DEFAULT 0",
"ALTER TABLE ps_buckets ADD COLUMN count_since_last INTEGER NOT NULL DEFAULT 0",
"ALTER TABLE ps_buckets ADD COLUMN downloaded_size INTEGER NOT NULL DEFAULT 0",
"INSERT INTO ps_buckets(
id,
name,
last_applied_op,
last_op,
add_checksum,
op_checksum,
pending_delete,
count_at_last,
count_since_last,
downloaded_size
)
SELECT
id,
name,
last_applied_op,
last_op,
add_checksum,
op_checksum,
pending_delete,
count_at_last,
count_since_last,
downloaded_size
FROM ps_buckets_14",
"DROP TABLE ps_buckets_14",
"INSERT INTO ps_buckets(name, pending_delete, last_op, last_applied_op, target_op)
SELECT '$local', 1, seen, applied, target
FROM (
SELECT
IFNULL((SELECT CAST(value AS INTEGER) FROM ps_kv WHERE key = 'last_seen_checkpoint_request_id'), 0) AS seen,
IFNULL((SELECT CAST(value AS INTEGER) FROM ps_kv WHERE key = 'last_applied_checkpoint_request_id'), 0) AS applied,
(SELECT CAST(value AS INTEGER) FROM ps_kv WHERE key = 'local_target_op') AS target
)
WHERE EXISTS (
SELECT 1 FROM ps_kv WHERE key = 'local_target_op'
)
ON CONFLICT(name) DO UPDATE SET
pending_delete = excluded.pending_delete,
last_op = excluded.last_op,
last_applied_op = excluded.last_applied_op,
target_op = excluded.target_op",
"DELETE FROM ps_migration WHERE id >= 14",
];
let down = serialize_down_statements(DOWN_STATEMENTS)?;
let track_migration =
local_db.prepare_v2("INSERT INTO ps_migration(id, down_migrations) VALUES (?, ?)")?;
track_migration.bind_int(1, 14)?;
track_migration.bind_text(2, &down, Destructor::STATIC)?;
track_migration.exec()?;
}

Ok(())
}

Expand Down
72 changes: 71 additions & 1 deletion crates/core/src/sync/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::sync::diagnostics::{DiagnosticOptions, DiagnosticsEvent};
use crate::sync::subscriptions::{StreamKey, apply_subscriptions};
use alloc::borrow::Cow;
use alloc::boxed::Box;
use alloc::format;
use alloc::rc::Rc;
use alloc::{string::String, vec::Vec};
use powersync_sqlite_nostd::bindings::SQLITE_RESULT_SUBTYPE;
Expand Down Expand Up @@ -76,6 +77,8 @@ pub enum SyncControlRequest<'a> {
StartSyncStream(StartSyncStream),
/// The client requests to stop the current sync iteration.
StopSyncStream,
/// The client requests a new checkpoint request id.
NextCheckpointRequestId,
/// The client is forwading a sync event to the core extension.
SyncEvent(SyncEvent<'a>),
}
Expand All @@ -89,6 +92,10 @@ pub enum SyncEvent<'a> {
///
/// In response, we'll stop the current iteration to begin another one with the new token.
DidRefreshToken,
/// Seeds the checkpoint request counter from service state.
SeedCheckpointRequestId {
request_id: Option<i64>,
},
/// Notifies the sync client that the current CRUD upload (for which the client SDK is
/// responsible) has finished.
///
Expand Down Expand Up @@ -128,13 +135,26 @@ pub enum Instruction {
},
/// Connect to the sync service using the [StreamingSyncRequest] created by the core extension,
/// and then forward received lines via [SyncEvent::TextLine] and [SyncEvent::BinaryLine].
EstablishSyncStream { request: StreamingSyncRequest },
EstablishSyncStream {
request: StreamingSyncRequest,
/// The latest checkpoint request id known locally before opening this stream.
///
/// SDKs can use a missing value as a cue to fetch checkpoint request state from the service
/// and report it back with `seed_checkpoint_request_id`.
last_checkpoint_request_id: Option<i64>,
},
FetchCredentials {
/// Whether the credentials currently used have expired.
///
/// If false, this is a pre-fetch.
did_expire: bool,
},
/// Return a newly allocated checkpoint request id to the SDK.
CheckpointRequestId { request_id: i64 },
/// Notify the SDK that a checkpoint request id has been applied locally.
CheckpointRequestApplied { request_id: i64 },
/// Return the local target op value observed before an optional update.
LocalTargetOp { target_op: Option<i64> },
// These are defined like this because deserializers in Kotlin can't support either an
// object or a literal value
/// Close the websocket / HTTP stream to the sync service.
Expand Down Expand Up @@ -232,6 +252,24 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
}
}),
"stop" => SyncControlRequest::StopSyncStream,
"next_checkpoint_request_id" => SyncControlRequest::NextCheckpointRequestId,
"local_target_op" => {
let target_op = parse_optional_i64_payload(
*payload,
"local target op",
"local target op must be an integer, integer string, or null",
)?;
let adapter = state.storage_adapter(db)?;
let target_op = adapter.probe_local_target_op(target_op)?;
let formatted =
serde_json::to_string(&alloc::vec![Instruction::LocalTargetOp {
target_op
},])
.map_err(PowerSyncError::internal)?;
ctx.result_text_transient(&formatted);
ctx.result_subtype(SUBTYPE_JSON);
return Ok(());
}
"line_text" => SyncControlRequest::SyncEvent(SyncEvent::TextLine {
data: if payload.value_type() == ColumnType::Text {
payload.text()
Expand All @@ -251,6 +289,15 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
},
}),
"refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken),
"seed_checkpoint_request_id" => {
SyncControlRequest::SyncEvent(SyncEvent::SeedCheckpointRequestId {
request_id: parse_optional_i64_payload(
*payload,
"checkpoint request id",
"checkpoint request id must be an integer, integer string, or null",
)?,
})
}
"completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished),
"update_subscriptions" => {
SyncControlRequest::SyncEvent(SyncEvent::DidUpdateSubscriptions {
Expand Down Expand Up @@ -345,3 +392,26 @@ create_sqlite_text_fn!(
powersync_offline_sync_status_impl,
"powersync_offline_sync_status"
);

fn parse_optional_i64_payload(
payload: *mut sqlite::value,
name: &'static str,
type_error: &'static str,
) -> Result<Option<i64>, PowerSyncError> {
let value = match payload.value_type() {
ColumnType::Null => return Ok(None),
ColumnType::Integer => payload.int64(),
ColumnType::Text => payload.text().parse::<i64>().map_err(|_| {
PowerSyncError::argument_error(format!("{name} must be an integer string"))
})?,
_ => return Err(PowerSyncError::argument_error(type_error)),
};

if value < 0 {
return Err(PowerSyncError::argument_error(format!(
"{name} must be a non-negative integer"
)));
}

Ok(Some(value))
}
Loading
Loading