From e21983d125b537689f381a0de3834471510f1994 Mon Sep 17 00:00:00 2001 From: npub1mn7jgtj4w2pd0g0zeuhxsa6jy6p0rewxz4kujt98my82ahfmp72sxjexk7 Date: Wed, 10 Jun 2026 14:04:46 -0400 Subject: [PATCH 1/3] feat(relay): add NIP-ER push scheduler, ingest relaxation, and NIP-11 advertisement The relay now proactively delivers due reminders via Redis pub/sub. Cross-pod dedup uses an atomic delivered_at claim (mirrors the reaper's archived_at guard). Ingest relaxed to allow kind:30300 without not_before (bookmarks/terminal states). max_not_before_delta enforced to prevent unbounded-future scheduling abuse. Schema: nullable not_before + delivered_at columns on events, partial index for scheduler queries. NIP-11: supported_extensions ["nip-er"], due_delivery_mode "push", max_not_before_delta advertised. Spec (NIP-ER.md) amended: not_before optional on kind:30300, required only for pending reminders that may become due. Co-authored-by: Will Pfleger Signed-off-by: Will Pfleger --- crates/buzz-db/src/event.rs | 170 +++++++++++++++++- crates/buzz-db/src/lib.rs | 25 ++- crates/buzz-relay/src/handlers/ingest.rs | 25 +++ crates/buzz-relay/src/main.rs | 97 +++++++++- crates/buzz-relay/src/nip11.rs | 17 ++ .../tests/e2e_event_reminder.rs | 27 +++ docs/nips/NIP-ER.md | 13 +- schema/schema.sql | 4 + 8 files changed, 368 insertions(+), 10 deletions(-) diff --git a/crates/buzz-db/src/event.rs b/crates/buzz-db/src/event.rs index f1558a99f..4393d2614 100644 --- a/crates/buzz-db/src/event.rs +++ b/crates/buzz-db/src/event.rs @@ -9,7 +9,9 @@ use nostr::Event; use sqlx::{PgPool, QueryBuilder, Row}; use uuid::Uuid; -use buzz_core::kind::{event_kind_i32, is_ephemeral, is_parameterized_replaceable, KIND_AUTH}; +use buzz_core::kind::{ + event_kind_i32, is_ephemeral, is_parameterized_replaceable, KIND_AUTH, KIND_EVENT_REMINDER, +}; use buzz_core::StoredEvent; use crate::error::{DbError, Result}; @@ -96,6 +98,26 @@ pub fn extract_d_tag(event: &Event) -> Option { Some(val) } +/// Extract the `not_before` timestamp for materialization in the `events` table. +/// +/// Only applies to `kind:30300` (NIP-ER event reminders). Returns the first +/// valid `not_before` tag value as an `i64` Unix timestamp, or `None` if the +/// event is not a reminder or has no `not_before` tag. +pub fn extract_not_before(event: &Event) -> Option { + let kind_u32 = event.kind.as_u16() as u32; + if kind_u32 != KIND_EVENT_REMINDER { + return None; + } + event.tags.iter().find_map(|tag| { + let parts = tag.as_slice(); + if parts.len() >= 2 && parts[0] == "not_before" { + parts[1].parse::().ok() + } else { + None + } + }) +} + /// Insert a Nostr event. Rejects AUTH and ephemeral kinds. /// /// Returns `(StoredEvent, was_inserted)` — `was_inserted` is `false` on duplicate. @@ -125,10 +147,11 @@ pub async fn insert_event( .ok_or(DbError::InvalidTimestamp(created_at_secs))?; let received_at = Utc::now(); let d_tag = extract_d_tag(event); + let not_before = extract_not_before(event); let result = sqlx::query( r#" - INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag, not_before) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING "#, ) @@ -142,6 +165,7 @@ pub async fn insert_event( .bind(received_at) .bind(channel_id) .bind(d_tag.as_deref()) + .bind(not_before) .execute(pool) .await?; @@ -842,13 +866,14 @@ pub async fn insert_event_with_thread_metadata( .ok_or(DbError::InvalidTimestamp(created_at_secs))?; let received_at = Utc::now(); let d_tag = extract_d_tag(event); + let not_before = extract_not_before(event); let mut tx = pool.begin().await?; // ── Insert event ────────────────────────────────────────────────────────── let result = sqlx::query( r#" - INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag, not_before) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING "#, ) @@ -862,6 +887,7 @@ pub async fn insert_event_with_thread_metadata( .bind(received_at) .bind(channel_id) .bind(d_tag.as_deref()) + .bind(not_before) .execute(&mut *tx) .await?; @@ -981,6 +1007,101 @@ pub async fn insert_event_with_thread_metadata( )) } +/// A due reminder row returned by [`query_due_reminders`]. +#[derive(Debug)] +pub struct DueReminder { + /// The event's raw ID bytes. + pub id: Vec, + /// The event's pubkey bytes. + pub pubkey: Vec, + /// The event's `created_at` timestamp. + pub created_at: DateTime, + /// The event's kind (always 30300). + pub kind: i32, + /// The event's JSONB tags. + pub tags: serde_json::Value, + /// The event's encrypted content. + pub content: String, + /// The event's signature bytes. + pub sig: Vec, + /// The channel ID (always None for reminders — global events). + pub channel_id: Option, +} + +/// Query due reminders: latest-per-address `kind:30300` rows where +/// `not_before <= now`, `deleted_at IS NULL`, `delivered_at IS NULL`. +/// +/// Returns the latest head per `(pubkey, d_tag)` using canonical NIP-16 +/// ordering (`created_at DESC, id ASC`). +pub async fn query_due_reminders( + pool: &PgPool, + now_secs: i64, + batch_limit: i64, +) -> Result> { + let kind_i32 = KIND_EVENT_REMINDER as i32; + let rows = sqlx::query( + r#" + SELECT DISTINCT ON (pubkey, d_tag) + id, pubkey, created_at, kind, tags, content, sig, channel_id + FROM events + WHERE kind = $1 + AND not_before IS NOT NULL + AND not_before <= $2 + AND deleted_at IS NULL + AND delivered_at IS NULL + ORDER BY pubkey, d_tag, created_at DESC, id ASC + LIMIT $3 + "#, + ) + .bind(kind_i32) + .bind(now_secs) + .bind(batch_limit) + .fetch_all(pool) + .await?; + + let results = rows + .into_iter() + .map(|row| DueReminder { + id: row.get("id"), + pubkey: row.get("pubkey"), + created_at: row.get("created_at"), + kind: row.get("kind"), + tags: row.get("tags"), + content: row.get("content"), + sig: row.get("sig"), + channel_id: row.get("channel_id"), + }) + .collect(); + + Ok(results) +} + +/// Atomically claim a due reminder for delivery. Returns `Some(id)` if this +/// caller won the claim (set `delivered_at`), or `None` if another pod already +/// claimed it. Mirrors the reaper's `archived_at IS NULL` guard for cross-pod +/// idempotency. +pub async fn claim_due_reminder( + pool: &PgPool, + event_id: &[u8], + event_created_at: DateTime, +) -> Result { + let now_epoch = Utc::now().timestamp(); + let result = sqlx::query( + r#" + UPDATE events + SET delivered_at = $1 + WHERE created_at = $2 AND id = $3 AND delivered_at IS NULL + "#, + ) + .bind(now_epoch) + .bind(event_created_at) + .bind(event_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + #[cfg(test)] mod tests { use super::*; @@ -1082,4 +1203,43 @@ mod tests { assert_eq!(result.len(), 2048); assert_eq!(result, long_val); } + + #[test] + fn extract_not_before_from_reminder() { + let event = make_event_with_kind_and_tags( + KIND_EVENT_REMINDER as u16, + vec![Tag::parse(["not_before", "1717000000"]).unwrap()], + ); + assert_eq!(extract_not_before(&event), Some(1_717_000_000)); + } + + #[test] + fn extract_not_before_absent_returns_none() { + // A bookmark/terminal reminder carries no `not_before` tag. + let event = make_event_with_kind_and_tags( + KIND_EVENT_REMINDER as u16, + vec![Tag::parse(["d", "abc"]).unwrap()], + ); + assert_eq!(extract_not_before(&event), None); + } + + #[test] + fn extract_not_before_non_reminder_returns_none() { + // Only kind:30300 materializes `not_before`; other kinds stay NULL. + let event = make_event_with_kind_and_tags( + 30023, + vec![Tag::parse(["not_before", "1717000000"]).unwrap()], + ); + assert_eq!(extract_not_before(&event), None); + } + + #[test] + fn extract_not_before_non_numeric_returns_none() { + // Malformed values are rejected by ingest; materialization just skips them. + let event = make_event_with_kind_and_tags( + KIND_EVENT_REMINDER as u16, + vec![Tag::parse(["not_before", "not-a-number"]).unwrap()], + ); + assert_eq!(extract_not_before(&event), None); + } } diff --git a/crates/buzz-db/src/lib.rs b/crates/buzz-db/src/lib.rs index e51a61d7f..d6c45f2d3 100644 --- a/crates/buzz-db/src/lib.rs +++ b/crates/buzz-db/src/lib.rs @@ -539,6 +539,26 @@ impl Db { channel::reap_expired_ephemeral_channels(&self.pool).await } + // ── Reminder scheduler ─────────────────────────────────────────────────── + + /// Query due reminders ready for delivery. + pub async fn query_due_reminders( + &self, + now_secs: i64, + batch_limit: i64, + ) -> Result> { + event::query_due_reminders(&self.pool, now_secs, batch_limit).await + } + + /// Atomically claim a due reminder for delivery (cross-pod dedup). + pub async fn claim_due_reminder( + &self, + event_id: &[u8], + event_created_at: chrono::DateTime, + ) -> Result { + event::claim_due_reminder(&self.pool, event_id, event_created_at).await + } + // ── Users ──────────────────────────────────────────────────────────────── /// Ensure a user record exists (upsert). @@ -1732,8 +1752,8 @@ impl Db { let received_at = chrono::Utc::now(); let insert_result = sqlx::query( - "INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \ + "INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag, not_before) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \ ON CONFLICT DO NOTHING", ) .bind(event.id.as_bytes().as_slice()) @@ -1746,6 +1766,7 @@ impl Db { .bind(received_at) .bind(channel_id) .bind(d_tag) + .bind(event::extract_not_before(event)) .execute(&mut *tx) .await?; diff --git a/crates/buzz-relay/src/handlers/ingest.rs b/crates/buzz-relay/src/handlers/ingest.rs index 8f1627bcb..3b3123e64 100644 --- a/crates/buzz-relay/src/handlers/ingest.rs +++ b/crates/buzz-relay/src/handlers/ingest.rs @@ -1037,6 +1037,20 @@ fn validate_event_reminder(event: &Event) -> Result<(), &'static str> { // `not_before` is optional — terminal states (done/cancelled) and bookmarks // omit it. The ordering check only applies when both are present. if let Some(nb) = not_before { + // Reject reminders scheduled beyond the configured horizon. The same + // SPROUT_MAX_NOT_BEFORE_DELTA env var is advertised in NIP-11. + let max_delta: u64 = std::env::var("SPROUT_MAX_NOT_BEFORE_DELTA") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(31_536_000); // 1 year default + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + if nb > now + max_delta { + return Err("not_before too far in future"); + } + if let Some(exp) = expiration { if let Ok(exp) = exp.parse::() { if exp <= nb { @@ -2458,6 +2472,17 @@ mod tests { assert!(validate_event_reminder(&ev).is_ok()); } + #[test] + fn reminder_rejects_not_before_too_far_in_future() { + // `not_before` beyond the max horizon (default 1 year) is rejected. + let far_future = (chrono::Utc::now().timestamp() as u64) + 63_072_000; // ~2 years + let ev = make_reminder(&[&["d", "abc"], &["not_before", &far_future.to_string()]]); + assert_eq!( + validate_event_reminder(&ev), + Err("not_before too far in future") + ); + } + #[test] fn reminder_rejects_duplicate_not_before() { let ev = make_reminder(&[ diff --git a/crates/buzz-relay/src/main.rs b/crates/buzz-relay/src/main.rs index 20437a92b..d2bafd300 100644 --- a/crates/buzz-relay/src/main.rs +++ b/crates/buzz-relay/src/main.rs @@ -1,7 +1,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; -use tracing::{error, info}; +use tracing::{error, info, warn}; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use buzz_audit::AuditService; @@ -404,6 +404,86 @@ async fn main() -> anyhow::Result<()> { }); } + // NIP-ER reminder scheduler — polls for due reminders and publishes them + // to Redis pub/sub for cross-pod fan-out. Each pod's existing + // subscribe_local consumer picks them up and applies the author-only gate. + // Mirrors the channel reaper pattern. Cross-pod dedup via `delivered_at` + // column: only the pod that wins the atomic claim publishes. + { + let scheduler_state = Arc::clone(&state); + let scheduler_interval_secs: u64 = std::env::var("SPROUT_REMINDER_SCHEDULER_INTERVAL_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(10); + let scheduler_batch_limit: i64 = std::env::var("SPROUT_REMINDER_SCHEDULER_BATCH_LIMIT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(100); + tokio::spawn(async move { + info!( + interval_secs = scheduler_interval_secs, + batch_limit = scheduler_batch_limit, + "NIP-ER reminder scheduler started" + ); + loop { + tokio::time::sleep(std::time::Duration::from_secs(scheduler_interval_secs)).await; + + let now_secs = chrono::Utc::now().timestamp(); + let due = match scheduler_state + .db + .query_due_reminders(now_secs, scheduler_batch_limit) + .await + { + Ok(reminders) => reminders, + Err(e) => { + error!("Reminder scheduler tick failed: {e}"); + continue; + } + }; + + if due.is_empty() { + continue; + } + + info!(count = due.len(), "Reminder scheduler: due reminders found"); + + for reminder in due { + // Publish first, then claim. If publish fails the reminder + // stays unclaimed and will be retried next tick. If claim + // fails after a successful publish, duplicate fan-out on the + // next tick is harmless (subscribers dedup by event ID). + if let Err(e) = scheduler_state + .pubsub + .publish_event(uuid::Uuid::nil(), &reminder_to_event(&reminder)) + .await + { + error!( + event_id = hex::encode(&reminder.id), + "Reminder scheduler: Redis publish failed, skipping claim: {e}" + ); + continue; + } + + // Atomic cross-pod claim — only the winner marks it delivered. + match scheduler_state + .db + .claim_due_reminder(&reminder.id, reminder.created_at) + .await + { + Ok(true) => {} + Ok(false) => {} // Another pod claimed it; duplicate publish is harmless. + Err(e) => { + warn!( + event_id = hex::encode(&reminder.id), + "Reminder scheduler: claim failed after publish (duplicate delivery possible): {e}" + ); + } + } + } + } + }); + } + // Multi-node fan-out consumer: receive events from Redis pub/sub // (published by other relay instances) and fan out to local WS subscribers. { @@ -635,6 +715,21 @@ async fn shutdown_signal() { tokio::signal::ctrl_c().await.ok(); } } +/// Reconstruct a `nostr::Event` from a [`DueReminder`] row for Redis pub/sub. +fn reminder_to_event(reminder: &buzz_db::event::DueReminder) -> nostr::Event { + let event_json = serde_json::json!({ + "id": hex::encode(&reminder.id), + "pubkey": hex::encode(&reminder.pubkey), + "created_at": reminder.created_at.timestamp(), + "kind": reminder.kind as u16, + "tags": reminder.tags, + "content": reminder.content, + "sig": hex::encode(&reminder.sig), + }); + + serde_json::from_value(event_json).expect("valid event JSON from DB row") +} + #[cfg(test)] mod tests { use super::buzz_auto_migrate_enabled; diff --git a/crates/buzz-relay/src/nip11.rs b/crates/buzz-relay/src/nip11.rs index 7fb11cafa..d797dc807 100644 --- a/crates/buzz-relay/src/nip11.rs +++ b/crates/buzz-relay/src/nip11.rs @@ -32,6 +32,9 @@ pub struct RelayInfo { pub contact: Option, /// NIPs supported by this relay. pub supported_nips: Vec, + /// Draft/extension protocol identifiers supported by this relay. + #[serde(skip_serializing_if = "Option::is_none")] + pub supported_extensions: Option>, /// URL of the relay software repository. pub software: String, /// Relay software version string. @@ -65,6 +68,12 @@ pub struct RelayLimitation { pub payment_required: bool, /// Whether writes are restricted to authorized pubkeys. pub restricted_writes: bool, + /// NIP-ER: how the relay delivers due reminders ("push" or "lazy"). + #[serde(skip_serializing_if = "Option::is_none")] + pub due_delivery_mode: Option, + /// NIP-ER: maximum allowed `not_before` horizon in seconds from now. + #[serde(skip_serializing_if = "Option::is_none")] + pub max_not_before_delta: Option, } /// Canonical `RelayLimitation` advertised by this relay. @@ -74,6 +83,11 @@ pub struct RelayLimitation { /// `AuthState::Authenticated`. This is independent of the REST API token /// toggle (`config.require_auth_token`). fn relay_limitation() -> RelayLimitation { + let max_not_before_delta: u64 = std::env::var("SPROUT_MAX_NOT_BEFORE_DELTA") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(31_536_000); // 1 year default + RelayLimitation { max_message_length: Some(MAX_FRAME_BYTES as u64), max_subscriptions: Some(1024), @@ -84,6 +98,8 @@ fn relay_limitation() -> RelayLimitation { auth_required: true, payment_required: false, restricted_writes: true, + due_delivery_mode: Some("push".to_string()), + max_not_before_delta: Some(max_not_before_delta), } } @@ -119,6 +135,7 @@ impl RelayInfo { pubkey: None, contact: None, supported_nips, + supported_extensions: Some(vec!["nip-er".to_string()]), software: "https://github.com/block/buzz".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), limitation: Some(relay_limitation()), diff --git a/crates/buzz-test-client/tests/e2e_event_reminder.rs b/crates/buzz-test-client/tests/e2e_event_reminder.rs index 66126c4e5..ae3a00f7a 100644 --- a/crates/buzz-test-client/tests/e2e_event_reminder.rs +++ b/crates/buzz-test-client/tests/e2e_event_reminder.rs @@ -1009,3 +1009,30 @@ async fn test_ws_count_returns_zero_for_other_users_reminders() { ws_other.disconnect().await.expect("disconnect"); } + +#[tokio::test] +#[ignore] +async fn test_reminder_rejected_not_before_too_far_in_future() { + let client = http_client(); + let keys = Keys::generate(); + let d_tag = uuid::Uuid::new_v4().to_string(); + + // Set not_before to 2 years from now (exceeds default 1-year max_not_before_delta) + let two_years_from_now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + + 63_072_000; // ~2 years + + let event = build_reminder( + &keys, + &d_tag, + vec![Tag::parse(["not_before", &two_years_from_now.to_string()]).unwrap()], + ); + let (accepted, msg) = submit_event_http(&client, &keys, &event).await; + assert!(!accepted, "should reject not_before too far in future"); + assert!( + msg.contains("not_before too far in future"), + "unexpected message: {msg}" + ); +} diff --git a/docs/nips/NIP-ER.md b/docs/nips/NIP-ER.md index a52bb0e06..f384ff7ef 100644 --- a/docs/nips/NIP-ER.md +++ b/docs/nips/NIP-ER.md @@ -6,7 +6,7 @@ Event Reminders `draft` `optional` `relay` -This NIP defines encrypted, author-only reminders as `kind:30300` addressable events. A reminder carries a public `not_before` tag that tells supporting relays when the reminder is due, while the reminder target, note, and state are encrypted to the author with [NIP-44](44.md). +This NIP defines encrypted, author-only reminders as `kind:30300` addressable events. A pending reminder carries a public `not_before` tag that tells supporting relays when the reminder is due, while the reminder target, note, and state are encrypted to the author with [NIP-44](44.md). A reminder without `not_before` is a bookmark (saved item with no due time) or a terminal state (done/cancelled). The relay learns that an author has a reminder due at a time. It does not learn what the reminder is about. @@ -55,9 +55,18 @@ Required tags for a reminder that may become due: ] ``` +For bookmarks (saved items) or terminal states (done/cancelled), `not_before` is omitted: + +```jsonc +[ + ["d", ""], + ["alt", "Encrypted reminder"] +] +``` + `d` MUST be an opaque random value with at least 128 bits of entropy and MUST NOT be derived from the target event, reminder text, or reminder time. Events with no `d` tag, an empty `d` tag, or more than one `d` tag are invalid. -`not_before` MUST be a decimal Unix timestamp string. It MUST contain only ASCII digits, with no sign, whitespace, decimal point, or leading zero except `"0"`. It MUST parse exactly as an integer in the range 0 through 9007199254740991 inclusive. Implementations MUST NOT parse it through lossy floating-point conversion, and MUST treat values outside this range or values that overflow their parser as malformed. Events MUST contain at most one `not_before` tag. Supporting relays SHOULD reject events with an invalid or duplicate `not_before` tag using `invalid: malformed not_before`. Clients MUST ignore pending reminders without exactly one valid `not_before`. +`not_before` MUST be a decimal Unix timestamp string. It MUST contain only ASCII digits, with no sign, whitespace, decimal point, or leading zero except `"0"`. It MUST parse exactly as an integer in the range 0 through 9007199254740991 inclusive. Implementations MUST NOT parse it through lossy floating-point conversion, and MUST treat values outside this range or values that overflow their parser as malformed. Events MUST contain at most one `not_before` tag. Supporting relays SHOULD reject events with an invalid or duplicate `not_before` tag using `invalid: malformed not_before`. A pending reminder that may become due MUST include exactly one valid `not_before`. Bookmarks and terminal states (done/cancelled) MUST omit `not_before`. Clients MUST ignore pending reminders without exactly one valid `not_before`. `alt` is RECOMMENDED for [NIP-31](31.md) fallback text. diff --git a/schema/schema.sql b/schema/schema.sql index 274fd6774..2a839adf6 100644 --- a/schema/schema.sql +++ b/schema/schema.sql @@ -105,6 +105,8 @@ CREATE TABLE events ( channel_id UUID, deleted_at TIMESTAMPTZ, d_tag TEXT, + not_before BIGINT, + delivered_at BIGINT, PRIMARY KEY (created_at, id) ) PARTITION BY RANGE (created_at); @@ -132,6 +134,8 @@ CREATE INDEX idx_events_id ON events (id); CREATE INDEX idx_events_deleted ON events (deleted_at); CREATE INDEX idx_events_addressable ON events (kind, pubkey, channel_id, deleted_at); CREATE INDEX idx_events_parameterized ON events (kind, pubkey, d_tag, deleted_at) WHERE d_tag IS NOT NULL; +CREATE INDEX idx_events_not_before ON events (not_before) + WHERE not_before IS NOT NULL AND deleted_at IS NULL AND delivered_at IS NULL; -- ── Event mentions ──────────────────────────────────────────────────────────── From a6ecd23dc35ffe31ec629f93dfd4dbd07a185237 Mon Sep 17 00:00:00 2001 From: npub1mn7jgtj4w2pd0g0zeuhxsa6jy6p0rewxz4kujt98my82ahfmp72sxjexk7 Date: Mon, 15 Jun 2026 22:46:15 -0400 Subject: [PATCH 2/3] test(relay): cover scheduler-tick delivery of due reminders MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing 28 e2e reminder tests exercise validation and author-only reads, but all run against the relay without depending on the scheduler — the due-time push path that is the scheduler's reason to exist had no coverage. Add a positive test that isolates the scheduler from ingest fan-out: submit a reminder due a few seconds out with no socket connected (ingest fan-out has no recipient), then subscribe and drain the historical EOSE before not_before passes (scheduler cannot have fired). The post-EOSE live delivery can only come from the scheduler claiming and publishing the due reminder, so the test fails by timeout when the scheduler is disabled. Co-authored-by: Will Pfleger Signed-off-by: Will Pfleger --- .../tests/e2e_event_reminder.rs | 182 ++++++++++++++++-- 1 file changed, 168 insertions(+), 14 deletions(-) diff --git a/crates/buzz-test-client/tests/e2e_event_reminder.rs b/crates/buzz-test-client/tests/e2e_event_reminder.rs index ae3a00f7a..a2d41fb27 100644 --- a/crates/buzz-test-client/tests/e2e_event_reminder.rs +++ b/crates/buzz-test-client/tests/e2e_event_reminder.rs @@ -5,6 +5,8 @@ //! expiration ordering //! - Read-path filtering: author-only enforcement on REQ, COUNT, and the //! HTTP bridge (/query, /count) +//! - Scheduler delivery: the due-reminder poll pushes a reminder to the +//! author's live subscription when `not_before` passes //! //! # Running //! @@ -17,7 +19,7 @@ use std::time::Duration; use buzz_test_client::{BuzzTestClient, RelayMessage}; -use nostr::{EventBuilder, Filter, Keys, Kind, Tag}; +use nostr::{EventBuilder, Filter, Keys, Kind, Tag, Timestamp}; use reqwest::Client; use serde_json::Value; @@ -64,6 +66,30 @@ fn build_reminder(keys: &Keys, d_tag: &str, extra_tags: Vec) -> nostr::Even .unwrap() } +/// Build a reminder with an explicit `created_at`, so replacement-ordering +/// tests are deterministic instead of racing the whole-second timestamp the +/// default builder stamps. +fn build_reminder_at( + keys: &Keys, + d_tag: &str, + created_at: Timestamp, + extra_tags: Vec, +) -> nostr::Event { + let mut tags = vec![ + Tag::parse(["d", d_tag]).unwrap(), + Tag::parse(["alt", "Encrypted reminder"]).unwrap(), + ]; + tags.extend(extra_tags); + EventBuilder::new( + Kind::Custom(KIND_EVENT_REMINDER), + "nip44-ciphertext-placeholder", + ) + .tags(tags) + .custom_created_at(created_at) + .sign_with_keys(keys) + .unwrap() +} + /// Submit an event via the HTTP bridge and return (accepted, message). async fn submit_event_http(client: &Client, keys: &Keys, event: &nostr::Event) -> (bool, String) { let pubkey_hex = keys.public_key().to_hex(); @@ -75,10 +101,19 @@ async fn submit_event_http(client: &Client, keys: &Keys, event: &nostr::Event) - .send() .await .expect("submit event"); + let status = resp.status().as_u16(); let body: Value = resp.json().await.expect("parse response"); - let accepted = body["accepted"].as_bool().unwrap_or(false); - let message = body["message"].as_str().unwrap_or("").to_string(); - (accepted, message) + if status == 200 { + let accepted = body["accepted"].as_bool().unwrap_or(false); + let message = body["message"].as_str().unwrap_or("").to_string(); + (accepted, message) + } else { + // Rejections come back as `api_error` → `{"error": msg}` with no + // `accepted`/`message` fields (see relay api/mod.rs). Mirror the + // sibling `count_events_http`, which reads `error` on non-200. + let message = body["error"].as_str().unwrap_or("").to_string(); + (false, message) + } } /// Query events via the HTTP bridge. Returns the JSON array of events. @@ -756,18 +791,28 @@ async fn test_reminder_not_before_zero_accepted() { #[tokio::test] #[ignore] -async fn test_reminder_not_before_max_safe_integer_accepted() { +async fn test_reminder_not_before_max_safe_integer_rejected_too_far_in_future() { let client = http_client(); let keys = Keys::generate(); let d_tag = uuid::Uuid::new_v4().to_string(); + // MAX_SAFE_INTEGER is structurally valid (NIP-ER.md:60, range 0..=2^53-1), + // but ~285M years out exceeds the relay's default max_not_before horizon + // (NIP-ER.md:130), so the relay correctly rejects it. let event = build_reminder( &keys, &d_tag, vec![Tag::parse(["not_before", "9007199254740991"]).unwrap()], ); let (accepted, msg) = submit_event_http(&client, &keys, &event).await; - assert!(accepted, "MAX_SAFE_INTEGER should be valid: {msg}"); + assert!( + !accepted, + "MAX_SAFE_INTEGER exceeds horizon, should reject: {msg}" + ); + assert!( + msg.contains("too far in future"), + "unexpected message: {msg}" + ); } #[tokio::test] @@ -779,22 +824,27 @@ async fn test_reminder_replacement_semantics() { let pubkey_hex = keys.public_key().to_hex(); let d_tag = uuid::Uuid::new_v4().to_string(); - // First version - let event1 = build_reminder( + // First version. Explicit distinct created_at makes replacement ordering + // deterministic: the default builder stamps whole-second timestamps, so v1 + // and v2 landing in the same second would let the stale-write tiebreak + // (created_at == existing && incoming_id >= existing_id) keep v1 and fail + // the assert. v2 two seconds later can never collide. + let v1_ts = Timestamp::now(); + let v2_ts = v1_ts + 2u64; + let event1 = build_reminder_at( &keys, &d_tag, + v1_ts, vec![Tag::parse(["not_before", "1717000000"]).unwrap()], ); let (accepted, msg) = submit_event_http(&client, &keys, &event1).await; assert!(accepted, "first version rejected: {msg}"); - // Small delay to ensure created_at differs - tokio::time::sleep(Duration::from_millis(100)).await; - // Second version (snooze — later not_before) - let event2 = build_reminder( + let event2 = build_reminder_at( &keys, &d_tag, + v2_ts, vec![Tag::parse(["not_before", "1718000000"]).unwrap()], ); let (accepted, msg) = submit_event_http(&client, &keys, &event2).await; @@ -851,8 +901,16 @@ async fn test_fanout_isolation_other_user_does_not_receive_reminder() { .await .expect("connect other"); let sid = sub_id("fanout-isolation"); - // Subscribe to all kinds (wildcard) — no kind filter, no channel filter. - let filter = Filter::new(); + // Subscribe with a mixed-kind filter authored by B itself. This passes both + // the p-gate (neither kind:9 nor 30300 is p-gated) and the author-only + // pre-filter (authors=[self=B]), so the subscription registers for fan-out. + // Reminder isolation is then proven by the per-event `is_author_only_event` + // omission: A's reminder is silently dropped from B's live delivery rather + // than rejected up front. A kindless wildcard would instead trip the + // anti-harvest p-gate guard (CLOSED), proving the wrong mechanism. + let filter = Filter::new() + .kinds(vec![Kind::Custom(9), Kind::Custom(KIND_EVENT_REMINDER)]) + .author(other_keys.public_key()); ws_other .subscribe(&sid, vec![filter]) .await @@ -1036,3 +1094,99 @@ async fn test_reminder_rejected_not_before_too_far_in_future() { "unexpected message: {msg}" ); } + +// ── Scheduler delivery test ────────────────────────────────────────────────── + +/// True if the event carries a `d` tag equal to `d_tag`. +fn has_d_tag(event: &nostr::Event, d_tag: &str) -> bool { + event.tags.iter().any(|t| { + let parts = t.as_slice(); + parts.len() >= 2 && parts[0] == "d" && parts[1] == d_tag + }) +} + +/// Wait for a live `EVENT` on `sub_id` whose `d` tag is `d_tag`, draining the +/// initial EOSE and any historical matches first. The post-EOSE delivery is the +/// scheduler's push — with the scheduler disabled, no such frame ever arrives +/// and this returns `Timeout`. +async fn await_scheduler_push( + ws: &mut BuzzTestClient, + sub_id: &str, + d_tag: &str, + timeout_dur: Duration, +) -> Result<(), String> { + // Drain the stored-events phase up to EOSE; the reminder may appear here as + // history, which proves storage but not the scheduler. + ws.collect_until_eose(sub_id, Duration::from_secs(5)) + .await + .map_err(|e| format!("EOSE phase failed: {e:?}"))?; + + let deadline = tokio::time::Instant::now() + timeout_dur; + loop { + let remaining = deadline + .checked_duration_since(tokio::time::Instant::now()) + .unwrap_or(Duration::ZERO); + if remaining.is_zero() { + return Err("no live scheduler push received before timeout".to_string()); + } + match ws.recv_event(remaining).await { + Ok(RelayMessage::Event { + subscription_id, + event, + }) if subscription_id == sub_id && has_d_tag(&event, d_tag) => return Ok(()), + Ok(_) => {} + Err(e) => return Err(format!("recv failed: {e:?}")), + } + } +} + +/// The scheduler tick delivers a due reminder to the author's live subscription. +/// +/// Submits a reminder whose `not_before` is a few seconds out *before* any +/// WebSocket is connected, so the ingest-time fan-out has no subscriber to +/// deliver to. The author then subscribes and drains the historical EOSE while +/// the reminder is still not due — so the scheduler cannot have fired yet. Once +/// `not_before` passes, the only path that reaches the live subscription is the +/// scheduler polling `query_due_reminders` and publishing the event for +/// fan-out. +/// +/// Requires a low scheduler interval; run the relay with +/// `SPROUT_REMINDER_SCHEDULER_INTERVAL_SECS=1`. +#[tokio::test] +#[ignore] +async fn test_scheduler_delivers_due_reminder_to_author_subscription() { + let url = relay_url(); + let keys = Keys::generate(); + let d_tag = uuid::Uuid::new_v4().to_string(); + + // Submit a reminder due a few seconds out, with no WebSocket connected — the + // ingest fan-out therefore has zero live recipients, and the reminder is not + // yet due so the scheduler will not have claimed it before we subscribe. + let due_at = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + + 3; + let client = http_client(); + let event = build_reminder( + &keys, + &d_tag, + vec![Tag::parse(["not_before", &due_at.to_string()]).unwrap()], + ); + let (accepted, msg) = submit_event_http(&client, &keys, &event).await; + assert!(accepted, "setup failed: {msg}"); + + // Now subscribe as the author and wait for the scheduler to push the due + // reminder live (after the historical EOSE). + let mut ws = BuzzTestClient::connect(&url, &keys).await.expect("connect"); + let sid = sub_id("scheduler-delivery"); + let filter = Filter::new() + .kind(Kind::Custom(KIND_EVENT_REMINDER)) + .author(keys.public_key()); + ws.subscribe(&sid, vec![filter]).await.expect("subscribe"); + + let result = await_scheduler_push(&mut ws, &sid, &d_tag, Duration::from_secs(15)).await; + + ws.disconnect().await.expect("disconnect"); + result.expect("scheduler should deliver the due reminder to the author"); +} From 72b44afe43fef6fbdca017095ab1e170c0482a41 Mon Sep 17 00:00:00 2001 From: npub1mn7jgtj4w2pd0g0zeuhxsa6jy6p0rewxz4kujt98my82ahfmp72sxjexk7 Date: Tue, 16 Jun 2026 12:06:03 -0400 Subject: [PATCH 3/3] feat(relay): add event-reminder columns as sqlx migration #988 moved the schema source-of-truth from schema/schema.sql to sqlx migrations under migrations/, run on relay startup via BUZZ_AUTO_MIGRATE. The NIP-ER stack added not_before, delivered_at, and idx_events_not_before to the snapshot only, so an auto-migrated relay had no reminder columns and the scheduler failed at runtime. This migration adds them to the runtime path, byte-equivalent to the snapshot. Co-authored-by: Will Pfleger Signed-off-by: Will Pfleger --- crates/buzz-db/src/migration.rs | 45 +++++++++++++++++++++++++---- migrations/0003_event_reminders.sql | 17 +++++++++++ 2 files changed, 57 insertions(+), 5 deletions(-) create mode 100644 migrations/0003_event_reminders.sql diff --git a/crates/buzz-db/src/migration.rs b/crates/buzz-db/src/migration.rs index aa0a96a5e..f4f3c9ab3 100644 --- a/crates/buzz-db/src/migration.rs +++ b/crates/buzz-db/src/migration.rs @@ -125,10 +125,10 @@ mod tests { const TEST_DB_URL: &str = "postgres://buzz:buzz_dev@localhost:5432/buzz"; #[test] - fn embedded_migrator_contains_initial_schema_and_d_tag_backfill() { + fn embedded_migrator_contains_all_schema_migrations() { let migrations: Vec<_> = MIGRATOR.iter().collect(); - assert_eq!(migrations.len(), 2); + assert_eq!(migrations.len(), 3); assert_eq!(migrations[0].version, 1); assert_eq!(&*migrations[0].description, "initial schema"); assert!( @@ -149,6 +149,17 @@ mod tests { migrations[1].sql.as_str().contains("UPDATE events"), "second migration should backfill existing event rows" ); + + assert_eq!(migrations[2].version, 3); + assert_eq!(&*migrations[2].description, "event reminders"); + assert!( + migrations[2] + .sql + .as_str() + .contains("ADD COLUMN not_before BIGINT") + && migrations[2].sql.as_str().contains("idx_events_not_before"), + "third migration should add the NIP-ER reminder columns and index" + ); } async fn connect_test_pool() -> PgPool { @@ -181,6 +192,26 @@ mod tests { .expect("read applied migrations") } + /// Returns `schema/schema.sql` with the NIP-ER reminder DDL removed, so it + /// models a pre-stack deployment whose `events` table lacks the reminder + /// columns and index. The strip is asserted: if the snapshot text drifts so + /// these fragments no longer match, the test fails loudly rather than + /// silently loading a snapshot that already carries the reminder columns + /// (which would make migration 0003 collide on re-add). + fn pre_reminder_schema_snapshot() -> String { + const REMINDER_COLUMNS: &str = " not_before BIGINT,\n delivered_at BIGINT,\n"; + const REMINDER_INDEX: &str = "CREATE INDEX idx_events_not_before ON events (not_before)\n WHERE not_before IS NOT NULL AND deleted_at IS NULL AND delivered_at IS NULL;\n"; + + assert!( + SCHEMA_SQL.contains(REMINDER_COLUMNS) && SCHEMA_SQL.contains(REMINDER_INDEX), + "schema.sql reminder DDL drifted; update pre_reminder_schema_snapshot to match" + ); + + SCHEMA_SQL + .replace(REMINDER_COLUMNS, "") + .replace(REMINDER_INDEX, "") + } + #[tokio::test] #[ignore = "requires Postgres"] async fn run_migrations_applies_embedded_versions_on_fresh_database() { @@ -189,7 +220,7 @@ mod tests { run_migrations(&pool).await.expect("run migrations"); - assert_eq!(applied_versions(&pool).await, vec![1, 2]); + assert_eq!(applied_versions(&pool).await, vec![1, 2, 3]); let events_exists = sqlx::query_scalar::<_, bool>( "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'events')", ) @@ -204,7 +235,11 @@ mod tests { async fn run_migrations_baselines_existing_schema_and_preserves_allowlist_backfill_path() { let pool = connect_test_pool().await; reset_public_schema(&pool).await; - sqlx::raw_sql(SCHEMA_SQL) + // Load a pre-stack snapshot (without the NIP-ER reminder DDL) so the + // events table matches a real pre-SQLx deployment, which never had the + // reminder columns. Migration 0003 must then add them — proving the + // genuine prod-upgrade path, not a snapshot that already carries them. + sqlx::raw_sql(sqlx::AssertSqlSafe(pre_reminder_schema_snapshot())) .execute(&pool) .await .expect("load pre-SQLx schema snapshot"); @@ -218,7 +253,7 @@ mod tests { run_migrations(&pool).await.expect("baseline migrations"); - assert_eq!(applied_versions(&pool).await, vec![1, 2]); + assert_eq!(applied_versions(&pool).await, vec![1, 2, 3]); let allowlist_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM pubkey_allowlist") .fetch_one(&pool) .await diff --git a/migrations/0003_event_reminders.sql b/migrations/0003_event_reminders.sql new file mode 100644 index 000000000..846928c30 --- /dev/null +++ b/migrations/0003_event_reminders.sql @@ -0,0 +1,17 @@ +-- Add NIP-ER event-reminder columns and due-delivery index to the events table. +-- +-- `not_before` is the reminder's scheduled delivery time (Unix seconds); +-- `delivered_at` records when the scheduler published it. Both are nullable — +-- non-reminder events leave them NULL. The partial index covers only +-- undelivered, live reminders so the scheduler's due-query stays cheap. +-- +-- `events` is partitioned by RANGE (created_at); ALTER TABLE on the parent +-- cascades the columns to every partition, and CREATE INDEX on the parent +-- builds a partitioned index that propagates to each partition. +-- +-- Managed by sqlx migrations. + +ALTER TABLE events ADD COLUMN not_before BIGINT; +ALTER TABLE events ADD COLUMN delivered_at BIGINT; +CREATE INDEX idx_events_not_before ON events (not_before) + WHERE not_before IS NOT NULL AND deleted_at IS NULL AND delivered_at IS NULL;