diff --git a/pyproject.toml b/pyproject.toml index 7f58980..9c2186a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "serverish" -version = "2.0.2" +version = "2.0.3" description = "helpers for server alike projects" authors = ["Mikołaj Kałuszyński", "MMME team"] readme = "README.md" diff --git a/serverish/messenger/msg_reader.py b/serverish/messenger/msg_reader.py index 51a4d6e..a01436f 100644 --- a/serverish/messenger/msg_reader.py +++ b/serverish/messenger/msg_reader.py @@ -292,10 +292,13 @@ async def read_batch(self) -> None: log.debug(self.fmt(f"No messages available, finishing due to nowait")) raise self.EndIterationException('nowait') - # Non-nowait mode: wait for NEW messages to arrive - # Break into shorter intervals to allow health checks and reconnection handling - blocking_interval = 10.0 # Check every 10 seconds - max_wait_cycles = 10 # Total wait: 10 * 10s = 100s + # Non-nowait mode: wait for NEW messages to arrive. + # Break into shorter intervals so the reconnect-needed + # check at the top of the loop runs often enough that + # low-rate subjects don't sit on a stale subscription + # for tens of seconds after a NATS reconnect. + blocking_interval = 2.0 + max_wait_cycles = 50 # Total wait: 50 * 2s = 100s (unchanged) for cycle in range(max_wait_cycles): # Check for reconnection before each wait cycle @@ -621,12 +624,28 @@ async def _reopen(self) -> None: max_retries = 3 for attempt in range(max_retries): try: - # Clean up existing subscription + # Clean up existing subscription. `unsubscribe()` only detaches the + # local handle; the JetStream consumer must be deleted explicitly + # or it leaks server-side across every reconnect (one orphan per + # reopen, accumulating until the stream's consumer cap or the + # process restarts). if self.pull_subscription is not None: + try: + ci = await self.pull_subscription.consumer_info() + except Exception as e: + log.debug(f"Error fetching consumer_info during reopen attempt {attempt + 1}: {e}") + ci = None try: await self.pull_subscription.unsubscribe() except Exception as e: log.debug(f"Error unsubscribing during reopen attempt {attempt + 1}: {e}") + if ci is not None: + try: + await self.connection.js.delete_consumer(stream=ci.stream_name, consumer=ci.name) + except nats.js.errors.NotFoundError: + pass # already gone, nothing to delete + except Exception as e: + log.debug(f"Error deleting old consumer during reopen attempt {attempt + 1}: {e}") # Create new consumer configuration consumer_conf = await self._create_consumer_cfg()