feat(broadcast): add unbounded policy#117
Draft
orthur2 wants to merge 2 commits into
Draft
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR implements the unbounded policy proposed in #95. Messages remain in the shared buffer until every active receiver has consumed them, or until receivers that have not consumed them are dropped. The channel has no capacity limit, so receivers do not lag and the receive error type has no Lagged variant.
The WaitSet cleanup needed by Recv::Drop was handled separately in #116. This PR is based on that change and focuses on the unbounded policy.
The channel uses a single Mutex to protect the VecDeque<Arc> buffer and the Slab containing receiver cursors. It also tracks how many receivers are at the current buffer head. When the last receiver at the head advances or is dropped, the channel finds the new minimum cursor and removes the corresponding messages. Removed values are dropped after releasing the lock.
Messages are stored as Arc internally, while recv returns a cloned T, so callers do not need to use Arc themselves. subscribe and resubscribe both create a receiver at the current tail without changing the backlog of existing receivers. Message versions use u64, and send panics if the version counter overflows.
By the way, I will update this branch if the WaitSet design changes.
Refs #95. Based on #116.