Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
11 changes: 6 additions & 5 deletions .bumpversion.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# https://peps.python.org/pep-0440/

[tool.bumpversion]
current_version = "0.4.4"
current_version = "1.0.0.dev16"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.
Expand All @@ -14,17 +14,18 @@
)? # Release section is optional (for final releases)
"""

# Single serialize pattern - dot is encoded in release_type values
# Most complete format MUST be first — bump-my-version derives the
# list of bumpable parts from the first serialization pattern.
serialize = [
"{major}.{minor}.{patch}",
"{major}.{minor}.{patch}{release_type}{release_num}",
"{major}.{minor}.{patch}",
]

# Configuration for version parts
[tool.bumpversion.parts.release_type]
# Release progression: .dev -> a -> b -> rc -> final -> .post
# ORDER MATTERS: PEP 440 release progression — do not sort alphabetically
optional_value = "final" # When "final", the release_type is omitted
values = [ ".dev", ".post", "a", "b", "final", "rc" ]
values = [ ".dev", "a", "b", "rc", "final", ".post" ]

# Files to update when bumping version
[[tool.bumpversion.files]]
Expand Down
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ LICENSE
.bumpversion.toml
.report.json
examples
!examples/bench_module
scripts
certs
*.md
7 changes: 2 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.12"]
test-name: ["Unit Tests", "Smoke Tests", "gRPC Tests", "Validation Tests", "Edge Case Tests", "Regression Tests", "Integration Tests", "Taskiq Tests"]
test-name: ["Unit Tests", "Smoke Tests", "gRPC Tests", "Validation Tests", "Edge Case Tests", "Regression Tests", "Integration Tests"]
include:
- test-name: "Unit Tests"
test-marker: "not integration and not grpc and not smoke and not validation and not edge_case and not regression and not taskiq"
test-marker: "not integration and not grpc and not smoke and not validation and not edge_case and not regression"
test-description: "Basic unit tests without markers"
- test-name: "Smoke Tests"
test-marker: "smoke"
Expand All @@ -117,9 +117,6 @@ jobs:
- test-name: "Integration Tests"
test-marker: "integration"
test-description: "Tests requiring external service connections"
- test-name: "Taskiq Tests"
test-marker: "taskiq"
test-description: "Taskiq distributed execution and pickle/unpickle behavior"

env:
PYTHON_VERSION: ${{ matrix.python-version }}
Expand Down
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ repos:
- id: check-toml

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.15.11
rev: v0.15.16
hooks:
- id: ruff-check
args: [--fix]
- id: ruff-format

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.20.2
rev: v2.1.0
hooks:
- id: mypy
additional_dependencies: [types-protobuf]
exclude: "^(tests/|examples/)"
exclude: "^(tests/|examples/|scripts/)"

# Add pytest as a local hook
- repo: local
Expand Down
17 changes: 3 additions & 14 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,6 @@ uv run mkdocs build
uv run mike deploy --push --update-aliases 0.3 latest
```

### Taskiq (Distributed Job Execution)
```bash
# Enable RabbitMQ stream capability (required for Taskiq)
sudo rabbitmq-plugins enable rabbitmq_stream

# Start Taskiq worker
task start-taskiq
```

## Architecture Overview

### Core Components
Expand All @@ -115,7 +106,6 @@ task start-taskiq
**Job Management** (`src/digitalkin/core/job_manager/`)
- `BaseJobManager`: Abstract base extending TaskManager
- `SingleJobManager`: In-memory execution for single-server deployments
- `TaskiqJobManager`: Distributed execution using Taskiq + RabbitMQ for horizontal scaling
- Jobs stream output via asyncio.Queue and callbacks

**Task Management** (`src/digitalkin/core/task_manager/`)
Expand Down Expand Up @@ -219,7 +209,7 @@ Keep docstrings lean and professional. No flowery language, no numbered steps, n
- **No ClassVar for single-use**: Don't create class attributes for values used only once

### IDs
IDs flow through the entire system: `job_id`, `mission_id`, `setup_id`, `setup_version_id`. Always propagate these correctly.
Propagate `task_id`, `setup_id`, and `mission_id` through the system whenever they are available.

### Pydantic Models
All data models use Pydantic for validation and serialization. JSON schemas are generated for module introspection.
Expand All @@ -231,7 +221,7 @@ Most operations are async/await. Use `async def` for handlers and module methods
Comprehensive type hints are used throughout. Always add type annotations to new code.

### Structured Logging
The `extra` parameter is **only for global context IDs** that help correlate logs across the system (e.g., `job_id`, `mission_id`, `setup_id`, `setup_version_id`, `task_id`). These IDs are typically available via `self.session_ids` or `context.session.current_ids()`.
The `extra` parameter is **only for global context IDs** that help correlate logs across the system (e.g., `task_id`, `setup_id`, `mission_id`). These IDs are typically available via `self.session_ids` or `context.session.current_ids()`.

**Local-scope variables go in the log message, not in `extra`:**
```python
Expand Down Expand Up @@ -275,10 +265,9 @@ Use `pytest.mark.asyncio` for async tests. The `asyncio_mode = "auto"` setting i

## Integration Points

- **RabbitMQ** (via Taskiq): Distributed job execution, message streaming
- **Redis**: Durable message passing via Redis Streams, session state, signal pub/sub
- **gRPC**: All inter-service communication
- **Protobuf**: Message definitions from `digitalkin-proto` package
- **Taskiq**: Optional distributed task execution (install with `pip install digitalkin[taskiq]`)

## Examples

Expand Down
34 changes: 10 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ communicate over gRPC, register with a service mesh, and scale independently.
- **Profiling** — optional `[profiling]` extra with asyncio-inspector,
pyinstrument, viztracer, and yappi
- **Batched history writes** — efficient storage writes for conversation history
- **TaskIQ integration** — optional distributed task execution backed by
RabbitMQ and Redis (`[taskiq]` extra)
- **Redis Streams** — durable message passing, crash recovery, and reconnection

## Installation

Expand All @@ -42,11 +41,11 @@ pip install digitalkin
**Optional extras:**

```bash
# Distributed task execution (RabbitMQ + Redis)
uv add "digitalkin[taskiq]"

# Async profiling tools
uv add "digitalkin[profiling]"

# uvloop for faster event loop
uv add "digitalkin[performance]"
```

## Quick Start
Expand Down Expand Up @@ -124,26 +123,15 @@ async def main() -> None:
asyncio.run(main())
```

## TaskIQ with RabbitMQ

TaskIQ integration allows the module to scale for heavy CPU tasks by
distributing requests to stateless worker instances.
## Redis Gateway

- **Decoupled Scalability**: RabbitMQ brokers messages, letting producers and
consumers scale independently.
- **Reliability**: Durable queues, acknowledgements, and dead-lettering ensure
tasks aren't lost.
- **Concurrency Control**: TaskIQ's worker pool manages parallel execution
without custom schedulers.
- **Flexibility**: Built-in retries, exponential backoff, and Redis
result-backend for resilient workflows.
The embedded gateway enables real-time bidirectional communication between
modules via Redis Streams, with crash recovery and horizontal scaling.

To enable RabbitMQ streaming:
- **Durable Streaming**: Output persisted to Redis Streams — reconnection via `from_seq`.
- **Zero-Copy Proto**: Binary proto serialization to Redis — no JSON intermediary.
- **Horizontal Scaling**: Each module instance embeds its own gateway. Scale by adding replicas behind a load balancer.

```bash
sudo rabbitmq-plugins enable rabbitmq_stream
task start-taskiq
```

## Development

Expand Down Expand Up @@ -176,8 +164,6 @@ task docs-serve # Serve docs locally (mkdocs)
task docs-build # Build docs

task generate-certificates # Generate mTLS certs for gRPC
task start-taskiq # Start TaskIQ worker

task clean # Remove build artifacts + __pycache__
task clean-all # Above + remove .venv
```
Expand Down
59 changes: 30 additions & 29 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,37 +1,39 @@
services:
tests-rabbitmq:
container_name: digitalkin-tests-rabbitmq
profiles: ["taskiq"]
build:
context: ${RABBITMQ_CONTEXT:-.}
dockerfile: ${RABBITMQ_DOCKERFILE:-dockerfiles/Dockerfile_rabbitmq}
args:
RABBITMQ_URL: ${RABBITMQ_URL:-digitalkin-tests-rabbitmq}
RABBITMQ_CERTIFICATE_VOLUME: ${RABBITMQ_CERTIFICATE_VOLUME:-/certificates/digitalkin-tests-rabbitmq}
RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER:-guest}
RABBITMQ_DEFAULT_PASSWORD: ${RABBITMQ_DEFAULT_PASSWORD:-guest}
RABBITMQ_RSTREAM_ADVERTISED_HOST: ${RABBITMQ_RSTREAM_ADVERTISED_HOST:-localhost}
RABBITMQ_RSTREAM_ADVERTISED_PORT: ${RABBITMQ_RSTREAM_ADVERTISED_PORT:-5553}
RABBITMQ_BROKER_PORT: ${RABBITMQ_BROKER_PORT:-5553}
RABBITMQ_MANAGEMENT_PORT: ${RABBITMQ_MANAGEMENT_PORT:-16573}
RABBITMQ_RSTREAM_PORT: ${RABBITMQ_RSTREAM_PORT:-5673}
tests-redis:
container_name: digitalkin-tests-redis
profiles: ["redis"]
image: redis:7-alpine
ports:
- ${RABBITMQ_BROKER_PORT:-5673}:${RABBITMQ_BROKER_PORT:-5673}
- ${RABBITMQ_MANAGEMENT_PORT:-15673}:${RABBITMQ_MANAGEMENT_PORT:-15673}
- ${RABBITMQ_RSTREAM_PORT:-5553}:${RABBITMQ_RSTREAM_PORT:-5553}
- "${REDIS_PORT:-6399}:6379"
networks:
- services-network
volumes:
- rabbitmq-data:/var/lib/rabbitmq
environment:
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER:-guest}
- RABBITMQ_DEFAULT_PASSWORD=${RABBITMQ_DEFAULT_PASSWORD:-guest}
command: >
redis-server
--maxmemory 256mb
--maxmemory-policy allkeys-lru
--save ""
--appendonly no
--protected-mode no
--loglevel warning
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 10s
timeout: 5s
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5

tests-toxiproxy:
container_name: digitalkin-tests-toxiproxy
profiles: ["chaos"]
image: ghcr.io/shopify/toxiproxy:2.9.0
ports:
- "8474:8474"
- "26379:26379"
networks:
- services-network
depends_on:
tests-redis:
condition: service_healthy

tests:
container_name: digitalkin-tests
build:
Expand All @@ -54,5 +56,4 @@ networks:
services-network:
driver: bridge

volumes:
rabbitmq-data:
volumes: {}
2 changes: 1 addition & 1 deletion docker/Dockerfile.test
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ COPY src/ /app/src/
COPY tests/ /app/tests/

# install project (editable) + extras; this layer installs heavy deps one time
RUN uv pip install --system -e ".[taskiq]" && \
RUN uv pip install --system -e "." && \
uv pip install --system --group dev --group tests

# copy entrypoint script
Expand Down
Loading
Loading