From 4b7b3591b1463c1c52f817be373ca678f7589c76 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 3 May 2026 05:21:49 +0000 Subject: [PATCH 1/4] Initial plan From 0ea13691e481c44666591a9b7854424d067e0b57 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 3 May 2026 05:26:49 +0000 Subject: [PATCH 2/4] Add MsgReader config validation and fatal NATS error handling Agent-Logs-Url: https://github.com/AkondLab/serverish/sessions/a9738867-cb3e-436c-bf7f-c8dc0e76b428 Co-authored-by: majkelx <8569721+majkelx@users.noreply.github.com> --- serverish/base/exceptions.py | 9 +++ serverish/messenger/msg_reader.py | 25 +++++++- tests/test_delivery_policies.py | 99 +++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 1 deletion(-) diff --git a/serverish/base/exceptions.py b/serverish/base/exceptions.py index afe4bce..ced4cb5 100644 --- a/serverish/base/exceptions.py +++ b/serverish/base/exceptions.py @@ -7,6 +7,15 @@ class MessengerNotConnected(Exception): class MessengerReaderStopped(Exception): pass +class MessengerReaderConfigError(ValueError): + """Raised when a MsgReader is misconfigured (e.g. missing required start marker). + + This is a fatal error — the same configuration will never succeed against + the NATS server. The reader stops and this exception propagates out of + ``async for`` loops so callers fail loudly instead of retrying forever. + """ + pass + class MessengerReaderAlreadyOpen(RuntimeError): pass diff --git a/serverish/messenger/msg_reader.py b/serverish/messenger/msg_reader.py index a01436f..c1ac284 100644 --- a/serverish/messenger/msg_reader.py +++ b/serverish/messenger/msg_reader.py @@ -12,12 +12,14 @@ import nats.errors +import nats.js.errors import param from nats.js import JetStreamContext from nats.js.api import DeliverPolicy, ConsumerConfig from serverish.base import wait_for_psce -from serverish.base.exceptions import (MessengerReaderStopped, MessengerNotConnected, +from serverish.base.exceptions import (MessengerReaderStopped, MessengerReaderConfigError, + MessengerNotConnected, MessengerReaderAlreadyOpen, MessengerRequestTimeout) from serverish.base.fifoset import FifoSet from serverish.messenger import Messenger @@ -105,6 +107,17 @@ def __init__(self, subject, parent = None, super().__init__(subject=subject, parent=parent, deliver_policy=deliver_policy, opt_start_time=opt_start_time, consumer_cfg=consumer_cfg_defaults, **kwargs) + # Validate deliver_policy ↔ start-marker consistency up front so + # callers get a clear error at construction time rather than an + # opaque NATS 400 error buried inside the read loop. + if self.deliver_policy == 'by_start_time' and self.opt_start_time is None: + raise ValueError( + "deliver_policy='by_start_time' requires opt_start_time to be set" + ) + if self.deliver_policy == 'by_start_sequence' and self.consumer_cfg.get('opt_start_seq') is None: + raise ValueError( + "deliver_policy='by_start_sequence' requires opt_start_seq to be set in consumer_cfg" + ) log.debug(f"Created {self}") @@ -384,6 +397,16 @@ def logput(self, msg: str) -> None: st.logput(f'{e.task}-err') st.error = e.error self._last_error = e.error # Track for health monitoring + # Fatal NATS API errors (4xx): the same request will never + # succeed — stop retrying immediately and surface a clear + # MessengerReaderConfigError to the caller. + if isinstance(e.error, (nats.js.errors.BadRequestError, + nats.js.errors.NotFoundError)): + log.error(st.fmt(f"fatal NATS error (not retrying): {e.error}")) + self._stop.set() + raise MessengerReaderConfigError( + f"Fatal NATS error on {self.subject}: {e.error}" + ) from e.error match self.error_behavior: case 'RAISE': log.error(st.fmt(f"raising read_next error: {e.error}")) diff --git a/tests/test_delivery_policies.py b/tests/test_delivery_policies.py index 0a661ea..ea5c774 100644 --- a/tests/test_delivery_policies.py +++ b/tests/test_delivery_policies.py @@ -7,6 +7,7 @@ from serverish.messenger import Messenger, get_publisher, get_reader from serverish.messenger.msg_reader import MsgReader +from serverish.base.exceptions import MessengerReaderConfigError @pytest.mark.nats @@ -310,3 +311,101 @@ async def test_consumer_cfg_nonzero_microsecond_is_unchanged(): cfg = await rdr._create_consumer_cfg() assert cfg.opt_start_time == '2026-04-25T16:00:00.123456Z' + + +# --------------------------------------------------------------------------- +# Unit tests for deliver_policy validation (no NATS needed) +# --------------------------------------------------------------------------- + +def test_by_start_time_without_opt_start_time_raises(): + """MsgReader must raise ValueError at construction when deliver_policy='by_start_time' + but opt_start_time is not provided.""" + m = Messenger() + with pytest.raises(ValueError, match="opt_start_time"): + MsgReader('test.unit', parent=m, deliver_policy='by_start_time') + + +def test_by_start_time_with_opt_start_time_succeeds(): + """MsgReader must succeed when deliver_policy='by_start_time' and opt_start_time is set.""" + m = Messenger() + t = datetime.datetime(2026, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + rdr = MsgReader('test.unit', parent=m, deliver_policy='by_start_time', opt_start_time=t) + assert rdr.deliver_policy == 'by_start_time' + + +def test_by_start_sequence_without_opt_start_seq_raises(): + """MsgReader must raise ValueError at construction when deliver_policy='by_start_sequence' + but opt_start_seq is not provided.""" + m = Messenger() + with pytest.raises(ValueError, match="opt_start_seq"): + MsgReader('test.unit', parent=m, deliver_policy='by_start_sequence') + + +def test_by_start_sequence_with_opt_start_seq_succeeds(): + """MsgReader must succeed when deliver_policy='by_start_sequence' and opt_start_seq is set.""" + m = Messenger() + rdr = MsgReader('test.unit', parent=m, deliver_policy='by_start_sequence', + consumer_cfg={'opt_start_seq': 42}) + assert rdr.deliver_policy == 'by_start_sequence' + + +def test_messenger_get_reader_by_start_time_without_time_raises(): + """Messenger.get_reader must raise ValueError when by_start_time is used without opt_start_time.""" + with pytest.raises(ValueError, match="opt_start_time"): + Messenger.get_reader('test.unit', deliver_policy='by_start_time') + + +# --------------------------------------------------------------------------- +# Unit test: MessengerReaderConfigError is exported from serverish.base +# --------------------------------------------------------------------------- + +def test_messenger_reader_config_error_is_exported(): + """MessengerReaderConfigError must be importable from serverish.base.""" + from serverish.base import MessengerReaderConfigError as Err # noqa: F401 + assert issubclass(Err, ValueError) + + +# --------------------------------------------------------------------------- +# Unit test: fatal NATS errors (BadRequestError / NotFoundError) raise +# MessengerReaderConfigError from __anext__ (no live NATS needed) +# --------------------------------------------------------------------------- + +async def test_fatal_nats_bad_request_raises_config_error(): + """When open() raises nats.js.errors.BadRequestError the read loop must + stop and raise MessengerReaderConfigError instead of retrying forever.""" + import nats.js.errors + + m = Messenger() + t = datetime.datetime(2026, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + rdr = MsgReader('test.unit', parent=m, deliver_policy='by_start_time', + opt_start_time=t, error_behavior='WAIT') + + # Patch open() so it raises BadRequestError immediately + async def _bad_open(): + raise nats.js.errors.BadRequestError() + + rdr.open = _bad_open + + with pytest.raises(MessengerReaderConfigError): + async for _ in rdr: + pass # pragma: no cover + + +async def test_fatal_nats_not_found_raises_config_error(): + """When open() raises nats.js.errors.NotFoundError the read loop must + stop and raise MessengerReaderConfigError.""" + import nats.js.errors + + m = Messenger() + rdr = MsgReader('test.unit', parent=m, deliver_policy='all', + error_behavior='WAIT') + + async def _not_found_open(): + raise nats.js.errors.NotFoundError() + + rdr.open = _not_found_open + + with pytest.raises(MessengerReaderConfigError): + async for _ in rdr: + pass # pragma: no cover + From 5f0a6a7c059ed5ea70bc5b7ece4d568517e7ab9d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 3 May 2026 06:05:26 +0000 Subject: [PATCH 3/4] Fix test_messenger_issue5: expect MessengerReaderConfigError for no-stream subject Agent-Logs-Url: https://github.com/AkondLab/serverish/sessions/28471008-e43d-4089-bda4-f971abf2c7e6 Co-authored-by: majkelx <8569721+majkelx@users.noreply.github.com> --- tests/test_messenger_issue5.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/test_messenger_issue5.py b/tests/test_messenger_issue5.py index baec2d2..257a618 100644 --- a/tests/test_messenger_issue5.py +++ b/tests/test_messenger_issue5.py @@ -1,6 +1,7 @@ import nats import pytest +from serverish.base.exceptions import MessengerReaderConfigError from serverish.messenger import Messenger, get_reader @@ -14,7 +15,11 @@ async def test_messenger_issue5_subject_not_in_stream(messenger): reader.error_behavior = "RAISE" try: cfg = await reader.read_next() - except nats.js.errors.NotFoundError: + except MessengerReaderConfigError: + # "No stream found" is a fatal configuration error — it is surfaced as + # MessengerReaderConfigError (wrapping the original NotFoundError) so + # that callers get a clear, actionable exception instead of a raw NATS + # 404 that would otherwise retry forever in WAIT mode. pass else: - assert False, 'Should raise NotFoundError' + assert False, 'Should raise MessengerReaderConfigError' From bd6deb726b3e61ebdba1afb31d9e31f2a495e1f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ka=C5=82uszy=C5=84ski?= Date: Sun, 3 May 2026 02:15:54 -0400 Subject: [PATCH 4/4] Bump version from 2.0.4 to 2.0.5 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1fdf5b4..7670b0b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "serverish" -version = "2.0.4" +version = "2.0.5" description = "helpers for server alike projects" authors = ["Mikołaj Kałuszyński", "MMME team"] readme = "README.md"