fix: topic reference drop if topic.gossip_sender/receiver() is called…#26
Conversation
… instead of split (everything stops) + initial_interval added to bubble and message_overlap workers + reduced custom max_join_peers for merge strategies (now individually configurable) + added bash script to run many instances at once in tmux
📝 WalkthroughWalkthroughAdded two merge configuration fields— Changes
Sequence Diagram(s)sequenceDiagram
participant Topic
participant ConfigBuilder
participant Merge (Bubble/MessageOverlap)
participant TokioTimer
participant MergeWorker
Note over Topic,ConfigBuilder: startup sequence
Topic->>ConfigBuilder: build merge config (initial_interval, max_join_peers)
ConfigBuilder-->>Topic: merge config
Topic->>Merge: new(config, initial_interval, max_join_peers, base_interval,...)
Merge->>TokioTimer: interval_at(now + initial_interval, base_interval)
TokioTimer-->>MergeWorker: first tick (after initial_interval)
MergeWorker->>MergeWorker: compute jitter → reset_after(base_interval + jitter)
MergeWorker->>Merge: perform merge round (respect max_join_peers)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/gossip/topic/topic.rs (1)
125-132: 🧹 Nitpick | 🔵 Trivial
split()now double-sets_topic_keep_alive.Since
gossip_sender()(Line 135-145) andgossip_receiver()(Line 148-157) each now inject their ownArc<self.clone()>into_topic_keep_alive, the subsequent assignments insplit()overwrite a value that was already set moments earlier. Functionally correct, but twoArc<Topic>allocations per split are discarded. Consider simplifying:♻️ Proposed simplification
pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> { - let topic_ref = Arc::new(self.clone()); - let mut sender = self.gossip_sender().await?; - let mut receiver = self.gossip_receiver().await?; - sender._topic_keep_alive = Some(topic_ref.clone()); - receiver._topic_keep_alive = Some(topic_ref); - Ok((sender, receiver)) + let sender = self.gossip_sender().await?; + let receiver = self.gossip_receiver().await?; + Ok((sender, receiver)) }Alternatively, if you prefer a single shared
Arc<Topic>across sender+receiver fromsplit()(slightly cheaper than two separate allocations), keep the current body but drop the now-redundant set insidegossip_sender/gossip_receiver— pick one place to own the keep-alive policy.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/gossip/topic/topic.rs` around lines 125 - 132, split() creates a single Arc of the topic and then overwrites _topic_keep_alive which is redundantly also set inside gossip_sender() and gossip_receiver(); to avoid double Arc allocations, remove the _topic_keep_alive assignment (the Arc::new(self.clone()) lines) from gossip_sender() and gossip_receiver() so split() is the single place that injects the shared Arc into sender._topic_keep_alive and receiver._topic_keep_alive, leaving gossip_sender/gossip_receiver to only construct and return their respective objects.src/config.rs (1)
226-537: 🧹 Nitpick | 🔵 TrivialConfig additions look consistent; one minor API-surface nit.
The
initial_interval/max_join_peersfields, defaults (30s / 2), getters, and builder setters mirror each other acrossBubbleMergeConfigInnerandMessageOverlapMergeConfigInnercleanly, and both integrate with the merge constructors viatopic.rs.One inconsistency worth noting:
initial_intervalaccepts anyDuration(includingZERO) without validation, while the siblingbase_intervalsetter no-ops onZEROand documents the override semantics. Zero is actually a valid value here (fires immediately), so this is fine — but consider a one-line doc note oninitial_intervalclarifying thatDuration::ZEROis accepted and means "start immediately", to preempt confusion with thebase_intervalno-op convention.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config.rs` around lines 226 - 537, Add a one-line doc clarification to the initial_interval API docs stating that Duration::ZERO is accepted and means "start immediately"; update the doc comments for the initial_interval methods and builders in BubbleMergeConfigInner (initial_interval method and BubbleMergeConfigBuilder::initial_interval) and MessageOverlapMergeConfigInner (initial_interval method and MessageOverlapMergeConfigBuilder::initial_interval) to note this behavior so it’s explicit alongside the existing base_interval zero-noop semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/gossip/topic/topic.rs`:
- Around line 490-579: Unify the timeouts and extend both tests
(test_topic_survives_topic_drop_split and
test_topic_survives_topic_drop_manual_sender_receiver) to prove the cancel token
only fires after all user-facing handles are dropped: after obtaining (_sender,
_receiver) from topic.split() or from
topic.gossip_sender()/topic.gossip_receiver(), drop(topic) as you already do,
then assert cancel_token is NOT cancelled within a single unified timeout
(choose e.g. 2s), then explicitly drop(_sender); drop(_receiver) and assert the
cancel_token does get cancelled (use tokio::time::timeout to wait for
cancel_token.cancelled() and expect Ok) to close the lifetime semantics loop.
In `@test_many_chats.sh`:
- Around line 1-20: Validate and harden the script by enabling safe bash flags
(add set -euo pipefail and IFS=$'\n\t'), remove the redundant shift after
reading N, and validate that N is a positive integer before using it (check N
with a regex like [[ $N =~ ^[0-9]+$ ]] and ensure N -ge 1), exiting with a clear
error message on failure; also check that tmux new-session (using CMD/SESSION)
succeeds before entering the loop (exit if it fails) so you don't attempt tmux
split-window/attach on failure.
---
Outside diff comments:
In `@src/config.rs`:
- Around line 226-537: Add a one-line doc clarification to the initial_interval
API docs stating that Duration::ZERO is accepted and means "start immediately";
update the doc comments for the initial_interval methods and builders in
BubbleMergeConfigInner (initial_interval method and
BubbleMergeConfigBuilder::initial_interval) and MessageOverlapMergeConfigInner
(initial_interval method and MessageOverlapMergeConfigBuilder::initial_interval)
to note this behavior so it’s explicit alongside the existing base_interval
zero-noop semantics.
In `@src/gossip/topic/topic.rs`:
- Around line 125-132: split() creates a single Arc of the topic and then
overwrites _topic_keep_alive which is redundantly also set inside
gossip_sender() and gossip_receiver(); to avoid double Arc allocations, remove
the _topic_keep_alive assignment (the Arc::new(self.clone()) lines) from
gossip_sender() and gossip_receiver() so split() is the single place that
injects the shared Arc into sender._topic_keep_alive and
receiver._topic_keep_alive, leaving gossip_sender/gossip_receiver to only
construct and return their respective objects.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f3bd7ec6-481c-4bd9-98d3-9734ac4dd1ff
📒 Files selected for processing (7)
README.mdexamples/full_config.rssrc/config.rssrc/gossip/merge/bubble.rssrc/gossip/merge/message_overlap.rssrc/gossip/topic/topic.rstest_many_chats.sh
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
test_many_chats.sh (1)
1-15:⚠️ Potential issue | 🟡 MinorHarden argument and failure handling before creating/splitting tmux panes.
This still has the previously flagged robustness gaps: no strict bash mode, redundant
shift(Line 9), no positive-integer validation forN(Lines 8/15), and no guard fortmux new-sessionfailure (Line 13), which can cause cascading command failures.🛠️ Suggested patch
#!/bin/bash +set -euo pipefail +IFS=$'\n\t' if [ $# -lt 1 ]; then echo "Usage: $0 <n>" exit 1 fi N=$1 -shift +if ! [[ "$N" =~ ^[0-9]+$ ]] || [ "$N" -lt 1 ]; then + echo "Error: <n> must be a positive integer, got '$N'" + exit 1 +fi CMD="cargo run --example chat" SESSION="multi-$$" -tmux new-session -d -s "$SESSION" "$CMD" +tmux new-session -d -s "$SESSION" "$CMD" || { + echo "Error: failed to create tmux session '$SESSION'" + exit 1 +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test_many_chats.sh` around lines 1 - 15, Enable strict bash modes (set -euo pipefail and IFS=$'\n\t') at the top, remove the redundant shift after assigning N, validate that N is a positive integer (check N =~ ^[1-9][0-9]*$) and error out with usage when invalid, and check the exit status of tmux new-session -d -s "$SESSION" "$CMD" (abort with a clear error if it fails) before attempting to split/create additional panes in the for loop; update references in the script to variables N, CMD, SESSION and the tmux new-session invocation accordingly so failures are caught early and handled cleanly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@Cargo.toml`:
- Line 10: Update the crate version in Cargo.toml from "0.3.2" to "0.4.0" to
reflect the breaking API change introduced by adding the required positional
parameter `initial_interval` to the public constructors `BubbleMerge::new()` and
`MessageOverlapMerge::new()` (exported in lib.rs); locate Cargo.toml and bump
the version string to "0.4.0" so semantic versioning correctly signals the
incompatible change to users.
In `@src/gossip/topic/topic.rs`:
- Around line 485-596: Extract the duplicated async test setup into a helper
async fn (e.g., make_topic) inside the test module that accepts a name: &str and
returns (crate::Topic, tokio_util::sync::CancellationToken) by performing the
secret_key/SigningKey generation, endpoint bind, iroh_gossip::net::Gossip spawn,
TopicId::new, RecordPublisher::new and crate::Topic::new calls currently
repeated in test_topic_full_shutdown_on_drop,
test_topic_survives_topic_drop_split, and
test_topic_survives_topic_drop_manual_sender_receiver; then update each test to
call make_topic(...) to obtain the Topic and its cancel_token and keep the
existing lifecycle assertions unchanged (references: Topic::new,
RecordPublisher::new, TopicId::new, topic.cancel_token(), topic.split(),
topic.gossip_sender(), topic.gossip_receiver()).
---
Duplicate comments:
In `@test_many_chats.sh`:
- Around line 1-15: Enable strict bash modes (set -euo pipefail and IFS=$'\n\t')
at the top, remove the redundant shift after assigning N, validate that N is a
positive integer (check N =~ ^[1-9][0-9]*$) and error out with usage when
invalid, and check the exit status of tmux new-session -d -s "$SESSION" "$CMD"
(abort with a clear error if it fails) before attempting to split/create
additional panes in the for loop; update references in the script to variables
N, CMD, SESSION and the tmux new-session invocation accordingly so failures are
caught early and handled cleanly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 14080ab8-22a5-4b0c-bf08-ae3b134aa170
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (12)
Cargo.tomlREADME.mdexamples/full_config.rssrc/config.rssrc/crypto/record.rssrc/dht.rssrc/gossip/merge/bubble.rssrc/gossip/merge/message_overlap.rssrc/gossip/topic/bootstrap.rssrc/gossip/topic/publisher.rssrc/gossip/topic/topic.rstest_many_chats.sh
| keywords = ["networking"] | ||
| categories = ["network-programming"] | ||
| version = "0.3.1" | ||
| version = "0.3.2" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Check whether the changed merge constructors are exported through the crate's public module tree.
set -euo pipefail
fd -e rs '^(lib|mod)$|gossip|merge' src -x sh -c '
echo "--- $1"
rg -nP -C3 "pub\s+(mod|use)\s+.*(gossip|merge|BubbleMerge|MessageOverlapMerge)|\b(BubbleMerge|MessageOverlapMerge)\b" "$1" || true
' sh {}Repository: rustonbsd/distributed-topic-tracker
Length of output: 61
🏁 Script executed:
# First, find all Rust files and understand directory structure
fd -e rs --type f src | head -20Repository: rustonbsd/distributed-topic-tracker
Length of output: 61
🏁 Script executed:
# Search for BubbleMerge and MessageOverlapMerge definitions
rg "struct\s+(BubbleMerge|MessageOverlapMerge)" -t rustRepository: rustonbsd/distributed-topic-tracker
Length of output: 542
🏁 Script executed:
# Check the public exports in lib.rs
cat src/lib.rs 2>/dev/null || echo "lib.rs not found"Repository: rustonbsd/distributed-topic-tracker
Length of output: 1579
🏁 Script executed:
# Look for the merge module structure
fd -path "*/merge*" -t f srcRepository: rustonbsd/distributed-topic-tracker
Length of output: 247
🏁 Script executed:
# Find the new() constructor implementations for BubbleMerge
rg -A 5 "impl\s+BubbleMerge" src/gossip/merge/bubble.rs -t rustRepository: rustonbsd/distributed-topic-tracker
Length of output: 771
🏁 Script executed:
# Find the new() constructor implementations for MessageOverlapMerge
rg -A 5 "impl\s+MessageOverlapMerge" src/gossip/merge/message_overlap.rs -t rustRepository: rustonbsd/distributed-topic-tracker
Length of output: 848
🏁 Script executed:
# Look at the actual constructor signature for BubbleMerge
sed -n '/impl.*BubbleMerge/,/^}/p' src/gossip/merge/bubble.rs | head -30Repository: rustonbsd/distributed-topic-tracker
Length of output: 1220
🏁 Script executed:
# Look at the actual constructor signature for MessageOverlapMerge
sed -n '/impl.*MessageOverlapMerge/,/^}/p' src/gossip/merge/message_overlap.rs | head -30Repository: rustonbsd/distributed-topic-tracker
Length of output: 1214
Change version bump to 0.4.0 to reflect the breaking API change.
Adding initial_interval as a required positional argument to BubbleMerge::new() and MessageOverlapMerge::new() breaks backward compatibility. These constructors are publicly exported from lib.rs, so existing ^0.3 users will experience compilation failures. A patch bump (0.3.2) is incompatible with breaking changes; use a minor pre-1.0 bump such as 0.4.0.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@Cargo.toml` at line 10, Update the crate version in Cargo.toml from "0.3.2"
to "0.4.0" to reflect the breaking API change introduced by adding the required
positional parameter `initial_interval` to the public constructors
`BubbleMerge::new()` and `MessageOverlapMerge::new()` (exported in lib.rs);
locate Cargo.toml and bump the version string to "0.4.0" so semantic versioning
correctly signals the incompatible change to users.
| #[tokio::test] | ||
| async fn test_topic_survives_topic_drop_split() { | ||
| let secret_key = iroh::SecretKey::generate(); | ||
| let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes()); | ||
| let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0) | ||
| .secret_key(secret_key.clone()) | ||
| .bind() | ||
| .await | ||
| .expect("failed to bind endpoint"); | ||
| let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone()); | ||
|
|
||
| let topic_id = crate::TopicId::new("my-iroh-gossip-topic-survives-drop-split".to_string()); | ||
| let initial_secret = b"my-initial-secret".to_vec(); | ||
|
|
||
| let record_publisher = RecordPublisher::new( | ||
| topic_id.clone(), | ||
| signing_key.clone(), | ||
| None, | ||
| initial_secret, | ||
| crate::config::Config::default(), | ||
| ); | ||
|
|
||
| let topic = crate::Topic::new(record_publisher, gossip.clone(), true) | ||
| .await | ||
| .expect("failed to create Topic"); | ||
|
|
||
| let cancel_token = topic.cancel_token(); | ||
| let (_sender, _receiver) = topic.split().await.expect("failed to split topic"); | ||
|
|
||
| drop(topic); | ||
|
|
||
| assert!( | ||
| tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled()) | ||
| .await | ||
| .is_err() | ||
| ); | ||
|
|
||
| assert!(!cancel_token.is_cancelled()); | ||
|
|
||
| drop(_sender); | ||
| drop(_receiver); | ||
|
|
||
| assert!( | ||
| tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled()) | ||
| .await | ||
| .is_ok() | ||
| ); | ||
|
|
||
| assert!(cancel_token.is_cancelled()); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_topic_survives_topic_drop_manual_sender_receiver() { | ||
| let secret_key = iroh::SecretKey::generate(); | ||
| let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes()); | ||
| let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0) | ||
| .secret_key(secret_key.clone()) | ||
| .bind() | ||
| .await | ||
| .expect("failed to bind endpoint"); | ||
| let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone()); | ||
|
|
||
| let topic_id = crate::TopicId::new( | ||
| "my-iroh-gossip-topic-survives-drop-manual-sender-receiver".to_string(), | ||
| ); | ||
| let initial_secret = b"my-initial-secret".to_vec(); | ||
|
|
||
| let record_publisher = RecordPublisher::new( | ||
| topic_id.clone(), | ||
| signing_key.clone(), | ||
| None, | ||
| initial_secret, | ||
| crate::config::Config::default(), | ||
| ); | ||
|
|
||
| let topic = crate::Topic::new(record_publisher, gossip.clone(), true) | ||
| .await | ||
| .expect("failed to create Topic"); | ||
|
|
||
| let cancel_token = topic.cancel_token(); | ||
| let (_sender, _receiver) = ( | ||
| topic | ||
| .gossip_sender() | ||
| .await | ||
| .expect("failed to get gossip sender"), | ||
| topic | ||
| .gossip_receiver() | ||
| .await | ||
| .expect("failed to get gossip receiver"), | ||
| ); | ||
|
|
||
| drop(topic); | ||
|
|
||
| assert!( | ||
| tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled()) | ||
| .await | ||
| .is_err() | ||
| ); | ||
|
|
||
| assert!(!cancel_token.is_cancelled()); | ||
|
|
||
| drop(_sender); | ||
| drop(_receiver); | ||
|
|
||
| assert!( | ||
| tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled()) | ||
| .await | ||
| .is_ok() | ||
| ); | ||
|
|
||
| assert!(cancel_token.is_cancelled()); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Optional: de-duplicate the three survival/shutdown tests.
test_topic_full_shutdown_on_drop, test_topic_survives_topic_drop_split, and test_topic_survives_topic_drop_manual_sender_receiver share ~15 lines of identical setup (secret key → endpoint → gossip → RecordPublisher → Topic::new). A small async fn make_topic(name: &str) -> (Topic, CancellationToken) helper inside the test module would make each test a handful of lines focused on the actual lifecycle assertions.
No functional concern — prior feedback on unifying timeouts and asserting shutdown after dropping both handles is already incorporated (2s unified; drop(_sender); drop(_receiver); … .is_ok()).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/gossip/topic/topic.rs` around lines 485 - 596, Extract the duplicated
async test setup into a helper async fn (e.g., make_topic) inside the test
module that accepts a name: &str and returns (crate::Topic,
tokio_util::sync::CancellationToken) by performing the secret_key/SigningKey
generation, endpoint bind, iroh_gossip::net::Gossip spawn, TopicId::new,
RecordPublisher::new and crate::Topic::new calls currently repeated in
test_topic_full_shutdown_on_drop, test_topic_survives_topic_drop_split, and
test_topic_survives_topic_drop_manual_sender_receiver; then update each test to
call make_topic(...) to obtain the Topic and its cancel_token and keep the
existing lifecycle assertions unchanged (references: Topic::new,
RecordPublisher::new, TopicId::new, topic.cancel_token(), topic.split(),
topic.gossip_sender(), topic.gossip_receiver()).
Summary by CodeRabbit
New Features
Documentation
Tests
Chores