diff --git a/pyproject.toml b/pyproject.toml index 9c2186a..1fdf5b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "serverish" -version = "2.0.3" +version = "2.0.4" description = "helpers for server alike projects" authors = ["Mikołaj Kałuszyński", "MMME team"] readme = "README.md" diff --git a/serverish/messenger/msg_callback_sub.py b/serverish/messenger/msg_callback_sub.py index 5519133..b226513 100644 --- a/serverish/messenger/msg_callback_sub.py +++ b/serverish/messenger/msg_callback_sub.py @@ -42,7 +42,9 @@ async def subscribe(self, callback: Callable[[dict, dict], bool] | Callable[[dic Args: callback: a callback function to call on each message, may be asynchronous callback is called with two arguments: message dict and metadata dict, - and should return True to continue reading messages, False to stop + and may return False to stop reading messages. Any other return value + (including None — i.e. callbacks with no explicit return — and True) + keeps the subscription running. """ self.callback = callback if asyncio.iscoroutinefunction(callback): @@ -59,7 +61,7 @@ async def _task_body(self, assert scb is not None or acb is not None assert not (scb is not None and acb is not None) cb = scb or acb - cont = True + cont: object = True log.debug(f"Entering sync interation{self}") async for data, meta in self: try: @@ -74,7 +76,11 @@ async def _task_body(self, break except Exception as e: log.exception(f'Error in callback {cb} for message {meta}{str(data):20}: {e}') - if not cont or self._stop_event.is_set() : + # Stop only on explicit False — None (the implicit "no return" value + # of a Python function) and any truthy value keep the subscription + # alive. Treating None as "stop" would silently kill any callback + # that doesn't bother returning anything, which is the common case. + if cont is False or self._stop_event.is_set(): break log.debug(f"Exiting sync interation{self}") diff --git a/tests/test_messenger_sub.py b/tests/test_messenger_sub.py index 9e2d34a..7608148 100644 --- a/tests/test_messenger_sub.py +++ b/tests/test_messenger_sub.py @@ -38,3 +38,95 @@ async def publisher_task(pub, n): await asyncio.sleep(6) await sub.wait_for_empty() assert len(msgs) == 9 + + +async def _wait_until(predicate, timeout: float = 5.0, interval: float = 0.1) -> bool: + """Poll predicate up to timeout seconds; return True as soon as it holds.""" + for _ in range(int(timeout / interval)): + if predicate(): + return True + await asyncio.sleep(interval) + return predicate() + + +@pytest.mark.nats +async def test_async_callback_returning_none_keeps_subscription_alive(messenger, unique_subject): + """Regression: async callback with no explicit return (returns None) must + not stop the subscription loop. + + The buggy predicate `if not cont` fires for both False and None — so any + `async def cb(...)` that does not explicitly `return True` silently broke + the subscription after the first delivered message. Production symptom: + pms aggregator's 144 cache.put callbacks died after one replay message. + """ + received = [] + + async def cb(data, meta): + received.append(data['n']) + # implicit return None — the idiomatic case the bug breaks + + pub = get_publisher(subject=unique_subject) + await messenger.purge(subject=unique_subject) + + sub = get_callbacksubscriber(subject=unique_subject, deliver_policy='all') + async with sub: + await sub.subscribe(cb) + for i in range(5): + await pub.publish(data={'n': i}) + await asyncio.sleep(0.05) + await _wait_until(lambda: len(received) >= 5, timeout=5.0) + assert received == [0, 1, 2, 3, 4], ( + f"Async callback returning None must not stop the loop. " + f"Received {len(received)}/5: {received}" + ) + + +@pytest.mark.nats +async def test_async_callback_returning_false_stops_subscription(messenger, unique_subject): + """The documented stop-on-False contract must still hold after the fix.""" + received = [] + + async def cb(data, meta): + received.append(data['n']) + return False # explicit stop after first message + + pub = get_publisher(subject=unique_subject) + await messenger.purge(subject=unique_subject) + + sub = get_callbacksubscriber(subject=unique_subject, deliver_policy='all') + async with sub: + await sub.subscribe(cb) + for i in range(5): + await pub.publish(data={'n': i}) + await asyncio.sleep(0.05) + # Give the loop ample time to (incorrectly) keep going if broken + await asyncio.sleep(1.5) + assert received == [0], ( + f"Returning False must stop the loop after one message. Received {received}" + ) + + +@pytest.mark.nats +async def test_sync_callback_returning_none_keeps_subscription_alive(messenger, unique_subject): + """Same regression as the async case, for sync callbacks — `def cb` with + no explicit return also returns None and must keep the loop alive.""" + received = [] + + def cb(data, meta): + received.append(data['n']) + # implicit return None + + pub = get_publisher(subject=unique_subject) + await messenger.purge(subject=unique_subject) + + sub = get_callbacksubscriber(subject=unique_subject, deliver_policy='all') + async with sub: + await sub.subscribe(cb) + for i in range(5): + await pub.publish(data={'n': i}) + await asyncio.sleep(0.05) + await _wait_until(lambda: len(received) >= 5, timeout=5.0) + assert received == [0, 1, 2, 3, 4], ( + f"Sync callback returning None must not stop the loop. " + f"Received {len(received)}/5: {received}" + )