From 1c8edaf18dd343dec05fc86b110f300951f21677 Mon Sep 17 00:00:00 2001 From: "dobby-yivi-agent[bot]" <275734547+dobby-yivi-agent[bot]@users.noreply.github.com> Date: Wed, 17 Jun 2026 17:12:04 +0000 Subject: [PATCH] feat: persist rolling-quota usage to SQLite (usage_db) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The config schema already accepts `usage_db = ""`, but `CryptifyConfig` never parsed it and the rolling-quota state lived only in the in-memory `Store.shared.state.usage` map, so any pod restart or redeploy wiped everyone's rolling quota. - Parse `usage_db: Option` in `RawCryptifyConfig` / `CryptifyConfig` with a `usage_db()` accessor. - Back the usage map with SQLite (`rusqlite`, bundled) at that path. The DB is the source of truth; the in-memory map is a cache. On startup existing usage is loaded into the cache; every accounted upload is written through and stale rows outside the rolling window are pruned. - `usage_db` unset → in-memory only, preserving prior behaviour. A configured-but-unopenable DB panics at startup (loud config error). - Tests: persistence across a simulated restart, continued accumulation after restart, rolling-window eviction persisted to the DB, lazy in-memory eviction after reload, and config parsing. Closes #134 Co-Authored-By: Claude Opus 4.8 --- Cargo.lock | 90 ++++++++++++++++- Cargo.toml | 1 + src/config.rs | 48 +++++++++ src/main.rs | 1 + src/store.rs | 272 +++++++++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 409 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26e6fb8..dfd358c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -476,6 +476,7 @@ dependencies = [ "reqwest 0.13.3", "rocket", "rocket_cors", + "rusqlite", "serde", "serde_json", "sha2", @@ -673,6 +674,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.4.1" @@ -728,6 +741,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -987,7 +1006,16 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ - "foldhash", + "foldhash 0.1.5", +] + +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash 0.2.0", ] [[package]] @@ -995,6 +1023,18 @@ name = "hashbrown" version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" +dependencies = [ + "foldhash 0.2.0", +] + +[[package]] +name = "hashlink" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5081f264ed7adee96ea4b4778b6bb9da0a7228b084587aa3bd3ff05da7c5a3b" +dependencies = [ + "hashbrown 0.17.0", +] [[package]] name = "heck" @@ -1560,6 +1600,17 @@ version = "0.2.185" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" +[[package]] +name = "libsqlite3-sys" +version = "0.38.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6c19a05435c21ac299d71b6a9c13db3e3f47c520517d58990a462a1397a61db" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -2424,6 +2475,31 @@ dependencies = [ "uncased", ] +[[package]] +name = "rsqlite-vfs" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c51c9ae4df8a7fba42103df5c621fa3c37eccf3a3c650879e90fc48b11cc192c" +dependencies = [ + "hashbrown 0.16.1", + "thiserror 2.0.18", +] + +[[package]] +name = "rusqlite" +version = "0.40.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11438310b19e3109b6446c33d1ed5e889428cf2e278407bc7896bc4aaea43323" +dependencies = [ + "bitflags 2.11.1", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", + "sqlite-wasm-rs", +] + [[package]] name = "rustc-hash" version = "2.1.2" @@ -2771,6 +2847,18 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "sqlite-wasm-rs" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc3efc0da82635d7e1ced0053bbbfa8c7ab9645d0bf36ceb4f7127bb85315d75" +dependencies = [ + "cc", + "js-sys", + "rsqlite-vfs", + "wasm-bindgen", +] + [[package]] name = "stable-pattern" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 57d5670..ef11d2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ url = "2.5.7" tokio-util = { version = "0.7.17", features = ["compat"] } pg-core = { version = "0.6.1", features = ["rust", "stream"] } minreq = { version = "2.14.1", features = ["json-using-serde", "https-native"]} +rusqlite = { version = "0.40.1", features = ["bundled"] } [dev-dependencies] # Enables `pg_core::test::TestSetup` for building real verifying keys and diff --git a/src/config.rs b/src/config.rs index 7773c41..c0a5c59 100644 --- a/src/config.rs +++ b/src/config.rs @@ -16,6 +16,7 @@ pub struct RawCryptifyConfig { chunk_size: Option, session_ttl_secs: Option, staging_mode: Option, + usage_db: Option, } #[derive(Debug, Deserialize)] @@ -35,6 +36,11 @@ pub struct CryptifyConfig { chunk_size: u64, session_ttl_secs: u64, staging_mode: bool, + /// Filesystem path to the SQLite database backing the rolling-quota + /// usage state. When set, per-sender usage survives process restarts + /// (the in-memory map in `Store` is only a cache). `None` keeps usage + /// entirely in memory, as it was before persistence was added. + usage_db: Option, } impl From for CryptifyConfig { @@ -57,6 +63,7 @@ impl From for CryptifyConfig { chunk_size: config.chunk_size.unwrap_or(5_000_000), session_ttl_secs: config.session_ttl_secs.unwrap_or(3600), staging_mode: config.staging_mode.unwrap_or(false), + usage_db: config.usage_db, } } } @@ -118,6 +125,12 @@ impl CryptifyConfig { self.staging_mode } + /// Path to the SQLite database backing rolling-quota usage, if + /// configured. `None` means usage is kept in memory only. + pub fn usage_db(&self) -> Option<&str> { + self.usage_db.as_deref() + } + #[cfg(test)] pub(crate) fn for_test(server_url: &str, staging_mode: bool) -> Self { CryptifyConfig { @@ -135,6 +148,41 @@ impl CryptifyConfig { chunk_size: 5_000_000, session_ttl_secs: 3600, staging_mode, + usage_db: None, } } } + +#[cfg(test)] +mod tests { + use super::*; + use rocket::figment::{providers::Serialized, Figment}; + + fn base_config() -> serde_json::Value { + serde_json::json!({ + "server_url": "http://localhost", + "data_dir": "/tmp/data", + "email_from": "Test ", + "smtp_url": "localhost", + "smtp_port": 1025u16, + "allowed_origins": ".*", + "pkg_url": "http://localhost", + }) + } + + #[test] + fn usage_db_is_parsed_when_present() { + let mut raw = base_config(); + raw["usage_db"] = serde_json::json!("/app/data/usage.db"); + let config: CryptifyConfig = Figment::from(Serialized::defaults(raw)).extract().unwrap(); + assert_eq!(config.usage_db(), Some("/app/data/usage.db")); + } + + #[test] + fn usage_db_defaults_to_none_when_absent() { + let config: CryptifyConfig = Figment::from(Serialized::defaults(base_config())) + .extract() + .unwrap(); + assert_eq!(config.usage_db(), None); + } +} diff --git a/src/main.rs b/src/main.rs index 5e48209..7720153 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1262,6 +1262,7 @@ pub fn build_rocket(figment: Figment, vk: Parameters) -> Rocket, +} + +impl UsageDb { + /// Open (creating if necessary) the SQLite database at `path` and ensure + /// the schema exists. + fn open(path: &str) -> rusqlite::Result { + let conn = rusqlite::Connection::open(path)?; + // WAL keeps writes from blocking the (rare) concurrent reads and + // survives an unclean pod kill better than the default rollback + // journal. + conn.pragma_update(None, "journal_mode", "WAL")?; + conn.execute( + "CREATE TABLE IF NOT EXISTS usage ( + email TEXT NOT NULL, + timestamp INTEGER NOT NULL, + bytes INTEGER NOT NULL + )", + [], + )?; + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_usage_email_ts ON usage (email, timestamp)", + [], + )?; + Ok(UsageDb { + conn: std::sync::Mutex::new(conn), + }) + } + + /// Load every persisted record into an in-memory map, grouped by email + /// and ordered oldest-first so the resulting `VecDeque`s match what the + /// in-memory path would have built. Stale records are intentionally not + /// pruned here: pruning is relative to the caller-supplied `now`, which + /// only the request path knows. + fn load_all(&self) -> rusqlite::Result>> { + let conn = self.conn.lock().unwrap(); + let mut stmt = + conn.prepare("SELECT email, timestamp, bytes FROM usage ORDER BY timestamp ASC")?; + let rows = stmt.query_map([], |row| { + let email: String = row.get(0)?; + let timestamp: i64 = row.get(1)?; + let bytes: i64 = row.get(2)?; + Ok((email, timestamp, bytes)) + })?; + + let mut map: HashMap> = HashMap::new(); + for row in rows { + let (email, timestamp, bytes) = row?; + map.entry(email).or_default().push_back(UploadRecord { + timestamp, + bytes: bytes as u64, + }); + } + Ok(map) + } + + /// Persist one accounted upload and drop any rows for the same email that + /// have fallen outside the rolling window, keeping the table bounded for + /// active senders. Errors are logged rather than propagated: a database + /// hiccup must not fail an otherwise-successful upload, and the in-memory + /// cache still reflects the record for the lifetime of the process. + fn record(&self, email: &str, bytes: u64, now: i64) { + let conn = self.conn.lock().unwrap(); + if let Err(e) = conn.execute( + "INSERT INTO usage (email, timestamp, bytes) VALUES (?1, ?2, ?3)", + rusqlite::params![email, now, bytes as i64], + ) { + log::error!("Failed to persist usage record for {}: {}", email, e); + return; + } + let cutoff = now - ROLLING_WINDOW_SECS; + if let Err(e) = conn.execute( + "DELETE FROM usage WHERE email = ?1 AND timestamp < ?2", + rusqlite::params![email, cutoff], + ) { + log::error!("Failed to prune usage records for {}: {}", email, e); + } + } +} + struct StoreState { files: HashMap>>, expirations: BTreeMap<(Instant, u64), String>, @@ -115,6 +207,10 @@ struct SharedState { notify: Notify, idle_ttl: Duration, metrics: Arc, + /// SQLite source of truth for rolling-quota usage. `None` keeps usage in + /// memory only (the pre-persistence behaviour, used by unit tests and + /// when `usage_db` is unset in config). + usage_db: Option, } pub struct Store { @@ -127,23 +223,56 @@ impl Store { Self::with_idle_ttl( Duration::from_secs(DEFAULT_UPLOAD_SESSION_IDLE_TIMEOUT_SECS), metrics, + None, ) } - pub fn with_idle_ttl(idle_ttl: Duration, metrics: Arc) -> Self { + /// Construct a store with the given idle-eviction window. When + /// `usage_db` is `Some(path)` the rolling-quota state is backed by a + /// SQLite database at that path: existing usage is loaded from disk on + /// startup and every accounted upload is written through, so quota + /// survives process restarts. A configured-but-unopenable database is a + /// deployment error and panics here, the same way a malformed config + /// does — better a loud startup failure than silently losing quota + /// persistence. + pub fn with_idle_ttl( + idle_ttl: Duration, + metrics: Arc, + usage_db: Option<&str>, + ) -> Self { + let (usage_db, usage) = match usage_db { + Some(path) => { + let db = UsageDb::open(path) + .unwrap_or_else(|e| panic!("Failed to open usage database at {}: {}", path, e)); + let usage = db.load_all().unwrap_or_else(|e| { + panic!("Failed to load usage records from {}: {}", path, e) + }); + let records: usize = usage.values().map(VecDeque::len).sum(); + log::info!( + "Loaded {} usage record(s) for {} sender(s) from {}", + records, + usage.len(), + path + ); + (Some(db), usage) + } + None => (None, HashMap::new()), + }; + let result = Store { shared: Arc::new(SharedState { state: std::sync::Mutex::new(StoreState { files: HashMap::new(), expirations: BTreeMap::new(), expiration_keys: HashMap::new(), - usage: HashMap::new(), + usage, next_id: 0, shutdown: false, }), notify: Notify::new(), idle_ttl, metrics, + usage_db, }), }; @@ -213,6 +342,12 @@ impl Store { } pub fn record_upload(&self, email: String, bytes: u64, now: i64) { + // Persist to the source of truth first so a crash between the two + // updates loses nothing: the cache is rebuilt from the database on + // the next startup anyway. + if let Some(db) = &self.shared.usage_db { + db.record(&email, bytes, now); + } let mut state = self.shared.state.lock().unwrap(); let entry = state.usage.entry(email).or_default(); prune_records(entry, now); @@ -446,6 +581,139 @@ mod tests { assert!(s.expiration_keys.is_empty()); } + /// Unique temp path for a test database, cleaned up by [`TempDbPath`]. + struct TempDbPath { + path: std::path::PathBuf, + } + + impl TempDbPath { + fn new() -> Self { + let path = + std::env::temp_dir().join(format!("cryptify-usage-{}.db", uuid::Uuid::new_v4())); + TempDbPath { path } + } + + fn as_str(&self) -> &str { + self.path.to_str().unwrap() + } + } + + impl Drop for TempDbPath { + fn drop(&mut self) { + // Remove the database file and any WAL/SHM sidecars. + let _ = std::fs::remove_file(&self.path); + for ext in ["-wal", "-shm"] { + let mut p = self.path.clone().into_os_string(); + p.push(ext); + let _ = std::fs::remove_file(p); + } + } + } + + fn store_with_db(path: &str) -> Store { + Store::with_idle_ttl( + Duration::from_secs(DEFAULT_UPLOAD_SESSION_IDLE_TIMEOUT_SECS), + Arc::new(Metrics::new()), + Some(path), + ) + } + + #[rocket::async_test] + async fn usage_survives_simulated_restart() { + let db = TempDbPath::new(); + let now: i64 = 2_000_000; + + { + let store = store_with_db(db.as_str()); + store.record_upload("a@example.com".into(), 1_000_000_000, now - 3600); + store.record_upload("a@example.com".into(), 2_000_000_000, now - 60); + store.record_upload("b@example.com".into(), 500, now - 10); + // store dropped here — simulates the pod going away. + } + + // Fresh Store opening the same database file — simulates restart. + let store = store_with_db(db.as_str()); + let snap = store.get_usage("a@example.com", now); + assert_eq!( + snap.used_bytes, 3_000_000_000, + "usage for a@ must be reloaded from the database after restart" + ); + assert_eq!( + snap.oldest_expires_at, + Some(now - 3600 + ROLLING_WINDOW_SECS) + ); + assert_eq!( + store.get_usage("b@example.com", now).used_bytes, + 500, + "per-sender usage stays isolated across a restart" + ); + } + + #[rocket::async_test] + async fn restart_continues_accumulating() { + let db = TempDbPath::new(); + let now: i64 = 2_000_000; + + { + let store = store_with_db(db.as_str()); + store.record_upload("a@example.com".into(), 1_000, now - 100); + } + + let store = store_with_db(db.as_str()); + // A record made after the restart must add to the reloaded total. + store.record_upload("a@example.com".into(), 2_000, now); + assert_eq!(store.get_usage("a@example.com", now).used_bytes, 3_000); + } + + #[rocket::async_test] + async fn rolling_window_eviction_persists_across_restart() { + let db = TempDbPath::new(); + let now: i64 = 2_000_000; + + { + let store = store_with_db(db.as_str()); + // One record well outside the window, one inside. + store.record_upload( + "c@example.com".into(), + 9_000, + now - ROLLING_WINDOW_SECS - 10, + ); + store.record_upload("c@example.com".into(), 1_000, now - 60); + // A later record at `now` triggers the database-side prune of the + // stale row (DELETE WHERE timestamp < now - window). + store.record_upload("c@example.com".into(), 2_000, now); + } + + // After restart only the two in-window records should remain — the + // expired one must have been evicted from the database, not just the + // in-memory cache. + let store = store_with_db(db.as_str()); + assert_eq!( + store.get_usage("c@example.com", now).used_bytes, + 3_000, + "stale record must not resurrect from the database after restart" + ); + } + + #[rocket::async_test] + async fn rolling_window_evicts_in_memory_after_reload() { + let db = TempDbPath::new(); + let now: i64 = 2_000_000; + + { + let store = store_with_db(db.as_str()); + // Record that is in-window now but will fall out by `later`. + store.record_upload("d@example.com".into(), 4_000, now); + } + + let store = store_with_db(db.as_str()); + // Immediately after reload the record counts. + assert_eq!(store.get_usage("d@example.com", now).used_bytes, 4_000); + // Far in the future it has rolled out of the window. + let later = now + ROLLING_WINDOW_SECS + 1; + assert_eq!(store.get_usage("d@example.com", later).used_bytes, 0); + } + #[rocket::async_test] async fn pruning_removes_only_expired_records() { let store = Store::new(Arc::new(Metrics::new()));