From a3eaa3538e3391e0cb59d3df4d008ed644e9e9bd Mon Sep 17 00:00:00 2001 From: Dzmitry Kalabuk Date: Mon, 29 Jun 2026 20:34:42 +0300 Subject: [PATCH] Stop collecting logs once the buffer is full Avoids the network round-trips, deserialization, and signature checks for workers whose logs would only be dropped. The buffer drains each round, so skipped workers are picked up next time. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 2 +- crates/logs-collector/Cargo.toml | 2 +- crates/logs-collector/src/collector.rs | 6 ++++++ crates/logs-collector/src/server.rs | 10 ++++++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 21cfb7e..730e9ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3783,7 +3783,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" [[package]] name = "logs-collector" -version = "2.1.9" +version = "2.1.10" dependencies = [ "anyhow", "clap", diff --git a/crates/logs-collector/Cargo.toml b/crates/logs-collector/Cargo.toml index 675ae18..80bb2ce 100644 --- a/crates/logs-collector/Cargo.toml +++ b/crates/logs-collector/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "logs-collector" -version = "2.1.9" +version = "2.1.10" edition = "2021" [dependencies] diff --git a/crates/logs-collector/src/collector.rs b/crates/logs-collector/src/collector.rs index 96bee9c..d6f7289 100644 --- a/crates/logs-collector/src/collector.rs +++ b/crates/logs-collector/src/collector.rs @@ -74,6 +74,12 @@ impl LogsCollector { } } + /// Whether the buffer has reached its memory limit. Callers use this to stop + /// collecting more logs that would only be dropped (see `buffer_logs`). + pub fn is_full(&self) -> bool { + self.buffer.lock().size >= self.max_buffer_size + } + pub async fn dump_buffer(&mut self) -> anyhow::Result<()> { let logs = { let buffer = self.buffer.get_mut(); diff --git a/crates/logs-collector/src/server.rs b/crates/logs-collector/src/server.rs index a06c78d..600fd60 100644 --- a/crates/logs-collector/src/server.rs +++ b/crates/logs-collector/src/server.rs @@ -117,6 +117,12 @@ where } async fn collect_logs(&self, worker_id: PeerId, mut from_timestamp_ms: u64) { + // Don't even request logs we'd have to drop. The buffer drains every round, + // so these workers are picked up again next time. + if self.logs_collector.is_full() { + log::debug!("Buffer full, skipping log collection from {worker_id}"); + return; + } let mut last_query_id = None; for page in 0..MAX_PAGES { if page == 0 { @@ -163,6 +169,10 @@ where if !logs.has_more { return; } + if self.logs_collector.is_full() { + log::debug!("Buffer full, stopping log collection from {worker_id}"); + return; + } } log::warn!("Logs from {worker_id} didn't fit in {MAX_PAGES} pages, giving up"); }