Skip to content

feat!: Redis-first task transport and minimal gateway surface#209

Open
xaviave wants to merge 1 commit into
devfrom
feat/mvp-redis
Open

feat!: Redis-first task transport and minimal gateway surface#209
xaviave wants to merge 1 commit into
devfrom
feat/mvp-redis

Conversation

@xaviave

@xaviave xaviave commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Replace the gRPC-loopback / Taskiq task path with a Redis Streams + pub/sub architecture. Gateway<->module communication now flows entirely through Redis: modules write output directly to Redis and the gateway exposes only a small external consumer surface.

Task transport (Redis-first)

  • Add core/task_manager/redis/ package: redis_client, redis_streams, redis_signal, redis_state, redis_idempotency, redis_checkpoint, proto_streams, plus instrumented/shadow profiling wrappers
  • Add core/task_manager/module_runner.py as the single module runner (replaces the SingleJobManager / job-manager path)
  • Add services/task_manager/redis_task_manager.py: signals over Redis pub/sub
  • Add models/core/redis.py and models/settings/redis.py
  • Remove gRPC loopback transport (services/task_manager/grpc_task_manager.py) and the Taskiq broker/job-manager (core/job_manager/taskiq_*.py)

Gateway

  • Add grpc_servers/gateway_servicer.py: 3-RPC surface (StartStream, Stream, SendSignal), flat messages, in-band sentinel lifecycle (stream.start/end/error/warn) with seq-based resume
  • Add stream_registry.py and stream_session.py for session tracking
  • Add m2m_call_registry.py and models/grpc_servers/m2m.py for module-to-module calls
  • Add models/grpc_servers/stream_error_codes.py

Resilience

  • Add core/resilience/bulkhead.py and task_supervisor.py
  • Add circuit breaker: grpc_servers/interceptors/circuit_breaker_interceptor.py, grpc_servers/utils/circuit_breaker.py, models/grpc_servers/circuit_breaker.py
  • Add grpc_servers/utils/validators.py

Settings

  • Migrate configuration to pydantic-settings under models/settings/ (consumer, gateway, grpc_client, log, module, profiling, queue, redis, resilience, task_manager, server/servicer), each with a scoped DIGITALKIN__ env_prefix and an @lru_cache get_*_settings() factory

Exceptions

  • Add per-service exceptions.py (communication, cost, filesystem, setup, storage, task_manager, user_profile), core/exceptions.py, utils/exceptions.py, top-level exceptions.py
  • Rename grpc_servers/utils/exceptions.py -> grpc_servers/exceptions.py

Profiling & schema

  • Add core/profiling/step_timer.py and models/settings/profiling.py
  • Add models/utils/dynamic_schema.py for dynamic setup-field resolution

AG-UI / agno

  • Refine AG-UI event/module models and the agno adapter (add community/agno/models.py)

Removals (dead layers)

  • Delete services/agent/ and services/snapshot/
  • Delete mixins/callback_mixin.py and mixins/chat_history_mixin.py
  • Delete core/profiling/asyncio_monitor.py

Tests

  • Add suites: tests/gateway, tests/core/redis, tests/integration/redis, tests/chaos, tests/canary, tests/stability, tests/advanced, tests/observability, tests/benchmarks
  • Remove obsolete Taskiq / gRPC-task-manager / shared-poller tests

BREAKING CHANGE: gRPC loopback and Taskiq transports are removed; Redis is now a required dependency for task dispatch and signals. The agent and snapshot services, the callback and chat-history mixins, and the grpc_servers/utils/exceptions module are removed/relocated.

Replace the gRPC-loopback / Taskiq task path with a Redis Streams + pub/sub
architecture. Gateway<->module communication now flows entirely through Redis:
modules write output directly to Redis and the gateway exposes only a small
external consumer surface.

Task transport (Redis-first)
- Add core/task_manager/redis/ package: redis_client, redis_streams,
  redis_signal, redis_state, redis_idempotency, redis_checkpoint,
  proto_streams, plus instrumented/shadow profiling wrappers
- Add core/task_manager/module_runner.py as the single module runner
  (replaces the SingleJobManager / job-manager path)
- Add services/task_manager/redis_task_manager.py: signals over Redis pub/sub
- Add models/core/redis.py and models/settings/redis.py
- Remove gRPC loopback transport (services/task_manager/grpc_task_manager.py)
  and the Taskiq broker/job-manager (core/job_manager/taskiq_*.py)

Gateway
- Add grpc_servers/gateway_servicer.py: 3-RPC surface (StartStream, Stream,
  SendSignal), flat messages, in-band sentinel lifecycle
  (stream.start/end/error/warn) with seq-based resume
- Add stream_registry.py and stream_session.py for session tracking
- Add m2m_call_registry.py and models/grpc_servers/m2m.py for
  module-to-module calls
- Add models/grpc_servers/stream_error_codes.py

Resilience
- Add core/resilience/bulkhead.py and task_supervisor.py
- Add circuit breaker: grpc_servers/interceptors/circuit_breaker_interceptor.py,
  grpc_servers/utils/circuit_breaker.py, models/grpc_servers/circuit_breaker.py
- Add grpc_servers/utils/validators.py

Settings
- Migrate configuration to pydantic-settings under models/settings/ (consumer,
  gateway, grpc_client, log, module, profiling, queue, redis, resilience,
  task_manager, server/servicer), each with a scoped DIGITALKIN_<SCOPE>_
  env_prefix and an @lru_cache get_*_settings() factory

Exceptions
- Add per-service exceptions.py (communication, cost, filesystem, setup,
  storage, task_manager, user_profile), core/exceptions.py, utils/exceptions.py,
  top-level exceptions.py
- Rename grpc_servers/utils/exceptions.py -> grpc_servers/exceptions.py

Profiling & schema
- Add core/profiling/step_timer.py and models/settings/profiling.py
- Add models/utils/dynamic_schema.py for dynamic setup-field resolution

AG-UI / agno
- Refine AG-UI event/module models and the agno adapter (add community/agno/models.py)

Removals (dead layers)
- Delete services/agent/ and services/snapshot/
- Delete mixins/callback_mixin.py and mixins/chat_history_mixin.py
- Delete core/profiling/asyncio_monitor.py

Tests
- Add suites: tests/gateway, tests/core/redis, tests/integration/redis,
  tests/chaos, tests/canary, tests/stability, tests/advanced,
  tests/observability, tests/benchmarks
- Remove obsolete Taskiq / gRPC-task-manager / shared-poller tests

BREAKING CHANGE: gRPC loopback and Taskiq transports are removed; Redis is now
a required dependency for task dispatch and signals. The agent and snapshot
services, the callback and chat-history mixins, and the
grpc_servers/utils/exceptions module are removed/relocated.
@xaviave xaviave self-assigned this Jun 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant