Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er

Controllers receive `consumer.Delivery` (subset interface without Ack/Nack) to enforce separation of business logic from infrastructure.

**Queue payloads: IDs within a boundary, full payloads across one.** When producer and consumer share a store (same service — e.g. `build`→`buildsignal`, `validate`→`mergeconflict`), put only the entity **ID** on the queue and reload from storage (the store is the source of truth, messages stay small, redelivery is idempotent). When a queue **crosses a service boundary** (the consumer cannot read the producer's store — e.g. orchestrator→runway), publish the **full payload** the consumer needs, and have the **client own the correlation ID** so it can record the in-flight work before publishing and match the async result on return. The queue's **owner defines the wire contract and topic keys** (in its own domain package); the other side imports them.

### Entities

Domain objects in `entity/`, organized by domain. Guidelines:
Expand Down
9 changes: 5 additions & 4 deletions doc/rfc/submitqueue/extension-contract.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Design notes for what SubmitQueue's pluggable extensions accept: orchestrator **

## Problem

Extension input granularity is inconsistent across the pipeline stages (see [workflow.md](workflow.md)). `conflict.Analyzer` takes identity (`entity.Batch`); `scorer`, `mergechecker`, `changeprovider`, `buildrunner`, `pusher` take controller-resolved `entity.Change`. The split caps what an extension can do:
Extension input granularity is inconsistent across the pipeline stages (see [workflow.md](workflow.md)). `conflict.Analyzer` takes identity (`entity.Batch`); `scorer`, `changeprovider`, `buildrunner`, `pusher` take controller-resolved `entity.Change`. The split caps what an extension can do:

- `ConflictType` already names `target_overlap`, but a real target-overlap analyzer **cannot be written** — the batch controller hands it identity-level batches (no changed targets) and the contract has nowhere to put them.
- `scorer` gets a URIs-only `Change`, so a heuristic scorer **cannot see** lines-changed / file-count.
Expand All @@ -21,7 +21,7 @@ Both unblock with the shape `conflict` already uses: accept identity, resolve in

| Stage | Loads | Resolves for the extension | Hands to the extension |
|---|---|---|---|
| `validate` | `entity.Request` | nothing — `request.Change` is already in hand (the change-store reads here serve duplicate detection) | `request.Change` → `mergechecker`, `changeprovider` |
| `validate` | `entity.Request` | nothing — `request.Change` is already in hand (the change-store reads here serve duplicate detection) | `request.Change` → `changeprovider` |
| `batch` | `entity.Request` + active `[]entity.Batch` | **nothing** — builds a batch whose `Contains` is `[requestID]` | `entity.Batch`, `[]entity.Batch` → `conflict` |
| `score` | `entity.Batch`, then each `entity.Request` | batch → requests | `request.Change` per request, then multiplies the scores → `scorer` |
| `build` | `entity.Batch`, then `collectChanges` | batch → requests → changes, **flattening batch boundaries** | base `[]Change`, head `[]Change` → `buildrunner` |
Expand All @@ -35,13 +35,14 @@ Two facts this grounds: `conflict` already resolves nothing (the baseline), and
|---|---|---|---|---|---|
| `conflict.Analyzer` | batch | identity (`Batch`, `[]Batch`) | unchanged — **the baseline** | conflicting in-flight batches (`[]Conflict`, `BatchID`-tagged) — unchanged | request store + change provider |
| `scorer.Scorer` | score | flat `Change`, per request | `entity.Batch` — resolve + reduce internally | one batch score (`float64`) — unchanged | request store + change provider |
| `mergechecker.MergeChecker` | validate | `Change` | `entity.Request` | mergeability (`Result`) — unchanged | none |
| `changeprovider.ChangeProvider` | validate | `Change` | `entity.Request` | per-URI change info (`[]ChangeInfo`, `URI`-tagged) — unchanged | none — it *is* the resolver |
| `buildrunner.BuildRunner` | build | base/head `[]Change` | base `[]entity.Batch` + head `entity.Batch` | build id, then status/cancel (`BuildID`, `BuildStatus`) — unchanged | request store + change provider |
| `pusher.Pusher` | merge | `[]Change` | ordered `[]entity.Batch` | **per-batch** outcomes (`Result` grouped by `BatchID`) — **changed** | request store + change provider |
| `storage`, `changestore`, `queueconfig` | — | keys + entities | unchanged — resolution targets | entities | — |

**Outputs are unchanged except `pusher`.** This RFC moves the *input* toward identity; five of the six return contracts — conflicts, score, mergeability, change info, build id/status — are exactly what they are today. `pusher` is the lone exception: because its input becomes a *list* of independently-landed batches, its result regroups per batch (`BatchID`-tagged, per-change commit detail kept underneath) so each batch's outcome stays correlatable — the "output mirrors the input unit" principle above. No other output shape changes.
**Outputs are unchanged except `pusher`.** This RFC moves the *input* toward identity; four of the five return contracts — conflicts, score, change info, build id/status — are exactly what they are today. `pusher` is the lone exception: because its input becomes a *list* of independently-landed batches, its result regroups per batch (`BatchID`-tagged, per-change commit detail kept underneath) so each batch's outcome stays correlatable — the "output mirrors the input unit" principle above. No other output shape changes.

The validate-time mergeability check is no longer in this catalog because it is no longer an in-process extension. It now runs **asynchronously and out-of-process** in runway: `validate` hands off to the `mergeconflict` controller, which publishes a full check request to the runway-owned `merge-conflict-checker` queue, and `mergeconflictsignal` consumes runway's result (see [workflow.md](workflow.md)). The `mergechecker` package stays in-tree but unused on the validate path; removing it is a follow-up.

Non-obvious points:

Expand Down
28 changes: 24 additions & 4 deletions doc/rfc/submitqueue/workflow.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Orchestrator Workflow

The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet.
The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet. The merge-conflict check is the one stage that crosses a service boundary: `mergeconflict` publishes a full payload to the runway-owned `merge-conflict-checker` queue and `mergeconflictsignal` consumes a full payload back off `merge-conflict-checker-signal`, because runway cannot read submitqueue's storage. See the queue-payload-boundary rule in [CLAUDE.md](../../../CLAUDE.md).

The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`.

Expand All @@ -21,7 +21,25 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate`
│ ▼
│ ┌──────────────────────────────────┐
│ │ validate │
│ │ Check mergeability + change meta │
│ │ Dedup + fetch change metadata │
│ └────────────────┬─────────────────┘
│ │ RequestID
│ ▼
│ ┌──────────────────────────────────┐
│ │ mergeconflict │
│ │ Publish check request to runway │
│ └────────────────┬─────────────────┘
│ MergeConflictCheckRequest
│ ▼
│ ╔══════════════════════════════════╗
│ ║ runway (separate service) ║
│ ║ Attempt merge, emit result ║
│ ╚════════════════┬═════════════════╝
│ MergeConflictCheckResult
│ ▼
│ ┌──────────────────────────────────┐
│ │ mergeconflictsignal │
│ │ Correlate result, gate request │
│ └────────────────┬─────────────────┘
│ │ RequestID
│ ▼
Expand Down Expand Up @@ -71,7 +89,9 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate`
|---|---|---|---|
| **gateway/Land** | RPC | start | Accept request, mint ID, log Accepted, hand off async |
| **start** | LandRequest | validate, log | Persist Request and emit Started log |
| **validate** | RequestID | batch | Check mergeability and fetch change metadata |
| **validate** | RequestID | mergeconflict | Dedup, fetch change metadata, claim changes |
| **mergeconflict** | RequestID | merge-conflict-checker (runway) | Publish the full check request to runway, keyed by the request id (the correlation id) |
| **mergeconflictsignal** | MergeConflictCheckResult | batch | Correlate runway's result; advance if mergeable, fail if conflicted |
| **batch** | RequestID | score | Group request into a Batch with dependencies |
| **score** | BatchID | speculate, log | Score the batch (∏ per-request scores), persist score |
| **speculate** | BatchID | build, merge | (stub) Decide whether to verify via CI or land |
Expand All @@ -85,7 +105,7 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate`

Every *consumed* primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The `log` topic is the exception: the orchestrator only publishes to it (the gateway is the sole consumer that persists the request log), so it has no orchestrator-side subscription and therefore no DLQ. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress".

The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, batch, cancel) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery.
The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, mergeconflict, batch, cancel) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. The `mergeconflictsignal` DLQ decodes a runway `MergeConflictCheckResult` whose `id` is the request id echoed back, so it fails that request directly. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery.

