Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 165 additions & 5 deletions crates/buzz-db/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use nostr::Event;
use sqlx::{PgPool, QueryBuilder, Row};
use uuid::Uuid;

use buzz_core::kind::{event_kind_i32, is_ephemeral, is_parameterized_replaceable, KIND_AUTH};
use buzz_core::kind::{
event_kind_i32, is_ephemeral, is_parameterized_replaceable, KIND_AUTH, KIND_EVENT_REMINDER,
};
use buzz_core::StoredEvent;

use crate::error::{DbError, Result};
Expand Down Expand Up @@ -96,6 +98,26 @@ pub fn extract_d_tag(event: &Event) -> Option<String> {
Some(val)
}

/// Extract the `not_before` timestamp for materialization in the `events` table.
///
/// Only applies to `kind:30300` (NIP-ER event reminders). Returns the first
/// valid `not_before` tag value as an `i64` Unix timestamp, or `None` if the
/// event is not a reminder or has no `not_before` tag.
pub fn extract_not_before(event: &Event) -> Option<i64> {
let kind_u32 = event.kind.as_u16() as u32;
if kind_u32 != KIND_EVENT_REMINDER {
return None;
}
event.tags.iter().find_map(|tag| {
let parts = tag.as_slice();
if parts.len() >= 2 && parts[0] == "not_before" {
parts[1].parse::<i64>().ok()
} else {
None
}
})
}

/// Insert a Nostr event. Rejects AUTH and ephemeral kinds.
///
/// Returns `(StoredEvent, was_inserted)` — `was_inserted` is `false` on duplicate.
Expand Down Expand Up @@ -125,10 +147,11 @@ pub async fn insert_event(
.ok_or(DbError::InvalidTimestamp(created_at_secs))?;
let received_at = Utc::now();
let d_tag = extract_d_tag(event);
let not_before = extract_not_before(event);
let result = sqlx::query(
r#"
INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag, not_before)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT DO NOTHING
"#,
)
Expand All @@ -142,6 +165,7 @@ pub async fn insert_event(
.bind(received_at)
.bind(channel_id)
.bind(d_tag.as_deref())
.bind(not_before)
.execute(pool)
.await?;

Expand Down Expand Up @@ -842,13 +866,14 @@ pub async fn insert_event_with_thread_metadata(
.ok_or(DbError::InvalidTimestamp(created_at_secs))?;
let received_at = Utc::now();
let d_tag = extract_d_tag(event);
let not_before = extract_not_before(event);
let mut tx = pool.begin().await?;

// ── Insert event ──────────────────────────────────────────────────────────
let result = sqlx::query(
r#"
INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag, not_before)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT DO NOTHING
"#,
)
Expand All @@ -862,6 +887,7 @@ pub async fn insert_event_with_thread_metadata(
.bind(received_at)
.bind(channel_id)
.bind(d_tag.as_deref())
.bind(not_before)
.execute(&mut *tx)
.await?;

Expand Down Expand Up @@ -981,6 +1007,101 @@ pub async fn insert_event_with_thread_metadata(
))
}

/// A due reminder row returned by [`query_due_reminders`].
#[derive(Debug)]
pub struct DueReminder {
/// The event's raw ID bytes.
pub id: Vec<u8>,
/// The event's pubkey bytes.
pub pubkey: Vec<u8>,
/// The event's `created_at` timestamp.
pub created_at: DateTime<Utc>,
/// The event's kind (always 30300).
pub kind: i32,
/// The event's JSONB tags.
pub tags: serde_json::Value,
/// The event's encrypted content.
pub content: String,
/// The event's signature bytes.
pub sig: Vec<u8>,
/// The channel ID (always None for reminders — global events).
pub channel_id: Option<Uuid>,
}

/// Query due reminders: latest-per-address `kind:30300` rows where
/// `not_before <= now`, `deleted_at IS NULL`, `delivered_at IS NULL`.
///
/// Returns the latest head per `(pubkey, d_tag)` using canonical NIP-16
/// ordering (`created_at DESC, id ASC`).
pub async fn query_due_reminders(
pool: &PgPool,
now_secs: i64,
batch_limit: i64,
) -> Result<Vec<DueReminder>> {
let kind_i32 = KIND_EVENT_REMINDER as i32;
let rows = sqlx::query(
r#"
SELECT DISTINCT ON (pubkey, d_tag)
id, pubkey, created_at, kind, tags, content, sig, channel_id
FROM events
WHERE kind = $1
AND not_before IS NOT NULL
AND not_before <= $2
AND deleted_at IS NULL
AND delivered_at IS NULL
ORDER BY pubkey, d_tag, created_at DESC, id ASC
LIMIT $3
"#,
)
.bind(kind_i32)
.bind(now_secs)
.bind(batch_limit)
.fetch_all(pool)
.await?;

let results = rows
.into_iter()
.map(|row| DueReminder {
id: row.get("id"),
pubkey: row.get("pubkey"),
created_at: row.get("created_at"),
kind: row.get("kind"),
tags: row.get("tags"),
content: row.get("content"),
sig: row.get("sig"),
channel_id: row.get("channel_id"),
})
.collect();

Ok(results)
}

/// Atomically claim a due reminder for delivery. Returns `Some(id)` if this
/// caller won the claim (set `delivered_at`), or `None` if another pod already
/// claimed it. Mirrors the reaper's `archived_at IS NULL` guard for cross-pod
/// idempotency.
pub async fn claim_due_reminder(
pool: &PgPool,
event_id: &[u8],
event_created_at: DateTime<Utc>,
) -> Result<bool> {
let now_epoch = Utc::now().timestamp();
let result = sqlx::query(
r#"
UPDATE events
SET delivered_at = $1
WHERE created_at = $2 AND id = $3 AND delivered_at IS NULL
"#,
)
.bind(now_epoch)
.bind(event_created_at)
.bind(event_id)
.execute(pool)
.await?;

Ok(result.rows_affected() > 0)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1082,4 +1203,43 @@ mod tests {
assert_eq!(result.len(), 2048);
assert_eq!(result, long_val);
}

#[test]
fn extract_not_before_from_reminder() {
let event = make_event_with_kind_and_tags(
KIND_EVENT_REMINDER as u16,
vec![Tag::parse(["not_before", "1717000000"]).unwrap()],
);
assert_eq!(extract_not_before(&event), Some(1_717_000_000));
}

#[test]
fn extract_not_before_absent_returns_none() {
// A bookmark/terminal reminder carries no `not_before` tag.
let event = make_event_with_kind_and_tags(
KIND_EVENT_REMINDER as u16,
vec![Tag::parse(["d", "abc"]).unwrap()],
);
assert_eq!(extract_not_before(&event), None);
}

#[test]
fn extract_not_before_non_reminder_returns_none() {
// Only kind:30300 materializes `not_before`; other kinds stay NULL.
let event = make_event_with_kind_and_tags(
30023,
vec![Tag::parse(["not_before", "1717000000"]).unwrap()],
);
assert_eq!(extract_not_before(&event), None);
}

#[test]
fn extract_not_before_non_numeric_returns_none() {
// Malformed values are rejected by ingest; materialization just skips them.
let event = make_event_with_kind_and_tags(
KIND_EVENT_REMINDER as u16,
vec![Tag::parse(["not_before", "not-a-number"]).unwrap()],
);
assert_eq!(extract_not_before(&event), None);
}
}
25 changes: 23 additions & 2 deletions crates/buzz-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,26 @@ impl Db {
channel::reap_expired_ephemeral_channels(&self.pool).await
}

// ── Reminder scheduler ───────────────────────────────────────────────────

/// Query due reminders ready for delivery.
pub async fn query_due_reminders(
&self,
now_secs: i64,
batch_limit: i64,
) -> Result<Vec<event::DueReminder>> {
event::query_due_reminders(&self.pool, now_secs, batch_limit).await
}

/// Atomically claim a due reminder for delivery (cross-pod dedup).
pub async fn claim_due_reminder(
&self,
event_id: &[u8],
event_created_at: chrono::DateTime<chrono::Utc>,
) -> Result<bool> {
event::claim_due_reminder(&self.pool, event_id, event_created_at).await
}

// ── Users ────────────────────────────────────────────────────────────────

