From 0fad44f66c4a26493211e5d09775590b380659c3 Mon Sep 17 00:00:00 2001 From: bomanaps Date: Mon, 22 Jun 2026 10:09:18 +0100 Subject: [PATCH] feat(audit): emit events for export, purge, reindex --- crates/audit/src/lib.rs | 2 +- crates/audit/src/sinks/memory.rs | 111 ++++ crates/audit/src/sinks/mod.rs | 2 + crates/hfs/src/main.rs | 228 ++++--- .../src/core/bulk_export_worker.rs | 132 +++- crates/persistence/src/search/reindex.rs | 115 ++++ crates/rest/Cargo.toml | 2 +- crates/rest/src/handlers/bulk_export.rs | 302 ++++++++-- crates/rest/src/handlers/mod.rs | 6 + crates/rest/src/handlers/purge.rs | 252 ++++++++ crates/rest/src/handlers/reindex.rs | 306 ++++++++++ crates/rest/src/lib.rs | 69 +++ crates/rest/src/reindex.rs | 56 ++ crates/rest/src/routing/fhir_routes.rs | 21 + crates/rest/src/state.rs | 37 ++ crates/rest/tests/audit_persistence.rs | 568 ++++++++++++++++++ 16 files changed, 2073 insertions(+), 136 deletions(-) create mode 100644 crates/audit/src/sinks/memory.rs create mode 100644 crates/rest/src/handlers/purge.rs create mode 100644 crates/rest/src/handlers/reindex.rs create mode 100644 crates/rest/src/reindex.rs create mode 100644 crates/rest/tests/audit_persistence.rs diff --git a/crates/audit/src/lib.rs b/crates/audit/src/lib.rs index b91341195..03cac854e 100644 --- a/crates/audit/src/lib.rs +++ b/crates/audit/src/lib.rs @@ -73,4 +73,4 @@ pub use middleware::{AuditAgent, AuditMiddlewareState, AuditResponseContext}; pub use sink::AuditSink; #[cfg(feature = "cloudwatch")] pub use sinks::CloudWatchLogsSink; -pub use sinks::{DatabaseSink, FileSink, NullSink}; +pub use sinks::{DatabaseSink, FileSink, InMemoryAuditSink, NullSink}; diff --git a/crates/audit/src/sinks/memory.rs b/crates/audit/src/sinks/memory.rs new file mode 100644 index 000000000..084f374ab --- /dev/null +++ b/crates/audit/src/sinks/memory.rs @@ -0,0 +1,111 @@ +//! In-memory audit sink for tests. +//! +//! Buffers every recorded [`AuditEvent`] in a `Vec` behind a `Mutex` so test +//! code can assert on emitted events. Not intended for production use. + +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; + +use crate::fhir_model::AuditEvent; +use crate::sink::AuditSink; + +/// Test-only sink that retains every recorded [`AuditEvent`] in memory. +/// +/// Clones share the same underlying buffer (events recorded through any clone +/// are visible to all of them), which makes it convenient to register the sink +/// in an [`AppState`] while still holding a handle for assertions. +#[derive(Clone, Default)] +pub struct InMemoryAuditSink { + events: Arc>>, +} + +impl InMemoryAuditSink { + /// Creates a new, empty in-memory sink. + pub fn new() -> Self { + Self::default() + } + + /// Returns a snapshot of every event recorded so far. + pub fn events(&self) -> Vec { + self.events + .lock() + .expect("audit sink mutex poisoned") + .clone() + } + + /// Returns the number of events currently buffered. + pub fn len(&self) -> usize { + self.events.lock().expect("audit sink mutex poisoned").len() + } + + /// Returns `true` when no events have been recorded. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Removes all buffered events. + pub fn clear(&self) { + self.events + .lock() + .expect("audit sink mutex poisoned") + .clear(); + } +} + +#[async_trait] +impl AuditSink for InMemoryAuditSink { + async fn record(&self, event: AuditEvent) { + self.events + .lock() + .expect("audit sink mutex poisoned") + .push(event); + } + + async fn flush(&self) {} + + fn name(&self) -> &str { + "memory" + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::builder::AuditEventBuilder; + + #[tokio::test] + async fn test_record_stores_event() { + let sink = InMemoryAuditSink::new(); + let event = AuditEventBuilder::new("Device/hfs") + .detail("audit-operation", "test") + .build(); + sink.record(event).await; + assert_eq!(sink.len(), 1); + } + + #[tokio::test] + async fn test_clones_share_buffer() { + let sink = InMemoryAuditSink::new(); + let clone = sink.clone(); + clone + .record(AuditEventBuilder::new("Device/hfs").build()) + .await; + assert_eq!(sink.len(), 1); + assert_eq!(clone.len(), 1); + } + + #[tokio::test] + async fn test_clear_resets_buffer() { + let sink = InMemoryAuditSink::new(); + sink.record(AuditEventBuilder::new("Device/hfs").build()) + .await; + sink.clear(); + assert!(sink.is_empty()); + } + + #[test] + fn test_name() { + assert_eq!(InMemoryAuditSink::new().name(), "memory"); + } +} diff --git a/crates/audit/src/sinks/mod.rs b/crates/audit/src/sinks/mod.rs index 3f3418717..e9ce52ab7 100644 --- a/crates/audit/src/sinks/mod.rs +++ b/crates/audit/src/sinks/mod.rs @@ -4,10 +4,12 @@ pub mod cloudwatch; pub mod database; pub mod file; +pub mod memory; pub mod null; #[cfg(feature = "cloudwatch")] pub use cloudwatch::CloudWatchLogsSink; pub use database::DatabaseSink; pub use file::FileSink; +pub use memory::InMemoryAuditSink; pub use null::NullSink; diff --git a/crates/hfs/src/main.rs b/crates/hfs/src/main.rs index 5c4376b37..555be7f83 100644 --- a/crates/hfs/src/main.rs +++ b/crates/hfs/src/main.rs @@ -31,14 +31,20 @@ use helios_rest::{ use tracing::info; use helios_persistence::backends::local_fs::LocalFsOutputStore; +#[cfg(any(feature = "sqlite", feature = "postgres"))] +use helios_persistence::core::storage::PurgableStorage; use helios_persistence::core::{ BulkExportJobStore, BulkSubmitJobStore, DefaultExportWorker, DefaultSubmitWorker, ExportOutputStore, SubmitInputFetcher, WorkerId, }; #[cfg(any(feature = "sqlite", feature = "postgres"))] +use helios_persistence::search::reindex::{ReindexOperation, ReindexableStorage}; +#[cfg(any(feature = "sqlite", feature = "postgres"))] use helios_rest::bulk_export_auth::BearerScopeAuth; #[cfg(any(feature = "sqlite", feature = "postgres"))] -use helios_rest::create_app_with_auth_and_bulk; +use helios_rest::create_app_with_auth_and_bulk_and_ops; +#[cfg(any(feature = "sqlite", feature = "postgres"))] +use helios_rest::{OperationsBundle, reindex::ReindexController}; #[cfg(feature = "sqlite")] use helios_persistence::backends::sqlite::{SqliteBackend, SqliteBackendConfig}; @@ -504,12 +510,15 @@ async fn start_mongodb( let backend = Arc::new(backend); let serve_audit_state = audit_state.clone(); - // MongoDB primary; embedded SQLite sidecar for job state. + // MongoDB doesn't implement PurgableStorage/ReindexableStorage — the + // empty operations bundle makes $purge / $reindex return 501. #[cfg(feature = "sqlite")] { let jobs = build_embedded_job_store(&config)?; - if let Some(bundle) = build_bulk_export(&config, backend.clone(), jobs).await? { - let app = create_app_with_auth_and_bulk( + if let Some(bundle) = + build_bulk_export(&config, backend.clone(), jobs, audit_state.as_ref()).await? + { + let app = create_app_with_auth_and_bulk_and_ops( backend, config.clone(), auth_config, @@ -517,6 +526,7 @@ async fn start_mongodb( audit_state, Some(bundle), None, + OperationsBundle::default(), ); return serve(app, &config, serve_audit_state).await; } @@ -843,29 +853,28 @@ async fn start_sqlite( let serve_audit_state = audit_state.clone(); let backend = Arc::new(create_sqlite_backend(&config)?); - let export_bundle = build_bulk_export(&config, backend.clone(), backend.clone()).await?; + let export_bundle = build_bulk_export( + &config, + backend.clone(), + backend.clone(), + audit_state.as_ref(), + ) + .await?; let submit_bundle = build_bulk_submit(&config, backend.clone()).await?; - if export_bundle.is_some() || submit_bundle.is_some() { - let app = create_app_with_auth_and_bulk( - backend, - config.clone(), - auth_config, - auth_state, - audit_state, - export_bundle, - submit_bundle, - ); - return serve(app, &config, serve_audit_state).await; - } - - let app = create_app_with_auth( - Arc::try_unwrap(backend).unwrap_or_else(|_| { - unreachable!("backend Arc is uniquely owned when bulk export is disabled") - }), + let ops_bundle = build_ops_bundle( + backend.clone(), + backend.search_extractor().clone(), + audit_state.as_ref(), + ); + let app = create_app_with_auth_and_bulk_and_ops( + backend, config.clone(), auth_config, auth_state, audit_state, + export_bundle, + submit_bundle, + ops_bundle, ); serve(app, &config, serve_audit_state).await } @@ -938,6 +947,7 @@ async fn build_bulk_export( config: &ServerConfig, data: Arc, jobs: Arc, + audit_state: Option<&Arc>, ) -> anyhow::Result> where Dp: helios_persistence::core::ExportResourceProvider + 'static, @@ -1008,7 +1018,21 @@ where other => anyhow::bail!("invalid HFS_BULK_EXPORT_OUTPUT_BACKEND '{other}'"), }; - spawn_export_workers(jobs.clone(), data, output.clone(), &cfg); + let (worker_audit_sink, worker_audit_observer) = match audit_state { + Some(state) => ( + Some(Arc::clone(&state.sink)), + state.config.source_observer.clone(), + ), + None => (None, "Device/hfs".to_string()), + }; + spawn_export_workers( + jobs.clone(), + data, + output.clone(), + &cfg, + worker_audit_sink, + worker_audit_observer, + ); Ok(Some(helios_rest::BulkExportBundle { jobs, @@ -1017,6 +1041,34 @@ where })) } +/// Builds an [`OperationsBundle`] from a backend that implements both +/// [`PurgableStorage`] and [`ReindexableStorage`]. SQLite and Postgres +/// backends satisfy these bounds; other backends are wired through +/// [`OperationsBundle::default`] and the corresponding handlers return +/// `501 Not Implemented`. +#[cfg(any(feature = "sqlite", feature = "postgres"))] +fn build_ops_bundle( + backend: Arc, + search_extractor: Arc, + audit_state: Option<&Arc>, +) -> OperationsBundle +where + B: PurgableStorage + ReindexableStorage + 'static, +{ + let reindex_op = ReindexOperation::new(Arc::clone(&backend), search_extractor); + let reindex_op = match audit_state { + Some(state) => reindex_op.with_audit( + Arc::clone(&state.sink), + state.config.source_observer.clone(), + ), + None => reindex_op, + }; + OperationsBundle { + purge: Some(backend as Arc), + reindex: Some(Arc::new(reindex_op) as Arc), + } +} + /// Spawns the in-process export worker pool and the periodic cleanup task. #[cfg(any(feature = "sqlite", feature = "postgres"))] fn spawn_export_workers( @@ -1024,6 +1076,8 @@ fn spawn_export_workers( data: Arc, output: Arc, cfg: &helios_rest::config::BulkExportConfig, + audit_sink: Option>, + audit_source_observer: String, ) where Dp: helios_persistence::core::ExportResourceProvider + 'static, { @@ -1038,9 +1092,15 @@ fn spawn_export_workers( let output = output.clone(); let worker_id = WorkerId::new(format!("hfs-worker-{i}")); let exclude_newly_added = cfg.since_newly_added.eq_ignore_ascii_case("exclude"); + let worker_audit_sink = audit_sink.clone(); + let worker_audit_observer = audit_source_observer.clone(); tokio::spawn(async move { - let worker = DefaultExportWorker::new(jobs.clone(), data, output, worker_id.clone()) - .with_exclude_since_newly_added(exclude_newly_added); + let mut worker = + DefaultExportWorker::new(jobs.clone(), data, output, worker_id.clone()) + .with_exclude_since_newly_added(exclude_newly_added); + if let Some(sink) = worker_audit_sink { + worker = worker.with_audit(sink, worker_audit_observer); + } loop { match jobs.claim_next(&worker_id, lease).await { Ok(Some(claimed)) => { @@ -1396,29 +1456,28 @@ async fn start_sqlite_elasticsearch( let serve_audit_state = audit_state.clone(); let composite = Arc::new(composite); - let export_bundle = build_bulk_export(&config, sqlite.clone(), sqlite.clone()).await?; + let export_bundle = build_bulk_export( + &config, + sqlite.clone(), + sqlite.clone(), + audit_state.as_ref(), + ) + .await?; let submit_bundle = build_bulk_submit(&config, sqlite.clone()).await?; - if export_bundle.is_some() || submit_bundle.is_some() { - let app = create_app_with_auth_and_bulk( - composite, - config.clone(), - auth_config, - auth_state, - audit_state, - export_bundle, - submit_bundle, - ); - return serve(app, &config, serve_audit_state).await; - } - - let app = create_app_with_auth( - Arc::try_unwrap(composite).unwrap_or_else(|_| { - unreachable!("composite Arc is uniquely owned when bulk export is disabled") - }), + let ops_bundle = build_ops_bundle( + sqlite.clone(), + sqlite.search_extractor().clone(), + audit_state.as_ref(), + ); + let app = create_app_with_auth_and_bulk_and_ops( + composite, config.clone(), auth_config, auth_state, audit_state, + export_bundle, + submit_bundle, + ops_bundle, ); serve(app, &config, serve_audit_state).await } @@ -1464,29 +1523,28 @@ async fn start_postgres( let backend = Arc::new(backend); let serve_audit_state = audit_state.clone(); - let export_bundle = build_bulk_export(&config, backend.clone(), backend.clone()).await?; + let export_bundle = build_bulk_export( + &config, + backend.clone(), + backend.clone(), + audit_state.as_ref(), + ) + .await?; let submit_bundle = build_bulk_submit(&config, backend.clone()).await?; - if export_bundle.is_some() || submit_bundle.is_some() { - let app = create_app_with_auth_and_bulk( - backend, - config.clone(), - auth_config, - auth_state, - audit_state, - export_bundle, - submit_bundle, - ); - return serve(app, &config, serve_audit_state).await; - } - - let app = create_app_with_auth( - Arc::try_unwrap(backend).unwrap_or_else(|_| { - unreachable!("backend Arc is uniquely owned when bulk export is disabled") - }), + let ops_bundle = build_ops_bundle( + backend.clone(), + backend.search_extractor().clone(), + audit_state.as_ref(), + ); + let app = create_app_with_auth_and_bulk_and_ops( + backend, config.clone(), auth_config, auth_state, audit_state, + export_bundle, + submit_bundle, + ops_bundle, ); serve(app, &config, serve_audit_state).await } @@ -1630,29 +1688,23 @@ async fn start_postgres_elasticsearch( let serve_audit_state = audit_state.clone(); let composite = Arc::new(composite); - let export_bundle = build_bulk_export(&config, pg.clone(), pg.clone()).await?; + let export_bundle = + build_bulk_export(&config, pg.clone(), pg.clone(), audit_state.as_ref()).await?; let submit_bundle = build_bulk_submit(&config, pg.clone()).await?; - if export_bundle.is_some() || submit_bundle.is_some() { - let app = create_app_with_auth_and_bulk( - composite, - config.clone(), - auth_config, - auth_state, - audit_state, - export_bundle, - submit_bundle, - ); - return serve(app, &config, serve_audit_state).await; - } - - let app = create_app_with_auth( - Arc::try_unwrap(composite).unwrap_or_else(|_| { - unreachable!("composite Arc is uniquely owned when bulk export is disabled") - }), + let ops_bundle = build_ops_bundle( + pg.clone(), + pg.search_extractor().clone(), + audit_state.as_ref(), + ); + let app = create_app_with_auth_and_bulk_and_ops( + composite, config.clone(), auth_config, auth_state, audit_state, + export_bundle, + submit_bundle, + ops_bundle, ); serve(app, &config, serve_audit_state).await } @@ -1788,12 +1840,14 @@ async fn start_mongodb_elasticsearch( let serve_audit_state = audit_state.clone(); let composite = Arc::new(composite); - // MongoDB primary; embedded SQLite sidecar for job state. + // See `start_mongodb` for why the operations bundle is empty. #[cfg(feature = "sqlite")] { let jobs = build_embedded_job_store(&config)?; - if let Some(bundle) = build_bulk_export(&config, mongo.clone(), jobs).await? { - let app = create_app_with_auth_and_bulk( + if let Some(bundle) = + build_bulk_export(&config, mongo.clone(), jobs, audit_state.as_ref()).await? + { + let app = create_app_with_auth_and_bulk_and_ops( composite, config.clone(), auth_config, @@ -1801,6 +1855,7 @@ async fn start_mongodb_elasticsearch( audit_state, Some(bundle), None, + OperationsBundle::default(), ); return serve(app, &config, serve_audit_state).await; } @@ -2077,12 +2132,14 @@ async fn start_s3_elasticsearch( let serve_audit_state = audit_state.clone(); let composite = Arc::new(composite); - // S3 primary; embedded SQLite sidecar for job state. + // See `start_mongodb` for why the operations bundle is empty. #[cfg(feature = "sqlite")] { let jobs = build_embedded_job_store(&config)?; - if let Some(bundle) = build_bulk_export(&config, s3.clone(), jobs).await? { - let app = create_app_with_auth_and_bulk( + if let Some(bundle) = + build_bulk_export(&config, s3.clone(), jobs, audit_state.as_ref()).await? + { + let app = create_app_with_auth_and_bulk_and_ops( composite, config.clone(), auth_config, @@ -2090,6 +2147,7 @@ async fn start_s3_elasticsearch( audit_state, Some(bundle), None, + OperationsBundle::default(), ); return serve(app, &config, serve_audit_state).await; } diff --git a/crates/persistence/src/core/bulk_export_worker.rs b/crates/persistence/src/core/bulk_export_worker.rs index eb69cf62c..f543a1544 100644 --- a/crates/persistence/src/core/bulk_export_worker.rs +++ b/crates/persistence/src/core/bulk_export_worker.rs @@ -252,6 +252,13 @@ pub struct DefaultExportWorker { /// before `_since` for patients added to the Group after `_since` /// (using `Group.member.period.start`). pub exclude_since_newly_added: bool, + /// Optional audit sink for emitting worker-level `AuditEvent` records + /// at job completion / failure / cancellation. + #[cfg(feature = "audit")] + pub audit_sink: Option>, + /// `AuditEvent.source.observer` reference used for worker-emitted events. + #[cfg(feature = "audit")] + pub audit_source_observer: String, } impl DefaultExportWorker @@ -268,6 +275,10 @@ where output, worker_id, exclude_since_newly_added: false, + #[cfg(feature = "audit")] + audit_sink: None, + #[cfg(feature = "audit")] + audit_source_observer: "Device/hfs".to_string(), } } @@ -277,16 +288,104 @@ where self } + /// Wires an audit sink so the worker emits a BALP `AuditEvent` at job + /// completion, failure, and cancellation. + #[cfg(feature = "audit")] + pub fn with_audit( + mut self, + sink: std::sync::Arc, + source_observer: impl Into, + ) -> Self { + self.audit_sink = Some(sink); + self.audit_source_observer = source_observer.into(); + self + } + + /// Emits a worker-level bulk-export `AuditEvent` when an audit sink is + /// configured. No-op otherwise. + #[cfg(feature = "audit")] + async fn emit_worker_audit( + &self, + operation: &str, + job_id: &ExportJobId, + level: Option<&ExportLevel>, + agent: Option<&str>, + outcome: &str, + outcome_desc: Option<&str>, + ) { + let Some(sink) = self.audit_sink.as_ref() else { + return; + }; + let mut builder = helios_audit::AuditEventBuilder::new(&self.audit_source_observer) + .event_type( + "http://terminology.hl7.org/CodeSystem/audit-event-type", + "object", + ) + .action(helios_audit::AuditAction::Execute) + .outcome(outcome) + .detail("audit-operation", "bulk-export") + .detail("bulk-export-operation", operation) + .detail("job-id", job_id.as_str()); + if let Some(l) = level { + builder = builder.detail("export-level", l.to_string()); + } + if let Some(desc) = outcome_desc { + builder = builder.outcome_desc(desc); + } + if let Some(a) = agent { + builder = builder.agent(a, None, true); + } + sink.record(builder.build()).await; + } + /// Runs the export job described by `lease` to completion. /// /// Every job-state mutation is fenced by `lease.worker_id` + /// `lease.fencing_token`; any `LeaseError::LeaseLost` aborts the run /// silently (the worker that reclaimed the job now owns it). pub async fn run_job(&self, lease: ExportJobLease) -> StorageResult<()> { - match self.run_job_inner(&lease).await { - Ok(()) => Ok(()), + let inner_result = self.run_job_inner(&lease).await; + #[cfg(feature = "audit")] + let agent_subject = self + .jobs + .get_export_job_metadata(&lease.tenant, &lease.job_id) + .await + .ok() + .and_then(|m| m.owner_subject); + match inner_result { + Ok(WorkerRunOutcome::Completed { level }) => { + #[cfg(feature = "audit")] + self.emit_worker_audit( + "worker-complete", + &lease.job_id, + Some(&level), + agent_subject.as_deref(), + "0", + None, + ) + .await; + #[cfg(not(feature = "audit"))] + let _ = level; + Ok(()) + } + Ok(WorkerRunOutcome::Cancelled { level }) => { + #[cfg(feature = "audit")] + self.emit_worker_audit( + "worker-cancelled", + &lease.job_id, + Some(&level), + agent_subject.as_deref(), + "4", + Some("export job cancelled mid-run"), + ) + .await; + #[cfg(not(feature = "audit"))] + let _ = level; + Ok(()) + } Err(LeaseError::LeaseLost { .. }) => { - // Another worker owns the job now — stop silently. + // Another worker owns the job now — stop silently. No audit + // emission: the worker that reclaimed the job will emit one. Ok(()) } Err(LeaseError::Storage(e)) => { @@ -301,12 +400,22 @@ where &e.to_string(), ) .await; + #[cfg(feature = "audit")] + self.emit_worker_audit( + "worker-failed", + &lease.job_id, + None, + agent_subject.as_deref(), + "8", + Some(&e.to_string()), + ) + .await; Err(e) } } } - async fn run_job_inner(&self, lease: &ExportJobLease) -> Result<(), LeaseError> { + async fn run_job_inner(&self, lease: &ExportJobLease) -> Result { let tenant = &lease.tenant; let job_id = &lease.job_id; let wid = &lease.worker_id; @@ -388,7 +497,9 @@ where // Cooperative cancellation check. if let Ok(progress) = self.jobs.get_export_status(tenant, job_id).await { if progress.status == ExportStatus::Cancelled { - return Ok(()); + return Ok(WorkerRunOutcome::Cancelled { + level: view.level.clone(), + }); } } @@ -511,10 +622,19 @@ where self.jobs .finish_export_job(tenant, job_id, wid, token) .await?; - Ok(()) + Ok(WorkerRunOutcome::Completed { level: view.level }) } } +/// Terminal state of a [`DefaultExportWorker::run_job_inner`] call, used by +/// `run_job` to pick the right audit phase after the lease/cancellation +/// branching resolves. +#[derive(Debug, Clone)] +enum WorkerRunOutcome { + Completed { level: ExportLevel }, + Cancelled { level: ExportLevel }, +} + /// Applies `_elements` projection to an NDJSON line. /// /// When `elements` is non-empty, keeps `resourceType`, `id`, `meta` and the diff --git a/crates/persistence/src/search/reindex.rs b/crates/persistence/src/search/reindex.rs index c641266db..a5812159b 100644 --- a/crates/persistence/src/search/reindex.rs +++ b/crates/persistence/src/search/reindex.rs @@ -363,6 +363,13 @@ pub struct ReindexOperation { jobs: Arc>>, /// Cancellation channels. cancel_channels: Arc>>>, + /// Optional audit sink for emitting BALP `AuditEvent` records at each + /// reindex lifecycle transition. + #[cfg(feature = "audit")] + audit_sink: Option>, + /// `AuditEvent.source.observer` reference used for emitted events. + #[cfg(feature = "audit")] + audit_source_observer: String, } impl ReindexOperation { @@ -373,9 +380,26 @@ impl ReindexOperation { extractor, jobs: Arc::new(RwLock::new(HashMap::new())), cancel_channels: Arc::new(RwLock::new(HashMap::new())), + #[cfg(feature = "audit")] + audit_sink: None, + #[cfg(feature = "audit")] + audit_source_observer: "Device/hfs".to_string(), } } + /// Wires an audit sink so each reindex lifecycle transition emits a + /// BALP-compliant `AuditEvent`. + #[cfg(feature = "audit")] + pub fn with_audit( + mut self, + sink: Arc, + source_observer: impl Into, + ) -> Self { + self.audit_sink = Some(sink); + self.audit_source_observer = source_observer.into(); + self + } + /// Starts a reindex operation. /// /// Returns immediately with a job ID. The reindex runs in the background. @@ -401,6 +425,29 @@ impl ReindexOperation { let extractor = self.extractor.clone(); let jobs = self.jobs.clone(); let job_id_clone = job_id.clone(); + #[cfg(feature = "audit")] + let request_types = request.resource_types.clone().unwrap_or_default(); + #[cfg(feature = "audit")] + let audit_sink = self.audit_sink.clone(); + #[cfg(feature = "audit")] + let audit_source_observer = self.audit_source_observer.clone(); + + // Emitted before `tokio::spawn` so the start event is always recorded + // before any concurrent terminal event from the background task. + #[cfg(feature = "audit")] + if let Some(ref sink) = audit_sink { + audit::record_reindex_event( + sink.as_ref(), + &audit_source_observer, + None, + &job_id, + "start", + &request_types, + 0, + "0", + ) + .await; + } // Spawn background task tokio::spawn(async move { @@ -412,6 +459,12 @@ impl ReindexOperation { extractor, jobs, cancel_rx, + #[cfg(feature = "audit")] + audit_sink, + #[cfg(feature = "audit")] + audit_source_observer, + #[cfg(feature = "audit")] + request_types, ) .await; }); @@ -420,7 +473,69 @@ impl ReindexOperation { } /// Runs the reindex operation in the background. + /// + /// Wraps [`Self::run_reindex_inner`] with terminal-state audit emission: + /// after the inner driver returns (success, cancellation, or failure), the + /// final [`ReindexProgress`] is read and a BALP `AuditEvent` is emitted + /// via [`audit::record_reindex_event`]. + #[allow(clippy::too_many_arguments)] async fn run_reindex( + job_id: String, + tenant: TenantContext, + request: ReindexRequest, + storage: Arc, + extractor: Arc, + jobs: Arc>>, + cancel_rx: mpsc::Receiver<()>, + #[cfg(feature = "audit")] audit_sink: Option>, + #[cfg(feature = "audit")] audit_source_observer: String, + #[cfg(feature = "audit")] audit_resource_types: Vec, + ) { + Self::run_reindex_inner( + job_id.clone(), + tenant, + request, + storage, + extractor, + jobs.clone(), + cancel_rx, + ) + .await; + + #[cfg(feature = "audit")] + if let Some(sink) = audit_sink.as_ref() { + let (status, processed) = { + let guard = jobs.read(); + let progress = guard.get(&job_id); + ( + progress.map(|p| p.status).unwrap_or(ReindexStatus::Failed), + progress.map(|p| p.processed_resources).unwrap_or(0), + ) + }; + let (phase, outcome) = match status { + ReindexStatus::Completed => ("complete", "0"), + ReindexStatus::Cancelled => ("cancel", "4"), + ReindexStatus::Failed => ("fail", "8"), + // In-flight states should not be observable post-return, but + // record them as failures rather than dropping the event. + ReindexStatus::Queued | ReindexStatus::InProgress => ("fail", "8"), + }; + audit::record_reindex_event( + sink.as_ref(), + &audit_source_observer, + None, + &job_id, + phase, + &audit_resource_types, + processed, + outcome, + ) + .await; + } + } + + /// Inner driver — the original reindex loop with no audit wiring. + async fn run_reindex_inner( job_id: String, tenant: TenantContext, request: ReindexRequest, diff --git a/crates/rest/Cargo.toml b/crates/rest/Cargo.toml index 88e83ecb6..2e369c61f 100644 --- a/crates/rest/Cargo.toml +++ b/crates/rest/Cargo.toml @@ -44,7 +44,7 @@ bulk-submit-jwe = ["dep:aes-gcm"] [dependencies] # Core dependencies helios-fhir = { path = "../fhir", version = "0.2.0", default-features = false } -helios-persistence = { path = "../persistence", version = "0.2.0", default-features = false } +helios-persistence = { path = "../persistence", version = "0.2.0", default-features = false, features = ["audit"] } helios-serde = { path = "../serde", version = "0.2.0", optional = true, default-features = false } helios-sof = { path = "../sof", version = "0.2.0", default-features = false } helios-auth = { path = "../auth", version = "0.2.0" } diff --git a/crates/rest/src/handlers/bulk_export.rs b/crates/rest/src/handlers/bulk_export.rs index b09367dfc..309f11da6 100644 --- a/crates/rest/src/handlers/bulk_export.rs +++ b/crates/rest/src/handlers/bulk_export.rs @@ -216,17 +216,42 @@ where resource_types.clone() }; for t in &types_to_check { - helios_auth::SmartScopePolicy::check(p, t, helios_auth::FhirOperation::Read).map_err( - |e| RestError::Forbidden { - message: e.to_string(), - }, - )?; + if let Err(e) = + helios_auth::SmartScopePolicy::check(p, t, helios_auth::FhirOperation::Read) + { + let msg = e.to_string(); + emit_export_audit_with_desc( + state, + Some(p), + "kickoff", + "", + &level, + &resource_types, + "8", + Some(&msg), + ) + .await; + return Err(RestError::Forbidden { message: msg }); + } } if matches!(level, ExportLevel::Group { .. }) { - helios_auth::SmartScopePolicy::check(p, "Group", helios_auth::FhirOperation::Read) - .map_err(|e| RestError::Forbidden { - message: e.to_string(), - })?; + if let Err(e) = + helios_auth::SmartScopePolicy::check(p, "Group", helios_auth::FhirOperation::Read) + { + let msg = e.to_string(); + emit_export_audit_with_desc( + state, + Some(p), + "kickoff", + "", + &level, + &resource_types, + "8", + Some(&msg), + ) + .await; + return Err(RestError::Forbidden { message: msg }); + } } } @@ -236,12 +261,22 @@ where .await .map_err(map_storage_err)?; if active >= cfg.max_concurrent_per_tenant as u64 { - return Err(RestError::BadRequest { - message: format!( - "too many concurrent exports for this tenant (max {})", - cfg.max_concurrent_per_tenant - ), - }); + let msg = format!( + "too many concurrent exports for this tenant (max {})", + cfg.max_concurrent_per_tenant + ); + emit_export_audit_with_desc( + state, + principal, + "kickoff", + "", + &level, + &resource_types, + "4", + Some(&msg), + ) + .await; + return Err(RestError::BadRequest { message: msg }); } let request = ExportRequest { @@ -266,10 +301,24 @@ where }; let request_clone = input.request.clone(); - let job_id = jobs - .start_export(tenant.context(), input) - .await - .map_err(map_storage_err)?; + let job_id = match jobs.start_export(tenant.context(), input).await { + Ok(id) => id, + Err(e) => { + let msg = e.to_string(); + emit_export_audit_with_desc( + state, + principal, + "kickoff", + "", + &request_clone.level, + &request_clone.resource_types, + "8", + Some(&msg), + ) + .await; + return Err(map_storage_err(e)); + } + }; emit_export_audit( state, @@ -447,6 +496,17 @@ where { Ok(m) => m, Err(_) => { + emit_export_audit_with_desc( + &state, + principal.as_ref(), + "status", + job_id.as_str(), + &ExportLevel::System, + &[], + "8", + Some("export job not found"), + ) + .await; return Err(RestError::NotFound { resource_type: "export-job".to_string(), id: job_id.to_string(), @@ -454,6 +514,17 @@ where } }; if !owns_job(principal.as_ref(), meta.owner_subject.as_deref()) { + emit_export_audit_with_desc( + &state, + principal.as_ref(), + "status", + job_id.as_str(), + &meta.level, + &[], + "8", + Some("requester is not the job owner"), + ) + .await; return Err(RestError::NotFound { resource_type: "export-job".to_string(), id: job_id.to_string(), @@ -462,6 +533,8 @@ where match meta.status { ExportStatus::Accepted | ExportStatus::InProgress => { + // In-progress polls are not audited individually (the BALP-relevant + // events are kickoff, terminal-status observation, cancel, download). let progress = jobs .get_export_status(tenant.context(), &job_id) .await @@ -525,6 +598,16 @@ where let body = serde_json::to_vec(&manifest).map_err(|e| RestError::InternalError { message: e.to_string(), })?; + emit_export_audit( + &state, + principal.as_ref(), + "status-complete", + job_id.as_str(), + &meta.level, + &[], + "0", + ) + .await; Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") @@ -533,13 +616,39 @@ where message: e.to_string(), }) } - ExportStatus::Error => Err(RestError::InternalError { - message: "export job failed".to_string(), - }), - ExportStatus::Cancelled => Err(RestError::NotFound { - resource_type: "export-job".to_string(), - id: job_id.to_string(), - }), + ExportStatus::Error => { + emit_export_audit_with_desc( + &state, + principal.as_ref(), + "status-error", + job_id.as_str(), + &meta.level, + &[], + "8", + Some("export job failed"), + ) + .await; + Err(RestError::InternalError { + message: "export job failed".to_string(), + }) + } + ExportStatus::Cancelled => { + emit_export_audit_with_desc( + &state, + principal.as_ref(), + "status-cancelled", + job_id.as_str(), + &meta.level, + &[], + "4", + Some("export job was cancelled"), + ) + .await; + Err(RestError::NotFound { + resource_type: "export-job".to_string(), + id: job_id.to_string(), + }) + } } } @@ -568,6 +677,17 @@ where { Ok(m) => m, Err(_) => { + emit_export_audit_with_desc( + &state, + principal.as_ref(), + "delete", + job_id.as_str(), + &ExportLevel::System, + &[], + "8", + Some("export job not found"), + ) + .await; return Err(RestError::NotFound { resource_type: "export-job".to_string(), id: job_id.to_string(), @@ -575,6 +695,17 @@ where } }; if !owns_job(principal.as_ref(), meta.owner_subject.as_deref()) { + emit_export_audit_with_desc( + &state, + principal.as_ref(), + "delete", + job_id.as_str(), + &meta.level, + &[], + "8", + Some("requester is not the job owner"), + ) + .await; return Err(RestError::NotFound { resource_type: "export-job".to_string(), id: job_id.to_string(), @@ -586,13 +717,36 @@ where let _ = jobs.cancel_export(tenant.context(), &job_id).await; } // REST owns the two-step teardown: outputs first, then job rows. - output - .delete_job_outputs(tenant.context(), &job_id) - .await - .map_err(map_storage_err)?; - jobs.delete_export(tenant.context(), &job_id) - .await - .map_err(map_storage_err)?; + if let Err(e) = output.delete_job_outputs(tenant.context(), &job_id).await { + let msg = e.to_string(); + emit_export_audit_with_desc( + &state, + principal.as_ref(), + "delete", + job_id.as_str(), + &meta.level, + &[], + "8", + Some(&msg), + ) + .await; + return Err(map_storage_err(e)); + } + if let Err(e) = jobs.delete_export(tenant.context(), &job_id).await { + let msg = e.to_string(); + emit_export_audit_with_desc( + &state, + principal.as_ref(), + "delete", + job_id.as_str(), + &meta.level, + &[], + "8", + Some(&msg), + ) + .await; + return Err(map_storage_err(e)); + } emit_export_audit( &state, @@ -633,15 +787,31 @@ where let principal = request.extensions().get::().cloned(); let job_id = ExportJobId::from_string(job_id); - let file_meta = jobs + let file_meta = match jobs .get_export_file_metadata(tenant.context(), &job_id, &part) .await - .map_err(|_| RestError::NotFound { - resource_type: "export-file".to_string(), - id: format!("{job_id}/{part}"), - })?; + { + Ok(m) => m, + Err(_) => { + emit_export_audit_with_desc( + &state, + principal.as_ref(), + "download", + job_id.as_str(), + &ExportLevel::System, + &[], + "8", + Some("export file not found"), + ) + .await; + return Err(RestError::NotFound { + resource_type: "export-file".to_string(), + id: format!("{job_id}/{part}"), + }); + } + }; - file_auth + if let Err(e) = file_auth .authorize_download( principal.as_ref(), tenant.context(), @@ -649,9 +819,21 @@ where &file_meta, ) .await - .map_err(|e| RestError::Forbidden { - message: e.to_string(), - })?; + { + let msg = e.to_string(); + emit_export_audit_with_desc( + &state, + principal.as_ref(), + "download", + job_id.as_str(), + &ExportLevel::System, + std::slice::from_ref(&file_meta.resource_type), + "8", + Some(&msg), + ) + .await; + return Err(RestError::Forbidden { message: msg }); + } emit_export_audit( &state, @@ -685,6 +867,7 @@ where } /// Emits a bulk-export lifecycle `AuditEvent` when an audit sink is configured. +#[allow(clippy::too_many_arguments)] async fn emit_export_audit( state: &AppState, principal: Option<&Principal>, @@ -695,6 +878,34 @@ async fn emit_export_audit( outcome: &str, ) where S: ResourceStorage, +{ + emit_export_audit_with_desc( + state, + principal, + operation, + job_id, + level, + resource_types, + outcome, + None, + ) + .await; +} + +/// Variant of [`emit_export_audit`] that records an outcome description (used +/// on failure paths to capture the rejection reason in the audit trail). +#[allow(clippy::too_many_arguments)] +async fn emit_export_audit_with_desc( + state: &AppState, + principal: Option<&Principal>, + operation: &str, + job_id: &str, + level: &ExportLevel, + resource_types: &[String], + outcome: &str, + outcome_desc: Option<&str>, +) where + S: ResourceStorage, { let Some(sink) = state.audit_sink() else { return; @@ -708,11 +919,16 @@ async fn emit_export_audit( .outcome(outcome) .detail("audit-operation", "bulk-export") .detail("bulk-export-operation", operation) - .detail("job-id", job_id) .detail("export-level", level.to_string()); + if !job_id.is_empty() { + builder = builder.detail("job-id", job_id); + } if !resource_types.is_empty() { builder = builder.detail("resource-types", resource_types.join(",")); } + if let Some(desc) = outcome_desc { + builder = builder.outcome_desc(desc); + } if let Some(p) = principal { builder = builder.agent(&p.subject, None, true); } diff --git a/crates/rest/src/handlers/mod.rs b/crates/rest/src/handlers/mod.rs index 7808d1c8c..8ffef3d78 100644 --- a/crates/rest/src/handlers/mod.rs +++ b/crates/rest/src/handlers/mod.rs @@ -26,7 +26,9 @@ pub mod delete; pub mod health; pub mod history; pub mod patch; +pub mod purge; pub mod read; +pub mod reindex; pub mod search; pub mod smart_discovery; pub mod sof; @@ -74,7 +76,11 @@ pub use history::{ history_system_handler, history_type_handler, }; pub use patch::patch_handler; +pub use purge::{purge_instance_handler, purge_type_handler}; pub use read::{head_read_handler, read_handler}; +pub use reindex::{ + reindex_cancel_handler, reindex_status_handler, reindex_system_handler, reindex_type_handler, +}; pub use search::{search_get_handler, search_post_handler}; pub use update::{conditional_update_handler, update_handler}; pub use versions::versions_handler; diff --git a/crates/rest/src/handlers/purge.rs b/crates/rest/src/handlers/purge.rs new file mode 100644 index 000000000..41c7cfb2f --- /dev/null +++ b/crates/rest/src/handlers/purge.rs @@ -0,0 +1,252 @@ +//! `$purge` operation handlers. +//! +//! Permanent deletion of FHIR resources (and their history) — a destructive, +//! irreversible operation distinct from the soft-delete [`delete_handler`] +//! exposed by the standard FHIR REST API. Purge is opt-in: when no +//! [`PurgableStorage`] provider is wired into [`AppState`], the handlers return +//! `501 Not Implemented`. +//! +//! Each invocation emits a BALP-compliant `AuditEvent` (action `Delete`) via +//! [`helios_persistence::core::storage::audit::record_purge_event`]. +//! +//! # Routes +//! +//! - `DELETE /{resource_type}/{id}/$purge` — purge a single resource and its +//! complete version history. +//! - `POST /{resource_type}/$purge` — purge every resource of a type within +//! the current tenant. Returns the number of resources purged. +//! +//! # Authorization +//! +//! A SMART-on-FHIR `*.write` (or wildcard) scope on the target resource type is +//! required when authentication is enabled. Operations targeting `AuditEvent` +//! are rejected: the audit trail is immutable. +//! +//! [`delete_handler`]: super::delete::delete_handler +//! [`PurgableStorage`]: helios_persistence::core::storage::PurgableStorage + +use axum::{ + body::Body, + extract::{Path, Request, State}, + http::StatusCode, + response::Response, +}; +use helios_auth::Principal; +use helios_persistence::core::ResourceStorage; + +use crate::error::{RestError, RestResult}; +use crate::extractors::TenantExtractor; +use crate::state::AppState; + +fn not_implemented() -> RestError { + RestError::NotImplemented { + feature: "$purge is not supported by the configured storage backend".to_string(), + } +} + +fn reject_audit_event(resource_type: &str) -> Option { + if resource_type == "AuditEvent" { + Some(RestError::MethodNotAllowed { + method: "PURGE".to_string(), + resource_type: resource_type.to_string(), + }) + } else { + None + } +} + +/// `DELETE /{resource_type}/{id}/$purge` — permanently delete a single resource. +pub async fn purge_instance_handler( + State(state): State>, + Path((resource_type, id)): Path<(String, String)>, + tenant: TenantExtractor, + request: Request, +) -> RestResult +where + S: ResourceStorage + Send + Sync + 'static, +{ + if let Some(e) = reject_audit_event(&resource_type) { + return Err(e); + } + let provider = state.purge_provider().ok_or_else(not_implemented)?; + let principal = request.extensions().get::().cloned(); + + if let Some(p) = principal.as_ref() { + if let Err(e) = helios_auth::SmartScopePolicy::check( + p, + &resource_type, + helios_auth::FhirOperation::Delete, + ) { + let msg = e.to_string(); + emit_purge_audit( + &state, + principal.as_ref(), + &resource_type, + Some(&id), + 0, + "8", + Some(&msg), + ) + .await; + return Err(RestError::Forbidden { message: msg }); + } + } + + match provider.purge(tenant.context(), &resource_type, &id).await { + Ok(()) => { + emit_purge_audit( + &state, + principal.as_ref(), + &resource_type, + Some(&id), + 1, + "0", + None, + ) + .await; + Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty()) + .map_err(|e| RestError::InternalError { + message: e.to_string(), + }) + } + Err(e) => { + let msg = e.to_string(); + emit_purge_audit( + &state, + principal.as_ref(), + &resource_type, + Some(&id), + 0, + "8", + Some(&msg), + ) + .await; + Err(e.into()) + } + } +} + +/// `POST /{resource_type}/$purge` — permanently delete every resource of a type +/// within the current tenant. +pub async fn purge_type_handler( + State(state): State>, + Path(resource_type): Path, + tenant: TenantExtractor, + request: Request, +) -> RestResult +where + S: ResourceStorage + Send + Sync + 'static, +{ + if let Some(e) = reject_audit_event(&resource_type) { + return Err(e); + } + let provider = state.purge_provider().ok_or_else(not_implemented)?; + let principal = request.extensions().get::().cloned(); + + if let Some(p) = principal.as_ref() { + if let Err(e) = helios_auth::SmartScopePolicy::check( + p, + &resource_type, + helios_auth::FhirOperation::Delete, + ) { + let msg = e.to_string(); + emit_purge_audit( + &state, + principal.as_ref(), + &resource_type, + None, + 0, + "8", + Some(&msg), + ) + .await; + return Err(RestError::Forbidden { message: msg }); + } + } + + match provider.purge_all(tenant.context(), &resource_type).await { + Ok(count) => { + emit_purge_audit( + &state, + principal.as_ref(), + &resource_type, + None, + count, + "0", + None, + ) + .await; + let body = serde_json::json!({ + "resourceType": "Parameters", + "parameter": [ + {"name": "resourceType", "valueString": resource_type}, + {"name": "purged", "valueInteger": count}, + ] + }); + let bytes = serde_json::to_vec(&body).map_err(|e| RestError::InternalError { + message: e.to_string(), + })?; + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/fhir+json") + .body(Body::from(bytes)) + .map_err(|e| RestError::InternalError { + message: e.to_string(), + }) + } + Err(e) => { + let msg = e.to_string(); + emit_purge_audit( + &state, + principal.as_ref(), + &resource_type, + None, + 0, + "8", + Some(&msg), + ) + .await; + Err(e.into()) + } + } +} + +/// Emits a `$purge` `AuditEvent` when an audit sink is configured. +async fn emit_purge_audit( + state: &AppState, + principal: Option<&Principal>, + resource_type: &str, + resource_id: Option<&str>, + count: u64, + outcome: &str, + outcome_desc: Option<&str>, +) where + S: ResourceStorage, +{ + let Some(sink) = state.audit_sink() else { + return; + }; + let mut builder = helios_audit::AuditEventBuilder::new(state.audit_source_observer()) + .event_type( + "http://terminology.hl7.org/CodeSystem/audit-event-type", + "object", + ) + .action(helios_audit::AuditAction::Delete) + .outcome(outcome) + .detail("audit-operation", "purge") + .detail("count", count.to_string()); + if let Some(id) = resource_id { + builder = builder.resource(resource_type, id); + } else { + builder = builder.detail("resource-type", resource_type); + } + if let Some(desc) = outcome_desc { + builder = builder.outcome_desc(desc); + } + if let Some(p) = principal { + builder = builder.agent(&p.subject, None, true); + } + sink.record(builder.build()).await; +} diff --git a/crates/rest/src/handlers/reindex.rs b/crates/rest/src/handlers/reindex.rs new file mode 100644 index 000000000..77ebc0ef1 --- /dev/null +++ b/crates/rest/src/handlers/reindex.rs @@ -0,0 +1,306 @@ +//! `$reindex` operation handlers. +//! +//! Asynchronous reindex kick-off, status polling, and cancellation. The actual +//! reindex work is driven by a [`ReindexController`] held in [`AppState`]; +//! when none is wired, the handlers return `501 Not Implemented`. +//! +//! At kickoff and at every job lifecycle transition, the handler emits a +//! BALP-compliant `AuditEvent` (action `Execute`) via the helper in +//! [`helios_persistence::search::reindex::audit`]. +//! +//! # Routes +//! +//! - `POST /$reindex` — system-level reindex (every resource type in tenant). +//! - `POST /{resource_type}/$reindex` — type-scoped reindex. +//! - `GET /$reindex-status/{job_id}` — poll job progress. +//! - `DELETE /$reindex-status/{job_id}` — cancel a running job. +//! +//! # Request Body +//! +//! Either a [`ReindexRequest`] JSON object or an empty body (uses defaults). +//! Type-scoped kickoff overrides the `resource_types` field with the path +//! component to prevent privilege escalation. +//! +//! [`ReindexController`]: crate::reindex::ReindexController + +use axum::{ + body::Body, + extract::{Path, Request, State}, + http::StatusCode, + response::Response, +}; +use helios_auth::Principal; +use helios_persistence::core::ResourceStorage; +use helios_persistence::search::reindex::ReindexRequest; + +use crate::error::{RestError, RestResult}; +use crate::extractors::TenantExtractor; +use crate::state::AppState; + +fn not_implemented() -> RestError { + RestError::NotImplemented { + feature: "$reindex is not supported by the configured storage backend".to_string(), + } +} + +async fn parse_reindex_body(request: Request) -> Result { + let bytes = axum::body::to_bytes(request.into_body(), 1024 * 1024) + .await + .map_err(|e| RestError::BadRequest { + message: format!("failed to read request body: {e}"), + })?; + if bytes.is_empty() { + return Ok(ReindexRequest::default()); + } + serde_json::from_slice(&bytes).map_err(|e| RestError::BadRequest { + message: format!("invalid $reindex Parameters JSON: {e}"), + }) +} + +/// `POST /$reindex` — system-level reindex kick-off. +pub async fn reindex_system_handler( + State(state): State>, + tenant: TenantExtractor, + request: Request, +) -> RestResult +where + S: ResourceStorage + Send + Sync + 'static, +{ + run_reindex_kickoff(state, tenant, request, None).await +} + +/// `POST /{resource_type}/$reindex` — type-scoped reindex kick-off. +pub async fn reindex_type_handler( + State(state): State>, + Path(resource_type): Path, + tenant: TenantExtractor, + request: Request, +) -> RestResult +where + S: ResourceStorage + Send + Sync + 'static, +{ + run_reindex_kickoff(state, tenant, request, Some(resource_type)).await +} + +async fn run_reindex_kickoff( + state: AppState, + tenant: TenantExtractor, + request: Request, + scoped_type: Option, +) -> RestResult +where + S: ResourceStorage + Send + Sync + 'static, +{ + let controller = state.reindex_controller().ok_or_else(not_implemented)?; + let principal = request.extensions().get::().cloned(); + + // Authorization: an update scope on the target type (or wildcard) is + // required when auth is enabled — reindex mutates search index entries. + if let Some(p) = principal.as_ref() { + let auth_target = scoped_type.as_deref().unwrap_or("*"); + if let Err(e) = + helios_auth::SmartScopePolicy::check(p, auth_target, helios_auth::FhirOperation::Update) + { + let msg = e.to_string(); + let scoped_vec: Vec = scoped_type.clone().into_iter().collect(); + emit_reindex_audit( + &state, + principal.as_ref(), + "", + "kickoff", + &scoped_vec, + 0, + "8", + Some(&msg), + ) + .await; + return Err(RestError::Forbidden { message: msg }); + } + } + + let mut req = parse_reindex_body(request).await?; + if let Some(rt) = scoped_type.as_ref() { + // Path takes precedence over body for type-scoped kickoff. + req.resource_types = Some(vec![rt.clone()]); + } + let types_for_audit: Vec = req.resource_types.clone().unwrap_or_default(); + + match controller.start(tenant.context().clone(), req).await { + Ok(job_id) => { + emit_reindex_audit( + &state, + principal.as_ref(), + &job_id, + "kickoff", + &types_for_audit, + 0, + "0", + None, + ) + .await; + let status_url = format!( + "{}/$reindex-status/{}", + state.base_url().trim_end_matches('/'), + job_id + ); + let body = serde_json::json!({ + "resourceType": "Parameters", + "parameter": [ + {"name": "jobId", "valueString": job_id}, + ] + }); + let bytes = serde_json::to_vec(&body).map_err(|e| RestError::InternalError { + message: e.to_string(), + })?; + Response::builder() + .status(StatusCode::ACCEPTED) + .header("Content-Location", status_url) + .header("Content-Type", "application/fhir+json") + .body(Body::from(bytes)) + .map_err(|e| RestError::InternalError { + message: e.to_string(), + }) + } + Err(e) => { + let msg = e.to_string(); + emit_reindex_audit( + &state, + principal.as_ref(), + "", + "kickoff", + &types_for_audit, + 0, + "8", + Some(&msg), + ) + .await; + Err(RestError::InternalError { message: msg }) + } + } +} + +/// `GET /$reindex-status/{job_id}` — return progress as a FHIR `Parameters`. +pub async fn reindex_status_handler( + State(state): State>, + Path(job_id): Path, + _tenant: TenantExtractor, +) -> RestResult +where + S: ResourceStorage + Send + Sync + 'static, +{ + let controller = state.reindex_controller().ok_or_else(not_implemented)?; + let progress = controller + .progress(&job_id) + .await + .ok_or_else(|| RestError::NotFound { + resource_type: "reindex-job".to_string(), + id: job_id.clone(), + })?; + let body = progress.to_parameters(); + let bytes = serde_json::to_vec(&body).map_err(|e| RestError::InternalError { + message: e.to_string(), + })?; + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/fhir+json") + .body(Body::from(bytes)) + .map_err(|e| RestError::InternalError { + message: e.to_string(), + }) +} + +/// `DELETE /$reindex-status/{job_id}` — cooperatively cancel a running job. +pub async fn reindex_cancel_handler( + State(state): State>, + Path(job_id): Path, + _tenant: TenantExtractor, + request: Request, +) -> RestResult +where + S: ResourceStorage + Send + Sync + 'static, +{ + let controller = state.reindex_controller().ok_or_else(not_implemented)?; + let principal = request.extensions().get::().cloned(); + + match controller.cancel(&job_id).await { + Ok(()) => { + emit_reindex_audit( + &state, + principal.as_ref(), + &job_id, + "cancel", + &[], + 0, + "0", + None, + ) + .await; + Response::builder() + .status(StatusCode::ACCEPTED) + .body(Body::empty()) + .map_err(|e| RestError::InternalError { + message: e.to_string(), + }) + } + Err(e) => { + let msg = e.to_string(); + emit_reindex_audit( + &state, + principal.as_ref(), + &job_id, + "cancel", + &[], + 0, + "8", + Some(&msg), + ) + .await; + Err(RestError::NotFound { + resource_type: "reindex-job".to_string(), + id: job_id, + }) + } + } +} + +/// Emits a `$reindex` `AuditEvent` when an audit sink is configured. +#[allow(clippy::too_many_arguments)] +async fn emit_reindex_audit( + state: &AppState, + principal: Option<&Principal>, + job_id: &str, + phase: &str, + resource_types: &[String], + resources_processed: u64, + outcome: &str, + outcome_desc: Option<&str>, +) where + S: ResourceStorage, +{ + let Some(sink) = state.audit_sink() else { + return; + }; + let mut builder = helios_audit::AuditEventBuilder::new(state.audit_source_observer()) + .event_type( + "http://terminology.hl7.org/CodeSystem/audit-event-type", + "object", + ) + .action(helios_audit::AuditAction::Execute) + .outcome(outcome) + .detail("audit-operation", "reindex") + .detail("phase", phase) + .detail("resources-processed", resources_processed.to_string()); + if !job_id.is_empty() { + builder = builder.detail("job-id", job_id); + } + if !resource_types.is_empty() { + builder = builder.detail("resource-types", resource_types.join(",")); + } + if let Some(desc) = outcome_desc { + builder = builder.outcome_desc(desc); + } + if let Some(p) = principal { + builder = builder.agent(&p.subject, None, true); + } + sink.record(builder.build()).await; +} diff --git a/crates/rest/src/lib.rs b/crates/rest/src/lib.rs index 77d958436..04e9c7427 100644 --- a/crates/rest/src/lib.rs +++ b/crates/rest/src/lib.rs @@ -156,6 +156,7 @@ pub mod extractors; pub mod fhir_types; pub mod handlers; pub mod middleware; +pub mod reindex; pub mod responses; pub mod routing; pub mod state; @@ -301,6 +302,18 @@ pub struct BulkSubmitBundle { pub file_auth: Arc, } +/// Optional providers for persistence-layer operations exposed over REST. +/// +/// Each field defaults to `None`, in which case the corresponding handler +/// returns 501. +#[derive(Default)] +pub struct OperationsBundle { + /// Backs the `$purge` operation. + pub purge: Option>, + /// Backs the `$reindex` operation. + pub reindex: Option>, +} + /// Creates the Axum application with custom configuration and optional authentication. /// /// When `auth_state` is `Some`, authentication and authorization middleware @@ -340,6 +353,7 @@ where audit_state, None, None, + OperationsBundle::default(), ) } @@ -378,6 +392,7 @@ where audit_state, Some(bulk_export), None, + OperationsBundle::default(), ) } @@ -419,11 +434,55 @@ where audit_state, bulk_export, bulk_submit, + OperationsBundle::default(), + ) +} + +/// Variant of [`create_app_with_auth_and_bulk`] that also wires the +/// persistence-layer operations bundle (`$purge`, `$reindex`). +#[allow(clippy::too_many_arguments)] +pub fn create_app_with_auth_and_bulk_and_ops( + storage: Arc, + config: ServerConfig, + auth_config: helios_auth::AuthConfig, + auth_state: Option>, + audit_state: Option>, + bulk_export: Option, + bulk_submit: Option, + operations: OperationsBundle, +) -> Router +where + S: ResourceStorage + + ConditionalStorage + + SearchProvider + + IncludeProvider + + RevincludeProvider + + InstanceHistoryProvider + + TypeHistoryProvider + + SystemHistoryProvider + + BundleProvider + + helios_persistence::core::ExportDataProvider + + helios_persistence::core::PatientExportProvider + + helios_persistence::core::GroupExportProvider + + Send + + Sync + + 'static, +{ + build_app( + storage, + config, + auth_config, + auth_state, + audit_state, + bulk_export, + bulk_submit, + operations, ) } /// Internal app builder shared by [`create_app_with_auth`] and /// [`create_app_with_auth_and_bulk_export`]. +#[allow(clippy::too_many_arguments)] fn build_app( storage: Arc, config: ServerConfig, @@ -432,6 +491,7 @@ fn build_app( audit_state: Option>, bulk_export: Option, bulk_submit: Option, + operations: OperationsBundle, ) -> Router where S: ResourceStorage @@ -595,6 +655,15 @@ where None => state, }; + let state = match operations.purge { + Some(provider) => state.with_purge_provider(provider), + None => state, + }; + let state = match operations.reindex { + Some(controller) => state.with_reindex_controller(controller), + None => state, + }; + // Inject subscription engine if enabled #[cfg(feature = "subscriptions")] let state = { diff --git a/crates/rest/src/reindex.rs b/crates/rest/src/reindex.rs new file mode 100644 index 000000000..06a72474f --- /dev/null +++ b/crates/rest/src/reindex.rs @@ -0,0 +1,56 @@ +//! Reindex job controller surface for the `$reindex` REST handler. +//! +//! [`ReindexController`] is a small dyn-safe handle over a +//! [`helios_persistence::search::reindex::ReindexOperation`] (or any equivalent +//! orchestrator). The handler interacts with the controller, not directly with +//! the typed operation, so AppState can carry the optional dependency as an +//! `Arc` without leaking the `ReindexableStorage` type +//! parameter into every generic bound. + +use async_trait::async_trait; +use helios_persistence::search::ReindexError; +use helios_persistence::search::reindex::{ + ReindexOperation, ReindexProgress, ReindexRequest, ReindexableStorage, +}; +use helios_persistence::tenant::TenantContext; + +/// Dyn-safe handle over a reindex orchestrator. +#[async_trait] +pub trait ReindexController: Send + Sync + 'static { + /// Starts a reindex job. Returns the job ID immediately; the work runs in + /// the background. + async fn start( + &self, + tenant: TenantContext, + request: ReindexRequest, + ) -> Result; + + /// Returns the current progress for a job, if it exists. + async fn progress(&self, job_id: &str) -> Option; + + /// Cancels a running job (idempotent: completed jobs return `Ok`). + async fn cancel(&self, job_id: &str) -> Result<(), ReindexError>; +} + +/// Blanket implementation for the persistence-layer [`ReindexOperation`]. +#[async_trait] +impl ReindexController for ReindexOperation +where + S: ReindexableStorage + 'static, +{ + async fn start( + &self, + tenant: TenantContext, + request: ReindexRequest, + ) -> Result { + ReindexOperation::start(self, tenant, request).await + } + + async fn progress(&self, job_id: &str) -> Option { + ReindexOperation::get_progress(self, job_id).await + } + + async fn cancel(&self, job_id: &str) -> Result<(), ReindexError> { + ReindexOperation::cancel(self, job_id).await + } +} diff --git a/crates/rest/src/routing/fhir_routes.rs b/crates/rest/src/routing/fhir_routes.rs index 5e7295262..a4f8f0e47 100644 --- a/crates/rest/src/routing/fhir_routes.rs +++ b/crates/rest/src/routing/fhir_routes.rs @@ -276,6 +276,13 @@ where "/bulk-submit-file/{poll_token}/{part}", get(handlers::bulk_submit_file_handler::), ) + // $reindex — operation routes precede the catch-all type/instance routes. + .route("/$reindex", post(handlers::reindex_system_handler::)) + .route( + "/$reindex-status/{job_id}", + get(handlers::reindex_status_handler::) + .delete(handlers::reindex_cancel_handler::), + ) // Type-level routes .route("/{resource_type}", get(handlers::search_get_handler::)) .route("/{resource_type}", post(handlers::create_handler::)) @@ -297,6 +304,20 @@ where "/{resource_type}/_history", get(handlers::history_type_handler::), ) + // $reindex / $purge — type-scoped and instance-scoped routes must + // precede the `/{resource_type}/{id}` catch-all below. + .route( + "/{resource_type}/$reindex", + post(handlers::reindex_type_handler::), + ) + .route( + "/{resource_type}/$purge", + post(handlers::purge_type_handler::), + ) + .route( + "/{resource_type}/{id}/$purge", + delete(handlers::purge_instance_handler::), + ) // Instance-level routes .route("/{resource_type}/{id}", get(handlers::read_handler::)) // HEAD for read - returns headers without body diff --git a/crates/rest/src/state.rs b/crates/rest/src/state.rs index f4fee7956..aaed3fe75 100644 --- a/crates/rest/src/state.rs +++ b/crates/rest/src/state.rs @@ -9,10 +9,13 @@ use std::sync::Arc; use helios_audit::AuditSink; use helios_auth::AuthConfig; use helios_persistence::core::sof_runner::SofRunner; +use helios_persistence::core::storage::PurgableStorage; use helios_persistence::core::{ BulkExportJobStore, BulkSubmitJobStore, ExportOutputStore, ResourceStorage, SubmitInputFetcher, }; +use crate::reindex::ReindexController; + use crate::bulk_export_auth::ExportFileAuth; use crate::config::{BulkExportConfig, BulkSubmitConfig, ServerConfig}; use crate::export::ExportJobController; @@ -93,6 +96,12 @@ pub struct AppState { /// Bulk submit configuration. bulk_submit_config: Arc, + + /// Backs the `$purge` operation; `None` ⇒ handler returns 501. + purge_provider: Option>, + + /// Backs the `$reindex` operation; `None` ⇒ handler returns 501. + reindex_controller: Option>, } // Manually implement Clone since S is wrapped in Arc and doesn't need to be Clone @@ -118,6 +127,8 @@ impl Clone for AppState { bulk_submit_output: self.bulk_submit_output.clone(), bulk_submit_file_auth: self.bulk_submit_file_auth.clone(), bulk_submit_config: Arc::clone(&self.bulk_submit_config), + purge_provider: self.purge_provider.clone(), + reindex_controller: self.reindex_controller.clone(), } } } @@ -152,6 +163,8 @@ impl AppState { bulk_submit_output: None, bulk_submit_file_auth: None, bulk_submit_config, + purge_provider: None, + reindex_controller: None, } } @@ -196,9 +209,33 @@ impl AppState { bulk_submit_output: None, bulk_submit_file_auth: None, bulk_submit_config, + purge_provider: None, + reindex_controller: None, } } + /// Wires a [`PurgableStorage`] provider that backs the `$purge` operation. + pub fn with_purge_provider(mut self, provider: Arc) -> Self { + self.purge_provider = Some(provider); + self + } + + /// Returns the configured purge provider, if any. + pub fn purge_provider(&self) -> Option<&Arc> { + self.purge_provider.as_ref() + } + + /// Wires a [`ReindexController`] that backs the `$reindex` operation. + pub fn with_reindex_controller(mut self, controller: Arc) -> Self { + self.reindex_controller = Some(controller); + self + } + + /// Returns the configured reindex controller, if any. + pub fn reindex_controller(&self) -> Option<&Arc> { + self.reindex_controller.as_ref() + } + /// Sets the SQL-on-FHIR runner for this application state. /// /// Typically called at startup after creating the state, once the runner has been diff --git a/crates/rest/tests/audit_persistence.rs b/crates/rest/tests/audit_persistence.rs new file mode 100644 index 000000000..b6946990c --- /dev/null +++ b/crates/rest/tests/audit_persistence.rs @@ -0,0 +1,568 @@ +//! Integration tests verifying that bulk export, `$purge`, and `$reindex` +//! emit BALP-compliant `AuditEvent` records via the configured audit sink. +//! +//! Each test wires an [`InMemoryAuditSink`] into the app state, exercises a +//! handler (and, for bulk export, the worker), and asserts that the expected +//! `AuditEvent` resources show up in the sink buffer with the right +//! `audit-operation`, action code, outcome, and lifecycle phase. +//! +//! Gated on `feature = "R4"` because the assertions reach into +//! `helios_fhir::r4::AuditEvent` directly and the fixture uses +//! `FhirVersion::default()` (which is also R4-gated). For single-version +//! minimal builds without R4, this file compiles away to nothing. + +#![cfg(feature = "R4")] +#![allow(missing_docs)] + +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use axum::http::StatusCode; +use axum_test::TestServer; +use helios_audit::InMemoryAuditSink; +use helios_fhir::FhirVersion; +use helios_persistence::backends::local_fs::LocalFsOutputStore; +use helios_persistence::backends::sqlite::{SqliteBackend, SqliteBackendConfig}; +use helios_persistence::core::storage::PurgableStorage; +use helios_persistence::core::{ + BulkExportJobStore, DefaultExportWorker, ExportClaimStrategy, ExportOutputStore, + ResourceStorage, WorkerId, +}; +use helios_persistence::search::reindex::ReindexOperation; +use helios_persistence::tenant::{TenantContext, TenantId, TenantPermissions}; +use helios_rest::ServerConfig; +use helios_rest::bulk_export_auth::BearerScopeAuth; +use helios_rest::config::{MultitenancyConfig, TenantRoutingMode}; +use helios_rest::reindex::ReindexController; +use serde_json::json; + +/// Source observer reference used by every test fixture. +const SOURCE_OBSERVER: &str = "Device/hfs"; + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +fn test_tenant() -> TenantContext { + TenantContext::new( + TenantId::new("test-tenant"), + TenantPermissions::full_access(), + ) +} + +struct AuditedServer { + server: TestServer, + sink: Arc, + backend: Arc, + output: Arc, + _tmp: tempfile::TempDir, +} + +/// Builds an Axum test server with bulk export, purge, and reindex all wired, +/// plus an [`InMemoryAuditSink`] threaded into the app state. +async fn create_audited_server() -> AuditedServer { + let data_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .and_then(|p| p.parent()) + .map(|p| p.join("data")) + .unwrap_or_else(|| PathBuf::from("data")); + + let backend_config = SqliteBackendConfig { + data_dir: Some(data_dir), + ..Default::default() + }; + let backend = Arc::new( + SqliteBackend::with_config(":memory:", backend_config).expect("create SQLite backend"), + ); + backend.init_schema().expect("init schema"); + + let tmp = tempfile::tempdir().expect("tempdir"); + let output = Arc::new(LocalFsOutputStore::new(tmp.path(), "http://localhost:8080")); + let file_auth = Arc::new(BearerScopeAuth); + + let config = ServerConfig { + multitenancy: MultitenancyConfig { + routing_mode: TenantRoutingMode::HeaderOnly, + ..Default::default() + }, + base_url: "http://localhost:8080".to_string(), + default_tenant: "test-tenant".to_string(), + ..ServerConfig::for_testing() + }; + + let sink = Arc::new(InMemoryAuditSink::new()); + + let reindex_op = ReindexOperation::new(backend.clone(), backend.search_extractor().clone()) + .with_audit( + sink.clone() as Arc, + SOURCE_OBSERVER, + ); + + let state = helios_rest::AppState::with_auth_and_audit( + Arc::clone(&backend), + config, + helios_auth::AuthConfig::default(), + None, + Some(sink.clone() as Arc), + SOURCE_OBSERVER, + ) + .with_bulk_export( + backend.clone() as Arc, + output.clone() as Arc, + file_auth, + ) + .with_purge_provider(backend.clone() as Arc) + .with_reindex_controller(Arc::new(reindex_op) as Arc); + + let app = helios_rest::routing::fhir_routes::create_routes(state); + let server = TestServer::new(app).expect("create test server"); + + AuditedServer { + server, + sink, + backend, + output, + _tmp: tmp, + } +} + +async fn seed_patients(backend: &Arc, n: usize) { + let tenant = test_tenant(); + for i in 0..n { + backend + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "id": format!("p{i}")}), + FhirVersion::default(), + ) + .await + .expect("seed patient"); + } +} + +/// Drains every claimable export job. The worker is constructed with the +/// audit sink so worker-level lifecycle events are captured. +async fn drain_workers_with_audit( + backend: &Arc, + output: &Arc, + sink: &Arc, +) { + let worker_id = WorkerId::new("test-worker"); + let worker = DefaultExportWorker::new( + backend.clone(), + backend.clone(), + output.clone(), + worker_id.clone(), + ) + .with_audit( + sink.clone() as Arc, + SOURCE_OBSERVER, + ); + while let Some(lease) = backend + .claim_next(&worker_id, Duration::from_secs(60)) + .await + .expect("claim_next") + { + worker.run_job(lease).await.expect("run_job"); + } +} + +// --------------------------------------------------------------------------- +// AuditEvent inspection helpers +// --------------------------------------------------------------------------- + +type AuditEvent = helios_fhir::r4::AuditEvent; + +fn action_code(event: &AuditEvent) -> Option { + event + .action + .as_ref() + .and_then(|a| a.value.as_ref()) + .cloned() +} + +fn outcome_code(event: &AuditEvent) -> Option { + event + .outcome + .as_ref() + .and_then(|o| o.value.as_ref()) + .cloned() +} + +fn detail_value(event: &AuditEvent, name: &str) -> Option { + use helios_fhir::r4::AuditEventEntityDetailValue; + let entities = event.entity.as_ref()?; + for ent in entities { + let details = match ent.detail.as_ref() { + Some(d) => d, + None => continue, + }; + for d in details { + if d.r#type.value.as_deref() == Some(name) { + return match &d.value { + Some(AuditEventEntityDetailValue::String(s)) => s.value.clone(), + _ => None, + }; + } + } + } + None +} + +/// Returns every event whose `audit-operation` detail equals the given value. +fn events_for_operation(sink: &InMemoryAuditSink, op: &str) -> Vec { + sink.events() + .into_iter() + .filter(|e| detail_value(e, "audit-operation").as_deref() == Some(op)) + .collect() +} + +/// Returns every bulk-export event with a specific `bulk-export-operation` +/// (kickoff / status-complete / delete / download / worker-complete / ...). +fn export_events(sink: &InMemoryAuditSink, op: &str) -> Vec { + sink.events() + .into_iter() + .filter(|e| { + detail_value(e, "audit-operation").as_deref() == Some("bulk-export") + && detail_value(e, "bulk-export-operation").as_deref() == Some(op) + }) + .collect() +} + +// --------------------------------------------------------------------------- +// Bulk export +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn bulk_export_kickoff_emits_audit_event() { + let env = create_audited_server().await; + seed_patients(&env.backend, 1).await; + + let resp = env + .server + .get("/$export") + .add_header("x-tenant-id", "test-tenant") + .add_header("prefer", "respond-async") + .add_query_param("_type", "Patient") + .await; + assert_eq!(resp.status_code(), StatusCode::ACCEPTED); + + let kickoffs = export_events(&env.sink, "kickoff"); + assert_eq!(kickoffs.len(), 1, "exactly one kickoff event"); + let event = &kickoffs[0]; + assert_eq!(action_code(event).as_deref(), Some("E")); + assert_eq!(outcome_code(event).as_deref(), Some("0")); + assert_eq!( + detail_value(event, "export-level").as_deref(), + Some("system") + ); + assert_eq!( + detail_value(event, "resource-types").as_deref(), + Some("Patient") + ); + assert!(detail_value(event, "job-id").is_some(), "job-id captured"); +} + +#[tokio::test] +async fn bulk_export_status_unknown_job_emits_failure() { + let env = create_audited_server().await; + let resp = env + .server + .get("/export-status/nonexistent") + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(resp.status_code(), StatusCode::NOT_FOUND); + + let events = export_events(&env.sink, "status"); + assert_eq!(events.len(), 1); + assert_eq!(outcome_code(&events[0]).as_deref(), Some("8")); +} + +#[tokio::test] +async fn bulk_export_cancel_unknown_job_emits_failure() { + let env = create_audited_server().await; + let resp = env + .server + .delete("/export-status/nonexistent") + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(resp.status_code(), StatusCode::NOT_FOUND); + + let events = export_events(&env.sink, "delete"); + assert_eq!(events.len(), 1); + assert_eq!(outcome_code(&events[0]).as_deref(), Some("8")); +} + +#[tokio::test] +async fn bulk_export_download_unknown_file_emits_failure() { + let env = create_audited_server().await; + let resp = env + .server + .get("/export-file/nonexistent/Patient-0") + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(resp.status_code(), StatusCode::NOT_FOUND); + + let events = export_events(&env.sink, "download"); + assert_eq!(events.len(), 1); + assert_eq!(outcome_code(&events[0]).as_deref(), Some("8")); +} + +#[tokio::test] +async fn bulk_export_full_lifecycle_emits_lifecycle_events() { + let env = create_audited_server().await; + seed_patients(&env.backend, 2).await; + + let resp = env + .server + .get("/$export") + .add_header("x-tenant-id", "test-tenant") + .add_header("prefer", "respond-async") + .add_query_param("_type", "Patient") + .await; + assert_eq!(resp.status_code(), StatusCode::ACCEPTED); + let status_path = resp + .headers() + .get("content-location") + .unwrap() + .to_str() + .unwrap() + .strip_prefix("http://localhost:8080") + .unwrap() + .to_string(); + + // Run the worker to completion. + drain_workers_with_audit(&env.backend, &env.output, &env.sink).await; + + // Worker-level completion event emitted. + let worker_complete = export_events(&env.sink, "worker-complete"); + assert_eq!(worker_complete.len(), 1, "worker emits a completion event"); + assert_eq!(outcome_code(&worker_complete[0]).as_deref(), Some("0")); + + // Successful status poll (terminal "Complete" branch). + let done = env + .server + .get(&status_path) + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(done.status_code(), StatusCode::OK); + let status_complete = export_events(&env.sink, "status-complete"); + assert_eq!(status_complete.len(), 1); + assert_eq!(outcome_code(&status_complete[0]).as_deref(), Some("0")); + + // Delete the job. + let deleted = env + .server + .delete(&status_path) + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(deleted.status_code(), StatusCode::ACCEPTED); + let delete_events = export_events(&env.sink, "delete"); + assert!( + delete_events + .iter() + .any(|e| outcome_code(e).as_deref() == Some("0")), + "delete success event present" + ); +} + +// --------------------------------------------------------------------------- +// $purge +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn purge_instance_success_emits_audit_event() { + let env = create_audited_server().await; + seed_patients(&env.backend, 1).await; + + let resp = env + .server + .delete("/Patient/p0/$purge") + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(resp.status_code(), StatusCode::NO_CONTENT); + + let events = events_for_operation(&env.sink, "purge"); + assert_eq!(events.len(), 1); + let event = &events[0]; + assert_eq!(action_code(event).as_deref(), Some("D")); + assert_eq!(outcome_code(event).as_deref(), Some("0")); + assert_eq!(detail_value(event, "count").as_deref(), Some("1")); +} + +#[tokio::test] +async fn purge_instance_unknown_resource_emits_failure() { + let env = create_audited_server().await; + let resp = env + .server + .delete("/Patient/missing/$purge") + .add_header("x-tenant-id", "test-tenant") + .await; + // SqliteBackend::purge returns StorageError::Resource(NotFound), which + // RestError::From maps to 404. + assert_eq!(resp.status_code(), StatusCode::NOT_FOUND); + + let events = events_for_operation(&env.sink, "purge"); + assert_eq!(events.len(), 1); + let event = &events[0]; + assert_eq!(action_code(event).as_deref(), Some("D")); + assert_eq!(outcome_code(event).as_deref(), Some("8")); +} + +#[tokio::test] +async fn purge_type_success_emits_audit_event_with_count() { + let env = create_audited_server().await; + seed_patients(&env.backend, 3).await; + + let resp = env + .server + .post("/Patient/$purge") + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(resp.status_code(), StatusCode::OK); + + let events = events_for_operation(&env.sink, "purge"); + assert_eq!(events.len(), 1); + let event = &events[0]; + assert_eq!(outcome_code(event).as_deref(), Some("0")); + assert_eq!(detail_value(event, "count").as_deref(), Some("3")); + assert_eq!( + detail_value(event, "resource-type").as_deref(), + Some("Patient") + ); +} + +#[tokio::test] +async fn purge_audit_event_is_blocked() { + let env = create_audited_server().await; + let resp = env + .server + .delete("/AuditEvent/anything/$purge") + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(resp.status_code(), StatusCode::METHOD_NOT_ALLOWED); + // No purge AuditEvent should be emitted for blocked AuditEvent purges. + assert!(events_for_operation(&env.sink, "purge").is_empty()); +} + +// --------------------------------------------------------------------------- +// $reindex +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn reindex_kickoff_emits_kickoff_and_start_events() { + // Both events fire synchronously before `controller.start().await` + // returns: the persistence-layer "start" event is recorded in + // `ReindexOperation::start` *before* the background task is spawned, and + // the REST "kickoff" event is recorded right after `start` resolves. + // No timing slack is needed. + let env = create_audited_server().await; + seed_patients(&env.backend, 1).await; + + let resp = env + .server + .post("/$reindex") + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(resp.status_code(), StatusCode::ACCEPTED); + + let events = events_for_operation(&env.sink, "reindex"); + let kickoff = events + .iter() + .find(|e| detail_value(e, "phase").as_deref() == Some("kickoff")) + .expect("REST handler emitted a kickoff event"); + assert_eq!(outcome_code(kickoff).as_deref(), Some("0")); + + let start = events + .iter() + .find(|e| detail_value(e, "phase").as_deref() == Some("start")) + .expect("ReindexOperation emitted a start event"); + assert_eq!(outcome_code(start).as_deref(), Some("0")); +} + +#[tokio::test] +async fn reindex_type_scoped_kickoff_records_resource_types() { + let env = create_audited_server().await; + seed_patients(&env.backend, 1).await; + + let resp = env + .server + .post("/Patient/$reindex") + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(resp.status_code(), StatusCode::ACCEPTED); + + let kickoffs: Vec<_> = events_for_operation(&env.sink, "reindex") + .into_iter() + .filter(|e| detail_value(e, "phase").as_deref() == Some("kickoff")) + .collect(); + assert_eq!(kickoffs.len(), 1); + assert_eq!( + detail_value(&kickoffs[0], "resource-types").as_deref(), + Some("Patient") + ); +} + +#[tokio::test] +async fn reindex_cancel_unknown_job_emits_failure() { + let env = create_audited_server().await; + let resp = env + .server + .delete("/$reindex-status/nonexistent") + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(resp.status_code(), StatusCode::NOT_FOUND); + + let cancels: Vec<_> = events_for_operation(&env.sink, "reindex") + .into_iter() + .filter(|e| detail_value(e, "phase").as_deref() == Some("cancel")) + .collect(); + assert_eq!(cancels.len(), 1); + assert_eq!(outcome_code(&cancels[0]).as_deref(), Some("8")); +} + +#[tokio::test] +async fn reindex_unavailable_when_controller_missing() { + // Build a server WITHOUT the reindex controller and confirm the handler + // returns 501 with no audit emission (the early bail beats the audit + // path, mirroring bulk-export's "disabled" behavior). + let backend_config = SqliteBackendConfig::default(); + let backend = Arc::new( + SqliteBackend::with_config(":memory:", backend_config).expect("create SQLite backend"), + ); + backend.init_schema().expect("init schema"); + + let config = ServerConfig { + multitenancy: MultitenancyConfig { + routing_mode: TenantRoutingMode::HeaderOnly, + ..Default::default() + }, + base_url: "http://localhost:8080".to_string(), + default_tenant: "test-tenant".to_string(), + ..ServerConfig::for_testing() + }; + let sink = Arc::new(InMemoryAuditSink::new()); + let state = helios_rest::AppState::with_auth_and_audit( + Arc::clone(&backend), + config, + helios_auth::AuthConfig::default(), + None, + Some(sink.clone() as Arc), + SOURCE_OBSERVER, + ); + + let app = helios_rest::routing::fhir_routes::create_routes(state); + let server = TestServer::new(app).expect("create test server"); + + let resp = server + .post("/$reindex") + .add_header("x-tenant-id", "test-tenant") + .await; + assert_eq!(resp.status_code(), StatusCode::NOT_IMPLEMENTED); + assert!( + events_for_operation(&sink, "reindex").is_empty(), + "no audit event when controller is absent" + ); +}