DLQ consumers are wired with `errs.AlwaysRetryableProcessor` and a very high `Retry.MaxAttempts`, with their own DLQ disabled. That combination makes reconciliation effectively non-droppable: any failure is forced retryable rather than escalating to a second-level dead-letter that nobody consumes. The trade-off is that a genuinely unprocessable DLQ message — typically a malformed payload — must be removed by an operator.

Expand Down
6 changes: 3 additions & 3 deletions example/submitqueue/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//extension/counter/mysql",
"//extension/messagequeue",
"//extension/messagequeue/mysql",
"//runway/core/topickey",
"//submitqueue/core/changeset",
"//submitqueue/core/topickey",
"//submitqueue/entity",
Expand All @@ -33,9 +34,6 @@ go_library(
"//submitqueue/extension/conflict/fake",
"//submitqueue/extension/conflict/fileoverlap",
"//submitqueue/extension/conflict/none",
"//submitqueue/extension/mergechecker",
"//submitqueue/extension/mergechecker/fake",
"//submitqueue/extension/mergechecker/github",
"//submitqueue/extension/pusher",
"//submitqueue/extension/pusher/fake",
"//submitqueue/extension/pusher/git",
Expand All @@ -53,6 +51,8 @@ go_library(
"//submitqueue/orchestrator/controller/conclude",
"//submitqueue/orchestrator/controller/dlq",
"//submitqueue/orchestrator/controller/merge",
"//submitqueue/orchestrator/controller/mergeconflict",
"//submitqueue/orchestrator/controller/mergeconflictsignal",
"//submitqueue/orchestrator/controller/score",
"//submitqueue/orchestrator/controller/speculate",
"//submitqueue/orchestrator/controller/start",
Expand Down
Loading