/// Ensure a user record exists (upsert).
Expand Down Expand Up @@ -1732,8 +1752,8 @@ impl Db {
let received_at = chrono::Utc::now();

let insert_result = sqlx::query(
"INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \
"INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag, not_before) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \
ON CONFLICT DO NOTHING",
)
.bind(event.id.as_bytes().as_slice())
Expand All @@ -1746,6 +1766,7 @@ impl Db {
.bind(received_at)
.bind(channel_id)
.bind(d_tag)
.bind(event::extract_not_before(event))
.execute(&mut *tx)
.await?;

Expand Down
45 changes: 40 additions & 5 deletions crates/buzz-db/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ mod tests {
const TEST_DB_URL: &str = "postgres://buzz:buzz_dev@localhost:5432/buzz";

#[test]
fn embedded_migrator_contains_initial_schema_and_d_tag_backfill() {
fn embedded_migrator_contains_all_schema_migrations() {
let migrations: Vec<_> = MIGRATOR.iter().collect();

assert_eq!(migrations.len(), 2);
assert_eq!(migrations.len(), 3);
assert_eq!(migrations[0].version, 1);
assert_eq!(&*migrations[0].description, "initial schema");
assert!(
Expand All @@ -149,6 +149,17 @@ mod tests {
migrations[1].sql.as_str().contains("UPDATE events"),
"second migration should backfill existing event rows"
);

assert_eq!(migrations[2].version, 3);
assert_eq!(&*migrations[2].description, "event reminders");
assert!(
migrations[2]
.sql
.as_str()
.contains("ADD COLUMN not_before BIGINT")
&& migrations[2].sql.as_str().contains("idx_events_not_before"),
"third migration should add the NIP-ER reminder columns and index"
);
}

async fn connect_test_pool() -> PgPool {
Expand Down Expand Up @@ -181,6 +192,26 @@ mod tests {
.expect("read applied migrations")
}

/// Returns `schema/schema.sql` with the NIP-ER reminder DDL removed, so it
/// models a pre-stack deployment whose `events` table lacks the reminder
/// columns and index. The strip is asserted: if the snapshot text drifts so
/// these fragments no longer match, the test fails loudly rather than
/// silently loading a snapshot that already carries the reminder columns
/// (which would make migration 0003 collide on re-add).
fn pre_reminder_schema_snapshot() -> String {
const REMINDER_COLUMNS: &str = " not_before BIGINT,\n delivered_at BIGINT,\n";
const REMINDER_INDEX: &str = "CREATE INDEX idx_events_not_before ON events (not_before)\n WHERE not_before IS NOT NULL AND deleted_at IS NULL AND delivered_at IS NULL;\n";

assert!(
SCHEMA_SQL.contains(REMINDER_COLUMNS) && SCHEMA_SQL.contains(REMINDER_INDEX),
"schema.sql reminder DDL drifted; update pre_reminder_schema_snapshot to match"
);

SCHEMA_SQL
.replace(REMINDER_COLUMNS, "")
.replace(REMINDER_INDEX, "")
}

#[tokio::test]
#[ignore = "requires Postgres"]
async fn run_migrations_applies_embedded_versions_on_fresh_database() {
Expand All @@ -189,7 +220,7 @@ mod tests {

run_migrations(&pool).await.expect("run migrations");

assert_eq!(applied_versions(&pool).await, vec![1, 2]);
assert_eq!(applied_versions(&pool).await, vec![1, 2, 3]);
let events_exists = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'events')",
)
Expand All @@ -204,7 +235,11 @@ mod tests {
async fn run_migrations_baselines_existing_schema_and_preserves_allowlist_backfill_path() {
let pool = connect_test_pool().await;
reset_public_schema(&pool).await;
sqlx::raw_sql(SCHEMA_SQL)
// Load a pre-stack snapshot (without the NIP-ER reminder DDL) so the
// events table matches a real pre-SQLx deployment, which never had the
// reminder columns. Migration 0003 must then add them — proving the
// genuine prod-upgrade path, not a snapshot that already carries them.
sqlx::raw_sql(sqlx::AssertSqlSafe(pre_reminder_schema_snapshot()))
.execute(&pool)
.await
.expect("load pre-SQLx schema snapshot");
Expand All @@ -218,7 +253,7 @@ mod tests {

run_migrations(&pool).await.expect("baseline migrations");

assert_eq!(applied_versions(&pool).await, vec![1, 2]);
assert_eq!(applied_versions(&pool).await, vec![1, 2, 3]);
let allowlist_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM pubkey_allowlist")
.fetch_one(&pool)
.await
Expand Down
Loading