Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 38 additions & 6 deletions crates/durable-runtime/src/plugin/durable/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,34 @@ impl Host for Task {
);
}

let timeout = std::time::Duration::from_nanos(timeout_ns);

// Durably record the absolute deadline as a recorded event *before* the
// result. This is what gives the timed wait a timer fallback: it is
// - computed from the injected `Clock` (so a `DstClock` controls it), and
// - persisted, so it survives a suspend/replay cycle. On replay we read the
// recorded value back rather than recomputing a fresh deadline.
let deadline_options =
TransactionOptions::new("durable:core/notify.notification-blocking-timeout.deadline");
let deadline: DateTime<Utc> =
match self.state.enter::<DateTime<Utc>>(deadline_options).await? {
Some(deadline) => deadline,
None => {
let deadline = chrono::Duration::from_std(timeout)
.ok()
.and_then(|d| self.state.clock().now().checked_add_signed(d))
.unwrap_or(DateTime::<Utc>::MAX_UTC);
self.state.exit(&deadline).await?;
deadline
}
};

let options = TransactionOptions::new("durable:core/notify.notification-blocking-timeout");
if let Some(event) = self.state.enter::<Option<EventData>>(options).await? {
return Ok(event.map(Into::into));
}

let timeout = std::time::Duration::from_nanos(timeout_ns);
let user_deadline = Instant::now() + timeout;
let suspend_deadline = Instant::now() + self.state.config().suspend_timeout;
let suspend_timeout = self.state.config().suspend_timeout;
let task_id = self.state.task_id();
let mut rx = self.state.subscribe_notifications();

