diff --git a/sentry-core/src/batcher.rs b/sentry-core/src/batcher.rs index 5a714588..28d19084 100644 --- a/sentry-core/src/batcher.rs +++ b/sentry-core/src/batcher.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Condvar, Mutex, MutexGuard}; use std::thread::JoinHandle; use std::time::{Duration, Instant}; -use crate::client::TransportArc; +use crate::client::EnvelopeSender; use crate::protocol::EnvelopeItem; use crate::Envelope; use sentry_types::protocol::v7::Log; @@ -50,7 +50,7 @@ impl Batch for Metric { /// Accumulates items in the queue and submits them through the transport when one of the flushing /// conditions is met. pub(crate) struct Batcher { - transport: TransportArc, + envelope_sender: EnvelopeSender, queue: Arc>>, shutdown: Arc<(Mutex, Condvar)>, worker: Option>, @@ -60,13 +60,13 @@ impl Batcher where T: Batch + Send + 'static, { - /// Creates a new Batcher that will submit envelopes to the given `transport`. - pub(crate) fn new(transport: TransportArc) -> Self { + /// Creates a new Batcher that will submit envelopes to the transport. + pub(crate) fn new(envelope_sender: EnvelopeSender) -> Self { let queue = Arc::new(Mutex::new(BatchQueue { items: Vec::new() })); #[allow(clippy::mutex_atomic)] let shutdown = Arc::new((Mutex::new(false), Condvar::new())); - let worker_transport = transport.clone(); + let worker_envelope_sender = envelope_sender.clone(); let worker_queue = queue.clone(); let worker_shutdown = shutdown.clone(); let worker = std::thread::Builder::new() @@ -90,7 +90,7 @@ where if last_flush.elapsed() >= FLUSH_INTERVAL { Batcher::flush_queue_internal( worker_queue.lock().unwrap(), - &worker_transport, + &worker_envelope_sender, ); last_flush = Instant::now(); } @@ -99,7 +99,7 @@ where .unwrap(); Self { - transport, + envelope_sender, queue, shutdown, worker: Some(worker), @@ -115,21 +115,24 @@ impl Batcher { let mut queue = self.queue.lock().unwrap(); queue.items.push(item); if queue.items.len() >= MAX_ITEMS { - Batcher::flush_queue_internal(queue, &self.transport); + Batcher::flush_queue_internal(queue, &self.envelope_sender); } } /// Flushes the queue to the transport. pub(crate) fn flush(&self) { let queue = self.queue.lock().unwrap(); - Batcher::flush_queue_internal(queue, &self.transport); + Batcher::flush_queue_internal(queue, &self.envelope_sender); } /// Flushes the queue to the transport. /// /// This is a static method as it will be called from both the background /// thread and the main thread on drop. - fn flush_queue_internal(mut queue_lock: MutexGuard>, transport: &TransportArc) { + fn flush_queue_internal( + mut queue_lock: MutexGuard>, + envelope_sender: &EnvelopeSender, + ) { let items = std::mem::take(&mut queue_lock.items); drop(queue_lock); @@ -139,12 +142,10 @@ impl Batcher { sentry_debug!("[Batcher({})] Flushing {} items", T::TYPE_NAME, items.len()); - if let Some(ref transport) = *transport.read().unwrap() { - let mut envelope = Envelope::new(); - let envelope_item = T::into_envelope_item(items); - envelope.add_item(envelope_item); - transport.send_envelope(envelope); - } + let mut envelope = Envelope::new(); + let envelope_item = T::into_envelope_item(items); + envelope.add_item(envelope_item); + envelope_sender.send_envelope(envelope); } } @@ -157,7 +158,7 @@ impl Drop for Batcher { if let Some(worker) = self.worker.take() { worker.join().ok(); } - Batcher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport); + Batcher::flush_queue_internal(self.queue.lock().unwrap(), &self.envelope_sender); } } diff --git a/sentry-core/src/client/envelope_sender.rs b/sentry-core/src/client/envelope_sender.rs new file mode 100644 index 00000000..1d0d60d5 --- /dev/null +++ b/sentry-core/src/client/envelope_sender.rs @@ -0,0 +1,187 @@ +//! Contains abstractions for sending envelopes. +//! +//! The most important type here is the [`EnvelopeSender`] struct, which wraps a [`Transport`] and +//! centralizes envelope sending logic. All code in this crate should send envelopes via the +//! [`EnvelopeSender`], not by using the [`Transport`] directly. + +use std::sync::Arc; +use std::time::Duration; + +use self::slot::TransportSlot; +use crate::{Envelope, Transport}; + +/// Sends envelopes through the client's transport. +/// +/// Cloning this sender has `Arc`-like semantics: clones share the same transport +/// slot and send to the same underlying transport until it is shut down. +/// +/// The [`Default`] implementation creates an [`EnvelopeSender`] without an underlying transport, +/// effectively rendering calls a no-op. +#[derive(Clone, Default)] +pub(crate) struct EnvelopeSender { + transport_slot: TransportSlot, +} + +impl EnvelopeSender { + /// Sends an envelope if the transport is still available. + pub(crate) fn send_envelope(&self, envelope: Envelope) { + // This forwards to `send_envelope_with`; any envelope pre-processing should be + // centralized in the `send_envelope_with` function! + self.send_envelope_with(|| Some(envelope)); + } + + /// Builds and sends an envelope if the transport is still available. + /// + /// The builder is only executed if this sender is still active. This allows skipping over + /// logic that constructs the envelope when it cannot be sent. The builder can also return + /// [`None`], in which case, we don't send anything. + pub(super) fn send_envelope_with(&self, builder: F) + where + F: FnOnce() -> Option, + { + self.transport_slot.send_envelope_with(builder) + } + + /// Creates a sender using the transport returned by the provided builder callback. + pub(super) fn new(transport_builder: F) -> Self + where + F: FnOnce() -> Arc, + { + let transport_slot = TransportSlot::new(transport_builder()); + Self { transport_slot } + } + + /// Flushes the transport if it is still available. + pub(super) fn flush(&self, timeout: Duration) -> bool { + self.transport_slot.flush(timeout) + } + + /// Shuts down and removes the transport if it is still available. + pub(super) fn shutdown(&self, timeout: Duration) -> bool { + self.transport_slot.shutdown(timeout) + } + + pub(super) fn clone_with_new_transport_slot(&self) -> Self { + let transport_slot = self.transport_slot.clone_into_new_slot(); + Self { transport_slot } + } + + /// Returns whether this sender currently has an available transport. + pub(super) fn is_enabled(&self) -> bool { + self.transport_slot.is_occupied() + } +} + +mod slot { + use std::sync::{Arc, RwLock}; + use std::time::Duration; + + use sentry_types::protocol::v7::Envelope; + + use crate::Transport; + + const READ_EXPECT_MSG: &str = "could not acquire transport read lock"; + const WRITE_EXPECT_MSG: &str = "could not acquire transport write lock"; + + /// A transport slot, which may or may not wrap a [`Transport`]. + /// + /// When initially constructed with [`TransportSlot::new`], this type will be wrapping this + /// transport. As long as constructed with a [`Transport`], as intended, this type also + /// implements [`Transport`], and all of the method calls forward to the underlying transport. + /// However, after [`Transport::shutdown`] is called on this slot, the slot is emptied, and + /// all future [`Transport`] method calls become no-ops. The type provides + /// [`Self::is_occupied`] to check if the transport is still present. + /// + /// This type has [`Arc`]-like clone semantics: clones share the underlying transport, and also + /// share the slot occupied status. + #[derive(Debug)] + pub(super) struct TransportSlot { + inner: Arc>>>, + } + + impl TransportSlot { + /// Create a new, occupied [`TransportSlot`] wrapping the provided transport. + pub(super) fn new(transport: Arc) -> Self { + let inner = Arc::new(RwLock::new(Some(transport))); + + Self { inner } + } + + /// Determine whether the slot is occupied, i.e. whether there is a transport inside. + pub(super) fn is_occupied(&self) -> bool { + self.inner.read().expect(READ_EXPECT_MSG).is_some() + } + + /// Creates a new [`TransportSlot`] with the same underlying `Transport`, but in a new + /// slot. + /// + /// If there is no transport, then we just return a clone of this empty slot. As empty + /// slots cannot become occupied later, this has the same semantics as returning a new + /// empty slot. + pub(super) fn clone_into_new_slot(&self) -> Self { + self.inner + .read() + .expect(READ_EXPECT_MSG) + .as_ref() + .map(|transport| Self::new(transport.clone())) + .unwrap_or_else(|| self.clone()) + } + } + + impl TransportSlot + where + T: Transport + ?Sized, + { + pub(super) fn send_envelope_with(&self, builder: F) + where + F: FnOnce() -> Option, + { + if let Some((transport, envelope)) = self + .inner + .read() + .expect(READ_EXPECT_MSG) + .as_deref() + .and_then(|transport| Some((transport, builder()?))) + { + transport.send_envelope(envelope); + } + } + + pub(super) fn flush(&self, timeout: Duration) -> bool { + self.inner + .read() + .expect(READ_EXPECT_MSG) + .as_deref() + .map(|transport| transport.flush(timeout)) + .unwrap_or(true) + } + + pub(super) fn shutdown(&self, timeout: Duration) -> bool { + let transport_opt = self.inner.write().expect(WRITE_EXPECT_MSG).take(); + if let Some(transport) = transport_opt { + sentry_debug!("client close; request transport to shut down"); + transport.shutdown(timeout) + } else { + sentry_debug!("client close; no transport to shut down"); + true + } + } + } + + impl Clone for TransportSlot { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } + } + + impl Default for TransportSlot { + /// Creates an empty, no-op [`TransportSlot`]. + fn default() -> Self { + Self { + inner: Default::default(), + } + } + } +} diff --git a/sentry-core/src/client.rs b/sentry-core/src/client/mod.rs similarity index 90% rename from sentry-core/src/client.rs rename to sentry-core/src/client/mod.rs index 69f41905..7ed8d542 100644 --- a/sentry-core/src/client.rs +++ b/sentry-core/src/client/mod.rs @@ -4,7 +4,9 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::fmt; use std::panic::RefUnwindSafe; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; +#[cfg(any(feature = "logs", feature = "metrics", feature = "release-health"))] +use std::sync::RwLock; use std::time::Duration; #[cfg(feature = "metrics")] @@ -23,7 +25,8 @@ use crate::session::SessionFlusher; use crate::types::{Dsn, Uuid}; #[cfg(feature = "release-health")] use crate::SessionMode; -use crate::{ClientOptions, Envelope, Hub, Integration, Scope, Transport}; +use crate::{ClientOptions, Envelope, Hub, Integration, Scope}; + #[cfg(feature = "logs")] use sentry_types::protocol::v7::Context; #[cfg(feature = "logs")] @@ -33,14 +36,16 @@ use sentry_types::protocol::v7::LogAttribute; #[cfg(feature = "metrics")] use sentry_types::protocol::v7::Metric; +mod envelope_sender; + +pub(crate) use self::envelope_sender::EnvelopeSender; + impl> From for Client { fn from(o: T) -> Client { Client::with_options(o.into()) } } -pub(crate) type TransportArc = Arc>>>; - /// The Sentry Client. /// /// The Client is responsible for event processing and sending events to the @@ -60,7 +65,7 @@ pub(crate) type TransportArc = Arc>>>; /// [Unified API]: https://develop.sentry.dev/sdk/unified-api/ pub struct Client { options: ClientOptions, - transport: TransportArc, + envelope_sender: EnvelopeSender, #[cfg(feature = "release-health")] session_flusher: RwLock>, #[cfg(feature = "logs")] @@ -86,17 +91,17 @@ impl fmt::Debug for Client { impl Clone for Client { fn clone(&self) -> Client { - let transport = Arc::new(RwLock::new(self.transport.read().unwrap().clone())); + let envelope_sender = self.envelope_sender.clone_with_new_transport_slot(); #[cfg(feature = "release-health")] let session_flusher = RwLock::new(Some(SessionFlusher::new( - transport.clone(), + envelope_sender.clone(), self.options.session_mode, ))); #[cfg(feature = "logs")] let logs_batcher = RwLock::new(if self.options.enable_logs { - Some(Batcher::new(transport.clone())) + Some(Batcher::new(envelope_sender.clone())) } else { None }); @@ -105,12 +110,12 @@ impl Clone for Client { let metrics_batcher = RwLock::new( self.options .enable_metrics - .then(|| Batcher::new(transport.clone())), + .then(|| Batcher::new(envelope_sender.clone())), ); Client { options: self.options.clone(), - transport, + envelope_sender, #[cfg(feature = "release-health")] session_flusher, #[cfg(feature = "logs")] @@ -161,14 +166,7 @@ impl Client { // See https://github.com/getsentry/sentry-rust/issues/237 Hub::with_current(|_| {}); - let create_transport = || { - options.dsn.as_ref()?; - let factory = options.transport.as_ref()?; - Some(factory.create_transport(&options)) - }; - - let transport = Arc::new(RwLock::new(create_transport())); - + let envelope_sender = build_envelope_sender(&options); let mut sdk_info = SDK_INFO.clone(); // NOTE: We do not filter out duplicate integrations based on their @@ -186,13 +184,13 @@ impl Client { #[cfg(feature = "release-health")] let session_flusher = RwLock::new(Some(SessionFlusher::new( - transport.clone(), + envelope_sender.clone(), options.session_mode, ))); #[cfg(feature = "logs")] let logs_batcher = RwLock::new(if options.enable_logs { - Some(Batcher::new(transport.clone())) + Some(Batcher::new(envelope_sender.clone())) } else { None }); @@ -201,13 +199,13 @@ impl Client { let metrics_batcher = RwLock::new( options .enable_metrics - .then(|| Batcher::new(transport.clone())), + .then(|| Batcher::new(envelope_sender.clone())), ); #[allow(unused_mut)] let mut client = Client { options, - transport, + envelope_sender, #[cfg(feature = "release-health")] session_flusher, #[cfg(feature = "logs")] @@ -414,14 +412,15 @@ impl Client { /// assert!(client.is_enabled()); /// ``` pub fn is_enabled(&self) -> bool { - self.options.dsn.is_some() && self.transport.read().unwrap().is_some() + self.options.dsn.is_some() && self.envelope_sender.is_enabled() } /// Captures an event and sends it to sentry. pub fn capture_event(&self, event: Event<'static>, scope: Option<&Scope>) -> Uuid { - if let Some(ref transport) = *self.transport.read().unwrap() { - if let Some(event) = self.prepare_event(event, scope) { - let event_id = event.event_id; + let mut event_id = Default::default(); + self.envelope_sender.send_envelope_with(|| { + self.prepare_event(event, scope).map(|event| { + event_id = event.event_id; let mut envelope: Envelope = event.into(); // For request-mode sessions, we aggregate them all instead of // flushing them out early. @@ -446,18 +445,15 @@ impl Client { } } - transport.send_envelope(envelope); - return event_id; - } - } - Default::default() + envelope + }) + }); + event_id } /// Sends the specified [`Envelope`] to sentry. pub fn send_envelope(&self, envelope: Envelope) { - if let Some(ref transport) = *self.transport.read().unwrap() { - transport.send_envelope(envelope); - } + self.envelope_sender.send_envelope(envelope); } #[cfg(feature = "release-health")] @@ -481,11 +477,8 @@ impl Client { if let Some(ref batcher) = *self.metrics_batcher.read().unwrap() { batcher.flush(); } - if let Some(ref transport) = *self.transport.read().unwrap() { - transport.flush(timeout.unwrap_or(self.options.shutdown_timeout)) - } else { - true - } + self.envelope_sender + .flush(timeout.unwrap_or(self.options.shutdown_timeout)) } /// Drains all pending events and shuts down the transport behind the @@ -502,14 +495,8 @@ impl Client { drop(self.logs_batcher.write().unwrap().take()); #[cfg(feature = "metrics")] drop(self.metrics_batcher.write().unwrap().take()); - let transport_opt = self.transport.write().unwrap().take(); - if let Some(transport) = transport_opt { - sentry_debug!("client close; request transport to shut down"); - transport.shutdown(timeout.unwrap_or(self.options.shutdown_timeout)) - } else { - sentry_debug!("client close; no transport to shut down"); - true - } + self.envelope_sender + .shutdown(timeout.unwrap_or(self.options.shutdown_timeout)) } /// Returns a random boolean with a probability defined @@ -598,3 +585,21 @@ impl Client { // Make this unwind safe. It's not out of the box because of the // `BeforeCallback`s inside `ClientOptions`, and the contained Integrations impl RefUnwindSafe for Client {} + +/// Build an [`EnvelopeSender`] from the given [`ClientOptions`]. +/// +/// If either the `dsn` or the `transport` are `None`, a no-op [`EnvelopeSender`] is returned. +fn build_envelope_sender(client_options: &ClientOptions) -> EnvelopeSender { + let ClientOptions { + dsn, + transport: transport_factory, + .. + } = client_options; + + match (dsn.as_ref(), transport_factory.as_ref()) { + (Some(_), Some(transport_factory)) => { + EnvelopeSender::new(|| transport_factory.create_transport(client_options)) + } + _ => Default::default(), + } +} diff --git a/sentry-core/src/session.rs b/sentry-core/src/session.rs index baa8a22d..ebe3489c 100644 --- a/sentry-core/src/session.rs +++ b/sentry-core/src/session.rs @@ -15,7 +15,7 @@ mod session_impl { use std::thread::JoinHandle; use std::time::{Duration, Instant, SystemTime}; - use crate::client::TransportArc; + use crate::client::EnvelopeSender; use crate::clientoptions::SessionMode; use crate::protocol::{ EnvelopeItem, Event, Level, SessionAggregateItem, SessionAggregates, SessionAttributes, @@ -190,7 +190,7 @@ mod session_impl { /// It has its own background thread that will flush its queue once every /// `FLUSH_INTERVAL`. pub(crate) struct SessionFlusher { - transport: TransportArc, + envelope_sender: EnvelopeSender, mode: SessionMode, queue: Arc>, shutdown: Arc<(Mutex, Condvar)>, @@ -198,13 +198,13 @@ mod session_impl { } impl SessionFlusher { - /// Creates a new Flusher that will submit envelopes to the given `transport`. - pub fn new(transport: TransportArc, mode: SessionMode) -> Self { + /// Creates a new Flusher that will submit envelopes to the transport. + pub fn new(envelope_sender: EnvelopeSender, mode: SessionMode) -> Self { let queue = Arc::new(Mutex::new(Default::default())); #[allow(clippy::mutex_atomic)] let shutdown = Arc::new((Mutex::new(false), Condvar::new())); - let worker_transport = transport.clone(); + let worker_envelope_sender = envelope_sender.clone(); let worker_queue = queue.clone(); let worker_shutdown = shutdown.clone(); let worker = std::thread::Builder::new() @@ -230,7 +230,7 @@ mod session_impl { } SessionFlusher::flush_queue_internal( worker_queue.lock().unwrap(), - &worker_transport, + &worker_envelope_sender, ); last_flush = Instant::now(); } @@ -238,7 +238,7 @@ mod session_impl { .unwrap(); Self { - transport, + envelope_sender, mode, queue, shutdown, @@ -255,7 +255,7 @@ mod session_impl { if self.mode == SessionMode::Application || !session_update.init { queue.individual.push(session_update); if queue.individual.len() >= MAX_SESSION_ITEMS { - SessionFlusher::flush_queue_internal(queue, &self.transport); + SessionFlusher::flush_queue_internal(queue, &self.envelope_sender); } return; } @@ -304,7 +304,7 @@ mod session_impl { /// Flushes the queue to the transport. pub fn flush(&self) { let queue = self.queue.lock().unwrap(); - SessionFlusher::flush_queue_internal(queue, &self.transport); + SessionFlusher::flush_queue_internal(queue, &self.envelope_sender); } /// Flushes the queue to the transport. @@ -313,7 +313,7 @@ mod session_impl { /// thread and the main thread on drop. fn flush_queue_internal( mut queue_lock: MutexGuard, - transport: &TransportArc, + envelope_sender: &EnvelopeSender, ) { let queue = std::mem::take(&mut queue_lock.individual); let aggregate = queue_lock.aggregated.take(); @@ -321,11 +321,9 @@ mod session_impl { // send aggregates if let Some(aggregate) = aggregate { - if let Some(ref transport) = *transport.read().unwrap() { - let mut envelope = Envelope::new(); - envelope.add_item(aggregate); - transport.send_envelope(envelope); - } + let mut envelope = Envelope::new(); + envelope.add_item(aggregate); + envelope_sender.send_envelope(envelope); } // send individual items @@ -338,9 +336,7 @@ mod session_impl { for session_update in queue { if items >= MAX_SESSION_ITEMS { - if let Some(ref transport) = *transport.read().unwrap() { - transport.send_envelope(envelope); - } + envelope_sender.send_envelope(envelope); envelope = Envelope::new(); items = 0; } @@ -349,9 +345,7 @@ mod session_impl { items += 1; } - if let Some(ref transport) = *transport.read().unwrap() { - transport.send_envelope(envelope); - } + envelope_sender.send_envelope(envelope); } } @@ -364,7 +358,7 @@ mod session_impl { if let Some(worker) = self.worker.take() { worker.join().ok(); } - SessionFlusher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport); + SessionFlusher::flush_queue_internal(self.queue.lock().unwrap(), &self.envelope_sender); } }