From 1a1ac0ebee583b324d3b3ffc777ca68164511dba Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 12 Jun 2026 23:56:31 +0000 Subject: [PATCH 1/2] Fix timed notification waits being stranded on suspend under a custom Clock A task that waits on a notification with a timeout and whose in-memory suspend timeout elapses first was suspended via `suspend_task_no_wakeup`, i.e. with no `wakeup_at`. Such a task had no timer fallback and could only be revived by a re-delivered notification; a single missed delivery stranded it indefinitely. This also surfaced under a custom `Clock` (DST) because the storage timers compared against the database wall clock rather than the injected clock. Changes: - notify.rs: the timed wait now durably records its absolute deadline (computed from the injected `Clock`) as a recorded event before the result, so it survives suspend/replay. On suspend it records that deadline as `wakeup_at` (via `suspend_task`) so the timer revives the task even if the notification is never re-delivered, and on replay it resolves the remaining time against the clock and returns `None` once the deadline has passed. - storage: thread the injected clock's `now()` into the heartbeat (`insert_worker`, `heartbeat_worker`, `delete_*_expired_worker*`) and suspend-wakeup (`wake_suspended_tasks`) queries, replacing `CURRENT_TIMESTAMP`/`NOW()` so a `DstClock` fully controls these timers. - worker leader: fix the inverted wakeup delay (`now - wakeup_at`, always zero for a future wakeup) to sleep until `wakeup_at + suspend_margin`. - task replay: advance `txn_index` on a cached `enter` hit, matching the database and record paths. Without this a second cached `enter` in one host call re-reads the same event; the timed-wait deadline event is the first operation to record two events across a suspend. - Add a DST regression test plus a workflow that times out, driving the clock past the deadline to prove timer-based wakeup fires under a custom clock with no notification delivered. Regenerate the sqlx offline query cache for the changed queries. --- ...e0777d8bf75348fef1b0e6e7e4042e4c420a.json} | 7 +- ...14a608b144c5da9f2123d95447a1bb1f8ce6.json} | 7 +- ...ae3751a8a94f990d247bc8cb8fd12093a81f.json} | 7 +- ...cbf6f8033d0ab87cae1628d13b4d8afd3e50.json} | 8 +- ...4cb218c8fbb1fcd3c49cad3d0fa39e7e0dd5.json} | 7 +- .../src/plugin/durable/notify.rs | 44 +++++- crates/durable-runtime/src/storage/mod.rs | 28 +++- crates/durable-runtime/src/storage/pg.rs | 35 +++-- crates/durable-runtime/src/task.rs | 9 +- crates/durable-runtime/src/worker.rs | 36 ++++- .../src/bin/notify-wait-timeout-expire.rs | 17 +++ crates/durable-test/tests/it/dst_notify.rs | 137 ++++++++++++++++++ 12 files changed, 299 insertions(+), 43 deletions(-) rename .sqlx/{query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json => query-1d609f7967405346d14bb7187c1de0777d8bf75348fef1b0e6e7e4042e4c420a.json} (71%) rename .sqlx/{query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json => query-26461b368e299400d8af2555b0eb14a608b144c5da9f2123d95447a1bb1f8ce6.json} (51%) rename .sqlx/{query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json => query-4965f02d5eae28d41446e216c8aeae3751a8a94f990d247bc8cb8fd12093a81f.json} (74%) rename .sqlx/{query-7f746180eb4a64c9d07789d04ea2e173479e1a82efe0f289bb4baa03642a32b1.json => query-57df875fa3c32f17e9de02c7457fcbf6f8033d0ab87cae1628d13b4d8afd3e50.json} (68%) rename .sqlx/{query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json => query-c9e163b93b396a00e90309fbad934cb218c8fbb1fcd3c49cad3d0fa39e7e0dd5.json} (51%) create mode 100644 crates/durable-test-workflows/src/bin/notify-wait-timeout-expire.rs diff --git a/.sqlx/query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json b/.sqlx/query-1d609f7967405346d14bb7187c1de0777d8bf75348fef1b0e6e7e4042e4c420a.json similarity index 71% rename from .sqlx/query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json rename to .sqlx/query-1d609f7967405346d14bb7187c1de0777d8bf75348fef1b0e6e7e4042e4c420a.json index d81f9c1..d504702 100644 --- a/.sqlx/query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json +++ b/.sqlx/query-1d609f7967405346d14bb7187c1de0777d8bf75348fef1b0e6e7e4042e4c420a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE durable.worker\n SET heartbeat_at = CURRENT_TIMESTAMP\n WHERE id = $1\n RETURNING id", + "query": "UPDATE durable.worker\n SET heartbeat_at = $2\n WHERE id = $1\n RETURNING id", "describe": { "columns": [ { @@ -17,12 +17,13 @@ ], "parameters": { "Left": [ - "Int8" + "Int8", + "Timestamptz" ] }, "nullable": [ false ] }, - "hash": "50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9" + "hash": "1d609f7967405346d14bb7187c1de0777d8bf75348fef1b0e6e7e4042e4c420a" } diff --git a/.sqlx/query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json b/.sqlx/query-26461b368e299400d8af2555b0eb14a608b144c5da9f2123d95447a1bb1f8ce6.json similarity index 51% rename from .sqlx/query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json rename to .sqlx/query-26461b368e299400d8af2555b0eb14a608b144c5da9f2123d95447a1bb1f8ce6.json index a0072b5..d60055f 100644 --- a/.sqlx/query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json +++ b/.sqlx/query-26461b368e299400d8af2555b0eb14a608b144c5da9f2123d95447a1bb1f8ce6.json @@ -1,15 +1,16 @@ { "db_name": "PostgreSQL", - "query": "\n DELETE FROM durable.worker\n WHERE CURRENT_TIMESTAMP - heartbeat_at > $2\n AND NOT id = $1\n ", + "query": "\n DELETE FROM durable.worker\n WHERE $3::timestamptz - heartbeat_at > $2\n AND NOT id = $1\n ", "describe": { "columns": [], "parameters": { "Left": [ "Int8", - "Interval" + "Interval", + "Timestamptz" ] }, "nullable": [] }, - "hash": "fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596" + "hash": "26461b368e299400d8af2555b0eb14a608b144c5da9f2123d95447a1bb1f8ce6" } diff --git a/.sqlx/query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json b/.sqlx/query-4965f02d5eae28d41446e216c8aeae3751a8a94f990d247bc8cb8fd12093a81f.json similarity index 74% rename from .sqlx/query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json rename to .sqlx/query-4965f02d5eae28d41446e216c8aeae3751a8a94f990d247bc8cb8fd12093a81f.json index c758d10..db2e9d2 100644 --- a/.sqlx/query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json +++ b/.sqlx/query-4965f02d5eae28d41446e216c8aeae3751a8a94f990d247bc8cb8fd12093a81f.json @@ -1,14 +1,15 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET state = 'ready',\n wakeup_at = NULL,\n running_on = (\n SELECT id\n FROM durable.worker\n ORDER BY random() + task.id\n LIMIT 1\n )\n WHERE state = 'suspended'\n AND wakeup_at <= (NOW() - $1::interval)\n ", + "query": "\n UPDATE durable.task\n SET state = 'ready',\n wakeup_at = NULL,\n running_on = (\n SELECT id\n FROM durable.worker\n ORDER BY random() + task.id\n LIMIT 1\n )\n WHERE state = 'suspended'\n AND wakeup_at <= ($2::timestamptz - $1::interval)\n ", "describe": { "columns": [], "parameters": { "Left": [ - "Interval" + "Interval", + "Timestamptz" ] }, "nullable": [] }, - "hash": "f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf" + "hash": "4965f02d5eae28d41446e216c8aeae3751a8a94f990d247bc8cb8fd12093a81f" } diff --git a/.sqlx/query-7f746180eb4a64c9d07789d04ea2e173479e1a82efe0f289bb4baa03642a32b1.json b/.sqlx/query-57df875fa3c32f17e9de02c7457fcbf6f8033d0ab87cae1628d13b4d8afd3e50.json similarity index 68% rename from .sqlx/query-7f746180eb4a64c9d07789d04ea2e173479e1a82efe0f289bb4baa03642a32b1.json rename to .sqlx/query-57df875fa3c32f17e9de02c7457fcbf6f8033d0ab87cae1628d13b4d8afd3e50.json index 2136af2..be90435 100644 --- a/.sqlx/query-7f746180eb4a64c9d07789d04ea2e173479e1a82efe0f289bb4baa03642a32b1.json +++ b/.sqlx/query-57df875fa3c32f17e9de02c7457fcbf6f8033d0ab87cae1628d13b4d8afd3e50.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO durable.worker(heartbeat_at)\n VALUES (CURRENT_TIMESTAMP)\n RETURNING id\n ", + "query": "\n INSERT INTO durable.worker(heartbeat_at)\n VALUES ($1)\n RETURNING id\n ", "describe": { "columns": [ { @@ -16,11 +16,13 @@ } ], "parameters": { - "Left": [] + "Left": [ + "Timestamptz" + ] }, "nullable": [ false ] }, - "hash": "7f746180eb4a64c9d07789d04ea2e173479e1a82efe0f289bb4baa03642a32b1" + "hash": "57df875fa3c32f17e9de02c7457fcbf6f8033d0ab87cae1628d13b4d8afd3e50" } diff --git a/.sqlx/query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json b/.sqlx/query-c9e163b93b396a00e90309fbad934cb218c8fbb1fcd3c49cad3d0fa39e7e0dd5.json similarity index 51% rename from .sqlx/query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json rename to .sqlx/query-c9e163b93b396a00e90309fbad934cb218c8fbb1fcd3c49cad3d0fa39e7e0dd5.json index 0793b49..0c72de3 100644 --- a/.sqlx/query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json +++ b/.sqlx/query-c9e163b93b396a00e90309fbad934cb218c8fbb1fcd3c49cad3d0fa39e7e0dd5.json @@ -1,15 +1,16 @@ { "db_name": "PostgreSQL", - "query": "\n DELETE FROM durable.worker\n WHERE id = $1\n AND CURRENT_TIMESTAMP - heartbeat_at > $2\n ", + "query": "\n DELETE FROM durable.worker\n WHERE id = $1\n AND $3::timestamptz - heartbeat_at > $2\n ", "describe": { "columns": [], "parameters": { "Left": [ "Int8", - "Interval" + "Interval", + "Timestamptz" ] }, "nullable": [] }, - "hash": "6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed" + "hash": "c9e163b93b396a00e90309fbad934cb218c8fbb1fcd3c49cad3d0fa39e7e0dd5" } diff --git a/crates/durable-runtime/src/plugin/durable/notify.rs b/crates/durable-runtime/src/plugin/durable/notify.rs index 8125f54..dee0685 100644 --- a/crates/durable-runtime/src/plugin/durable/notify.rs +++ b/crates/durable-runtime/src/plugin/durable/notify.rs @@ -114,14 +114,34 @@ impl Host for Task { ); } + let timeout = std::time::Duration::from_nanos(timeout_ns); + + // Durably record the absolute deadline as a recorded event *before* the + // result. This is what gives the timed wait a timer fallback: it is + // - computed from the injected `Clock` (so a `DstClock` controls it), and + // - persisted, so it survives a suspend/replay cycle. On replay we read + // the recorded value back rather than recomputing a fresh deadline. + let deadline_options = + TransactionOptions::new("durable:core/notify.notification-blocking-timeout.deadline"); + let deadline: DateTime = + match self.state.enter::>(deadline_options).await? { + Some(deadline) => deadline, + None => { + let deadline = chrono::Duration::from_std(timeout) + .ok() + .and_then(|d| self.state.clock().now().checked_add_signed(d)) + .unwrap_or(DateTime::::MAX_UTC); + self.state.exit(&deadline).await?; + deadline + } + }; + let options = TransactionOptions::new("durable:core/notify.notification-blocking-timeout"); if let Some(event) = self.state.enter::>(options).await? { return Ok(event.map(Into::into)); } - let timeout = std::time::Duration::from_nanos(timeout_ns); - let user_deadline = Instant::now() + timeout; - let suspend_deadline = Instant::now() + self.state.config().suspend_timeout; + let suspend_timeout = self.state.config().suspend_timeout; let task_id = self.state.task_id(); let mut rx = self.state.subscribe_notifications(); @@ -138,6 +158,16 @@ impl Host for Task { tx.rollback().await?; + // Compute the time remaining until the user deadline using the + // injected clock. If it has already elapsed (e.g. we were revived by + // the wakeup timer at `deadline`), `user_deadline` is now and the + // select below resolves the timeout immediately. + let remaining = (deadline - self.state.clock().now()) + .to_std() + .unwrap_or(std::time::Duration::ZERO); + let user_deadline = Instant::now() + remaining; + let suspend_deadline = Instant::now() + suspend_timeout; + // Wait for either a notification, the user timeout, or the suspend // timeout — whichever comes first. enum Expired { @@ -186,14 +216,16 @@ impl Host for Task { break None; } - // The suspend timeout expired. Attempt to suspend the task so - // we free up the worker slot, just like notification_blocking. + // The suspend timeout expired. Suspend the task to free up the + // worker slot, recording `deadline` as the wakeup time so the + // task is revived by the timer even if its notification is never + // re-delivered. Some(Expired::Suspend) => { let mut tx = self.state.pool().begin().await?; self.state .storage() - .suspend_task_no_wakeup(&mut tx, self.task_id()) + .suspend_task(&mut tx, self.task_id(), Some(deadline)) .await?; if poll_notification(&mut *self, &mut tx).await?.is_some() { diff --git a/crates/durable-runtime/src/storage/mod.rs b/crates/durable-runtime/src/storage/mod.rs index 1aedfee..b7cf5b6 100644 --- a/crates/durable-runtime/src/storage/mod.rs +++ b/crates/durable-runtime/src/storage/mod.rs @@ -63,7 +63,12 @@ pub(crate) trait Storage: Send + Sync + 'static { #[allow(dead_code)] fn pool(&self) -> &sqlx::PgPool; - async fn insert_worker(&self, conn: &mut PgConnection) -> sqlx::Result; + /// `now` is the runtime's [`Clock`](crate::clock::Clock) view of the + /// current time, used to seed the worker's initial heartbeat so that + /// liveness is tracked on the injected clock's timeline rather than the + /// database wall clock. + async fn insert_worker(&self, conn: &mut PgConnection, now: DateTime) + -> sqlx::Result; async fn delete_worker( &self, @@ -71,22 +76,34 @@ pub(crate) trait Storage: Send + Sync + 'static { worker_id: i64, ) -> sqlx::Result; + /// Records a heartbeat at `now` (the injected clock's current time). + /// /// Returns `true` if the row still exists. - async fn heartbeat_worker(&self, conn: &mut PgConnection, worker_id: i64) - -> sqlx::Result; + async fn heartbeat_worker( + &self, + conn: &mut PgConnection, + worker_id: i64, + now: DateTime, + ) -> sqlx::Result; + /// `now` is the injected clock's current time, against which heartbeat + /// expiry is measured (rather than the database wall clock). async fn delete_following_expired_worker( &self, conn: &mut PgConnection, following: i64, timeout: PgInterval, + now: DateTime, ) -> sqlx::Result; + /// `now` is the injected clock's current time, against which heartbeat + /// expiry is measured (rather than the database wall clock). async fn delete_other_expired_workers( &self, conn: &mut PgConnection, worker_id: i64, timeout: PgInterval, + now: DateTime, ) -> sqlx::Result; /// See `Worker::validate_workers` for the algorithm. @@ -99,10 +116,15 @@ pub(crate) trait Storage: Send + Sync + 'static { /// The leader is the oldest worker. async fn load_leader_id(&self, conn: &mut PgConnection) -> sqlx::Result>; + /// `now` is the injected clock's current time. Suspended tasks are woken + /// once `wakeup_at` (which is also written from the injected clock) is at + /// least `suspend_margin` in the past relative to `now`, so that + /// timer-based wakeup is driven entirely by the runtime's clock. async fn wake_suspended_tasks( &self, conn: &mut PgConnection, suspend_margin: PgInterval, + now: DateTime, ) -> sqlx::Result; async fn next_wakeup_at(&self, conn: &mut PgConnection) -> sqlx::Result>>; diff --git a/crates/durable-runtime/src/storage/pg.rs b/crates/durable-runtime/src/storage/pg.rs index 1406b9c..5487035 100644 --- a/crates/durable-runtime/src/storage/pg.rs +++ b/crates/durable-runtime/src/storage/pg.rs @@ -33,13 +33,18 @@ impl Storage for PgStorage { &self.pool } - async fn insert_worker(&self, conn: &mut PgConnection) -> sqlx::Result { + async fn insert_worker( + &self, + conn: &mut PgConnection, + now: DateTime, + ) -> sqlx::Result { let record = sqlx::query!( " INSERT INTO durable.worker(heartbeat_at) - VALUES (CURRENT_TIMESTAMP) + VALUES ($1) RETURNING id - " + ", + now ) .fetch_one(&mut *conn) .await?; @@ -61,13 +66,15 @@ impl Storage for PgStorage { &self, conn: &mut PgConnection, worker_id: i64, + now: DateTime, ) -> sqlx::Result { let record = sqlx::query!( "UPDATE durable.worker - SET heartbeat_at = CURRENT_TIMESTAMP + SET heartbeat_at = $2 WHERE id = $1 RETURNING id", - worker_id + worker_id, + now ) .fetch_optional(&mut *conn) .await?; @@ -80,15 +87,17 @@ impl Storage for PgStorage { conn: &mut PgConnection, following: i64, timeout: PgInterval, + now: DateTime, ) -> sqlx::Result { sqlx::query!( " DELETE FROM durable.worker WHERE id = $1 - AND CURRENT_TIMESTAMP - heartbeat_at > $2 + AND $3::timestamptz - heartbeat_at > $2 ", following, - timeout + timeout, + now ) .execute(&mut *conn) .await @@ -99,15 +108,17 @@ impl Storage for PgStorage { conn: &mut PgConnection, worker_id: i64, timeout: PgInterval, + now: DateTime, ) -> sqlx::Result { sqlx::query!( " DELETE FROM durable.worker - WHERE CURRENT_TIMESTAMP - heartbeat_at > $2 + WHERE $3::timestamptz - heartbeat_at > $2 AND NOT id = $1 ", worker_id, - timeout + timeout, + now ) .execute(&mut *conn) .await @@ -177,6 +188,7 @@ impl Storage for PgStorage { &self, conn: &mut PgConnection, suspend_margin: PgInterval, + now: DateTime, ) -> sqlx::Result { sqlx::query!( " @@ -190,9 +202,10 @@ impl Storage for PgStorage { LIMIT 1 ) WHERE state = 'suspended' - AND wakeup_at <= (NOW() - $1::interval) + AND wakeup_at <= ($2::timestamptz - $1::interval) ", - suspend_margin + suspend_margin, + now ) .execute(&mut *conn) .await diff --git a/crates/durable-runtime/src/task.rs b/crates/durable-runtime/src/task.rs index 77c793d..c825b9f 100644 --- a/crates/durable-runtime/src/task.rs +++ b/crates/durable-runtime/src/task.rs @@ -420,7 +420,14 @@ impl TaskState { std::any::type_name::() ) }) { - Ok(value) => Ok(Some(value)), + Ok(value) => { + // Advance past the replayed event, mirroring the database hit + // path in `enter_impl` (the guest does not call `exit` on + // replay, so nothing else advances the index). Without this a + // second cached `enter` would re-read this same event. + self.txn_index += 1; + Ok(Some(value)) + } Err(e) => { self.events.clear(); Err(e) diff --git a/crates/durable-runtime/src/worker.rs b/crates/durable-runtime/src/worker.rs index d1ad7cf..b40bc1a 100644 --- a/crates/durable-runtime/src/worker.rs +++ b/crates/durable-runtime/src/worker.rs @@ -442,7 +442,11 @@ impl Worker { pub async fn run(&mut self) -> anyhow::Result<()> { let mut conn = self.shared.acquire().await?; - self.worker_id = self.shared.storage.insert_worker(&mut conn).await?; + self.worker_id = self + .shared + .storage + .insert_worker(&mut conn, self.shared.clock.now()) + .await?; drop(conn); tracing::info!("durable worker id is {}", self.worker_id); @@ -534,7 +538,7 @@ impl Worker { let mut conn = shared.acquire().await?; let alive = shared .storage - .heartbeat_worker(&mut conn, worker_id) + .heartbeat_worker(&mut conn, worker_id, shared.clock.now()) .await?; drop(conn); @@ -598,7 +602,12 @@ impl Worker { let mut result = if let Some(following) = following.take() { shared .storage - .delete_following_expired_worker(&mut tx, following, timeout) + .delete_following_expired_worker( + &mut tx, + following, + timeout, + shared.clock.now(), + ) .await? } else { Default::default() @@ -608,7 +617,12 @@ impl Worker { result.extend(std::iter::once( shared .storage - .delete_other_expired_workers(&mut tx, worker_id, timeout) + .delete_other_expired_workers( + &mut tx, + worker_id, + timeout, + shared.clock.now(), + ) .await?, )); @@ -697,7 +711,11 @@ impl Worker { // If we don't do that all the rows here get the same random number. let result = shared .storage - .wake_suspended_tasks(&mut conn, shared.config.suspend_margin.into_pg_interval()) + .wake_suspended_tasks( + &mut conn, + shared.config.suspend_margin.into_pg_interval(), + shared.clock.now(), + ) .await?; let count = result.rows_affected(); @@ -710,8 +728,12 @@ impl Worker { let now = shared.clock.now(); let delay = match wakeup_at { - Some(wakeup_at) => now - .signed_duration_since(wakeup_at) + // A task becomes wakeable once `now >= wakeup_at + suspend_margin` + // (see `wake_suspended_tasks`), so sleep until that point rather + // than `now - wakeup_at`, which is negative for a future wakeup + // and would otherwise collapse to zero and busy-loop. + Some(wakeup_at) => (wakeup_at + shared.config.suspend_margin) + .signed_duration_since(now) .to_std() .unwrap_or(Duration::ZERO), None => Duration::from_secs(60), diff --git a/crates/durable-test-workflows/src/bin/notify-wait-timeout-expire.rs b/crates/durable-test-workflows/src/bin/notify-wait-timeout-expire.rs new file mode 100644 index 0000000..37d7af1 --- /dev/null +++ b/crates/durable-test-workflows/src/bin/notify-wait-timeout-expire.rs @@ -0,0 +1,17 @@ +use std::time::Duration; + +use durable::notify; + +fn main() { + // Wait with a long timeout and no notification will ever be delivered. The + // call is expected to time out and return `None`. The test harness drives a + // custom clock past the deadline; the only way this workflow completes is if + // the timer-based wakeup fallback fires (i.e. the task was suspended with a + // `wakeup_at` and the runtime resolves that timer against the injected + // clock). + let result = notify::wait_with_timeout(Duration::from_secs(120)); + assert!( + result.is_none(), + "expected the wait to time out with no notification" + ); +} diff --git a/crates/durable-test/tests/it/dst_notify.rs b/crates/durable-test/tests/it/dst_notify.rs index 4bea2a8..f1cb47a 100644 --- a/crates/durable-test/tests/it/dst_notify.rs +++ b/crates/durable-test/tests/it/dst_notify.rs @@ -395,6 +395,143 @@ async fn dst_notify_timeout_recovers_from_lag(pool: sqlx::PgPool) -> anyhow::Res Ok(()) } +/// Regression test for timed notification waits being stranded when they +/// suspend (iopsystems/systemslab#5412). +/// +/// A task that waits on a notification *with a timeout* and suspends must +/// record a `wakeup_at` so the timer can revive it even if its notification is +/// never re-delivered. Crucially, that timer must be resolved against the +/// injected [`DstClock`] rather than the database wall clock — otherwise a +/// custom clock can never drive the wakeup. +/// +/// This test never delivers a notification. The task suspends, and we advance +/// the simulated clock past the user deadline. The runtime must then wake the +/// task purely on the basis of the clock, and the wait must return `None` +/// (timeout) so the workflow completes successfully. +#[sqlx::test] +async fn dst_notify_timeout_timer_wakeup_under_clock(pool: sqlx::PgPool) -> anyhow::Result<()> { + let scheduler = Arc::new(DstScheduler::new(42)); + let start = Utc::now(); + let clock = Arc::new(DstClock::new(start)); + let entropy = Arc::new(DstEntropy::new(42)); + let (event_source, event_handle) = DstEventSource::new(); + + // Zero suspend timeout/margin: the task suspends as soon as it enters the + // wait loop, and becomes wakeable the instant the clock reaches its + // recorded `wakeup_at`. + let config = Config::new() + .suspend_margin(Duration::ZERO) + .suspend_timeout(Duration::ZERO); + + let _guard = durable_test::spawn_worker_with_dst_events( + pool.clone(), + config, + scheduler.clone(), + clock.clone(), + entropy.clone(), + Box::new(event_source), + ) + .await?; + + let client = DurableClient::new(pool.clone())?; + let program = crate::load_binary(&client, "notify-wait-timeout-expire.wasm").await?; + + let task = client + .launch("dst timer wakeup", &program, &serde_json::json!(null)) + .await?; + + // Start the task. + event_handle.send_task(task.id(), None); + + // Wait for the task to suspend, and confirm it recorded a `wakeup_at`. The + // workflow waits with a 120s timeout, so the deadline should land ~120s + // after the simulated start time. Without the fix the suspended row would + // have a NULL `wakeup_at` and could only be revived by a re-delivered + // notification. + let wakeup_at = timeout(Duration::from_secs(30), async { + loop { + let row = sqlx::query!( + r#" + SELECT state = 'suspended' as "suspended!", wakeup_at + FROM durable.task + WHERE id = $1 + "#, + task.id() + ) + .fetch_one(&pool) + .await?; + + if row.suspended { + break anyhow::Ok(row.wakeup_at); + } + + tokio::task::yield_now().await; + } + }) + .await + .context("task did not suspend within 30s")??; + + let wakeup_at = wakeup_at.context("suspended task must record a wakeup_at for the timer")?; + assert!( + wakeup_at >= start + Duration::from_secs(60), + "wakeup_at ({wakeup_at}) should be ~120s after the simulated start ({start})" + ); + + // Advance the simulated clock past the user deadline. This is the crux: the + // only timeline that has moved is the injected clock, so the runtime must + // resolve the wakeup timer against it. We never deliver a notification. + clock.advance(Duration::from_secs(121)); + + // The leader should now mark the task ready purely on the basis of the + // clock. Poke the worker so it re-claims the readied task. + timeout(Duration::from_secs(30), async { + loop { + let suspended = sqlx::query_scalar!( + r#" + SELECT state = 'suspended' as "suspended!" + FROM durable.task + WHERE id = $1 + "#, + task.id() + ) + .fetch_one(&pool) + .await?; + + if !suspended { + break anyhow::Ok(()); + } + + event_handle.send_task(task.id(), None); + tokio::task::yield_now().await; + } + }) + .await + .context("clock-driven wakeup did not move the task out of 'suspended' within 30s")??; + + // Keep nudging the worker until it re-claims and runs the task to + // completion. The wait must return `None` (timeout), so the workflow + // asserts and exits successfully. + let status = timeout(Duration::from_secs(30), async { + loop { + event_handle.send_task(task.id(), None); + if let Ok(status) = + tokio::time::timeout(Duration::from_millis(250), task.wait(&client)).await + { + break status; + } + } + }) + .await + .context("task did not complete after clock-driven wakeup")??; + + assert!( + status.success(), + "task should time out cleanly and complete successfully" + ); + + Ok(()) +} + /// Similar to the lag recovery test above, but the notification arrives /// *after* the lag event. This exercises the path where lag causes a re-poll /// that finds nothing, the task loops back into the select, and then the From 299582402ff12518a314a1e6bf6be67a059e5431 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 15 Jun 2026 18:26:52 +0000 Subject: [PATCH 2/2] Reflow comment to satisfy nightly rustfmt --- crates/durable-runtime/src/plugin/durable/notify.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/durable-runtime/src/plugin/durable/notify.rs b/crates/durable-runtime/src/plugin/durable/notify.rs index dee0685..35e30f7 100644 --- a/crates/durable-runtime/src/plugin/durable/notify.rs +++ b/crates/durable-runtime/src/plugin/durable/notify.rs @@ -119,8 +119,8 @@ impl Host for Task { // Durably record the absolute deadline as a recorded event *before* the // result. This is what gives the timed wait a timer fallback: it is // - computed from the injected `Clock` (so a `DstClock` controls it), and - // - persisted, so it survives a suspend/replay cycle. On replay we read - // the recorded value back rather than recomputing a fresh deadline. + // - persisted, so it survives a suspend/replay cycle. On replay we read the + // recorded value back rather than recomputing a fresh deadline. let deadline_options = TransactionOptions::new("durable:core/notify.notification-blocking-timeout.deadline"); let deadline: DateTime =