From 8719d256bcdbcf2f2bb3a0fce6f99ea6e8018957 Mon Sep 17 00:00:00 2001 From: majkelx Date: Sat, 2 May 2026 03:53:05 -0400 Subject: [PATCH 1/3] test(messenger): regression test for callback subscriber stopping on None return MsgCallbackSubscriber._task_body uses `if not cont` to decide whether to stop the loop, which fires for both False (the documented "stop" return) and None (the implicit return value of any callback that doesn't end with `return True`). Result: any `async def cb(...)` or `def cb(...)` that just does its work and returns nothing silently kills the subscription after the first delivered message. Three tests added: - async callback returning None must keep the loop alive across 5 messages - sync callback returning None must keep the loop alive across 5 messages - explicit `return False` must still stop the loop after one message (regression check that the fix preserves the documented stop contract) The first two fail on master; the third passes. The fix on feat/checking-log-burst-debug (dcf5aed) makes all three pass. --- tests/test_messenger_sub.py | 92 +++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/tests/test_messenger_sub.py b/tests/test_messenger_sub.py index 9e2d34a..7608148 100644 --- a/tests/test_messenger_sub.py +++ b/tests/test_messenger_sub.py @@ -38,3 +38,95 @@ async def publisher_task(pub, n): await asyncio.sleep(6) await sub.wait_for_empty() assert len(msgs) == 9 + + +async def _wait_until(predicate, timeout: float = 5.0, interval: float = 0.1) -> bool: + """Poll predicate up to timeout seconds; return True as soon as it holds.""" + for _ in range(int(timeout / interval)): + if predicate(): + return True + await asyncio.sleep(interval) + return predicate() + + +@pytest.mark.nats +async def test_async_callback_returning_none_keeps_subscription_alive(messenger, unique_subject): + """Regression: async callback with no explicit return (returns None) must + not stop the subscription loop. + + The buggy predicate `if not cont` fires for both False and None — so any + `async def cb(...)` that does not explicitly `return True` silently broke + the subscription after the first delivered message. Production symptom: + pms aggregator's 144 cache.put callbacks died after one replay message. + """ + received = [] + + async def cb(data, meta): + received.append(data['n']) + # implicit return None — the idiomatic case the bug breaks + + pub = get_publisher(subject=unique_subject) + await messenger.purge(subject=unique_subject) + + sub = get_callbacksubscriber(subject=unique_subject, deliver_policy='all') + async with sub: + await sub.subscribe(cb) + for i in range(5): + await pub.publish(data={'n': i}) + await asyncio.sleep(0.05) + await _wait_until(lambda: len(received) >= 5, timeout=5.0) + assert received == [0, 1, 2, 3, 4], ( + f"Async callback returning None must not stop the loop. " + f"Received {len(received)}/5: {received}" + ) + + +@pytest.mark.nats +async def test_async_callback_returning_false_stops_subscription(messenger, unique_subject): + """The documented stop-on-False contract must still hold after the fix.""" + received = [] + + async def cb(data, meta): + received.append(data['n']) + return False # explicit stop after first message + + pub = get_publisher(subject=unique_subject) + await messenger.purge(subject=unique_subject) + + sub = get_callbacksubscriber(subject=unique_subject, deliver_policy='all') + async with sub: + await sub.subscribe(cb) + for i in range(5): + await pub.publish(data={'n': i}) + await asyncio.sleep(0.05) + # Give the loop ample time to (incorrectly) keep going if broken + await asyncio.sleep(1.5) + assert received == [0], ( + f"Returning False must stop the loop after one message. Received {received}" + ) + + +@pytest.mark.nats +async def test_sync_callback_returning_none_keeps_subscription_alive(messenger, unique_subject): + """Same regression as the async case, for sync callbacks — `def cb` with + no explicit return also returns None and must keep the loop alive.""" + received = [] + + def cb(data, meta): + received.append(data['n']) + # implicit return None + + pub = get_publisher(subject=unique_subject) + await messenger.purge(subject=unique_subject) + + sub = get_callbacksubscriber(subject=unique_subject, deliver_policy='all') + async with sub: + await sub.subscribe(cb) + for i in range(5): + await pub.publish(data={'n': i}) + await asyncio.sleep(0.05) + await _wait_until(lambda: len(received) >= 5, timeout=5.0) + assert received == [0, 1, 2, 3, 4], ( + f"Sync callback returning None must not stop the loop. " + f"Received {len(received)}/5: {received}" + ) From e365389606d7c069efe43805fef524305001cf4f Mon Sep 17 00:00:00 2001 From: majkelx Date: Sat, 2 May 2026 02:23:31 -0400 Subject: [PATCH 2/3] Fix: callback subscriber stopped after first message when callback returned None MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MsgCallbackSubscriber._task_body treated None (the implicit return value of a Python function with no explicit return statement) the same as False — so any async def callback that didn't end with `return True` exited the subscription loop after the very first delivered message. Symptom (pms aggregator, 176 subscribers via asyncio.gather): - 32 subjects with no replay-time messages stayed in the wait loop and worked correctly. - 144 subjects that received an initial replay message had their callback fire once, return None implicitly, and break the loop. The subscription task exited silently (DEBUG-level "Exiting sync iteration" log was the only trace). The aggregator's value cache then never updated again, trends stayed null for hours, and the watchdog flagged "144/176 silent > 600s" exactly as observed in production. Fix: stop only on explicit False; None and any truthy value keep the subscription alive. Docstring updated to match the corrected contract. --- serverish/messenger/msg_callback_sub.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/serverish/messenger/msg_callback_sub.py b/serverish/messenger/msg_callback_sub.py index 5519133..b226513 100644 --- a/serverish/messenger/msg_callback_sub.py +++ b/serverish/messenger/msg_callback_sub.py @@ -42,7 +42,9 @@ async def subscribe(self, callback: Callable[[dict, dict], bool] | Callable[[dic Args: callback: a callback function to call on each message, may be asynchronous callback is called with two arguments: message dict and metadata dict, - and should return True to continue reading messages, False to stop + and may return False to stop reading messages. Any other return value + (including None — i.e. callbacks with no explicit return — and True) + keeps the subscription running. """ self.callback = callback if asyncio.iscoroutinefunction(callback): @@ -59,7 +61,7 @@ async def _task_body(self, assert scb is not None or acb is not None assert not (scb is not None and acb is not None) cb = scb or acb - cont = True + cont: object = True log.debug(f"Entering sync interation{self}") async for data, meta in self: try: @@ -74,7 +76,11 @@ async def _task_body(self, break except Exception as e: log.exception(f'Error in callback {cb} for message {meta}{str(data):20}: {e}') - if not cont or self._stop_event.is_set() : + # Stop only on explicit False — None (the implicit "no return" value + # of a Python function) and any truthy value keep the subscription + # alive. Treating None as "stop" would silently kill any callback + # that doesn't bother returning anything, which is the common case. + if cont is False or self._stop_event.is_set(): break log.debug(f"Exiting sync interation{self}") From a7e7126a6db11e2097d0f809136e3bcf016ba0a2 Mon Sep 17 00:00:00 2001 From: majkelx Date: Sat, 2 May 2026 04:12:59 -0400 Subject: [PATCH 3/3] chore: bump to 2.0.4 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9c2186a..1fdf5b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "serverish" -version = "2.0.3" +version = "2.0.4" description = "helpers for server alike projects" authors = ["Mikołaj Kałuszyński", "MMME team"] readme = "README.md"