A real-time RAG pipeline for incident response. Streams operational events (GitHub commits, Jira tickets, system logs) through Kafka into a pgvector knowledge base, then answers on-call queries using hybrid retrieval and LLM generation.
Standard RAG pipelines batch-process documents on a schedule. This means during a live outage, the commits that caused the incident haven't been indexed yet. OpsStream-RAG eliminates that gap — events are vectorized and queryable within seconds of occurrence.
Six services, fully containerized. One docker compose up --build -d to run the entire stack.
GitHub / Jira Webhooks On-Call Engineer
│ │
▼ ▼
┌────────────────┐ ┌────────────────┐
│ API Gateway │ │ API Gateway │
│ POST /webhooks │ │ POST /ask │
│ (Pydantic) │ │ (RAG generate) │
└───────┬────────┘ └───────┬────────┘
│ │
▼ │
┌────────────────┐ │
│ Apache Kafka │ │
│ (3 partitions)│ │
└───────┬────────┘ │
│ │
┌──────┼──────┐ │
▼ ▼ ▼ │
Consumer Consumer Consumer │
(batch → embed → upsert) │
│ │ │ │
└──────┼──────┘ │
▼ │
┌────────────────┐ │
│ Embedding API │ │
│ Jina v2 local │ │
│ (768-dim) │ │
└───────┬────────┘ │
▼ │
┌────────────────┐ ┌────────┴───────┐
│ PostgreSQL 16 │◄─────────────────────┤ Retrieval API │
│ pgvector HNSW │ │ hybrid search │
│ + GIN FTS │ │ + time decay │
└────────────────┘ └────────────────┘
| Service | Container | Port | What it does |
|---|---|---|---|
| API Gateway | opsstream_rag_api |
8897 | Receives webhooks, produces to Kafka. Orchestrates RAG answer generation. |
| Kafka | opsstream_broker |
9092 | KRaft-mode broker. 3-partition topic for parallel consumption. |
| Consumer | ×3 replicas | — | Polls Kafka, batches events, calls embedding API, upserts to Postgres. |
| Embedding API | opsstream_embedder |
8000 | Runs jina-embeddings-v2-base-code locally via PyTorch. Model baked into Docker image. |
| Retrieval API | opsstream_retrieval |
8085 | Hybrid vector + keyword search with temporal decay reranking. |
| PostgreSQL | opsstream_db |
5432 | pgvector with HNSW index for vectors + GIN index for full-text search. |
- Webhook hits
POST /webhooks/githuborPOST /webhooks/jira - Pydantic validates the payload — malformed requests are rejected with HTTP 422, never touching Kafka
- A deterministic UUID v5 is generated from the event ID (same input → same UUID every time)
- Event is produced to Kafka topic
system-eventswith key-based partitioning - API returns immediately — the webhook response takes <5ms
On the consumer side:
- 3 consumer replicas poll from Kafka with
enable.auto.commit: False - Events are buffered until 10 accumulate or 2 seconds pass
- The batch is sent to the local embedding API (Jina v2, 768-dim vectors)
- Vectors are upserted to PostgreSQL with
ON CONFLICT (id) DO NOTHING - Kafka offset is committed only after the database transaction succeeds
If the consumer crashes between step 8 and 10, the offset stays uncommitted and Kafka replays the batch on restart. The deterministic UUIDs + conflict guard ensure no duplicates enter the database.
The retrieval API runs two parallel index-backed queries against PostgreSQL, then merges the results:
Semantic search — HNSW index on the embedding column, ordered by cosine distance. Captures meaning-level similarity ("database crashed" matches "PostgreSQL connection refused").
Keyword search — GIN index on to_tsvector('english', content), ranked by ts_rank_cd. Catches exact matches that vector search misses: ticket IDs like Refs #33173, test names like test_ForeignKey_using_to_field, version numbers.
The two result sets are merged and scored:
hybrid_score = 0.7 × semantic_score + 0.3 × keyword_score
A temporal decay factor is then applied to downrank old events:
final_score = hybrid_score × exp(−0.05 × age_in_days)
This gives a half-life of roughly 14 days — recent events surface naturally without hard date cutoffs that would discard potentially useful older context. The decay coefficient lives in the application layer, so tuning it doesn't require schema changes or index rebuilds.
Before executing the search, the query itself is refined by an LLM call that extracts error codes, service names, and key terms from natural language input. This is a best-effort step — if it fails (rate limit, timeout), the raw query is used as fallback.
| What breaks | What happens |
|---|---|
| PostgreSQL goes down | Consumers retry with exponential backoff (5s → 10s → 20s). Kafka offsets remain uncommitted, so events are held on disk and replayed when DB returns. |
| Embedding API crashes | Consumer logs the error and crashes deliberately. Docker's restart: unless-stopped relaunches it. Uncommitted offsets trigger replay. |
| Bad message shape | If the embedding API returns 400/422 (fundamentally bad data), the consumer routes the batch to a Dead Letter Queue topic and commits the offset to avoid blocking the queue. |
| Consumer replica dies | Kafka detects heartbeat loss, triggers partition rebalance, and reassigns the orphaned partition to a surviving replica. |
| LLM rate limit on query | The API retries with exponential backoff (up to 5 attempts). The retrieval API's query refinement step degrades gracefully to the raw query. |
Requirements: Docker Desktop (Compose V2+), Python 3.11+, an OpenRouter API key (free tier works).
git clone https://github.com/YOUR_USERNAME/OpsStream-RAG.git
cd OpsStream-RAGCreate .env:
OPENROUTER_API_KEY=sk-or-v1-...
LLM_MODEL=google/gemma-4-31b-it:free
GITHUB_TOKEN=ghp_... # optional, raises GitHub API rate limitStart the stack:
docker compose up --build -dFirst build takes a few minutes (downloads PyTorch, bakes the Jina model into the image). After that, starts are instant.
Ingest 1,000 commits from the Django repository:
python -m venv venv
venv\Scripts\activate
pip install requests python-dotenv
python scripts/ingest_historical_github.pyVerify:
docker exec opsstream_db psql -U admin -d incident_logs -c "SELECT count(*) FROM incident_logs;"curl -X POST http://localhost:8897/ask \
-H "Content-Type: application/json" \
-d '{"question": "Who resolved the parallel test database destruction issue on Windows?"}'pip install streamlit psycopg[binary] pandas
streamlit run scripts/app.pyThree tabs:
- Copilot Chat — ask questions, get answers grounded in retrieved logs
- Vector Search Explorer — inspect raw retrieval scores (semantic, keyword, time decay, final)
- Timeline Analytics — ingestion rate over time
| Platform | URL | Content Type |
|---|---|---|
| GitHub | http://YOUR_HOST:8897/webhooks/github |
application/json |
| Jira | http://YOUR_HOST:8897/webhooks/jira |
application/json |
Built-in evaluation harness using LLM-as-a-judge against a golden dataset (data/eval_set.json):
python scripts/evaluate.pyMeasures four metrics per question:
| Metric | Definition |
|---|---|
| Context Precision | Proportion of retrieved logs that are actually relevant |
| Context Recall | Whether the retrieved logs contain all information needed to answer |
| Faithfulness | Whether the generated answer is derived from context (not hallucinated) |
| Answer Relevance | Whether the answer directly addresses the question |
OpsStream-RAG/
├── api/ # Webhook ingestion + RAG orchestration
│ ├── main.py # Pydantic models, Kafka producer, /ask endpoint
│ └── Dockerfile
├── consumer/ # Kafka consumer workers (3 replicas)
│ ├── main.py # Batching, embedding, upsert, DLQ routing
│ └── Dockerfile
├── embedding_api/ # Local vector inference
│ ├── main.py # Jina v2 via sentence-transformers
│ └── Dockerfile # Model downloaded at build time
├── retrieval_api/ # Hybrid search engine
│ ├── main.py # HNSW + GIN queries, time decay, query refinement
│ └── Dockerfile
├── scripts/
│ ├── app.py # Streamlit dashboard
│ ├── evaluate.py # RAG evaluation harness
│ └── ingest_historical_github.py
├── data/
│ └── eval_set.json # Golden evaluation dataset
├── docker-compose.yaml # 6 services, 8 containers
└── .env # API keys (gitignored)
MIT
