Summary
Two related shortcomings in serverish.messenger.MsgReader, surfaced while debugging an unrelated event-loop-starvation bug in oca_monitor:
- No upfront validation of
deliver_policy ↔ start-marker consistency. MsgReader accepts mutually-required-fields-missing combinations silently at construction. NATS rejects the consumer create on the first read attempt, but by then the reader is already wired into a caller's async for loop.
- Retry loop never gives up on configuration errors. The recovery path in
_reopen_pull_subscribtion rebuilds the same malformed ConsumerConfig and fires it at the server, which keeps replying 400. Backoff grows without bound; the reader stays silent forever and the iterator never raises.
Repro
Against any reachable NATS with JetStream:
import asyncio
from serverish.messenger import Messenger
async def go():
msg = Messenger()
await msg.open(host="nats.example", port=4222)
rdr = msg.get_reader("telemetry.weather.davis", deliver_policy="by_start_time")
# ^^^^^^^^^^^^^^^
# opt_start_time intentionally omitted — should be a hard error
async for data, meta in rdr:
print(data); break
asyncio.run(go())
Expected: a ValueError (or similar) at construction time saying opt_start_time is required when deliver_policy='by_start_time'.
Actual: construction succeeds, then this line repeats with growing backoff forever:
[WARNING] msg_reader: (N)[PULL]MsgReader_xxx [...] read_next error,
(retry in N.Ns): nats: BadRequestError: code=400 err_code=10094
description='consumer delivery policy is deliver by start time,
but optional start time is not set'
elapsed: NN.Ns hist: ensure_open-err
The same shape applies to deliver_policy='by_start_sequence' without opt_start_seq (NATS replies with err_code=10095).
Root cause
In messenger/msg_reader.py:
__init__ (≈line 75) trusts the param descriptors and never cross-validates deliver_policy against opt_start_time / consumer_cfg.
_create_consumer_cfg (≈line 390) builds the config from self.consumer_cfg.copy() and only copies opt_start_time when set — silently producing a malformed config.
_reopen_pull_subscribtion (≈line 330) calls _create_consumer_cfg again on every retry. There is no fatal-error exit.
Suggested fix
(a) Validate at construction
In MsgReader.__init__, after the param coercion, raise ValueError if any of:
deliver_policy == 'by_start_time' and opt_start_time is None
deliver_policy == 'by_start_sequence' and consumer_cfg.get('opt_start_seq') is None
Surface message: e.g. "deliver_policy='by_start_time' requires opt_start_time".
(b) Classify NATS API errors as fatal vs transient
nats-py already exposes the classification we need — its nats.js.errors hierarchy maps NATS' HTTP-style status codes:
APIError # base
├── BadRequestError (400) # config error → fatal
├── NotFoundError (404) # stream/consumer missing → fatal
├── ServerError (500) # server bug → transient
└── ServiceUnavailableError (503) # overload/disconnect → transient
Plus nats.errors.TimeoutError and ConnectionClosedError for transport-level transients.
Proposed rule, simple to implement and aligned with NATS docs:
- Fatal (stop retrying, mark reader stopped, raise from
__anext__): BadRequestError, NotFoundError. These mean the request itself is wrong — the same payload will never succeed.
- Transient (keep current backoff retry):
ServerError, ServiceUnavailableError, TimeoutError, ConnectionClosedError, generic APIError whose code is 5xx.
The err_code field (e.g. 10094, 10095) gives even finer granularity if needed in the future, but the 4xx/5xx split is correct and matches the NATS JetStream error reference.
For surfacing the fatal case to callers, a new exception type — e.g. MessengerReaderConfigError(MessengerError) — raised from __anext__ would let async for consumers fail loudly rather than silently iterate forever.
Why it matters
Today an oca-side mistake (forgetting opt_start_time) is invisible to ops — the reader just stays silent. Worse, the same fatal-vs-transient gap means a transient configuration problem (e.g. someone briefly deletes a stream and recreates it under a new name) can leave the reader stuck in a no-progress retry storm even after the underlying issue clears, because each retry resends an outdated config.
(Surfaced while chasing a separate oca_monitor issue where ~50 readers all timed out concurrently during a slow synchronous startup tick. The freeze itself recovers cleanly when the params are correct — but if any reader's params were bad the retry loop just hides it.)
I'm happy to follow up with a PR for (a) and (b) once we agree on the exception hierarchy / public name. Let me know if you'd prefer a single combined PR or two.
Summary
Two related shortcomings in
serverish.messenger.MsgReader, surfaced while debugging an unrelated event-loop-starvation bug inoca_monitor:deliver_policy↔ start-marker consistency.MsgReaderaccepts mutually-required-fields-missing combinations silently at construction. NATS rejects the consumer create on the first read attempt, but by then the reader is already wired into a caller'sasync forloop._reopen_pull_subscribtionrebuilds the same malformedConsumerConfigand fires it at the server, which keeps replying 400. Backoff grows without bound; the reader stays silent forever and the iterator never raises.Repro
Against any reachable NATS with JetStream:
Expected: a
ValueError(or similar) at construction time sayingopt_start_timeis required whendeliver_policy='by_start_time'.Actual: construction succeeds, then this line repeats with growing backoff forever:
The same shape applies to
deliver_policy='by_start_sequence'withoutopt_start_seq(NATS replies witherr_code=10095).Root cause
In
messenger/msg_reader.py:__init__(≈line 75) trusts the param descriptors and never cross-validatesdeliver_policyagainstopt_start_time/consumer_cfg._create_consumer_cfg(≈line 390) builds the config fromself.consumer_cfg.copy()and only copiesopt_start_timewhen set — silently producing a malformed config._reopen_pull_subscribtion(≈line 330) calls_create_consumer_cfgagain on every retry. There is no fatal-error exit.Suggested fix
(a) Validate at construction
In
MsgReader.__init__, after the param coercion, raiseValueErrorif any of:deliver_policy == 'by_start_time'andopt_start_time is Nonedeliver_policy == 'by_start_sequence'andconsumer_cfg.get('opt_start_seq') is NoneSurface message: e.g.
"deliver_policy='by_start_time' requires opt_start_time".(b) Classify NATS API errors as fatal vs transient
nats-pyalready exposes the classification we need — itsnats.js.errorshierarchy maps NATS' HTTP-style status codes:Plus
nats.errors.TimeoutErrorandConnectionClosedErrorfor transport-level transients.Proposed rule, simple to implement and aligned with NATS docs:
__anext__):BadRequestError,NotFoundError. These mean the request itself is wrong — the same payload will never succeed.ServerError,ServiceUnavailableError,TimeoutError,ConnectionClosedError, genericAPIErrorwhosecodeis5xx.The
err_codefield (e.g.10094,10095) gives even finer granularity if needed in the future, but the 4xx/5xx split is correct and matches the NATS JetStream error reference.For surfacing the fatal case to callers, a new exception type — e.g.
MessengerReaderConfigError(MessengerError)— raised from__anext__would letasync forconsumers fail loudly rather than silently iterate forever.Why it matters
Today an oca-side mistake (forgetting
opt_start_time) is invisible to ops — the reader just stays silent. Worse, the same fatal-vs-transient gap means a transient configuration problem (e.g. someone briefly deletes a stream and recreates it under a new name) can leave the reader stuck in a no-progress retry storm even after the underlying issue clears, because each retry resends an outdated config.(Surfaced while chasing a separate
oca_monitorissue where ~50 readers all timed out concurrently during a slow synchronous startup tick. The freeze itself recovers cleanly when the params are correct — but if any reader's params were bad the retry loop just hides it.)I'm happy to follow up with a PR for (a) and (b) once we agree on the exception hierarchy / public name. Let me know if you'd prefer a single combined PR or two.