Expand All @@ -138,6 +158,16 @@ impl Host for Task {

tx.rollback().await?;

// Compute the time remaining until the user deadline using the
// injected clock. If it has already elapsed (e.g. we were revived by
// the wakeup timer at `deadline`), `user_deadline` is now and the
// select below resolves the timeout immediately.
let remaining = (deadline - self.state.clock().now())
.to_std()
.unwrap_or(std::time::Duration::ZERO);
let user_deadline = Instant::now() + remaining;
let suspend_deadline = Instant::now() + suspend_timeout;

// Wait for either a notification, the user timeout, or the suspend
// timeout — whichever comes first.
enum Expired {
Expand Down Expand Up @@ -186,14 +216,16 @@ impl Host for Task {
break None;
}

// The suspend timeout expired. Attempt to suspend the task so
// we free up the worker slot, just like notification_blocking.
// The suspend timeout expired. Suspend the task to free up the
// worker slot, recording `deadline` as the wakeup time so the
// task is revived by the timer even if its notification is never
// re-delivered.
Some(Expired::Suspend) => {
let mut tx = self.state.pool().begin().await?;

self.state
.storage()
.suspend_task_no_wakeup(&mut tx, self.task_id())
.suspend_task(&mut tx, self.task_id(), Some(deadline))
.await?;

if poll_notification(&mut *self, &mut tx).await?.is_some() {
Expand Down
28 changes: 25 additions & 3 deletions crates/durable-runtime/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,30 +63,47 @@ pub(crate) trait Storage: Send + Sync + 'static {
#[allow(dead_code)]
fn pool(&self) -> &sqlx::PgPool;

async fn insert_worker(&self, conn: &mut PgConnection) -> sqlx::Result<i64>;
/// `now` is the runtime's [`Clock`](crate::clock::Clock) view of the
/// current time, used to seed the worker's initial heartbeat so that
/// liveness is tracked on the injected clock's timeline rather than the
/// database wall clock.
async fn insert_worker(&self, conn: &mut PgConnection, now: DateTime<Utc>)
-> sqlx::Result<i64>;

async fn delete_worker(
&self,
conn: &mut PgConnection,
worker_id: i64,
) -> sqlx::Result<PgQueryResult>;

/// Records a heartbeat at `now` (the injected clock's current time).
///
/// Returns `true` if the row still exists.
async fn heartbeat_worker(&self, conn: &mut PgConnection, worker_id: i64)
-> sqlx::Result<bool>;
async fn heartbeat_worker(
&self,
conn: &mut PgConnection,
worker_id: i64,
now: DateTime<Utc>,
) -> sqlx::Result<bool>;

/// `now` is the injected clock's current time, against which heartbeat
/// expiry is measured (rather than the database wall clock).
async fn delete_following_expired_worker(
&self,
conn: &mut PgConnection,
following: i64,
timeout: PgInterval,
now: DateTime<Utc>,
) -> sqlx::Result<PgQueryResult>;

/// `now` is the injected clock's current time, against which heartbeat
/// expiry is measured (rather than the database wall clock).
async fn delete_other_expired_workers(
&self,
conn: &mut PgConnection,
worker_id: i64,
timeout: PgInterval,
now: DateTime<Utc>,
) -> sqlx::Result<PgQueryResult>;

/// See `Worker::validate_workers` for the algorithm.
Expand All @@ -99,10 +116,15 @@ pub(crate) trait Storage: Send + Sync + 'static {
/// The leader is the oldest worker.
async fn load_leader_id(&self, conn: &mut PgConnection) -> sqlx::Result<Option<i64>>;

/// `now` is the injected clock's current time. Suspended tasks are woken
/// once `wakeup_at` (which is also written from the injected clock) is at
/// least `suspend_margin` in the past relative to `now`, so that
/// timer-based wakeup is driven entirely by the runtime's clock.
async fn wake_suspended_tasks(
&self,
conn: &mut PgConnection,
suspend_margin: PgInterval,
now: DateTime<Utc>,
) -> sqlx::Result<PgQueryResult>;

async fn next_wakeup_at(&self, conn: &mut PgConnection) -> sqlx::Result<Option<DateTime<Utc>>>;
Expand Down
35 changes: 24 additions & 11 deletions crates/durable-runtime/src/storage/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,18 @@ impl Storage for PgStorage {
&self.pool
}

async fn insert_worker(&self, conn: &mut PgConnection) -> sqlx::Result<i64> {
async fn insert_worker(
&self,
conn: &mut PgConnection,
now: DateTime<Utc>,
) -> sqlx::Result<i64> {
let record = sqlx::query!(
"
INSERT INTO durable.worker(heartbeat_at)
VALUES (CURRENT_TIMESTAMP)
VALUES ($1)
RETURNING id
"
",
now
)
.fetch_one(&mut *conn)
.await?;
Expand All @@ -61,13 +66,15 @@ impl Storage for PgStorage {
&self,
conn: &mut PgConnection,
worker_id: i64,
now: DateTime<Utc>,
) -> sqlx::Result<bool> {
let record = sqlx::query!(
"UPDATE durable.worker
SET heartbeat_at = CURRENT_TIMESTAMP
SET heartbeat_at = $2
WHERE id = $1
RETURNING id",
worker_id
worker_id,
now
)
.fetch_optional(&mut *conn)
.await?;
Expand All @@ -80,15 +87,17 @@ impl Storage for PgStorage {
conn: &mut PgConnection,
following: i64,
timeout: PgInterval,
now: DateTime<Utc>,
) -> sqlx::Result<PgQueryResult> {
sqlx::query!(
"
DELETE FROM durable.worker
WHERE id = $1
AND CURRENT_TIMESTAMP - heartbeat_at > $2
AND $3::timestamptz - heartbeat_at > $2
",
following,
timeout
timeout,
now
)
.execute(&mut *conn)
.await
Expand All @@ -99,15 +108,17 @@ impl Storage for PgStorage {
conn: &mut PgConnection,
worker_id: i64,
timeout: PgInterval,
now: DateTime<Utc>,
) -> sqlx::Result<PgQueryResult> {
sqlx::query!(
"
DELETE FROM durable.worker
WHERE CURRENT_TIMESTAMP - heartbeat_at > $2
WHERE $3::timestamptz - heartbeat_at > $2
AND NOT id = $1
",
worker_id,
timeout
timeout,
now
)
.execute(&mut *conn)
.await
Expand Down Expand Up @@ -177,6 +188,7 @@ impl Storage for PgStorage {
&self,
conn: &mut PgConnection,
suspend_margin: PgInterval,
now: DateTime<Utc>,
) -> sqlx::Result<PgQueryResult> {
sqlx::query!(
"
Expand All @@ -190,9 +202,10 @@ impl Storage for PgStorage {
LIMIT 1
)
WHERE state = 'suspended'
AND wakeup_at <= (NOW() - $1::interval)
AND wakeup_at <= ($2::timestamptz - $1::interval)
",
suspend_margin
suspend_margin,
now
)
.execute(&mut *conn)
.await
Expand Down
9 changes: 8 additions & 1 deletion crates/durable-runtime/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,14 @@ impl TaskState {
std::any::type_name::<T>()
)
}) {
Ok(value) => Ok(Some(value)),
Ok(value) => {
// Advance past the replayed event, mirroring the database hit
// path in `enter_impl` (the guest does not call `exit` on
// replay, so nothing else advances the index). Without this a
// second cached `enter` would re-read this same event.
self.txn_index += 1;
Ok(Some(value))
}
Err(e) => {
self.events.clear();
Err(e)
Expand Down
Loading
Loading