Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "serverish"
version = "2.0.2"
version = "2.0.3"
description = "helpers for server alike projects"
authors = ["Mikołaj Kałuszyński", "MMME team"]
readme = "README.md"
Expand Down
29 changes: 24 additions & 5 deletions serverish/messenger/msg_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,13 @@ async def read_batch(self) -> None:
log.debug(self.fmt(f"No messages available, finishing due to nowait"))
raise self.EndIterationException('nowait')

# Non-nowait mode: wait for NEW messages to arrive
# Break into shorter intervals to allow health checks and reconnection handling
blocking_interval = 10.0 # Check every 10 seconds
max_wait_cycles = 10 # Total wait: 10 * 10s = 100s
# Non-nowait mode: wait for NEW messages to arrive.
# Break into shorter intervals so the reconnect-needed
# check at the top of the loop runs often enough that
# low-rate subjects don't sit on a stale subscription
# for tens of seconds after a NATS reconnect.
blocking_interval = 2.0
max_wait_cycles = 50 # Total wait: 50 * 2s = 100s (unchanged)

for cycle in range(max_wait_cycles):
# Check for reconnection before each wait cycle
Expand Down Expand Up @@ -621,12 +624,28 @@ async def _reopen(self) -> None:
max_retries = 3
for attempt in range(max_retries):
try:
# Clean up existing subscription
# Clean up existing subscription. `unsubscribe()` only detaches the
# local handle; the JetStream consumer must be deleted explicitly
# or it leaks server-side across every reconnect (one orphan per
# reopen, accumulating until the stream's consumer cap or the
# process restarts).
if self.pull_subscription is not None:
try:
ci = await self.pull_subscription.consumer_info()
except Exception as e:
log.debug(f"Error fetching consumer_info during reopen attempt {attempt + 1}: {e}")
ci = None
try:
await self.pull_subscription.unsubscribe()
except Exception as e:
log.debug(f"Error unsubscribing during reopen attempt {attempt + 1}: {e}")
if ci is not None:
try:
await self.connection.js.delete_consumer(stream=ci.stream_name, consumer=ci.name)
except nats.js.errors.NotFoundError:
pass # already gone, nothing to delete
except Exception as e:
log.debug(f"Error deleting old consumer during reopen attempt {attempt + 1}: {e}")

# Create new consumer configuration
consumer_conf = await self._create_consumer_cfg()
Expand Down
Loading