Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "serverish"
version = "2.0.3"
version = "2.0.4"
description = "helpers for server alike projects"
authors = ["Mikołaj Kałuszyński", "MMME team"]
readme = "README.md"
Expand Down
12 changes: 9 additions & 3 deletions serverish/messenger/msg_callback_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ async def subscribe(self, callback: Callable[[dict, dict], bool] | Callable[[dic
Args:
callback: a callback function to call on each message, may be asynchronous
callback is called with two arguments: message dict and metadata dict,
and should return True to continue reading messages, False to stop
and may return False to stop reading messages. Any other return value
(including None — i.e. callbacks with no explicit return — and True)
keeps the subscription running.
"""
self.callback = callback
if asyncio.iscoroutinefunction(callback):
Expand All @@ -59,7 +61,7 @@ async def _task_body(self,
assert scb is not None or acb is not None
assert not (scb is not None and acb is not None)
cb = scb or acb
cont = True
cont: object = True
log.debug(f"Entering sync interation{self}")
async for data, meta in self:
try:
Expand All @@ -74,7 +76,11 @@ async def _task_body(self,
break
except Exception as e:
log.exception(f'Error in callback {cb} for message {meta}{str(data):20}: {e}')
if not cont or self._stop_event.is_set() :
# Stop only on explicit False — None (the implicit "no return" value
# of a Python function) and any truthy value keep the subscription
# alive. Treating None as "stop" would silently kill any callback
# that doesn't bother returning anything, which is the common case.
if cont is False or self._stop_event.is_set():
break
log.debug(f"Exiting sync interation{self}")

Expand Down
92 changes: 92 additions & 0 deletions tests/test_messenger_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,95 @@ async def publisher_task(pub, n):
await asyncio.sleep(6)
await sub.wait_for_empty()
assert len(msgs) == 9


async def _wait_until(predicate, timeout: float = 5.0, interval: float = 0.1) -> bool:
"""Poll predicate up to timeout seconds; return True as soon as it holds."""
for _ in range(int(timeout / interval)):
if predicate():
return True
await asyncio.sleep(interval)
return predicate()


@pytest.mark.nats
async def test_async_callback_returning_none_keeps_subscription_alive(messenger, unique_subject):
"""Regression: async callback with no explicit return (returns None) must
not stop the subscription loop.

The buggy predicate `if not cont` fires for both False and None — so any
`async def cb(...)` that does not explicitly `return True` silently broke
the subscription after the first delivered message. Production symptom:
pms aggregator's 144 cache.put callbacks died after one replay message.
"""
received = []

async def cb(data, meta):
received.append(data['n'])
# implicit return None — the idiomatic case the bug breaks

pub = get_publisher(subject=unique_subject)
await messenger.purge(subject=unique_subject)

sub = get_callbacksubscriber(subject=unique_subject, deliver_policy='all')
async with sub:
await sub.subscribe(cb)
for i in range(5):
await pub.publish(data={'n': i})
await asyncio.sleep(0.05)
await _wait_until(lambda: len(received) >= 5, timeout=5.0)
assert received == [0, 1, 2, 3, 4], (
f"Async callback returning None must not stop the loop. "
f"Received {len(received)}/5: {received}"
)


@pytest.mark.nats
async def test_async_callback_returning_false_stops_subscription(messenger, unique_subject):
"""The documented stop-on-False contract must still hold after the fix."""
received = []

async def cb(data, meta):
received.append(data['n'])
return False # explicit stop after first message

pub = get_publisher(subject=unique_subject)
await messenger.purge(subject=unique_subject)

sub = get_callbacksubscriber(subject=unique_subject, deliver_policy='all')
async with sub:
await sub.subscribe(cb)
for i in range(5):
await pub.publish(data={'n': i})
await asyncio.sleep(0.05)
# Give the loop ample time to (incorrectly) keep going if broken
await asyncio.sleep(1.5)
assert received == [0], (
f"Returning False must stop the loop after one message. Received {received}"
)


@pytest.mark.nats
async def test_sync_callback_returning_none_keeps_subscription_alive(messenger, unique_subject):
"""Same regression as the async case, for sync callbacks — `def cb` with
no explicit return also returns None and must keep the loop alive."""
received = []

def cb(data, meta):
received.append(data['n'])
# implicit return None

pub = get_publisher(subject=unique_subject)
await messenger.purge(subject=unique_subject)

sub = get_callbacksubscriber(subject=unique_subject, deliver_policy='all')
async with sub:
await sub.subscribe(cb)
for i in range(5):
await pub.publish(data={'n': i})
await asyncio.sleep(0.05)
await _wait_until(lambda: len(received) >= 5, timeout=5.0)
assert received == [0, 1, 2, 3, 4], (
f"Sync callback returning None must not stop the loop. "
f"Received {len(received)}/5: {received}"
)
Loading