From 587fe93c5244b25819b3728478cfb9bcd15940d6 Mon Sep 17 00:00:00 2001 From: Daniel Szoke Date: Mon, 8 Jun 2026 11:09:05 +0200 Subject: [PATCH 1/2] ref(client): Create `EnvelopeSender` to wrap `Transport` Create a new `EnvelopeSender` type, which wraps the client's transport. `EnvelopeSender` is itself a lightweight wrapper around a `TransportSlot`, which itself is a replacement for `TransportArc` and was also introduced in this change. The `TransportSlot` abstracts the logic for managing whether the transport is present or not, while the `EnvelopeSender` abstracts away the whole transport itself. **Why the change?** Previously, code in several different places (the client, the batchers, and the session flusher) interacted with the transport directly. This makes it difficult for us to ensure that any preprocessing we want to do on all envelopes before they go to the transport happens in all the places we use the transport. With this change, we can perform any needed preprocessing in the `EnvelopeSender`'s `send_envelope` implementation. We intend to use this for sending client reports; we will check in `EnvelopeSender`'s `send_envelope` whether there are any pending reports, and attach them to the outgoing envelope if yes. Contributes to [#1002](https://github.com/getsentry/sentry-rust/issues/1002) Contributes to [RUST-154](https://linear.app/getsentry/issue/RUST-154/add-shared-client-report-aggregator-and-send-client-reports-option) Closes [#1155](https://github.com/getsentry/sentry-rust/issues/1155) Closes [RUST-230](https://linear.app/getsentry/issue/RUST-230) --- sentry-core/src/batcher.rs | 35 ++-- sentry-core/src/client/envelope_sender.rs | 164 +++++++++++++++++++ sentry-core/src/{client.rs => client/mod.rs} | 87 +++++----- sentry-core/src/session.rs | 38 ++--- 4 files changed, 244 insertions(+), 80 deletions(-) create mode 100644 sentry-core/src/client/envelope_sender.rs rename sentry-core/src/{client.rs => client/mod.rs} (91%) diff --git a/sentry-core/src/batcher.rs b/sentry-core/src/batcher.rs index 5a714588..28d19084 100644 --- a/sentry-core/src/batcher.rs +++ b/sentry-core/src/batcher.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Condvar, Mutex, MutexGuard}; use std::thread::JoinHandle; use std::time::{Duration, Instant}; -use crate::client::TransportArc; +use crate::client::EnvelopeSender; use crate::protocol::EnvelopeItem; use crate::Envelope; use sentry_types::protocol::v7::Log; @@ -50,7 +50,7 @@ impl Batch for Metric { /// Accumulates items in the queue and submits them through the transport when one of the flushing /// conditions is met. pub(crate) struct Batcher { - transport: TransportArc, + envelope_sender: EnvelopeSender, queue: Arc>>, shutdown: Arc<(Mutex, Condvar)>, worker: Option>, @@ -60,13 +60,13 @@ impl Batcher where T: Batch + Send + 'static, { - /// Creates a new Batcher that will submit envelopes to the given `transport`. - pub(crate) fn new(transport: TransportArc) -> Self { + /// Creates a new Batcher that will submit envelopes to the transport. + pub(crate) fn new(envelope_sender: EnvelopeSender) -> Self { let queue = Arc::new(Mutex::new(BatchQueue { items: Vec::new() })); #[allow(clippy::mutex_atomic)] let shutdown = Arc::new((Mutex::new(false), Condvar::new())); - let worker_transport = transport.clone(); + let worker_envelope_sender = envelope_sender.clone(); let worker_queue = queue.clone(); let worker_shutdown = shutdown.clone(); let worker = std::thread::Builder::new() @@ -90,7 +90,7 @@ where if last_flush.elapsed() >= FLUSH_INTERVAL { Batcher::flush_queue_internal( worker_queue.lock().unwrap(), - &worker_transport, + &worker_envelope_sender, ); last_flush = Instant::now(); } @@ -99,7 +99,7 @@ where .unwrap(); Self { - transport, + envelope_sender, queue, shutdown, worker: Some(worker), @@ -115,21 +115,24 @@ impl Batcher { let mut queue = self.queue.lock().unwrap(); queue.items.push(item); if queue.items.len() >= MAX_ITEMS { - Batcher::flush_queue_internal(queue, &self.transport); + Batcher::flush_queue_internal(queue, &self.envelope_sender); } } /// Flushes the queue to the transport. pub(crate) fn flush(&self) { let queue = self.queue.lock().unwrap(); - Batcher::flush_queue_internal(queue, &self.transport); + Batcher::flush_queue_internal(queue, &self.envelope_sender); } /// Flushes the queue to the transport. /// /// This is a static method as it will be called from both the background /// thread and the main thread on drop. - fn flush_queue_internal(mut queue_lock: MutexGuard>, transport: &TransportArc) { + fn flush_queue_internal( + mut queue_lock: MutexGuard>, + envelope_sender: &EnvelopeSender, + ) { let items = std::mem::take(&mut queue_lock.items); drop(queue_lock); @@ -139,12 +142,10 @@ impl Batcher { sentry_debug!("[Batcher({})] Flushing {} items", T::TYPE_NAME, items.len()); - if let Some(ref transport) = *transport.read().unwrap() { - let mut envelope = Envelope::new(); - let envelope_item = T::into_envelope_item(items); - envelope.add_item(envelope_item); - transport.send_envelope(envelope); - } + let mut envelope = Envelope::new(); + let envelope_item = T::into_envelope_item(items); + envelope.add_item(envelope_item); + envelope_sender.send_envelope(envelope); } } @@ -157,7 +158,7 @@ impl Drop for Batcher { if let Some(worker) = self.worker.take() { worker.join().ok(); } - Batcher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport); + Batcher::flush_queue_internal(self.queue.lock().unwrap(), &self.envelope_sender); } } diff --git a/sentry-core/src/client/envelope_sender.rs b/sentry-core/src/client/envelope_sender.rs new file mode 100644 index 00000000..7f919715 --- /dev/null +++ b/sentry-core/src/client/envelope_sender.rs @@ -0,0 +1,164 @@ +//! Contains abstractions for sending envelopes. +//! +//! The most important type here is the [`EnvelopeSender`] struct, which wraps a [`Transport`] and +//! centralizes envelope sending logic. All code in this crate should send envelopes via the +//! [`EnvelopeSender`], not by using the [`Transport`] directly. + +use std::sync::Arc; +use std::time::Duration; + +use self::slot::TransportSlot; +use crate::{Envelope, Transport}; + +/// Sends envelopes through the client's transport. +/// +/// Cloning this sender has `Arc`-like semantics: clones share the same transport +/// slot and send to the same underlying transport until it is shut down. +/// +/// The [`Default`] implementation creates an [`EnvelopeSender`] without an underlying transport, +/// effectively rendering calls a no-op. +#[derive(Clone, Default)] +pub(crate) struct EnvelopeSender { + transport_slot: TransportSlot, +} + +impl EnvelopeSender { + /// Sends an envelope if the transport is still available. + pub(crate) fn send_envelope(&self, envelope: Envelope) { + self.transport_slot.send_envelope(envelope); + } + + /// Creates a sender using the transport returned by the provided builder callback. + pub(super) fn new(transport_builder: F) -> Self + where + F: FnOnce() -> Arc, + { + let transport_slot = TransportSlot::new(transport_builder()); + Self { transport_slot } + } + + /// Flushes the transport if it is still available. + pub(super) fn flush(&self, timeout: Duration) -> bool { + self.transport_slot.flush(timeout) + } + + /// Shuts down and removes the transport if it is still available. + pub(super) fn shutdown(&self, timeout: Duration) -> bool { + self.transport_slot.shutdown(timeout) + } + + pub(super) fn clone_with_new_transport_slot(&self) -> Self { + let transport_slot = self.transport_slot.clone_into_new_slot(); + Self { transport_slot } + } + + /// Returns whether this sender currently has an available transport. + pub(super) fn is_enabled(&self) -> bool { + self.transport_slot.is_occupied() + } +} + +mod slot { + use std::sync::{Arc, RwLock}; + use std::time::Duration; + + use sentry_types::protocol::v7::Envelope; + + use crate::Transport; + + const READ_EXPECT_MSG: &str = "could not acquire transport read lock"; + const WRITE_EXPECT_MSG: &str = "could not acquire transport write lock"; + + /// A transport slot, which may or may not wrap a [`Transport`]. + /// + /// When initially constructed with [`TransportSlot::new`], this type will be wrapping this + /// transport. As long as constructed with a [`Transport`], as intended, this type also + /// implements [`Transport`], and all of the method calls forward to the underlying transport. + /// However, after [`Transport::shutdown`] is called on this slot, the slot is emptied, and + /// all future [`Transport`] method calls become no-ops. The type provides + /// [`Self::is_occupied`] to check if the transport is still present. + /// + /// This type has [`Arc`]-like clone semantics: clones share the underlying transport, and also + /// share the slot occupied status. + #[derive(Debug)] + pub(super) struct TransportSlot { + inner: Arc>>>, + } + + impl TransportSlot { + /// Create a new, occupied [`TransportSlot`] wrapping the provided transport. + pub(super) fn new(transport: Arc) -> Self { + let inner = Arc::new(RwLock::new(Some(transport))); + + Self { inner } + } + + /// Determine whether the slot is occupied, i.e. whether there is a transport inside. + pub(super) fn is_occupied(&self) -> bool { + self.inner.read().expect(READ_EXPECT_MSG).is_some() + } + + /// Creates a new [`TransportSlot`] with the same underlying `Transport`, but in a new + /// slot. + /// + /// If there is no transport, then we just return a clone of this empty slot. As empty + /// slots cannot become occupied later, this has the same semantics as returning a new + /// empty slot. + pub(super) fn clone_into_new_slot(&self) -> Self { + self.inner + .read() + .expect(READ_EXPECT_MSG) + .as_ref() + .map(|transport| Self::new(transport.clone())) + .unwrap_or_else(|| self.clone()) + } + } + + impl Transport for TransportSlot + where + T: Transport + ?Sized, + { + fn send_envelope(&self, envelope: Envelope) { + if let Some(transport) = self.inner.read().expect(READ_EXPECT_MSG).as_deref() { + transport.send_envelope(envelope); + } + } + + fn flush(&self, timeout: Duration) -> bool { + self.inner + .read() + .expect(READ_EXPECT_MSG) + .as_deref() + .map(|transport| transport.flush(timeout)) + .unwrap_or(true) + } + + fn shutdown(&self, timeout: Duration) -> bool { + let transport_opt = self.inner.write().expect(WRITE_EXPECT_MSG).take(); + if let Some(transport) = transport_opt { + sentry_debug!("client close; request transport to shut down"); + transport.shutdown(timeout) + } else { + sentry_debug!("client close; no transport to shut down"); + true + } + } + } + + impl Clone for TransportSlot { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } + } + + impl Default for TransportSlot { + /// Creates an empty, no-op [`TransportSlot`]. + fn default() -> Self { + Self { + inner: Default::default(), + } + } + } +} diff --git a/sentry-core/src/client.rs b/sentry-core/src/client/mod.rs similarity index 91% rename from sentry-core/src/client.rs rename to sentry-core/src/client/mod.rs index 69f41905..9dec3cca 100644 --- a/sentry-core/src/client.rs +++ b/sentry-core/src/client/mod.rs @@ -4,7 +4,9 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::fmt; use std::panic::RefUnwindSafe; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; +#[cfg(any(feature = "logs", feature = "metrics", feature = "release-health"))] +use std::sync::RwLock; use std::time::Duration; #[cfg(feature = "metrics")] @@ -23,7 +25,8 @@ use crate::session::SessionFlusher; use crate::types::{Dsn, Uuid}; #[cfg(feature = "release-health")] use crate::SessionMode; -use crate::{ClientOptions, Envelope, Hub, Integration, Scope, Transport}; +use crate::{ClientOptions, Envelope, Hub, Integration, Scope}; + #[cfg(feature = "logs")] use sentry_types::protocol::v7::Context; #[cfg(feature = "logs")] @@ -33,14 +36,16 @@ use sentry_types::protocol::v7::LogAttribute; #[cfg(feature = "metrics")] use sentry_types::protocol::v7::Metric; +mod envelope_sender; + +pub(crate) use self::envelope_sender::EnvelopeSender; + impl> From for Client { fn from(o: T) -> Client { Client::with_options(o.into()) } } -pub(crate) type TransportArc = Arc>>>; - /// The Sentry Client. /// /// The Client is responsible for event processing and sending events to the @@ -60,7 +65,7 @@ pub(crate) type TransportArc = Arc>>>; /// [Unified API]: https://develop.sentry.dev/sdk/unified-api/ pub struct Client { options: ClientOptions, - transport: TransportArc, + envelope_sender: EnvelopeSender, #[cfg(feature = "release-health")] session_flusher: RwLock>, #[cfg(feature = "logs")] @@ -86,17 +91,17 @@ impl fmt::Debug for Client { impl Clone for Client { fn clone(&self) -> Client { - let transport = Arc::new(RwLock::new(self.transport.read().unwrap().clone())); + let envelope_sender = self.envelope_sender.clone_with_new_transport_slot(); #[cfg(feature = "release-health")] let session_flusher = RwLock::new(Some(SessionFlusher::new( - transport.clone(), + envelope_sender.clone(), self.options.session_mode, ))); #[cfg(feature = "logs")] let logs_batcher = RwLock::new(if self.options.enable_logs { - Some(Batcher::new(transport.clone())) + Some(Batcher::new(envelope_sender.clone())) } else { None }); @@ -105,12 +110,12 @@ impl Clone for Client { let metrics_batcher = RwLock::new( self.options .enable_metrics - .then(|| Batcher::new(transport.clone())), + .then(|| Batcher::new(envelope_sender.clone())), ); Client { options: self.options.clone(), - transport, + envelope_sender, #[cfg(feature = "release-health")] session_flusher, #[cfg(feature = "logs")] @@ -161,14 +166,7 @@ impl Client { // See https://github.com/getsentry/sentry-rust/issues/237 Hub::with_current(|_| {}); - let create_transport = || { - options.dsn.as_ref()?; - let factory = options.transport.as_ref()?; - Some(factory.create_transport(&options)) - }; - - let transport = Arc::new(RwLock::new(create_transport())); - + let envelope_sender = build_envelope_sender(&options); let mut sdk_info = SDK_INFO.clone(); // NOTE: We do not filter out duplicate integrations based on their @@ -186,13 +184,13 @@ impl Client { #[cfg(feature = "release-health")] let session_flusher = RwLock::new(Some(SessionFlusher::new( - transport.clone(), + envelope_sender.clone(), options.session_mode, ))); #[cfg(feature = "logs")] let logs_batcher = RwLock::new(if options.enable_logs { - Some(Batcher::new(transport.clone())) + Some(Batcher::new(envelope_sender.clone())) } else { None }); @@ -201,13 +199,13 @@ impl Client { let metrics_batcher = RwLock::new( options .enable_metrics - .then(|| Batcher::new(transport.clone())), + .then(|| Batcher::new(envelope_sender.clone())), ); #[allow(unused_mut)] let mut client = Client { options, - transport, + envelope_sender, #[cfg(feature = "release-health")] session_flusher, #[cfg(feature = "logs")] @@ -414,12 +412,12 @@ impl Client { /// assert!(client.is_enabled()); /// ``` pub fn is_enabled(&self) -> bool { - self.options.dsn.is_some() && self.transport.read().unwrap().is_some() + self.options.dsn.is_some() && self.envelope_sender.is_enabled() } /// Captures an event and sends it to sentry. pub fn capture_event(&self, event: Event<'static>, scope: Option<&Scope>) -> Uuid { - if let Some(ref transport) = *self.transport.read().unwrap() { + if self.envelope_sender.is_enabled() { if let Some(event) = self.prepare_event(event, scope) { let event_id = event.event_id; let mut envelope: Envelope = event.into(); @@ -446,7 +444,7 @@ impl Client { } } - transport.send_envelope(envelope); + self.envelope_sender.send_envelope(envelope); return event_id; } } @@ -455,9 +453,7 @@ impl Client { /// Sends the specified [`Envelope`] to sentry. pub fn send_envelope(&self, envelope: Envelope) { - if let Some(ref transport) = *self.transport.read().unwrap() { - transport.send_envelope(envelope); - } + self.envelope_sender.send_envelope(envelope); } #[cfg(feature = "release-health")] @@ -481,11 +477,8 @@ impl Client { if let Some(ref batcher) = *self.metrics_batcher.read().unwrap() { batcher.flush(); } - if let Some(ref transport) = *self.transport.read().unwrap() { - transport.flush(timeout.unwrap_or(self.options.shutdown_timeout)) - } else { - true - } + self.envelope_sender + .flush(timeout.unwrap_or(self.options.shutdown_timeout)) } /// Drains all pending events and shuts down the transport behind the @@ -502,14 +495,8 @@ impl Client { drop(self.logs_batcher.write().unwrap().take()); #[cfg(feature = "metrics")] drop(self.metrics_batcher.write().unwrap().take()); - let transport_opt = self.transport.write().unwrap().take(); - if let Some(transport) = transport_opt { - sentry_debug!("client close; request transport to shut down"); - transport.shutdown(timeout.unwrap_or(self.options.shutdown_timeout)) - } else { - sentry_debug!("client close; no transport to shut down"); - true - } + self.envelope_sender + .shutdown(timeout.unwrap_or(self.options.shutdown_timeout)) } /// Returns a random boolean with a probability defined @@ -598,3 +585,21 @@ impl Client { // Make this unwind safe. It's not out of the box because of the // `BeforeCallback`s inside `ClientOptions`, and the contained Integrations impl RefUnwindSafe for Client {} + +/// Build an [`EnvelopeSender`] from the given [`ClientOptions`]. +/// +/// If either the `dsn` or the `transport` are `None`, a no-op [`EnvelopeSender`] is returned. +fn build_envelope_sender(client_options: &ClientOptions) -> EnvelopeSender { + let ClientOptions { + dsn, + transport: transport_factory, + .. + } = client_options; + + match (dsn.as_ref(), transport_factory.as_ref()) { + (Some(_), Some(transport_factory)) => { + EnvelopeSender::new(|| transport_factory.create_transport(client_options)) + } + _ => Default::default(), + } +} diff --git a/sentry-core/src/session.rs b/sentry-core/src/session.rs index baa8a22d..ebe3489c 100644 --- a/sentry-core/src/session.rs +++ b/sentry-core/src/session.rs @@ -15,7 +15,7 @@ mod session_impl { use std::thread::JoinHandle; use std::time::{Duration, Instant, SystemTime}; - use crate::client::TransportArc; + use crate::client::EnvelopeSender; use crate::clientoptions::SessionMode; use crate::protocol::{ EnvelopeItem, Event, Level, SessionAggregateItem, SessionAggregates, SessionAttributes, @@ -190,7 +190,7 @@ mod session_impl { /// It has its own background thread that will flush its queue once every /// `FLUSH_INTERVAL`. pub(crate) struct SessionFlusher { - transport: TransportArc, + envelope_sender: EnvelopeSender, mode: SessionMode, queue: Arc>, shutdown: Arc<(Mutex, Condvar)>, @@ -198,13 +198,13 @@ mod session_impl { } impl SessionFlusher { - /// Creates a new Flusher that will submit envelopes to the given `transport`. - pub fn new(transport: TransportArc, mode: SessionMode) -> Self { + /// Creates a new Flusher that will submit envelopes to the transport. + pub fn new(envelope_sender: EnvelopeSender, mode: SessionMode) -> Self { let queue = Arc::new(Mutex::new(Default::default())); #[allow(clippy::mutex_atomic)] let shutdown = Arc::new((Mutex::new(false), Condvar::new())); - let worker_transport = transport.clone(); + let worker_envelope_sender = envelope_sender.clone(); let worker_queue = queue.clone(); let worker_shutdown = shutdown.clone(); let worker = std::thread::Builder::new() @@ -230,7 +230,7 @@ mod session_impl { } SessionFlusher::flush_queue_internal( worker_queue.lock().unwrap(), - &worker_transport, + &worker_envelope_sender, ); last_flush = Instant::now(); } @@ -238,7 +238,7 @@ mod session_impl { .unwrap(); Self { - transport, + envelope_sender, mode, queue, shutdown, @@ -255,7 +255,7 @@ mod session_impl { if self.mode == SessionMode::Application || !session_update.init { queue.individual.push(session_update); if queue.individual.len() >= MAX_SESSION_ITEMS { - SessionFlusher::flush_queue_internal(queue, &self.transport); + SessionFlusher::flush_queue_internal(queue, &self.envelope_sender); } return; } @@ -304,7 +304,7 @@ mod session_impl { /// Flushes the queue to the transport. pub fn flush(&self) { let queue = self.queue.lock().unwrap(); - SessionFlusher::flush_queue_internal(queue, &self.transport); + SessionFlusher::flush_queue_internal(queue, &self.envelope_sender); } /// Flushes the queue to the transport. @@ -313,7 +313,7 @@ mod session_impl { /// thread and the main thread on drop. fn flush_queue_internal( mut queue_lock: MutexGuard, - transport: &TransportArc, + envelope_sender: &EnvelopeSender, ) { let queue = std::mem::take(&mut queue_lock.individual); let aggregate = queue_lock.aggregated.take(); @@ -321,11 +321,9 @@ mod session_impl { // send aggregates if let Some(aggregate) = aggregate { - if let Some(ref transport) = *transport.read().unwrap() { - let mut envelope = Envelope::new(); - envelope.add_item(aggregate); - transport.send_envelope(envelope); - } + let mut envelope = Envelope::new(); + envelope.add_item(aggregate); + envelope_sender.send_envelope(envelope); } // send individual items @@ -338,9 +336,7 @@ mod session_impl { for session_update in queue { if items >= MAX_SESSION_ITEMS { - if let Some(ref transport) = *transport.read().unwrap() { - transport.send_envelope(envelope); - } + envelope_sender.send_envelope(envelope); envelope = Envelope::new(); items = 0; } @@ -349,9 +345,7 @@ mod session_impl { items += 1; } - if let Some(ref transport) = *transport.read().unwrap() { - transport.send_envelope(envelope); - } + envelope_sender.send_envelope(envelope); } } @@ -364,7 +358,7 @@ mod session_impl { if let Some(worker) = self.worker.take() { worker.join().ok(); } - SessionFlusher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport); + SessionFlusher::flush_queue_internal(self.queue.lock().unwrap(), &self.envelope_sender); } } From 40e0905bfb69991118d2d27ee2c600098bfcf194 Mon Sep 17 00:00:00 2001 From: Daniel Szoke Date: Wed, 10 Jun 2026 15:28:20 +0200 Subject: [PATCH 2/2] fixup! ref(client): Create `EnvelopeSender` to wrap `Transport` --- sentry-core/src/client/envelope_sender.rs | 35 +++++++++++++++++++---- sentry-core/src/client/mod.rs | 16 +++++------ 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/sentry-core/src/client/envelope_sender.rs b/sentry-core/src/client/envelope_sender.rs index 7f919715..1d0d60d5 100644 --- a/sentry-core/src/client/envelope_sender.rs +++ b/sentry-core/src/client/envelope_sender.rs @@ -25,7 +25,21 @@ pub(crate) struct EnvelopeSender { impl EnvelopeSender { /// Sends an envelope if the transport is still available. pub(crate) fn send_envelope(&self, envelope: Envelope) { - self.transport_slot.send_envelope(envelope); + // This forwards to `send_envelope_with`; any envelope pre-processing should be + // centralized in the `send_envelope_with` function! + self.send_envelope_with(|| Some(envelope)); + } + + /// Builds and sends an envelope if the transport is still available. + /// + /// The builder is only executed if this sender is still active. This allows skipping over + /// logic that constructs the envelope when it cannot be sent. The builder can also return + /// [`None`], in which case, we don't send anything. + pub(super) fn send_envelope_with(&self, builder: F) + where + F: FnOnce() -> Option, + { + self.transport_slot.send_envelope_with(builder) } /// Creates a sender using the transport returned by the provided builder callback. @@ -114,17 +128,26 @@ mod slot { } } - impl Transport for TransportSlot + impl TransportSlot where T: Transport + ?Sized, { - fn send_envelope(&self, envelope: Envelope) { - if let Some(transport) = self.inner.read().expect(READ_EXPECT_MSG).as_deref() { + pub(super) fn send_envelope_with(&self, builder: F) + where + F: FnOnce() -> Option, + { + if let Some((transport, envelope)) = self + .inner + .read() + .expect(READ_EXPECT_MSG) + .as_deref() + .and_then(|transport| Some((transport, builder()?))) + { transport.send_envelope(envelope); } } - fn flush(&self, timeout: Duration) -> bool { + pub(super) fn flush(&self, timeout: Duration) -> bool { self.inner .read() .expect(READ_EXPECT_MSG) @@ -133,7 +156,7 @@ mod slot { .unwrap_or(true) } - fn shutdown(&self, timeout: Duration) -> bool { + pub(super) fn shutdown(&self, timeout: Duration) -> bool { let transport_opt = self.inner.write().expect(WRITE_EXPECT_MSG).take(); if let Some(transport) = transport_opt { sentry_debug!("client close; request transport to shut down"); diff --git a/sentry-core/src/client/mod.rs b/sentry-core/src/client/mod.rs index 9dec3cca..7ed8d542 100644 --- a/sentry-core/src/client/mod.rs +++ b/sentry-core/src/client/mod.rs @@ -417,9 +417,10 @@ impl Client { /// Captures an event and sends it to sentry. pub fn capture_event(&self, event: Event<'static>, scope: Option<&Scope>) -> Uuid { - if self.envelope_sender.is_enabled() { - if let Some(event) = self.prepare_event(event, scope) { - let event_id = event.event_id; + let mut event_id = Default::default(); + self.envelope_sender.send_envelope_with(|| { + self.prepare_event(event, scope).map(|event| { + event_id = event.event_id; let mut envelope: Envelope = event.into(); // For request-mode sessions, we aggregate them all instead of // flushing them out early. @@ -444,11 +445,10 @@ impl Client { } } - self.envelope_sender.send_envelope(envelope); - return event_id; - } - } - Default::default() + envelope + }) + }); + event_id } /// Sends the specified [`Envelope`] to sentry.