From d565b915ffb5eb4adeed65759999c6795d1009a0 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Mon, 15 Jun 2026 15:42:36 -0700 Subject: [PATCH] feat(mergeconflict): make merge-conflict check asynchronous via runway MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary ### Why? The merge-conflict check ran synchronously inside the `validate` consumer by calling the `mergechecker` extension inline. A real merge attempt is slow and I/O-heavy, so doing it on the partition lease blocks the pipeline and couples SubmitQueue to the checker's latency. This moves the check to an asynchronous round-trip with runway, modelled on `build`/`buildsignal` but across a service boundary. ### What? The pipeline gains `validate → mergeconflict ⇢ (runway) ⇢ mergeconflictsignal → batch`: - `validate` drops the inline `mergechecker` call and now publishes the request id to the internal `mergeconflict` topic (dedup + change-metadata + claim are unchanged). - `mergeconflict` (new) publishes the full `MergeConflictCheckRequest` to the runway-owned `merge-conflict-checker` queue, keyed by the request id as the client-owned correlation id. No local record is needed — the request id round-trips, so the result correlates straight back to the request (unlike `build`, whose server-generated id needs a mapping store). - `mergeconflictsignal` (new) consumes runway's `MergeConflictCheckResult` off `merge-conflict-checker-signal`, advances the request to `batch` when mergeable, or fails it (user error) when conflicted. - DLQ reconcilers drive the request to `Error` on dead-letter; the signal DLQ reads the request id straight off the result. Crossing the runway boundary is why these payloads carry full data rather than entity IDs; the new queue-payload-boundary rule is documented in CLAUDE.md, with the pipeline diagram and stage table updated in workflow.md and the superseded `mergechecker` validate-path row noted in extension-contract.md. The `mergechecker` package is left in-tree (unused on the validate path); removing it is a follow-up. Runway's service implementation is out of scope — only its contract (added in the parent PR) is consumed here. ## Test Plan ✅ `bazel build //...` ✅ `bazel test //... --test_tag_filters=-integration,-e2e` (54 tests pass) ✅ `make gazelle` clean --- CLAUDE.md | 2 + doc/rfc/submitqueue/extension-contract.md | 9 +- doc/rfc/submitqueue/workflow.md | 28 ++- .../orchestrator/server/BUILD.bazel | 6 +- .../submitqueue/orchestrator/server/main.go | 111 ++++----- submitqueue/core/topickey/topickey.go | 5 + .../orchestrator/controller/dlq/BUILD.bazel | 5 + .../controller/dlq/mergeconflictsignal.go | 113 +++++++++ .../dlq/mergeconflictsignal_test.go | 89 +++++++ .../controller/mergeconflict/BUILD.bazel | 44 ++++ .../controller/mergeconflict/mergeconflict.go | 193 +++++++++++++++ .../mergeconflict/mergeconflict_test.go | 158 +++++++++++++ .../mergeconflictsignal/BUILD.bazel | 41 ++++ .../mergeconflictsignal.go | 219 ++++++++++++++++++ .../mergeconflictsignal_test.go | 170 ++++++++++++++ .../controller/validate/BUILD.bazel | 3 - .../controller/validate/validate.go | 46 +--- .../controller/validate/validate_test.go | 157 ++++++------- 18 files changed, 1208 insertions(+), 191 deletions(-) create mode 100644 submitqueue/orchestrator/controller/dlq/mergeconflictsignal.go create mode 100644 submitqueue/orchestrator/controller/dlq/mergeconflictsignal_test.go create mode 100644 submitqueue/orchestrator/controller/mergeconflict/BUILD.bazel create mode 100644 submitqueue/orchestrator/controller/mergeconflict/mergeconflict.go create mode 100644 submitqueue/orchestrator/controller/mergeconflict/mergeconflict_test.go create mode 100644 submitqueue/orchestrator/controller/mergeconflictsignal/BUILD.bazel create mode 100644 submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal.go create mode 100644 submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal_test.go diff --git a/CLAUDE.md b/CLAUDE.md index d4075e45..5d44452c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -99,6 +99,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er Controllers receive `consumer.Delivery` (subset interface without Ack/Nack) to enforce separation of business logic from infrastructure. +**Queue payloads: IDs within a boundary, full payloads across one.** When producer and consumer share a store (same service — e.g. `build`→`buildsignal`, `validate`→`mergeconflict`), put only the entity **ID** on the queue and reload from storage (the store is the source of truth, messages stay small, redelivery is idempotent). When a queue **crosses a service boundary** (the consumer cannot read the producer's store — e.g. orchestrator→runway), publish the **full payload** the consumer needs, and have the **client own the correlation ID** so it can record the in-flight work before publishing and match the async result on return. The queue's **owner defines the wire contract and topic keys** (in its own domain package); the other side imports them. + ### Entities Domain objects in `entity/`, organized by domain. Guidelines: diff --git a/doc/rfc/submitqueue/extension-contract.md b/doc/rfc/submitqueue/extension-contract.md index 1fae9134..b2f05799 100644 --- a/doc/rfc/submitqueue/extension-contract.md +++ b/doc/rfc/submitqueue/extension-contract.md @@ -4,7 +4,7 @@ Design notes for what SubmitQueue's pluggable extensions accept: orchestrator ** ## Problem -Extension input granularity is inconsistent across the pipeline stages (see [workflow.md](workflow.md)). `conflict.Analyzer` takes identity (`entity.Batch`); `scorer`, `mergechecker`, `changeprovider`, `buildrunner`, `pusher` take controller-resolved `entity.Change`. The split caps what an extension can do: +Extension input granularity is inconsistent across the pipeline stages (see [workflow.md](workflow.md)). `conflict.Analyzer` takes identity (`entity.Batch`); `scorer`, `changeprovider`, `buildrunner`, `pusher` take controller-resolved `entity.Change`. The split caps what an extension can do: - `ConflictType` already names `target_overlap`, but a real target-overlap analyzer **cannot be written** — the batch controller hands it identity-level batches (no changed targets) and the contract has nowhere to put them. - `scorer` gets a URIs-only `Change`, so a heuristic scorer **cannot see** lines-changed / file-count. @@ -21,7 +21,7 @@ Both unblock with the shape `conflict` already uses: accept identity, resolve in | Stage | Loads | Resolves for the extension | Hands to the extension | |---|---|---|---| -| `validate` | `entity.Request` | nothing — `request.Change` is already in hand (the change-store reads here serve duplicate detection) | `request.Change` → `mergechecker`, `changeprovider` | +| `validate` | `entity.Request` | nothing — `request.Change` is already in hand (the change-store reads here serve duplicate detection) | `request.Change` → `changeprovider` | | `batch` | `entity.Request` + active `[]entity.Batch` | **nothing** — builds a batch whose `Contains` is `[requestID]` | `entity.Batch`, `[]entity.Batch` → `conflict` | | `score` | `entity.Batch`, then each `entity.Request` | batch → requests | `request.Change` per request, then multiplies the scores → `scorer` | | `build` | `entity.Batch`, then `collectChanges` | batch → requests → changes, **flattening batch boundaries** | base `[]Change`, head `[]Change` → `buildrunner` | @@ -35,13 +35,14 @@ Two facts this grounds: `conflict` already resolves nothing (the baseline), and |---|---|---|---|---|---| | `conflict.Analyzer` | batch | identity (`Batch`, `[]Batch`) | unchanged — **the baseline** | conflicting in-flight batches (`[]Conflict`, `BatchID`-tagged) — unchanged | request store + change provider | | `scorer.Scorer` | score | flat `Change`, per request | `entity.Batch` — resolve + reduce internally | one batch score (`float64`) — unchanged | request store + change provider | -| `mergechecker.MergeChecker` | validate | `Change` | `entity.Request` | mergeability (`Result`) — unchanged | none | | `changeprovider.ChangeProvider` | validate | `Change` | `entity.Request` | per-URI change info (`[]ChangeInfo`, `URI`-tagged) — unchanged | none — it *is* the resolver | | `buildrunner.BuildRunner` | build | base/head `[]Change` | base `[]entity.Batch` + head `entity.Batch` | build id, then status/cancel (`BuildID`, `BuildStatus`) — unchanged | request store + change provider | | `pusher.Pusher` | merge | `[]Change` | ordered `[]entity.Batch` | **per-batch** outcomes (`Result` grouped by `BatchID`) — **changed** | request store + change provider | | `storage`, `changestore`, `queueconfig` | — | keys + entities | unchanged — resolution targets | entities | — | -**Outputs are unchanged except `pusher`.** This RFC moves the *input* toward identity; five of the six return contracts — conflicts, score, mergeability, change info, build id/status — are exactly what they are today. `pusher` is the lone exception: because its input becomes a *list* of independently-landed batches, its result regroups per batch (`BatchID`-tagged, per-change commit detail kept underneath) so each batch's outcome stays correlatable — the "output mirrors the input unit" principle above. No other output shape changes. +**Outputs are unchanged except `pusher`.** This RFC moves the *input* toward identity; four of the five return contracts — conflicts, score, change info, build id/status — are exactly what they are today. `pusher` is the lone exception: because its input becomes a *list* of independently-landed batches, its result regroups per batch (`BatchID`-tagged, per-change commit detail kept underneath) so each batch's outcome stays correlatable — the "output mirrors the input unit" principle above. No other output shape changes. + +The validate-time mergeability check is no longer in this catalog because it is no longer an in-process extension. It now runs **asynchronously and out-of-process** in runway: `validate` hands off to the `mergeconflict` controller, which publishes a full check request to the runway-owned `merge-conflict-checker` queue, and `mergeconflictsignal` consumes runway's result (see [workflow.md](workflow.md)). The `mergechecker` package stays in-tree but unused on the validate path; removing it is a follow-up. Non-obvious points: diff --git a/doc/rfc/submitqueue/workflow.md b/doc/rfc/submitqueue/workflow.md index 52e1aff5..39d4dca4 100644 --- a/doc/rfc/submitqueue/workflow.md +++ b/doc/rfc/submitqueue/workflow.md @@ -1,6 +1,6 @@ # Orchestrator Workflow -The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet. +The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet. The merge-conflict check is the one stage that crosses a service boundary: `mergeconflict` publishes a full payload to the runway-owned `merge-conflict-checker` queue and `mergeconflictsignal` consumes a full payload back off `merge-conflict-checker-signal`, because runway cannot read submitqueue's storage. See the queue-payload-boundary rule in [CLAUDE.md](../../../CLAUDE.md). The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`. @@ -21,7 +21,25 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` │ ▼ │ ┌──────────────────────────────────┐ │ │ validate │ - │ │ Check mergeability + change meta │ + │ │ Dedup + fetch change metadata │ + │ └────────────────┬─────────────────┘ + │ │ RequestID + │ ▼ + │ ┌──────────────────────────────────┐ + │ │ mergeconflict │ + │ │ Publish check request to runway │ + │ └────────────────┬─────────────────┘ + │ MergeConflictCheckRequest + │ ▼ + │ ╔══════════════════════════════════╗ + │ ║ runway (separate service) ║ + │ ║ Attempt merge, emit result ║ + │ ╚════════════════┬═════════════════╝ + │ MergeConflictCheckResult + │ ▼ + │ ┌──────────────────────────────────┐ + │ │ mergeconflictsignal │ + │ │ Correlate result, gate request │ │ └────────────────┬─────────────────┘ │ │ RequestID │ ▼ @@ -71,7 +89,9 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` |---|---|---|---| | **gateway/Land** | RPC | start | Accept request, mint ID, log Accepted, hand off async | | **start** | LandRequest | validate, log | Persist Request and emit Started log | -| **validate** | RequestID | batch | Check mergeability and fetch change metadata | +| **validate** | RequestID | mergeconflict | Dedup, fetch change metadata, claim changes | +| **mergeconflict** | RequestID | merge-conflict-checker (runway) | Publish the full check request to runway, keyed by the request id (the correlation id) | +| **mergeconflictsignal** | MergeConflictCheckResult | batch | Correlate runway's result; advance if mergeable, fail if conflicted | | **batch** | RequestID | score | Group request into a Batch with dependencies | | **score** | BatchID | speculate, log | Score the batch (∏ per-request scores), persist score | | **speculate** | BatchID | build, merge | (stub) Decide whether to verify via CI or land | @@ -85,7 +105,7 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` Every *consumed* primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The `log` topic is the exception: the orchestrator only publishes to it (the gateway is the sole consumer that persists the request log), so it has no orchestrator-side subscription and therefore no DLQ. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress". -The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, batch, cancel) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. +The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, mergeconflict, batch, cancel) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. The `mergeconflictsignal` DLQ decodes a runway `MergeConflictCheckResult` whose `id` is the request id echoed back, so it fails that request directly. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. DLQ consumers are wired with `errs.AlwaysRetryableProcessor` and a very high `Retry.MaxAttempts`, with their own DLQ disabled. That combination makes reconciliation effectively non-droppable: any failure is forced retryable rather than escalating to a second-level dead-letter that nobody consumes. The trade-off is that a genuinely unprocessable DLQ message — typically a malformed payload — must be removed by an operator. diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index a1b07cc4..797dfb44 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//extension/counter/mysql", "//extension/messagequeue", "//extension/messagequeue/mysql", + "//runway/core/topickey", "//submitqueue/core/changeset", "//submitqueue/core/topickey", "//submitqueue/entity", @@ -33,9 +34,6 @@ go_library( "//submitqueue/extension/conflict/fake", "//submitqueue/extension/conflict/fileoverlap", "//submitqueue/extension/conflict/none", - "//submitqueue/extension/mergechecker", - "//submitqueue/extension/mergechecker/fake", - "//submitqueue/extension/mergechecker/github", "//submitqueue/extension/pusher", "//submitqueue/extension/pusher/fake", "//submitqueue/extension/pusher/git", @@ -53,6 +51,8 @@ go_library( "//submitqueue/orchestrator/controller/conclude", "//submitqueue/orchestrator/controller/dlq", "//submitqueue/orchestrator/controller/merge", + "//submitqueue/orchestrator/controller/mergeconflict", + "//submitqueue/orchestrator/controller/mergeconflictsignal", "//submitqueue/orchestrator/controller/score", "//submitqueue/orchestrator/controller/speculate", "//submitqueue/orchestrator/controller/start", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 5f15d7b5..a5962eec 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -39,6 +39,7 @@ import ( mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" extqueue "github.com/uber/submitqueue/extension/messagequeue" queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" + runwaytopickey "github.com/uber/submitqueue/runway/core/topickey" "github.com/uber/submitqueue/submitqueue/core/changeset" "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" @@ -52,9 +53,6 @@ import ( conflictfake "github.com/uber/submitqueue/submitqueue/extension/conflict/fake" "github.com/uber/submitqueue/submitqueue/extension/conflict/fileoverlap" "github.com/uber/submitqueue/submitqueue/extension/conflict/none" - "github.com/uber/submitqueue/submitqueue/extension/mergechecker" - mcfake "github.com/uber/submitqueue/submitqueue/extension/mergechecker/fake" - githubchecker "github.com/uber/submitqueue/submitqueue/extension/mergechecker/github" "github.com/uber/submitqueue/submitqueue/extension/pusher" pushfake "github.com/uber/submitqueue/submitqueue/extension/pusher/fake" gitpusher "github.com/uber/submitqueue/submitqueue/extension/pusher/git" @@ -72,6 +70,8 @@ import ( "github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflict" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflictsignal" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/score" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/start" @@ -239,7 +239,6 @@ func run() error { } // Per-extension factories all resolve against the registry by queue name. - mcf := mergeCheckerFactory{queues} cpf := changeProviderFactory{queues} pshf := pusherFactory{queues} brf := buildRunnerFactory{queues} @@ -247,7 +246,7 @@ func run() error { cof := analyzerFactory{queues} // Register controllers - primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, mcf, cpf, pshf, brf, scf, cof, cnt, store) + primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, cpf, pshf, brf, scf, cof, cnt, store) if err != nil { return err } @@ -377,6 +376,8 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe {topickey.TopicKeyStart, "start", "orchestrator-start"}, {topickey.TopicKeyCancel, "cancel", "orchestrator-cancel"}, {topickey.TopicKeyValidate, "validate", "orchestrator-validate"}, + {topickey.TopicKeyMergeConflict, "mergeconflict", "orchestrator-mergeconflict"}, + {runwaytopickey.TopicKeyMergeConflictCheckSignal, "merge-conflict-checker-signal", "orchestrator-mergeconflictsignal"}, {topickey.TopicKeyBatch, "batch", "orchestrator-batch"}, {topickey.TopicKeyScore, "score", "orchestrator-score"}, {topickey.TopicKeySpeculate, "speculate", "orchestrator-speculate"}, @@ -430,17 +431,35 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe Queue: q, }) + // Publish-only: the orchestrator hands merge-conflict check requests to + // runway via the runway-owned merge-conflict-checker queue. Runway is the + // sole consumer, so the orchestrator registers no consuming subscription + // (and no DLQ) here — the inbound result arrives on the separate + // merge-conflict-checker-signal queue, which is a consumed primary topic + // above. + configs = append(configs, consumer.TopicConfig{ + Key: runwaytopickey.TopicKeyMergeConflictCheck, + Name: "merge-conflict-checker", + Queue: q, + }) + return consumer.NewTopicRegistry(configs) } // registerPrimaryControllers creates all pipeline controllers and registers // them with the primary consumer. Pipeline: // -// request → validate → batch → score → speculate → build → buildsignal ─┐ -// ↑ ↘ ↻ poll │ -// │ merge → conclude │ -// │ │ │ -// └────────┴───────────────────────┘ +// request → validate → mergeconflict ⇢ (runway) ⇢ mergeconflictsignal → batch → score → speculate → build → buildsignal ─┐ +// ↑ ↘ ↻ poll │ +// │ merge → conclude │ +// │ │ │ +// └────────┴───────────────────────┘ +// +// The merge-conflict check is asynchronous and crosses a service boundary: +// mergeconflict publishes the full check request to the runway-owned +// merge-conflict-checker queue (⇢); runway performs the merge attempt and +// publishes the result to merge-conflict-checker-signal, which mergeconflictsignal +// consumes before fanning the request out to batch. // TODO(wiring abstraction): queueExtensions + queueRegistry currently live here // as example-local wiring. Evaluate promoting them into a defined abstraction in @@ -459,7 +478,6 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // read as "for this queue, here are its scorer, analyzer, pusher, …", and lets // a queue profile start from a baseline and override only what differs. type queueExtensions struct { - mergeChecker mergechecker.MergeChecker changeProvider changeprovider.ChangeProvider pusher pusher.Pusher buildRunner buildrunner.BuildRunner @@ -486,12 +504,6 @@ func (r queueRegistry) get(queue string) queueExtensions { // The per-extension factories below are thin adapters: each satisfies its // extension's Factory contract by resolving the queue's profile from the // registry. All routing logic lives here in the wiring layer. -type mergeCheckerFactory struct{ reg queueRegistry } - -func (f mergeCheckerFactory) For(cfg mergechecker.Config) (mergechecker.MergeChecker, error) { - return f.reg.get(cfg.QueueName).mergeChecker, nil -} - type changeProviderFactory struct{ reg queueRegistry } func (f changeProviderFactory) For(cfg changeprovider.Config) (changeprovider.ChangeProvider, error) { @@ -522,7 +534,7 @@ func (f analyzerFactory) For(cfg conflict.Config) (conflict.Analyzer, error) { return f.reg.get(cfg.QueueName).analyzer, nil } -func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mcf mergechecker.Factory, cpf changeprovider.Factory, pshf pusher.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) { +func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cpf changeprovider.Factory, pshf pusher.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) { var count int requestController := start.NewController( logger, @@ -555,7 +567,6 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope, store, registry, - mcf, cpf, topickey.TopicKeyValidate, "orchestrator-validate", @@ -565,6 +576,33 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, } count++ + mergeconflictController := mergeconflict.NewController( + logger, + scope, + store, + registry, + runwaytopickey.TopicKeyMergeConflictCheck, + topickey.TopicKeyMergeConflict, + "orchestrator-mergeconflict", + ) + if err := c.Register(mergeconflictController); err != nil { + return count, fmt.Errorf("failed to register mergeconflict controller: %w", err) + } + count++ + + mergeconflictsignalController := mergeconflictsignal.NewController( + logger, + scope, + store, + registry, + runwaytopickey.TopicKeyMergeConflictCheckSignal, + "orchestrator-mergeconflictsignal", + ) + if err := c.Register(mergeconflictsignalController); err != nil { + return count, fmt.Errorf("failed to register mergeconflictsignal controller: %w", err) + } + count++ + batchController := batch.NewController( logger, scope, @@ -678,6 +716,8 @@ func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scop {"start_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeLandRequestID, dlq.TopicKey(topickey.TopicKeyStart), "orchestrator-start-dlq")}, {"cancel_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeCancelRequestID, dlq.TopicKey(topickey.TopicKeyCancel), "orchestrator-cancel-dlq")}, {"validate_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(topickey.TopicKeyValidate), "orchestrator-validate-dlq")}, + {"mergeconflict_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(topickey.TopicKeyMergeConflict), "orchestrator-mergeconflict-dlq")}, + {"mergeconflictsignal_dlq", dlq.NewDLQMergeConflictSignalController(logger, dlqScope, store, dlq.TopicKey(runwaytopickey.TopicKeyMergeConflictCheckSignal), "orchestrator-mergeconflictsignal-dlq")}, {"batch_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(topickey.TopicKeyBatch), "orchestrator-batch-dlq")}, {"score_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyScore), "orchestrator-score-dlq")}, {"speculate_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeySpeculate), "orchestrator-speculate-dlq")}, @@ -717,34 +757,6 @@ func parseTimeout(envVal string, defaultVal time.Duration) time.Duration { return defaultVal } -// newMergeChecker creates a MergeChecker for GitHub (github.com), configured via -// GITHUB_BASE_URL, GITHUB_TOKEN, and GITHUB_TIMEOUT. When GITHUB_TOKEN is unset -// it returns the fake merge checker (every change mergeable unless a URI carries -// a failure marker, see mergechecker/fake), keeping the example runnable without -// GitHub and letting e2e tests drive unmergeable scenarios via request payloads. -func newMergeChecker(logger *zap.Logger, scope tally.Scope) (mergechecker.MergeChecker, error) { - if os.Getenv("GITHUB_TOKEN") == "" { - logger.Warn("GITHUB_TOKEN not set; using fake merge checker (every change mergeable unless URI-marked)") - return mcfake.New(), nil - } - - client, err := httpclient.NewClient(getEnv("GITHUB_BASE_URL", "https://api.github.com")) - if err != nil { - return nil, fmt.Errorf("failed to build GitHub HTTP client: %w", err) - } - - ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: os.Getenv("GITHUB_TOKEN")}) - client.Transport = &oauth2.Transport{Source: ts, Base: client.Transport} - - client.Timeout = parseTimeout(os.Getenv("GITHUB_TIMEOUT"), 30*time.Second) - - return githubchecker.NewMergeChecker(githubchecker.Params{ - HTTPClient: client, - Logger: logger.Sugar(), - MetricsScope: scope.SubScope("mergechecker"), - }), nil -} - // newChangeProvider creates a ChangeProvider for GitHub (github.com), configured // via GITHUB_BASE_URL, GITHUB_TOKEN, and GITHUB_TIMEOUT. When GITHUB_TOKEN is // unset it returns the fake change provider (one empty ChangeInfo per URI unless @@ -801,10 +813,6 @@ func newPusher(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolve // baseline. This is the one place queue topology lives; extension packages stay // queue-agnostic. func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolver) (queueRegistry, error) { - mc, err := newMergeChecker(logger, scope) - if err != nil { - return queueRegistry{}, fmt.Errorf("failed to create merge checker: %w", err) - } cp, err := newChangeProvider(logger, scope) if err != nil { return queueRegistry{}, fmt.Errorf("failed to create change provider: %w", err) @@ -833,7 +841,6 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset. // on a queue to exercise the analyzer error path, as e2e-conflict-error-queue // below does. base := queueExtensions{ - mergeChecker: mc, changeProvider: cp, pusher: psh, buildRunner: buildfake.New(resolver), diff --git a/submitqueue/core/topickey/topickey.go b/submitqueue/core/topickey/topickey.go index b4fbbe42..cee9ef11 100644 --- a/submitqueue/core/topickey/topickey.go +++ b/submitqueue/core/topickey/topickey.go @@ -27,6 +27,11 @@ const ( TopicKeyCancel TopicKey = "cancel" // TopicKeyValidate is the pipeline stage where requests are published for validation. TopicKeyValidate TopicKey = "validate" + // TopicKeyMergeConflict is the internal pipeline stage where validated + // requests are published to start an asynchronous merge-conflict check. The + // mergeconflict controller consumes it and publishes the full request to + // runway's merge-conflict-checker queue. + TopicKeyMergeConflict TopicKey = "mergeconflict" // TopicKeyBatch is the pipeline stage where validated requests are published for batching. TopicKeyBatch TopicKey = "batch" // TopicKeyScore is the pipeline stage where batches are published for scoring. diff --git a/submitqueue/orchestrator/controller/dlq/BUILD.bazel b/submitqueue/orchestrator/controller/dlq/BUILD.bazel index f38da3af..8935c704 100644 --- a/submitqueue/orchestrator/controller/dlq/BUILD.bazel +++ b/submitqueue/orchestrator/controller/dlq/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "buildsignal.go", "dlq.go", "log.go", + "mergeconflictsignal.go", "request.go", ], importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq", @@ -14,6 +15,7 @@ go_library( deps = [ "//core/consumer", "//core/metrics", + "//runway/entity", "//submitqueue/entity", "//submitqueue/extension/storage", "@com_github_uber_go_tally//:tally", @@ -28,6 +30,7 @@ go_test( "buildsignal_test.go", "dlq_test.go", "log_test.go", + "mergeconflictsignal_test.go", "request_test.go", ], embed = [":dlq"], @@ -36,6 +39,8 @@ go_test( "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", + "//runway/core/topickey", + "//runway/entity", "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", diff --git a/submitqueue/orchestrator/controller/dlq/mergeconflictsignal.go b/submitqueue/orchestrator/controller/dlq/mergeconflictsignal.go new file mode 100644 index 00000000..1fd8dffb --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/mergeconflictsignal.go @@ -0,0 +1,113 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/metrics" + runwayentity "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// mergeConflictSignalController is the DLQ reconciler for the +// mergeconflictsignal topic. Its payload carries a runway +// MergeConflictCheckResult whose id is the request id echoed back, so +// reconciliation fails that request directly via failRequest. +type mergeConflictSignalController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify mergeConflictSignalController implements consumer.Controller at compile time. +var _ consumer.Controller = (*mergeConflictSignalController)(nil) + +// NewDLQMergeConflictSignalController builds a DLQ controller for the +// mergeconflictsignal topic. +func NewDLQMergeConflictSignalController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + topicKey consumer.TopicKey, + consumerGroup string, +) consumer.Controller { + name := string(topicKey) + "_controller" + return &mergeConflictSignalController{ + logger: logger.Named(name), + metricsScope: scope.SubScope(name), + store: store, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process reconciles a single DLQ delivery for the mergeconflictsignal topic. +func (c *mergeConflictSignalController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + result, err := runwayentity.MergeConflictCheckResultFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to decode merge conflict check result from dlq payload: %w", err) + } + if result.ID == "" { + metrics.NamedCounter(c.metricsScope, opName, "empty_id_errors", 1) + return fmt.Errorf("dlq payload decoded to empty request id") + } + + dmeta := delivery.Metadata() + c.logger.Warnw("dlq message received", + "request_id", result.ID, + "attempt", delivery.Attempt(), + "dlq_original_topic", dmeta["dlq.original_topic"], + "dlq_failure_count", dmeta["dlq.failure_count"], + "dlq_last_error", dmeta["dlq.last_error"], + ) + + if err := failRequest(ctx, c.store, c.logger, result.ID); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "reconcile_errors", 1) + return err + } + + metrics.NamedCounter(c.metricsScope, opName, "reconciled", 1) + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *mergeConflictSignalController) Name() string { + return string(c.topicKey) +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *mergeConflictSignalController) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *mergeConflictSignalController) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/dlq/mergeconflictsignal_test.go b/submitqueue/orchestrator/controller/dlq/mergeconflictsignal_test.go new file mode 100644 index 00000000..2f60b82b --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/mergeconflictsignal_test.go @@ -0,0 +1,89 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/core/consumer" + runwaytopickey "github.com/uber/submitqueue/runway/core/topickey" + runwayentity "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/submitqueue/entity" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func TestDLQMergeConflictSignalController_InterfaceAndAccessors(t *testing.T) { + ctrl := gomock.NewController(t) + store := storagemock.NewMockStorage(ctrl) + + c := NewDLQMergeConflictSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(runwaytopickey.TopicKeyMergeConflictCheckSignal), "orchestrator-mergeconflictsignal-dlq") + + assert.Equal(t, "merge-conflict-checker-signal_dlq", c.Name()) + assert.Equal(t, consumer.TopicKey("merge-conflict-checker-signal_dlq"), c.TopicKey()) + assert.Equal(t, "orchestrator-mergeconflictsignal-dlq", c.ConsumerGroup()) +} + +func TestDLQMergeConflictSignalController_Process_ReconcilesRequest(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 1, State: entity.RequestStateProcessing, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(1), int32(2), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + c := NewDLQMergeConflictSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(runwaytopickey.TopicKeyMergeConflictCheckSignal), "orchestrator-mergeconflictsignal-dlq") + + payload, err := runwayentity.MergeConflictCheckResult{ID: "q/1", Mergeable: false, Reason: "boom"}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQMergeConflictSignalController_Process_EmptyIDFails(t *testing.T) { + ctrl := gomock.NewController(t) + + store := storagemock.NewMockStorage(ctrl) + c := NewDLQMergeConflictSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(runwaytopickey.TopicKeyMergeConflictCheckSignal), "orchestrator-mergeconflictsignal-dlq") + + payload, err := runwayentity.MergeConflictCheckResult{ID: "", Mergeable: false}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.Error(t, c.Process(context.Background(), delivery)) +} + +func TestDLQMergeConflictSignalController_Process_MalformedPayloadFails(t *testing.T) { + ctrl := gomock.NewController(t) + + store := storagemock.NewMockStorage(ctrl) + c := NewDLQMergeConflictSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(runwaytopickey.TopicKeyMergeConflictCheckSignal), "orchestrator-mergeconflictsignal-dlq") + + delivery := newMockDelivery(ctrl, []byte("garbage")) + require.Error(t, c.Process(context.Background(), delivery)) +} diff --git a/submitqueue/orchestrator/controller/mergeconflict/BUILD.bazel b/submitqueue/orchestrator/controller/mergeconflict/BUILD.bazel new file mode 100644 index 00000000..c3be1d93 --- /dev/null +++ b/submitqueue/orchestrator/controller/mergeconflict/BUILD.bazel @@ -0,0 +1,44 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "mergeconflict", + srcs = ["mergeconflict.go"], + importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflict", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//core/errs", + "//core/metrics", + "//entity/change", + "//entity/messagequeue", + "//runway/entity", + "//submitqueue/entity", + "//submitqueue/extension/storage", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "mergeconflict_test", + srcs = ["mergeconflict_test.go"], + embed = [":mergeconflict"], + deps = [ + "//core/consumer", + "//core/errs", + "//entity/change", + "//entity/mergestrategy", + "//entity/messagequeue", + "//extension/messagequeue/mock", + "//runway/core/topickey", + "//runway/entity", + "//submitqueue/core/topickey", + "//submitqueue/entity", + "//submitqueue/extension/storage/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/submitqueue/orchestrator/controller/mergeconflict/mergeconflict.go b/submitqueue/orchestrator/controller/mergeconflict/mergeconflict.go new file mode 100644 index 00000000..ec7c10ff --- /dev/null +++ b/submitqueue/orchestrator/controller/mergeconflict/mergeconflict.go @@ -0,0 +1,193 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mergeconflict implements the trigger stage for the asynchronous +// merge-conflict check. It consumes a validated request and publishes the full +// check request to runway's merge-conflict-checker queue, using the request id +// as the client-owned correlation id. Runway performs the check out of process +// and publishes the result to the signal queue, which the mergeconflictsignal +// stage consumes and correlates back to the request by that id. +package mergeconflict + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/entity/change" + entityqueue "github.com/uber/submitqueue/entity/messagequeue" + runwayentity "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// Controller handles mergeconflict queue messages. Implements consumer.Controller. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + registry consumer.TopicRegistry + runwayTopicKey consumer.TopicKey + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new mergeconflict controller for the orchestrator. +// runwayTopicKey is the runway-owned topic this controller publishes check +// requests to (TopicKeyMergeConflictCheck). +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + registry consumer.TopicRegistry, + runwayTopicKey consumer.TopicKey, + topicKey consumer.TopicKey, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("mergeconflict_controller"), + metricsScope: scope.SubScope("mergeconflict_controller"), + store: store, + registry: registry, + runwayTopicKey: runwayTopicKey, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process publishes the full check request to runway. Returns nil to ack +// (success), or error to nack/reject. +// +// Error classification: deserialize and storage failures are non-retryable +// (reject to DLQ). The publish to runway is retryable — it is the hand-off that +// keeps the check alive, so a transient enqueue blip should replay rather than +// strand the request. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + rid, err := entity.RequestIDFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to deserialize request ID: %w", err) + } + + request, err := c.store.GetRequestStore().Get(ctx, rid.ID) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get request %s: %w", rid.ID, err) + } + + c.logger.Infow("received mergeconflict event", + "request_id", request.ID, + "queue", request.Queue, + "state", string(request.State), + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // Short-circuit halted requests (terminal or cancelling): no external check + // should be kicked off for a request that will not proceed. + if entity.IsRequestStateHalted(request.State) { + metrics.NamedCounter(c.metricsScope, opName, "skipped_halted", 1) + c.logger.Infow("skipping mergeconflict for halted request", + "request_id", request.ID, + "state", string(request.State), + ) + return nil + } + + // Build the full payload runway needs to attempt the merge. The request id is + // the client-owned correlation id, so a redelivery republishes the same id and + // runway dedupes on it; the result is matched straight back to the request. At + // validate time the check is a single step (candidate vs target branch); a + // future in-flight caller layers earlier steps before the candidate. + req := runwayentity.MergeConflictCheckRequest{ + ID: request.ID, + QueueName: request.Queue, + Steps: []runwayentity.MergeStep{ + { + StepID: request.ID, + Changes: []change.Change{request.Change}, + Strategy: request.LandStrategy, + }, + }, + } + + if err := c.publish(ctx, c.runwayTopicKey, req, request.Queue); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + // Retryable: the hand-off to runway is what keeps this check alive. + return errs.NewRetryableError(fmt.Errorf("failed to publish to runway merge-conflict-checker: %w", err)) + } + + c.logger.Infow("published merge conflict check to runway", + "request_id", request.ID, + "topic_key", c.runwayTopicKey, + ) + + return nil // Success - message will be acked +} + +// publish serializes the runway check request and publishes it to the given +// topic key, partitioned by queue. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, req runwayentity.MergeConflictCheckRequest, partitionKey string) error { + payload, err := req.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize merge conflict check request: %w", err) + } + + msg := entityqueue.NewMessage(req.ID, payload, partitionKey, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "mergeconflict" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/mergeconflict/mergeconflict_test.go b/submitqueue/orchestrator/controller/mergeconflict/mergeconflict_test.go new file mode 100644 index 00000000..844298e7 --- /dev/null +++ b/submitqueue/orchestrator/controller/mergeconflict/mergeconflict_test.go @@ -0,0 +1,158 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mergeconflict + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/entity/change" + "github.com/uber/submitqueue/entity/mergestrategy" + entityqueue "github.com/uber/submitqueue/entity/messagequeue" + queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" + runwaytopickey "github.com/uber/submitqueue/runway/core/topickey" + runwayentity "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/submitqueue/core/topickey" + "github.com/uber/submitqueue/submitqueue/entity" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func ridPayload(t *testing.T, id string) []byte { + payload, err := entity.RequestID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + +func newDelivery(ctrl *gomock.Controller, msg entityqueue.Message) *queuemock.MockDelivery { + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(msg).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + return d +} + +func TestProcess_PublishesFullPayloadToRunway(t *testing.T) { + ctrl := gomock.NewController(t) + + request := entity.Request{ + ID: "test-queue/1", + Queue: "test-queue", + Change: change.Change{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}, + LandStrategy: mergestrategy.MergeStrategySquashRebase, + State: entity.RequestStateStarted, + Version: 1, + } + + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + var gotTopic string + var gotPayload []byte + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: runwaytopickey.TopicKeyMergeConflictCheck, Name: "merge-conflict-checker", Queue: q}}, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, + runwaytopickey.TopicKeyMergeConflictCheck, topickey.TopicKeyMergeConflict, "orchestrator-mergeconflict") + + msg := entityqueue.NewMessage(request.ID, ridPayload(t, request.ID), request.Queue, nil) + require.NoError(t, controller.Process(context.Background(), newDelivery(ctrl, msg))) + + // Full payload published to runway, keyed by the request id (the correlation id). + assert.Equal(t, "merge-conflict-checker", gotTopic) + got, err := runwayentity.MergeConflictCheckRequestFromBytes(gotPayload) + require.NoError(t, err) + assert.Equal(t, request.ID, got.ID) + assert.Equal(t, request.Queue, got.QueueName) + require.Len(t, got.Steps, 1) + assert.Equal(t, request.ID, got.Steps[0].StepID) + assert.Equal(t, []change.Change{request.Change}, got.Steps[0].Changes) + assert.Equal(t, mergestrategy.MergeStrategySquashRebase, got.Steps[0].Strategy) +} + +func TestProcess_HaltedRequestSkips(t *testing.T) { + ctrl := gomock.NewController(t) + + request := entity.Request{ID: "test-queue/1", Queue: "test-queue", State: entity.RequestStateCancelled, Version: 3} + + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + // No check store / publisher expectations: gomock fails if either is touched. + pub := queuemock.NewMockPublisher(ctrl) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: runwaytopickey.TopicKeyMergeConflictCheck, Name: "merge-conflict-checker", Queue: q}}, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, + runwaytopickey.TopicKeyMergeConflictCheck, topickey.TopicKeyMergeConflict, "orchestrator-mergeconflict") + + msg := entityqueue.NewMessage(request.ID, ridPayload(t, request.ID), request.Queue, nil) + require.NoError(t, controller.Process(context.Background(), newDelivery(ctrl, msg))) +} + +func TestProcess_PublishFailureIsRetryable(t *testing.T) { + ctrl := gomock.NewController(t) + + request := entity.Request{ID: "test-queue/1", Queue: "test-queue", State: entity.RequestStateStarted, Version: 1} + + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("enqueue failed")) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: runwaytopickey.TopicKeyMergeConflictCheck, Name: "merge-conflict-checker", Queue: q}}, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, + runwaytopickey.TopicKeyMergeConflictCheck, topickey.TopicKeyMergeConflict, "orchestrator-mergeconflict") + + msg := entityqueue.NewMessage(request.ID, ridPayload(t, request.ID), request.Queue, nil) + err = controller.Process(context.Background(), newDelivery(ctrl, msg)) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) +} diff --git a/submitqueue/orchestrator/controller/mergeconflictsignal/BUILD.bazel b/submitqueue/orchestrator/controller/mergeconflictsignal/BUILD.bazel new file mode 100644 index 00000000..41073ac0 --- /dev/null +++ b/submitqueue/orchestrator/controller/mergeconflictsignal/BUILD.bazel @@ -0,0 +1,41 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "mergeconflictsignal", + srcs = ["mergeconflictsignal.go"], + importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflictsignal", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//core/metrics", + "//entity/messagequeue", + "//runway/entity", + "//submitqueue/core/request", + "//submitqueue/core/topickey", + "//submitqueue/entity", + "//submitqueue/extension/storage", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "mergeconflictsignal_test", + srcs = ["mergeconflictsignal_test.go"], + embed = [":mergeconflictsignal"], + deps = [ + "//core/consumer", + "//entity/messagequeue", + "//extension/messagequeue/mock", + "//runway/core/topickey", + "//runway/entity", + "//submitqueue/core/topickey", + "//submitqueue/entity", + "//submitqueue/extension/storage/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal.go b/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal.go new file mode 100644 index 00000000..f1d16249 --- /dev/null +++ b/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal.go @@ -0,0 +1,219 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mergeconflictsignal consumes merge-conflict check results from runway's +// signal queue, correlates them to the request by the echoed id, and either +// advances the request to the batch stage (mergeable) or fails it (conflicted). +// Unlike buildsignal it is purely result-driven — runway pushes the result, so +// there is no poll loop or self-reschedule. +package mergeconflictsignal + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/metrics" + entityqueue "github.com/uber/submitqueue/entity/messagequeue" + runwayentity "github.com/uber/submitqueue/runway/entity" + corerequest "github.com/uber/submitqueue/submitqueue/core/request" + "github.com/uber/submitqueue/submitqueue/core/topickey" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// Controller handles mergeconflictsignal queue messages. Implements consumer.Controller. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + registry consumer.TopicRegistry + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new mergeconflictsignal controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + registry consumer.TopicRegistry, + topicKey consumer.TopicKey, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("mergeconflictsignal_controller"), + metricsScope: scope.SubScope("mergeconflictsignal_controller"), + store: store, + registry: registry, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process consumes a runway check result and advances or fails the request. +// Returns nil to ack, or error to nack/reject. +// +// A not-mergeable verdict is an expected outcome of the check, not a failure: +// the request is driven to terminal Error inline and the message is acked. Only +// infrastructure faults — deserialize, storage, the terminal transition, and the +// batch publish — return an error and reject to the DLQ, where the request is +// reconciled to Error. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + // The runway result carries full data (it crosses the service boundary). Its + // id is the request id echoed back, so correlate straight to the request. + result, err := runwayentity.MergeConflictCheckResultFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to deserialize merge conflict check result: %w", err) + } + + request, err := c.store.GetRequestStore().Get(ctx, result.ID) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get request %s: %w", result.ID, err) + } + + c.logger.Infow("received mergeconflict signal", + "request_id", request.ID, + "mergeable", result.Mergeable, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // Short-circuit halted requests: the cancel path owns driving them terminal. + if entity.IsRequestStateHalted(request.State) { + metrics.NamedCounter(c.metricsScope, opName, "skipped_halted", 1) + c.logger.Infow("skipping mergeconflict signal for halted request", + "request_id", request.ID, + "state", string(request.State), + ) + return nil + } + + if !result.Mergeable { + metrics.NamedCounter(c.metricsScope, opName, "not_mergeable", 1) + c.logger.Infow("request not mergeable", + "request_id", request.ID, + "reason", result.Reason, + ) + // Expected terminal outcome, not a failure: mark the request Error inline + // and ack. + if err := c.failRequest(ctx, request, result.Reason); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "fail_errors", 1) + return fmt.Errorf("failed to fail request %s: %w", request.ID, err) + } + return nil + } + + if err := c.publishRequestID(ctx, topickey.TopicKeyBatch, request.ID, request.Queue); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish to batch: %w", err) + } + + c.logger.Infow("published request to batch", + "request_id", request.ID, + "topic_key", topickey.TopicKeyBatch, + ) + + return nil // Success - message will be acked +} + +// failRequest drives the request to terminal RequestStateError and records the +// conflict reason on the request log. A not-mergeable verdict is an expected +// terminal outcome of the check, so the request is concluded here directly. +// +// Idempotent under at-least-once delivery: a redelivery whose request is already +// in Error skips the state CAS but still publishes the log (so a prior attempt +// that flipped the state but failed before logging is repaired); a request that +// reached a different terminal state (e.g. a racing cancel) is left untouched. +func (c *Controller) failRequest(ctx context.Context, request entity.Request, reason string) error { + switch { + case request.State == entity.RequestStateError: + // Idempotent retry: a prior delivery already wrote Error. Fall through to + // the log publish. + case entity.IsRequestStateTerminal(request.State): + c.logger.Warnw("request already in different terminal state, skipping fail", + "request_id", request.ID, + "state", string(request.State), + ) + return nil + default: + newVersion := request.Version + 1 + if err := c.store.GetRequestStore().UpdateState(ctx, request.ID, request.Version, newVersion, entity.RequestStateError); err != nil { + return fmt.Errorf("failed to update request %s state to error: %w", request.ID, err) + } + request.Version = newVersion + request.State = entity.RequestStateError + } + + logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusError, request.Version, reason, nil) + if err := corerequest.PublishLog(ctx, c.registry, logEntry, request.ID); err != nil { + return fmt.Errorf("failed to publish request log for %s: %w", request.ID, err) + } + return nil +} + +// publishRequestID publishes a request ID to the given topic key, partitioned by queue. +func (c *Controller) publishRequestID(ctx context.Context, key consumer.TopicKey, requestID string, partitionKey string) error { + payload, err := entity.RequestID{ID: requestID}.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize request ID: %w", err) + } + + msg := entityqueue.NewMessage(requestID, payload, partitionKey, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "mergeconflictsignal" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal_test.go b/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal_test.go new file mode 100644 index 00000000..d726d58b --- /dev/null +++ b/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal_test.go @@ -0,0 +1,170 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mergeconflictsignal + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" + entityqueue "github.com/uber/submitqueue/entity/messagequeue" + queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" + runwaytopickey "github.com/uber/submitqueue/runway/core/topickey" + runwayentity "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/submitqueue/core/topickey" + "github.com/uber/submitqueue/submitqueue/entity" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func resultPayload(t *testing.T, res runwayentity.MergeConflictCheckResult) []byte { + payload, err := res.ToBytes() + require.NoError(t, err) + return payload +} + +func newDelivery(ctrl *gomock.Controller, msg entityqueue.Message) *queuemock.MockDelivery { + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(msg).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + return d +} + +const ( + testRequestID = "test-queue/1" + testQueue = "test-queue" +) + +func TestProcess_MergeablePublishesToBatch(t *testing.T) { + ctrl := gomock.NewController(t) + + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), testRequestID).Return( + entity.Request{ID: testRequestID, Queue: testQueue, State: entity.RequestStateStarted, Version: 1}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + var gotTopic string + var gotPayload []byte + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: topickey.TopicKeyBatch, Name: "batch", Queue: q}}, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, + runwaytopickey.TopicKeyMergeConflictCheckSignal, "orchestrator-mergeconflictsignal") + + res := runwayentity.MergeConflictCheckResult{ID: testRequestID, Mergeable: true} + msg := entityqueue.NewMessage(testRequestID, resultPayload(t, res), testQueue, nil) + require.NoError(t, controller.Process(context.Background(), newDelivery(ctrl, msg))) + + assert.Equal(t, "batch", gotTopic) + rid, err := entity.RequestIDFromBytes(gotPayload) + require.NoError(t, err) + assert.Equal(t, testRequestID, rid.ID) +} + +func TestProcess_NotMergeableMarksRequestError(t *testing.T) { + ctrl := gomock.NewController(t) + + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), testRequestID).Return( + entity.Request{ID: testRequestID, Queue: testQueue, State: entity.RequestStateStarted, Version: 1}, nil) + // The request is driven to terminal Error inline (version 1 -> 2). + reqStore.EXPECT().UpdateState(gomock.Any(), testRequestID, int32(1), int32(2), entity.RequestStateError).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + // One publish is expected — the terminal log entry to the log topic. A publish + // to the batch topic would be a bug (gomock fails on the unexpected 2nd call). + var gotTopic string + var gotPayload []byte + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: topickey.TopicKeyBatch, Name: "batch", Queue: q}, + {Key: topickey.TopicKeyLog, Name: "log", Queue: q}, + }, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, + runwaytopickey.TopicKeyMergeConflictCheckSignal, "orchestrator-mergeconflictsignal") + + res := runwayentity.MergeConflictCheckResult{ID: testRequestID, Mergeable: false, Reason: "conflict in foo.go"} + msg := entityqueue.NewMessage(testRequestID, resultPayload(t, res), testQueue, nil) + // Not-mergeable is an expected terminal outcome, so Process acks (no error). + require.NoError(t, controller.Process(context.Background(), newDelivery(ctrl, msg))) + + // The single publish is the terminal log entry carrying the conflict reason. + assert.Equal(t, "log", gotTopic) + logEntry, err := entity.RequestLogFromBytes(gotPayload) + require.NoError(t, err) + assert.Equal(t, entity.RequestStatusError, logEntry.Status) + assert.Equal(t, int32(2), logEntry.RequestVersion) + assert.Equal(t, "conflict in foo.go", logEntry.LastError) +} + +func TestProcess_HaltedRequestSkips(t *testing.T) { + ctrl := gomock.NewController(t) + + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), testRequestID).Return( + entity.Request{ID: testRequestID, Queue: testQueue, State: entity.RequestStateCancelled, Version: 4}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + // No publish: gomock fails if a batch publish runs for a halted request. + pub := queuemock.NewMockPublisher(ctrl) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: topickey.TopicKeyBatch, Name: "batch", Queue: q}}, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, + runwaytopickey.TopicKeyMergeConflictCheckSignal, "orchestrator-mergeconflictsignal") + + res := runwayentity.MergeConflictCheckResult{ID: testRequestID, Mergeable: true} + msg := entityqueue.NewMessage(testRequestID, resultPayload(t, res), testQueue, nil) + require.NoError(t, controller.Process(context.Background(), newDelivery(ctrl, msg))) +} diff --git a/submitqueue/orchestrator/controller/validate/BUILD.bazel b/submitqueue/orchestrator/controller/validate/BUILD.bazel index 5c14f306..e1d9ebe9 100644 --- a/submitqueue/orchestrator/controller/validate/BUILD.bazel +++ b/submitqueue/orchestrator/controller/validate/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/changeprovider", - "//submitqueue/extension/mergechecker", "//submitqueue/extension/storage", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", @@ -34,8 +33,6 @@ go_test( "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/changeprovider/mock", - "//submitqueue/extension/mergechecker", - "//submitqueue/extension/mergechecker/mock", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/orchestrator/controller/validate/validate.go b/submitqueue/orchestrator/controller/validate/validate.go index f7a9e8a7..120737e1 100644 --- a/submitqueue/orchestrator/controller/validate/validate.go +++ b/submitqueue/orchestrator/controller/validate/validate.go @@ -28,21 +28,20 @@ import ( "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/changeprovider" - "github.com/uber/submitqueue/submitqueue/extension/mergechecker" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" ) // Controller handles validate queue messages. -// It consumes requests, performs validation checks (duplicate detection via the change store, -// merge conflicts, change metadata fetch), and publishes to the batch stage. Validation logic -// is extensible to support additional checks. Implements consumer.Controller. +// It consumes requests, performs local validation checks (duplicate detection via the change store +// and change metadata fetch), and publishes to the mergeconflict stage, which delegates the +// asynchronous merge-conflict check to runway. Validation logic is extensible to support additional +// checks. Implements consumer.Controller. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage registry consumer.TopicRegistry - mergeCheckers mergechecker.Factory changeProviders changeprovider.Factory topicKey consumer.TopicKey consumerGroup string @@ -57,7 +56,6 @@ func NewController( scope tally.Scope, store storage.Storage, registry consumer.TopicRegistry, - mergeCheckers mergechecker.Factory, changeProviders changeprovider.Factory, topicKey consumer.TopicKey, consumerGroup string, @@ -67,7 +65,6 @@ func NewController( metricsScope: scope.SubScope("validate_controller"), store: store, registry: registry, - mergeCheckers: mergeCheckers, changeProviders: changeProviders, topicKey: topicKey, consumerGroup: consumerGroup, @@ -75,7 +72,8 @@ func NewController( } // Process processes a validate delivery from the queue. -// Runs duplicate detection, merge-conflict check, change metadata fetch, then publishes to batch. +// Runs duplicate detection, change metadata fetch, and change claiming, then publishes to the +// mergeconflict stage (which delegates the async merge-conflict check to runway). // Returns nil to ack (success or non-retryable rejection), error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { op := coremetrics.Begin(c.metricsScope, "process") @@ -135,27 +133,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return errs.NewUserError(fmt.Errorf("request %s is a duplicate of in-flight request %s", request.ID, dupID)) } - // Merge conflict check - mergeChecker, err := c.mergeCheckers.For(mergechecker.Config{QueueName: request.Queue}) - if err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "merge_check_errors", 1) - return fmt.Errorf("failed to build merge checker for queue %s: %w", request.Queue, err) - } - mergeResult, err := mergeChecker.Check(ctx, request) - if err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "merge_check_errors", 1) - return fmt.Errorf("merge check failed: %w", err) - } - if !mergeResult.Mergeable { - c.logger.Infow("request not mergeable", - "request_id", request.ID, - "queue", request.Queue, - "reason", mergeResult.Reason, - ) - coremetrics.NamedCounter(c.metricsScope, "process", "not_mergeable", 1) - return errs.NewUserError(fmt.Errorf("request %s is not mergeable: %s", request.ID, mergeResult.Reason)) - } - // Fetch change metadata changeProvider, err := c.changeProviders.For(changeprovider.Config{QueueName: request.Queue}) if err != nil { @@ -184,15 +161,16 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to claim change records for request %s: %w", request.ID, err) } - // Publish to batch topic - if err := c.publish(ctx, topickey.TopicKeyBatch, request.ID, request.Queue); err != nil { + // Publish to the mergeconflict stage, which records the check and delegates + // the asynchronous merge-conflict validation to runway. + if err := c.publish(ctx, topickey.TopicKeyMergeConflict, request.ID, request.Queue); err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "publish_errors", 1) - return fmt.Errorf("failed to publish to batch: %w", err) + return fmt.Errorf("failed to publish to mergeconflict: %w", err) } - c.logger.Infow("published request to batch", + c.logger.Infow("published request to mergeconflict", "request_id", request.ID, - "topic_key", topickey.TopicKeyBatch, + "topic_key", topickey.TopicKeyMergeConflict, ) return nil // Success - message will be acked diff --git a/submitqueue/orchestrator/controller/validate/validate_test.go b/submitqueue/orchestrator/controller/validate/validate_test.go index d72d5e0b..05f837d5 100644 --- a/submitqueue/orchestrator/controller/validate/validate_test.go +++ b/submitqueue/orchestrator/controller/validate/validate_test.go @@ -31,8 +31,6 @@ import ( "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" changeprovidermock "github.com/uber/submitqueue/submitqueue/extension/changeprovider/mock" - "github.com/uber/submitqueue/submitqueue/extension/mergechecker" - mergecheckermock "github.com/uber/submitqueue/submitqueue/extension/mergechecker/mock" "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" @@ -66,13 +64,6 @@ func (m *mockChangeProvider) Get(ctx context.Context, request entity.Request) ([ }, nil } -// newMergeableMock returns a mock MergeChecker that always returns mergeable. -func newMergeableMock(ctrl *gomock.Controller) *mergecheckermock.MockMergeChecker { - mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any()).Return(entity.MergeResult{Mergeable: true}, nil).AnyTimes() - return mc -} - // newMockStorage creates a MockStorage with a MockRequestStore that returns the given request on Get. // The returned MockRequestStore is exposed so individual tests can layer additional Get expectations. func newMockStorage(ctrl *gomock.Controller, request entity.Request) (*storagemock.MockStorage, *storagemock.MockRequestStore) { @@ -100,7 +91,6 @@ func newTestController( ctrl *gomock.Controller, store *storagemock.MockStorage, cs *storagemock.MockChangeStore, - mc mergechecker.MergeChecker, publishErr error, ) *Controller { logger := zaptest.NewLogger(t).Sugar() @@ -119,23 +109,19 @@ func newTestController( mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: topickey.TopicKeyBatch, Name: "batch", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyMergeConflict, Name: "mergeconflict", Queue: mockQ}}, ) require.NoError(t, err) cp := &mockChangeProvider{} - - mcFactory := mergecheckermock.NewMockFactory(ctrl) - mcFactory.EXPECT().For(gomock.Any()).Return(mc, nil).AnyTimes() cpFactory := changeprovidermock.NewMockFactory(ctrl) cpFactory.EXPECT().For(gomock.Any()).Return(cp, nil).AnyTimes() - return NewController(logger, scope, store, registry, mcFactory, cpFactory, topickey.TopicKeyValidate, "orchestrator-validate") + return NewController(logger, scope, store, registry, cpFactory, topickey.TopicKeyValidate, "orchestrator-validate") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -145,7 +131,7 @@ func TestNewController(t *testing.T) { Version: 1, } store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) require.NotNil(t, controller) assert.Equal(t, topickey.TopicKeyValidate, controller.TopicKey()) @@ -155,7 +141,6 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) request := entity.Request{ ID: "test-queue/123", @@ -166,7 +151,7 @@ func TestController_Process_Success(t *testing.T) { Version: 1, } store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) msg := entityqueue.NewMessage("test-queue/123", requestIDPayload(t, request.ID), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -176,12 +161,62 @@ func TestController_Process_Success(t *testing.T) { require.NoError(t, controller.Process(context.Background(), delivery)) } +// TestController_Process_PublishesToMergeConflict verifies the request ID is +// published to the mergeconflict stage (not batch) on the happy path. +func TestController_Process_PublishesToMergeConflict(t *testing.T) { + ctrl := gomock.NewController(t) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: change.Change{URIs: []string{"github://uber/service/pull/456/abcdef0123456789abcdef0123456789abcdef01"}}, + LandStrategy: mergestrategy.MergeStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } + store, _ := newMockStorage(ctrl, request) + store.EXPECT().GetChangeStore().Return(newMockChangeStore(ctrl)).AnyTimes() + + logger := zaptest.NewLogger(t).Sugar() + + var gotTopic string + var gotPayload []byte + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: topickey.TopicKeyMergeConflict, Name: "mergeconflict", Queue: mockQ}}, + ) + require.NoError(t, err) + cpFactory := changeprovidermock.NewMockFactory(ctrl) + cpFactory.EXPECT().For(gomock.Any()).Return(&mockChangeProvider{}, nil).AnyTimes() + + controller := NewController(logger, tally.NoopScope, store, registry, cpFactory, topickey.TopicKeyValidate, "orchestrator-validate") + + msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.NoError(t, controller.Process(context.Background(), delivery)) + assert.Equal(t, "mergeconflict", gotTopic) + rid, err := entity.RequestIDFromBytes(gotPayload) + require.NoError(t, err) + assert.Equal(t, request.ID, rid.ID) +} + // TestController_Process_ClaimsChangeRecordsWithDetails verifies that, on the happy // path, validate creates a change record per fetched change, capturing the provider // details in a single immutable Create. func TestController_Process_ClaimsChangeRecordsWithDetails(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) // The request's URI matches the URI the mock change provider returns, so the // claim carries that change's details. @@ -214,7 +249,7 @@ func TestController_Process_ClaimsChangeRecordsWithDetails(t *testing.T) { }, ) - controller := newTestController(t, ctrl, store, cs, mc, nil) + controller := newTestController(t, ctrl, store, cs, nil) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -226,14 +261,13 @@ func TestController_Process_ClaimsChangeRecordsWithDetails(t *testing.T) { func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(entity.Request{}, fmt.Errorf("db connection lost")) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) msg := entityqueue.NewMessage("test-queue/123", requestIDPayload(t, "test-queue/123"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -247,7 +281,6 @@ func TestController_Process_StorageFailure(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) request := entity.Request{ ID: "test-queue/123", @@ -259,7 +292,7 @@ func TestController_Process_PublishFailure(t *testing.T) { } store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), fmt.Errorf("publish failed")) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -271,68 +304,13 @@ func TestController_Process_PublishFailure(t *testing.T) { func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) request := entity.Request{ID: "test-queue/123", Queue: "test-queue"} store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) var _ consumer.Controller = controller } -func TestController_Process_NotMergeable(t *testing.T) { - ctrl := gomock.NewController(t) - - mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any()).Return(entity.MergeResult{Mergeable: false}, nil) - - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: change.Change{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}, - LandStrategy: mergestrategy.MergeStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) - - msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err := controller.Process(context.Background(), delivery) - require.Error(t, err) - assert.True(t, errs.IsUserError(err)) -} - -func TestController_Process_MergeCheckError(t *testing.T) { - ctrl := gomock.NewController(t) - - mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any()).Return(entity.MergeResult{}, fmt.Errorf("merge check failed")) - - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: change.Change{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}, - LandStrategy: mergestrategy.MergeStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) - - msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err := controller.Process(context.Background(), delivery) - require.Error(t, err) - assert.False(t, errs.IsRetryable(err)) -} - func TestController_Process_DuplicateDetection(t *testing.T) { const ( queueName = "test-queue" @@ -356,7 +334,7 @@ func TestController_Process_DuplicateDetection(t *testing.T) { wantUnexpected bool }{ { - name: "no overlap proceeds to merge check", + name: "no overlap proceeds", byURI: map[string][]entity.ChangeRecord{uriA: nil}, }, { @@ -445,7 +423,6 @@ func TestController_Process_DuplicateDetection(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) uris := tt.requestURIs if uris == nil { @@ -485,7 +462,7 @@ func TestController_Process_DuplicateDetection(t *testing.T) { // and claims each fetched change via Create. Accept any Create. cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - controller := newTestController(t, ctrl, store, cs, mc, nil) + controller := newTestController(t, ctrl, store, cs, nil) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -510,7 +487,6 @@ func TestController_Process_DuplicateDetection(t *testing.T) { func TestController_Process_ChangeStoreQueryFailure(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) request := entity.Request{ ID: "test-queue/123", @@ -525,7 +501,7 @@ func TestController_Process_ChangeStoreQueryFailure(t *testing.T) { cs := storagemock.NewMockChangeStore(ctrl) cs.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("change store down")) - controller := newTestController(t, ctrl, store, cs, mc, nil) + controller := newTestController(t, ctrl, store, cs, nil) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -540,8 +516,8 @@ func TestController_Process_ChangeStoreQueryFailure(t *testing.T) { // A request already in a terminal state (e.g. cancelled while the validate // message was in flight) must be short-circuited before any extension is // touched and before any publish happens. We verify this by registering a -// merge checker and change store with NO expectations — gomock fails the test -// if either is called — and a publisher that returns an error if invoked. +// change store with NO expectations — gomock fails the test if it is called — +// and a publisher that returns an error if invoked. func TestController_Process_TerminalShortCircuit(t *testing.T) { for _, state := range []entity.RequestState{ entity.RequestStateCancelled, @@ -559,13 +535,12 @@ func TestController_Process_TerminalShortCircuit(t *testing.T) { } store, _ := newMockStorage(ctrl, request) - // No EXPECTs on merge checker or change store: gomock will fail if either is called. - mc := mergecheckermock.NewMockMergeChecker(ctrl) + // No EXPECTs on change store: gomock will fail if it is called. cs := storagemock.NewMockChangeStore(ctrl) // Sentinel publish error: if Process publishes, it returns a non-nil err, // which the require.NoError below will catch. - controller := newTestController(t, ctrl, store, cs, mc, fmt.Errorf("should not publish")) + controller := newTestController(t, ctrl, store, cs, fmt.Errorf("should not publish")) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl)