diff --git a/docs/superpowers/plans/2026-06-08-307-slow-start-batching.md b/docs/superpowers/plans/2026-06-08-307-slow-start-batching.md new file mode 100644 index 00000000..f59e72c9 --- /dev/null +++ b/docs/superpowers/plans/2026-06-08-307-slow-start-batching.md @@ -0,0 +1,749 @@ +# Slow-Start Batching Primitive (#307) Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build a standalone, pure `slowStartBatch` primitive that spawns N children in geometrically growing batches (1→2→4→8), halts on the first batch failure, and classifies failures into halt vs throttle-with-backoff. + +**Architecture:** One cohesive module `src/core/slow-start-batch.ts` exporting the runner, a `BatchStrategy` config type (canonical home for `spec.batchStrategy`) with validation, a bounded-backoff helper, an injectable failure classifier, and a `grove_spawn_batch_size` metric observer. No live wiring — the fan-out driver that calls this is owned by later issues (#306/#336/#337/#358). + +**Tech Stack:** TypeScript (ESM, `.js` import specifiers), `bun:test`, biome, tsc. Extends the existing `GroveError` hierarchy (`src/core/errors.ts`) and reuses `AdmissionRejectError` (`src/core/admission/errors.ts`). + +**Spec:** `docs/superpowers/specs/2026-06-08-307-slow-start-batching-design.md` + +--- + +## File Structure + +- **Create** `src/core/slow-start-batch.ts` — all types, validation, classifier, backoff helper, and the runner. Single file because `BatchStrategy` is the configuration for `slowStartBatch`; they are tightly coupled and small. +- **Create** `src/core/slow-start-batch.test.ts` — unit tests (`bun:test`), grown task-by-task. +- **Modify** `src/core/index.ts` — re-export the public surface (final task). + +## Conventions for every task + +- Tests: `import { describe, expect, test } from "bun:test";` and import from `"./slow-start-batch.js"` (ESM `.js` specifier even though the source is `.ts`). +- Run a single test file: `bun test src/core/slow-start-batch.test.ts` +- Commit with `git commit --no-verify` (worktree pre-commit biome hangs on full-repo; we lint targeted instead). +- Format/lint a single file: `biome check --write src/core/slow-start-batch.ts` + +--- + +### Task 1: BatchStrategy config + validation + +**Files:** +- Create: `src/core/slow-start-batch.ts` +- Test: `src/core/slow-start-batch.test.ts` + +- [ ] **Step 1: Write the failing test** + +Create `src/core/slow-start-batch.test.ts`: + +```ts +import { describe, expect, test } from "bun:test"; +import { normalizeBatchStrategy } from "./slow-start-batch.js"; + +describe("normalizeBatchStrategy", () => { + test("applies defaults for empty input", () => { + const s = normalizeBatchStrategy(); + expect(s.initialBatchSize).toBe(1); + expect(s.multiplier).toBe(2); + expect(s.maxBatchSize).toBe(Number.POSITIVE_INFINITY); + expect(s.backoff).toEqual({ baseMs: 1000, multiplier: 2, maxMs: 30_000 }); + }); + + test("passes through provided values", () => { + const s = normalizeBatchStrategy({ + initialBatchSize: 2, + multiplier: 3, + maxBatchSize: 16, + backoff: { baseMs: 500, multiplier: 4, maxMs: 5000 }, + }); + expect(s.initialBatchSize).toBe(2); + expect(s.multiplier).toBe(3); + expect(s.maxBatchSize).toBe(16); + expect(s.backoff).toEqual({ baseMs: 500, multiplier: 4, maxMs: 5000 }); + }); + + test("rejects invalid input with RangeError", () => { + expect(() => normalizeBatchStrategy({ initialBatchSize: 0 })).toThrow(RangeError); + expect(() => normalizeBatchStrategy({ initialBatchSize: 1.5 })).toThrow(RangeError); + expect(() => normalizeBatchStrategy({ multiplier: 0.5 })).toThrow(RangeError); + expect(() => normalizeBatchStrategy({ maxBatchSize: 4, initialBatchSize: 8 })).toThrow( + RangeError, + ); + expect(() => normalizeBatchStrategy({ backoff: { maxMs: 100, baseMs: 1000 } })).toThrow( + RangeError, + ); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `bun test src/core/slow-start-batch.test.ts` +Expected: FAIL — `Cannot find module './slow-start-batch.js'`. + +- [ ] **Step 3: Write minimal implementation** + +Create `src/core/slow-start-batch.ts`: + +```ts +import { AdmissionRejectError } from "./admission/errors.js"; +import { GroveError } from "./errors.js"; + +export interface BackoffStrategy { + readonly baseMs?: number; + readonly multiplier?: number; + readonly maxMs?: number; +} + +export interface BatchStrategy { + readonly initialBatchSize?: number; + readonly multiplier?: number; + readonly maxBatchSize?: number; + readonly backoff?: BackoffStrategy; +} + +export interface NormalizedBatchStrategy { + readonly initialBatchSize: number; + readonly multiplier: number; + readonly maxBatchSize: number; // Number.POSITIVE_INFINITY when unbounded + readonly backoff: Required; +} + +const DEFAULT_INITIAL_BATCH_SIZE = 1; +const DEFAULT_MULTIPLIER = 2; +const DEFAULT_BACKOFF_BASE_MS = 1000; +const DEFAULT_BACKOFF_MULTIPLIER = 2; +const DEFAULT_BACKOFF_MAX_MS = 30_000; + +export function normalizeBatchStrategy(input?: BatchStrategy): NormalizedBatchStrategy { + const initialBatchSize = input?.initialBatchSize ?? DEFAULT_INITIAL_BATCH_SIZE; + const multiplier = input?.multiplier ?? DEFAULT_MULTIPLIER; + const maxBatchSize = input?.maxBatchSize ?? Number.POSITIVE_INFINITY; + const baseMs = input?.backoff?.baseMs ?? DEFAULT_BACKOFF_BASE_MS; + const backoffMultiplier = input?.backoff?.multiplier ?? DEFAULT_BACKOFF_MULTIPLIER; + const maxMs = input?.backoff?.maxMs ?? DEFAULT_BACKOFF_MAX_MS; + + requirePositiveInt(initialBatchSize, "initialBatchSize"); + requireFiniteMin(multiplier, 1, "multiplier"); + if (maxBatchSize !== Number.POSITIVE_INFINITY) { + requirePositiveInt(maxBatchSize, "maxBatchSize"); + if (maxBatchSize < initialBatchSize) { + throw new RangeError( + `maxBatchSize (${maxBatchSize}) must be >= initialBatchSize (${initialBatchSize})`, + ); + } + } + requirePositiveInt(baseMs, "backoff.baseMs"); + requireFiniteMin(backoffMultiplier, 1, "backoff.multiplier"); + requirePositiveInt(maxMs, "backoff.maxMs"); + if (maxMs < baseMs) { + throw new RangeError(`backoff.maxMs (${maxMs}) must be >= backoff.baseMs (${baseMs})`); + } + + return { + initialBatchSize, + multiplier, + maxBatchSize, + backoff: { baseMs, multiplier: backoffMultiplier, maxMs }, + }; +} + +function requirePositiveInt(value: number, name: string): void { + if (!Number.isInteger(value) || value < 1) { + throw new RangeError(`${name} must be an integer >= 1 (got ${value})`); + } +} + +function requireFiniteMin(value: number, min: number, name: string): void { + if (!Number.isFinite(value) || value < min) { + throw new RangeError(`${name} must be a finite number >= ${min} (got ${value})`); + } +} +``` + +> Note: `AdmissionRejectError` / `GroveError` are imported now to keep import order stable; they are used in Task 3. Biome may warn about unused imports until then — if it does, add the imports in Task 3 instead and remove them here. + +- [ ] **Step 4: Run test to verify it passes** + +Run: `bun test src/core/slow-start-batch.test.ts` +Expected: PASS (3 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/core/slow-start-batch.ts src/core/slow-start-batch.test.ts +git commit --no-verify -m "feat(orchestration): #307 BatchStrategy config + validation" +``` + +--- + +### Task 2: Bounded backoff helper + +**Files:** +- Modify: `src/core/slow-start-batch.ts` +- Test: `src/core/slow-start-batch.test.ts` + +- [ ] **Step 1: Write the failing test** + +Append to `src/core/slow-start-batch.test.ts`: + +```ts +import { computeBackoffMs } from "./slow-start-batch.js"; + +describe("computeBackoffMs", () => { + const backoff = { baseMs: 1000, multiplier: 2, maxMs: 30_000 } as const; + + test("attempt 0 returns baseMs", () => { + expect(computeBackoffMs(0, backoff)).toBe(1000); + }); + + test("grows geometrically", () => { + expect(computeBackoffMs(1, backoff)).toBe(2000); + expect(computeBackoffMs(2, backoff)).toBe(4000); + expect(computeBackoffMs(3, backoff)).toBe(8000); + }); + + test("clamps to maxMs", () => { + expect(computeBackoffMs(10, backoff)).toBe(30_000); + }); + + test("guards non-positive / non-finite attempts to baseMs", () => { + expect(computeBackoffMs(-5, backoff)).toBe(1000); + expect(computeBackoffMs(Number.NaN, backoff)).toBe(1000); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `bun test src/core/slow-start-batch.test.ts` +Expected: FAIL — `computeBackoffMs is not a function` / not exported. + +- [ ] **Step 3: Write minimal implementation** + +Append to `src/core/slow-start-batch.ts`: + +```ts +export function computeBackoffMs(attempt: number, backoff: Required): number { + const n = Number.isFinite(attempt) && attempt > 0 ? Math.floor(attempt) : 0; + const raw = backoff.baseMs * backoff.multiplier ** n; + return Math.min(raw, backoff.maxMs); +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `bun test src/core/slow-start-batch.test.ts` +Expected: PASS (Task 1 + Task 2 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/core/slow-start-batch.ts src/core/slow-start-batch.test.ts +git commit --no-verify -m "feat(orchestration): #307 bounded exponential backoff helper" +``` + +--- + +### Task 3: Failure classification + +**Files:** +- Modify: `src/core/slow-start-batch.ts` +- Test: `src/core/slow-start-batch.test.ts` + +- [ ] **Step 1: Write the failing test** + +Append to `src/core/slow-start-batch.test.ts`: + +```ts +import { AdmissionRejectError } from "./admission/errors.js"; +import { RuntimeUnavailableError, defaultFailureClassifier } from "./slow-start-batch.js"; + +describe("defaultFailureClassifier", () => { + test("classifies AdmissionRejectError as admission", () => { + const err = new AdmissionRejectError({ + ruleName: "max-fanout", + ruleType: "concurrency", + reason: "capacity exhausted", + }); + expect(defaultFailureClassifier(err)).toBe("admission"); + }); + + test("classifies RuntimeUnavailableError as backpressure", () => { + expect(defaultFailureClassifier(new RuntimeUnavailableError("runtime down"))).toBe( + "backpressure", + ); + }); + + test("classifies anything else as task", () => { + expect(defaultFailureClassifier(new Error("bad prompt"))).toBe("task"); + expect(defaultFailureClassifier("oops")).toBe("task"); + }); + + test("RuntimeUnavailableError carries a reason", () => { + const err = new RuntimeUnavailableError("runtime down", { reason: "provider-pressure" }); + expect(err.reason).toBe("provider-pressure"); + expect(err).toBeInstanceOf(Error); + }); +}); +``` + +> The `ruleType: "concurrency"` value must be a member of `NormalizedAdmissionRule["type"]`. If `bun test` reports a type error on that literal, open `src/core/admission/types.ts`, pick any valid member of that union, and use it — the classifier only checks `instanceof`, not the rule type. + +- [ ] **Step 2: Run test to verify it fails** + +Run: `bun test src/core/slow-start-batch.test.ts` +Expected: FAIL — `RuntimeUnavailableError` / `defaultFailureClassifier` not exported. + +- [ ] **Step 3: Write minimal implementation** + +Append to `src/core/slow-start-batch.ts`: + +```ts +export type FailureClass = "task" | "backpressure" | "admission"; + +export class RuntimeUnavailableError extends GroveError { + readonly reason: string; + + constructor(message: string, options?: { reason?: string }) { + super(message); + this.name = "RuntimeUnavailableError"; + this.reason = options?.reason ?? message; + } +} + +export interface ClassifiedFailure { + readonly index: number; // index into the input items array + readonly batchIndex: number; // 0-based batch the item was in + readonly class: FailureClass; + readonly reason: string; + readonly error: unknown; +} + +export type FailureClassifier = (error: unknown) => FailureClass; + +export const defaultFailureClassifier: FailureClassifier = (error) => { + if (error instanceof AdmissionRejectError) return "admission"; + if (error instanceof RuntimeUnavailableError) return "backpressure"; + return "task"; +}; + +function failureReason(error: unknown): string { + if (error instanceof AdmissionRejectError) return error.reason; + if (error instanceof RuntimeUnavailableError) return error.reason; + if (error instanceof Error) return error.message; + return String(error); +} +``` + +> `failureReason` is unused until Task 4. If biome flags it, proceed — Task 4 consumes it. (Do not delete it.) + +- [ ] **Step 4: Run test to verify it passes** + +Run: `bun test src/core/slow-start-batch.test.ts` +Expected: PASS (Tasks 1–3 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/core/slow-start-batch.ts src/core/slow-start-batch.test.ts +git commit --no-verify -m "feat(orchestration): #307 failure classification + RuntimeUnavailableError" +``` + +--- + +### Task 4: slowStartBatch runner — success paths + metric + +**Files:** +- Modify: `src/core/slow-start-batch.ts` +- Test: `src/core/slow-start-batch.test.ts` + +- [ ] **Step 1: Write the failing test** + +Append to `src/core/slow-start-batch.test.ts`: + +```ts +import { + type SpawnBatchMetric, + normalizeBatchStrategy as norm, + slowStartBatch, +} from "./slow-start-batch.js"; + +describe("slowStartBatch — success", () => { + test("doubling sequence 1,2,4,8 over 15 items", async () => { + const batches: number[][] = []; + let current: number[] = []; + const calls: number[] = []; + // Record which items landed in which batch by observing onSpawnBatch boundaries. + const items = Array.from({ length: 15 }, (_, i) => i); + const sizes: number[] = []; + const result = await slowStartBatch( + items, + async (item) => { + calls.push(item); + current.push(item); + }, + norm(), + { + onSpawnBatch: (m: SpawnBatchMetric) => { + sizes.push(m.batchSize); + batches.push(current); + current = []; + }, + }, + ); + expect(sizes).toEqual([1, 2, 4, 8]); + expect(result.outcome).toBe("completed"); + expect(result.succeeded).toBe(15); + expect(result.attempted).toBe(15); + expect(result.failures).toEqual([]); + expect(calls.sort((a, b) => a - b)).toEqual(items); + }); + + test("empty input completes with no spawn/metric calls", async () => { + let spawned = 0; + let metrics = 0; + const result = await slowStartBatch( + [], + async () => { + spawned += 1; + }, + norm(), + { onSpawnBatch: () => { metrics += 1; } }, + ); + expect(result.outcome).toBe("completed"); + expect(result.attempted).toBe(0); + expect(spawned).toBe(0); + expect(metrics).toBe(0); + }); + + test("maxBatchSize caps growth: cap 3 over 15 items", async () => { + const sizes: number[] = []; + await slowStartBatch( + Array.from({ length: 15 }, (_, i) => i), + async () => {}, + norm({ maxBatchSize: 3 }), + { onSpawnBatch: (m) => sizes.push(m.batchSize) }, + ); + expect(sizes).toEqual([1, 2, 3, 3, 3, 3]); + }); + + test("metric reports succeeded/failed and forwards taskGroupId", async () => { + const seen: SpawnBatchMetric[] = []; + await slowStartBatch( + [1, 2], + async () => {}, + norm({ initialBatchSize: 2 }), + { taskGroupId: "tg-1", onSpawnBatch: (m) => seen.push(m) }, + ); + expect(seen).toHaveLength(1); + expect(seen[0]).toMatchObject({ + taskGroupId: "tg-1", + batchIndex: 0, + batchSize: 2, + attempted: 2, + succeeded: 2, + failed: 0, + }); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `bun test src/core/slow-start-batch.test.ts` +Expected: FAIL — `slowStartBatch` not exported. + +- [ ] **Step 3: Write minimal implementation** + +Append to `src/core/slow-start-batch.ts`: + +```ts +export interface SpawnBatchMetric { + readonly taskGroupId?: string | undefined; + readonly batchIndex: number; + readonly batchSize: number; // == grove_spawn_batch_size + readonly attempted: number; // items dispatched this batch (== batchSize) + readonly succeeded: number; + readonly failed: number; +} + +export type SpawnBatchObserver = (metric: SpawnBatchMetric) => void; + +export interface SlowStartHooks { + readonly onSpawnBatch?: SpawnBatchObserver | undefined; + readonly classify?: FailureClassifier | undefined; + readonly taskGroupId?: string | undefined; +} + +export type SlowStartOutcome = "completed" | "halted" | "throttled"; + +export interface SlowStartResult { + readonly outcome: SlowStartOutcome; + readonly attempted: number; + readonly succeeded: number; + readonly failures: readonly ClassifiedFailure[]; + readonly retryAfterMs?: number | undefined; +} + +export async function slowStartBatch( + items: readonly T[], + spawn: (item: T, batchIndex: number) => Promise, + strategy: NormalizedBatchStrategy, + hooks?: SlowStartHooks, +): Promise { + const classify = hooks?.classify ?? defaultFailureClassifier; + let attempted = 0; + let succeeded = 0; + let offset = 0; + let batchIndex = 0; + let size = Math.min(strategy.initialBatchSize, items.length - offset); + + while (size > 0) { + const batch = items.slice(offset, offset + size); + const settled = await Promise.allSettled( + // async wrapper converts a synchronous throw into a rejection + batch.map(async (item) => spawn(item, batchIndex)), + ); + attempted += size; + + const failures: ClassifiedFailure[] = []; + let batchSucceeded = 0; + settled.forEach((res, i) => { + if (res.status === "fulfilled") { + batchSucceeded += 1; + } else { + failures.push({ + index: offset + i, + batchIndex, + class: classify(res.reason), + reason: failureReason(res.reason), + error: res.reason, + }); + } + }); + succeeded += batchSucceeded; + + hooks?.onSpawnBatch?.({ + taskGroupId: hooks.taskGroupId, + batchIndex, + batchSize: size, + attempted: size, + succeeded: batchSucceeded, + failed: failures.length, + }); + + if (failures.length > 0) { + const anyTerminal = failures.some((f) => f.class === "task" || f.class === "admission"); + if (anyTerminal) { + return { outcome: "halted", attempted, succeeded, failures }; + } + return { + outcome: "throttled", + attempted, + succeeded, + failures, + retryAfterMs: computeBackoffMs(0, strategy.backoff), + }; + } + + offset += size; + batchIndex += 1; + const remaining = items.length - offset; + size = Math.min(Math.floor(size * strategy.multiplier), strategy.maxBatchSize, remaining); + } + + return { outcome: "completed", attempted, succeeded, failures: [] }; +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `bun test src/core/slow-start-batch.test.ts` +Expected: PASS (Tasks 1–4 tests). + +- [ ] **Step 5: Commit** + +```bash +git add src/core/slow-start-batch.ts src/core/slow-start-batch.test.ts +git commit --no-verify -m "feat(orchestration): #307 slowStartBatch runner + grove_spawn_batch_size metric" +``` + +--- + +### Task 5: slowStartBatch runner — failure paths + +**Files:** +- Modify: `src/core/slow-start-batch.test.ts` (tests only; runner already complete) + +- [ ] **Step 1: Write the failing test** + +Append to `src/core/slow-start-batch.test.ts`: + +```ts +describe("slowStartBatch — failure handling", () => { + test("task failure in batch 1 halts; later batches never fire", async () => { + const spawned: number[] = []; + const result = await slowStartBatch( + Array.from({ length: 15 }, (_, i) => i), + async (item) => { + spawned.push(item); + if (item === 0) throw new Error("bad prompt"); + }, + norm(), + ); + expect(result.outcome).toBe("halted"); + expect(result.attempted).toBe(1); + expect(result.succeeded).toBe(0); + expect(spawned).toEqual([0]); // only batch 1 (size 1) ran + expect(result.failures).toHaveLength(1); + expect(result.failures[0]).toMatchObject({ index: 0, batchIndex: 0, class: "task" }); + }); + + test("RuntimeUnavailable in batch 1 throttles with backoff; no batch 2", async () => { + const spawned: number[] = []; + const result = await slowStartBatch( + Array.from({ length: 15 }, (_, i) => i), + async (item) => { + spawned.push(item); + if (item === 0) throw new RuntimeUnavailableError("runtime down"); + }, + norm(), + ); + expect(result.outcome).toBe("throttled"); + expect(result.retryAfterMs).toBe(1000); // computeBackoffMs(0) === baseMs + expect(spawned).toEqual([0]); + expect(result.failures[0]).toMatchObject({ class: "backpressure" }); + }); + + test("AdmissionReject halts and surfaces the actionable reason", async () => { + const result = await slowStartBatch( + [0], + async () => { + throw new AdmissionRejectError({ + ruleName: "max-fanout", + ruleType: "concurrency", + reason: "capacity exhausted", + }); + }, + norm(), + ); + expect(result.outcome).toBe("halted"); + expect(result.failures[0]).toMatchObject({ class: "admission", reason: "capacity exhausted" }); + }); + + test("terminal wins on a mixed batch (backpressure + task)", async () => { + const result = await slowStartBatch( + [0, 1], + async (item) => { + if (item === 0) throw new RuntimeUnavailableError("runtime down"); + if (item === 1) throw new Error("bad prompt"); + }, + norm({ initialBatchSize: 2 }), + ); + expect(result.outcome).toBe("halted"); // not throttled + expect(result.failures).toHaveLength(2); + }); + + test("custom classifier overrides the default", async () => { + const result = await slowStartBatch( + [0], + async () => { + throw new Error("treat-me-as-backpressure"); + }, + norm(), + { classify: () => "backpressure" }, + ); + expect(result.outcome).toBe("throttled"); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it passes immediately** + +Run: `bun test src/core/slow-start-batch.test.ts` +Expected: PASS — the runner from Task 4 already implements these paths. (This task is the failure-path test contract; if any test fails, fix the runner in `slow-start-batch.ts`, not the test.) + +- [ ] **Step 3: Commit** + +```bash +git add src/core/slow-start-batch.test.ts src/core/slow-start-batch.ts +git commit --no-verify -m "test(orchestration): #307 slowStartBatch halt/throttle/terminal-wins coverage" +``` + +--- + +### Task 6: Public exports + full verification + +**Files:** +- Modify: `src/core/index.ts` +- Verify: whole-project typecheck, targeted lint, module tests + +- [ ] **Step 1: Add the barrel exports** + +In `src/core/index.ts`, add (near the other `./task-controller.js` / `./agent-task.js` exports, following the existing split between value exports and `export type`): + +```ts +export { + computeBackoffMs, + defaultFailureClassifier, + normalizeBatchStrategy, + RuntimeUnavailableError, + slowStartBatch, +} from "./slow-start-batch.js"; +export type { + BackoffStrategy, + BatchStrategy, + ClassifiedFailure, + FailureClass, + FailureClassifier, + NormalizedBatchStrategy, + SlowStartHooks, + SlowStartOutcome, + SlowStartResult, + SpawnBatchMetric, + SpawnBatchObserver, +} from "./slow-start-batch.js"; +``` + +- [ ] **Step 2: Typecheck the whole project** + +Run: `tsc --noEmit` +Expected: exit 0, no errors. (If `RuntimeUnavailableError` is reported as both a value and type export collision, keep it only in the value-export block above — it is a class.) + +- [ ] **Step 3: Lint the touched files (targeted — do NOT run full-repo biome in a worktree)** + +Run: `biome check --write src/core/slow-start-batch.ts src/core/slow-start-batch.test.ts src/core/index.ts` +Expected: no remaining diagnostics (auto-fixes import ordering). + +- [ ] **Step 4: Run the module tests once more** + +Run: `bun test src/core/slow-start-batch.test.ts` +Expected: PASS — all tests across Tasks 1–5. + +- [ ] **Step 5: Commit** + +```bash +git add src/core/index.ts src/core/slow-start-batch.ts src/core/slow-start-batch.test.ts +git commit --no-verify -m "feat(orchestration): #307 export slow-start batching primitive from core barrel" +``` + +--- + +## Self-Review + +**Spec coverage:** +- `slowStartBatch` geometric runner + halt-on-first-batch-failure → Tasks 4, 5. ✓ +- `BatchStrategy` / `normalizeBatchStrategy` (`spec.batchStrategy` home) → Task 1. ✓ +- `computeBackoffMs` bounded backoff → Task 2. ✓ +- Failure taxonomy `task|backpressure|admission` + injectable classifier + `RuntimeUnavailableError` → Tasks 3, 5. ✓ +- `grove_spawn_batch_size` via `onSpawnBatch` observer → Task 4. ✓ +- Acceptance: failure in first batch stops later batches → Task 5 test 1. ✓ +- Acceptance: `RuntimeUnavailable` → throttled, no batch 2 → Task 5 test 2. ✓ +- Acceptance: admission terminal → halt + reason → Task 5 test 3. ✓ +- Out of scope (no TaskGroup entity, no live wiring, no exporter) → honored; only `src/core/slow-start-batch.ts` + barrel export touched. ✓ + +**Placeholder scan:** No TBD/TODO; every step has concrete code or an exact command. ✓ + +**Type consistency:** `NormalizedBatchStrategy.backoff` is `Required`, matching `computeBackoffMs`'s second param. `SlowStartResult.retryAfterMs` set only on `throttled`. `ClassifiedFailure` shape identical across definition (Task 3) and usage (Task 4). Metric field names (`batchSize`/`attempted`/`succeeded`/`failed`/`taskGroupId`/`batchIndex`) identical across Task 4 definition and Task 4/5 assertions. ✓ diff --git a/docs/superpowers/specs/2026-06-08-307-slow-start-batching-design.md b/docs/superpowers/specs/2026-06-08-307-slow-start-batching-design.md new file mode 100644 index 00000000..534a889c --- /dev/null +++ b/docs/superpowers/specs/2026-06-08-307-slow-start-batching-design.md @@ -0,0 +1,255 @@ +# D6: Slow-start batching for fan-out spawns — Design + +**Issue**: #307 (Epic D — Orchestration, parent #285) +**Depends on**: #299 (Task controller, closed) +**Reference**: `kubernetes/pkg/controller/replicaset/replica_set.go` `slowStartBatch` +**Date**: 2026-06-08 + +## Problem + +When a parent fans out N parallel children, spawning all N at once amplifies +transient failures into spawn storms: a bad prompt, an unavailable runtime, or +an admission rejection hits all N before anyone notices. Kubernetes solves this +in the ReplicaSet controller with `slowStartBatch` — create children in +geometrically growing batches (1 → 2 → 4 → 8 …), halting the moment a batch sees +a failure, so a systemic problem stops after the first small batch instead of +after N. + +Grove needs the same primitive for agent fan-out, plus a failure taxonomy: +not every failure should halt. A task/prompt failure is the spawner's fault and +should halt and wait for a human. A runtime/backpressure failure is transient +and should pause-and-requeue with bounded backoff. An admission rejection is +terminal and should halt with an actionable reason. + +## Scope + +A **standalone, pure primitive** — no new entity, no live wiring. This matches +how the rest of Epic D landed (`#297` spec/status split, `#299` task controller): +each issue ships a focused, unit-tested module plus a design doc. The fan-out +*driver* that calls this primitive is owned by later issues (`#306` ownerRefs + +cascade GC, `#358` plan/shard/merge, `#336` runtime preflight admission gate, +`#337` backpressure conditions + queue-depth metrics). + +### In scope +- `slowStartBatch` — the geometric-batch runner with halt-on-first-failure. +- `BatchStrategy` config type (canonical home for `spec.batchStrategy`) + + `normalizeBatchStrategy` validation + `computeSlowStartBackoffMs` bounded-backoff helper. +- Failure classification: `task` | `backpressure` | `admission` with an + injectable classifier and a sensible default. +- `grove_spawn_batch_size` metric surfaced via an `onSpawnBatch` observer + callback (the codebase has no Prometheus/OTEL registry; emission mirrors + `TaskController`'s existing `onTransition`/`onError` callback pattern). +- Full unit-test coverage. + +### Explicitly out of scope (YAGNI / deferred to owning issues) +- No `TaskGroup` entity (spec/status/store/controller). +- No `OwnerKind: "task"` extension or cascade-GC changes (#306). +- No live spawn-path edits to `TaskController` or `SessionOrchestrator`. +- No real metrics exporter / `/metrics` endpoint (#337). +- No requeue/backoff *scheduling* loop — the primitive returns the + classification + backoff hint; the future controller does the requeuing. + +## Module layout + +Single cohesive module, since `BatchStrategy` is the configuration for +`slowStartBatch` and the two are tightly coupled: + +- `src/core/slow-start-batch.ts` — algorithm, types, classifier, helpers, errors. +- `src/core/slow-start-batch.test.ts` — unit tests. +- Re-export public surface from `src/core/index.ts`. + +## Public surface + +```ts +// ---- configuration (canonical home for spec.batchStrategy) ---- +export interface BatchStrategy { + readonly initialBatchSize?: number; // default 1 + readonly multiplier?: number; // default 2 + readonly maxBatchSize?: number; // default unbounded (capped by remaining) + readonly backoff?: BackoffStrategy; +} +export interface BackoffStrategy { + readonly baseMs?: number; // default 1000 + readonly multiplier?: number; // default 2 + readonly maxMs?: number; // default 30000 +} +export interface NormalizedBatchStrategy { + readonly initialBatchSize: number; + readonly multiplier: number; + readonly maxBatchSize: number; // Number.POSITIVE_INFINITY when unbounded + readonly backoff: Required; +} + +// Applies defaults; throws RangeError on invalid input +// (matching validateResyncIntervalMs / validateWorkerCount style). +export function normalizeBatchStrategy(input?: BatchStrategy): NormalizedBatchStrategy; + +// Bounded exponential backoff for the Nth requeue attempt (attempt >= 0). +// computeSlowStartBackoffMs(0) === baseMs; grows × multiplier, clamped to maxMs. +export function computeSlowStartBackoffMs(attempt: number, backoff: Required): number; + +// ---- failure classification ---- +export type FailureClass = "task" | "backpressure" | "admission"; +// task → halted (report, wait for user) +// backpressure → throttled (pause + requeue with bounded backoff) +// admission → halted (terminal; surface actionable reason) + +export interface ClassifiedFailure { + readonly index: number; // index into the input items array + readonly batchIndex: number; // which batch (0-based) the item was in + readonly class: FailureClass; + readonly reason: string; // actionable message (esp. admission) + readonly error: unknown; // original thrown value +} + +export type FailureClassifier = (error: unknown) => FailureClass; + +// Recoverable runtime/backpressure signal. Callers throw this from `spawn` +// to mark a failure as transient. (AdmissionRejectError from +// src/core/admission/errors.ts is recognized as "admission".) +export class RuntimeUnavailableError extends GroveError { + constructor(message: string, options?: { reason?: string }); +} + +// Default classifier: +// AdmissionRejectError -> "admission" +// RuntimeUnavailableError -> "backpressure" +// anything else -> "task" +export const defaultFailureClassifier: FailureClassifier; + +// ---- metric observer ---- +export interface SpawnBatchMetric { + readonly taskGroupId?: string; // satisfies "per TaskGroup" (caller-supplied) + readonly batchIndex: number; // 0-based + readonly batchSize: number; // == grove_spawn_batch_size + readonly attempted: number; // == batchSize (items dispatched this batch) + readonly succeeded: number; + readonly failed: number; +} +export type SpawnBatchObserver = (metric: SpawnBatchMetric) => void; + +// ---- the runner ---- +export interface SlowStartHooks { + readonly onSpawnBatch?: SpawnBatchObserver | undefined; + readonly classify?: FailureClassifier | undefined; // default: defaultFailureClassifier + readonly taskGroupId?: string | undefined; // forwarded into the metric +} + +export type SlowStartOutcome = "completed" | "halted" | "throttled"; + +export interface SlowStartResult { + readonly outcome: SlowStartOutcome; + readonly attempted: number; // total items dispatched across all batches run + readonly succeeded: number; // total successful spawns + readonly failures: readonly ClassifiedFailure[]; // [] when completed + readonly retryAfterMs?: number; // present iff outcome === "throttled" +} + +export async function slowStartBatch( + items: readonly T[], + spawn: (item: T, batchIndex: number) => Promise, + strategy: NormalizedBatchStrategy, + hooks?: SlowStartHooks, +): Promise; +``` + +## Algorithm + +``` +remaining = items +batchIndex = 0 +size = min(initialBatchSize, remaining.length) +while size > 0: + batch = next `size` items + results = await Promise.allSettled(batch.map((it, i) => spawn(it, batchIndex))) + succeeded = count fulfilled + failures = rejected → classify(error) → ClassifiedFailure + onSpawnBatch({ taskGroupId, batchIndex, batchSize: size, + attempted: size, succeeded, failed: failures.length }) + if failures non-empty: + if any failure.class in {task, admission}: # terminal wins + return { outcome: "halted", failures, attempted, succeeded } + else: # all backpressure + return { outcome: "throttled", failures, attempted, succeeded, + retryAfterMs: computeSlowStartBackoffMs(0, backoff) } + remaining -= batch + batchIndex += 1 + size = min(size * multiplier, maxBatchSize, remaining.length) +return { outcome: "completed", failures: [], attempted, succeeded } +``` + +Notes: +- The doubling stops short on the last partial batch (`min(..., remaining.length)`), + exactly like k8s — e.g. 15 items → batches of 1, 2, 4, 8. +- `attempted`/`succeeded` accumulate only over batches that actually ran; batches + after a halt/throttle never dispatch (acceptance #1). +- `retryAfterMs` uses `attempt = 0` here because the primitive is single-shot. + The requeuing controller owns the attempt counter and calls `computeSlowStartBackoffMs` + with the live attempt number on each requeue to get the escalating delay. +- `spawn` rejections are the only failure signal; the runner never throws for a + spawn failure (it classifies and returns). It *does* propagate a programming + error thrown synchronously by a hook, unchanged. + +## Failure-classification rationale + +Injectable classifier (not hard-coded `instanceof`) so: +- The primitive stays decoupled from concrete error types beyond the default. +- The future controller can supply a richer taxonomy (e.g. distinguishing + provider-pressure from local-capacity backpressure) without forking the runner. + +The default recognizes the two error types that already mean something in the +codebase or that this module introduces: +- `AdmissionRejectError` (existing, `src/core/admission/errors.ts`) → `admission` + (terminal; its `reason` is surfaced into `ClassifiedFailure.reason`). +- `RuntimeUnavailableError` (new, exported here) → `backpressure`. +- Everything else → `task` (halt; the conservative default — an unknown failure + is treated as the spawner's fault, not silently retried). + +## Acceptance mapping + +| Acceptance criterion | Covered by | +| --- | --- | +| Inject failure in first batch → subsequent batches don't fire | `task` failure in batch 1 → `halted`, only 1 item attempted | +| Metric `grove_spawn_batch_size` exposed per TaskGroup | `onSpawnBatch` emits `batchSize` + `taskGroupId` each batch | +| Configurable via spec | `BatchStrategy` type + `normalizeBatchStrategy` (canonical `spec.batchStrategy` home) | +| (comment) `RuntimeUnavailable` in batch-1 → no batch-2, recoverable throttled state, resumes on recovery | `backpressure` → `throttled` + `retryAfterMs`; batch 2 never dispatched; caller requeues | +| (comment) Admission reject (terminal) → halt + actionable reason | `admission` → `halted`; `ClassifiedFailure.reason` carries the rule reason | + +## Testing + +`src/core/slow-start-batch.test.ts`: + +- **Doubling sequence**: 15 items, fake `spawn` records the item set per + `batchIndex` → assert batch sizes `[1, 2, 4, 8]` and full coverage on success. +- **Halt on batch-1 task failure**: `spawn` throws plain error for one item in + batch 1 → `outcome: "halted"`, `attempted === 1`, no further `spawn` calls. +- **Backpressure throttle**: `spawn` throws `RuntimeUnavailableError` in batch 1 + → `outcome: "throttled"`, `retryAfterMs === baseMs`, batch 2 never dispatched. +- **Admission halt**: `spawn` throws `AdmissionRejectError` → `outcome: "halted"`, + failure `class === "admission"`, `reason` surfaced. +- **Terminal wins on mixed batch**: one `RuntimeUnavailableError` + one plain + error in the same batch → `outcome: "halted"` (not throttled). +- **Metric emission**: `onSpawnBatch` called once per batch with correct + `batchSize`/`succeeded`/`failed`; `taskGroupId` forwarded. +- **`normalizeBatchStrategy`**: defaults applied for empty input; `RangeError` + for `initialBatchSize < 1`, non-integer sizes, `multiplier < 1`, + `maxBatchSize < initialBatchSize`, negative backoff fields. +- **`maxBatchSize` cap**: growth clamped (e.g. cap 3 over 15 items → `[1,2,3,3,3,3]`). +- **`computeSlowStartBackoffMs`**: `attempt 0 === baseMs`; geometric growth; clamped to + `maxMs`; `attempt` guarded against negatives. +- **Empty input**: `items: []` → `outcome: "completed"`, no `spawn`/metric calls. + +## Risks / open questions + +- **`grove_*` naming**: no metrics registry exists yet, so the name lives only in + the `SpawnBatchMetric` doc contract until `#337` binds it to an exporter. The + callback shape is chosen so `#337` can map it directly. +- **Backoff statefulness**: bounded exponential backoff across requeues requires + an attempt counter the primitive cannot hold (it is single-shot). Resolved by + exposing `computeSlowStartBackoffMs(attempt, …)` for the controller to drive. +- **Backoff vs `src/core/backoff.ts`**: a `computeBackoffMs` already exists there, + but it is *full-jitter / randomized* with a hardcoded ×2 factor (claim-retry, + thundering-herd prevention). Slow-start deliberately needs a *deterministic*, + configurable-multiplier delay so requeues are predictable and unit-testable, so + ours is a separate `computeSlowStartBackoffMs`. If a future controller wants + jitter on the requeue, it can layer `backoff.ts` on top of this value. diff --git a/src/core/index.ts b/src/core/index.ts index 56585125..d40f9b24 100644 --- a/src/core/index.ts +++ b/src/core/index.ts @@ -366,6 +366,26 @@ export type { SessionConfig, } from "./session-orchestrator.js"; export { mergeRuntimeConfig, SessionOrchestrator } from "./session-orchestrator.js"; +export type { + BackoffStrategy, + BatchStrategy, + ClassifiedFailure, + FailureClass, + FailureClassifier, + NormalizedBatchStrategy, + SlowStartHooks, + SlowStartOutcome, + SlowStartResult, + SpawnBatchMetric, + SpawnBatchObserver, +} from "./slow-start-batch.js"; +export { + computeSlowStartBackoffMs, + defaultFailureClassifier, + normalizeBatchStrategy, + RuntimeUnavailableError, + slowStartBatch, +} from "./slow-start-batch.js"; export type { ActiveClaimFilter, AgentTaskQuery, diff --git a/src/core/slow-start-batch.test.ts b/src/core/slow-start-batch.test.ts new file mode 100644 index 00000000..e8bb2da7 --- /dev/null +++ b/src/core/slow-start-batch.test.ts @@ -0,0 +1,302 @@ +import { describe, expect, test } from "bun:test"; +import { AdmissionRejectError } from "./admission/errors.js"; +import { + computeSlowStartBackoffMs, + defaultFailureClassifier, + normalizeBatchStrategy, + RuntimeUnavailableError, + type SpawnBatchMetric, + slowStartBatch, +} from "./slow-start-batch.js"; + +const norm = normalizeBatchStrategy; + +describe("normalizeBatchStrategy", () => { + test("applies defaults for empty input", () => { + const s = normalizeBatchStrategy(); + expect(s.initialBatchSize).toBe(1); + expect(s.multiplier).toBe(2); + expect(s.maxBatchSize).toBe(Number.POSITIVE_INFINITY); + expect(s.backoff).toEqual({ baseMs: 1000, multiplier: 2, maxMs: 30_000 }); + }); + + test("passes through provided values", () => { + const s = normalizeBatchStrategy({ + initialBatchSize: 2, + multiplier: 3, + maxBatchSize: 16, + backoff: { baseMs: 500, multiplier: 4, maxMs: 5000 }, + }); + expect(s.initialBatchSize).toBe(2); + expect(s.multiplier).toBe(3); + expect(s.maxBatchSize).toBe(16); + expect(s.backoff).toEqual({ baseMs: 500, multiplier: 4, maxMs: 5000 }); + }); + + test("rejects invalid input with RangeError", () => { + expect(() => normalizeBatchStrategy({ initialBatchSize: 0 })).toThrow(RangeError); + expect(() => normalizeBatchStrategy({ initialBatchSize: 1.5 })).toThrow(RangeError); + expect(() => normalizeBatchStrategy({ multiplier: 0.5 })).toThrow(RangeError); + expect(() => normalizeBatchStrategy({ maxBatchSize: 4, initialBatchSize: 8 })).toThrow( + RangeError, + ); + expect(() => normalizeBatchStrategy({ backoff: { baseMs: 0 } })).toThrow(RangeError); + expect(() => normalizeBatchStrategy({ backoff: { maxMs: 0 } })).toThrow(RangeError); + expect(() => normalizeBatchStrategy({ backoff: { maxMs: 100, baseMs: 1000 } })).toThrow( + RangeError, + ); + }); + + test("accepts multiplier=1 (fixed-size batches / constant backoff are valid configs)", () => { + const s = normalizeBatchStrategy({ multiplier: 1, backoff: { multiplier: 1 } }); + expect(s.multiplier).toBe(1); + expect(s.backoff.multiplier).toBe(1); + }); +}); + +describe("computeSlowStartBackoffMs", () => { + const backoff = { baseMs: 1000, multiplier: 2, maxMs: 30_000 } as const; + + test("attempt 0 returns baseMs", () => { + expect(computeSlowStartBackoffMs(0, backoff)).toBe(1000); + }); + + test("grows geometrically", () => { + expect(computeSlowStartBackoffMs(1, backoff)).toBe(2000); + expect(computeSlowStartBackoffMs(2, backoff)).toBe(4000); + expect(computeSlowStartBackoffMs(3, backoff)).toBe(8000); + }); + + test("clamps to maxMs", () => { + expect(computeSlowStartBackoffMs(10, backoff)).toBe(30_000); + }); + + test("guards non-positive / non-finite attempts to baseMs", () => { + expect(computeSlowStartBackoffMs(-5, backoff)).toBe(1000); + expect(computeSlowStartBackoffMs(Number.NaN, backoff)).toBe(1000); + }); +}); + +describe("defaultFailureClassifier", () => { + test("classifies AdmissionRejectError as admission", () => { + const err = new AdmissionRejectError({ + ruleName: "max-fanout", + ruleType: "shell", + reason: "capacity exhausted", + }); + expect(defaultFailureClassifier(err)).toBe("admission"); + }); + + test("classifies RuntimeUnavailableError as backpressure", () => { + expect(defaultFailureClassifier(new RuntimeUnavailableError("runtime down"))).toBe( + "backpressure", + ); + }); + + test("classifies anything else as task", () => { + expect(defaultFailureClassifier(new Error("bad prompt"))).toBe("task"); + expect(defaultFailureClassifier("oops")).toBe("task"); + }); + + test("RuntimeUnavailableError carries a reason", () => { + const err = new RuntimeUnavailableError("runtime down", { reason: "provider-pressure" }); + expect(err.reason).toBe("provider-pressure"); + expect(err).toBeInstanceOf(Error); + }); +}); + +describe("slowStartBatch — success", () => { + test("doubling sequence 1,2,4,8 over 15 items", async () => { + const batches: number[][] = []; + let current: number[] = []; + const calls: number[] = []; + const items = Array.from({ length: 15 }, (_, i) => i); + const sizes: number[] = []; + const result = await slowStartBatch( + items, + async (item) => { + calls.push(item); + current.push(item); + }, + norm(), + { + onSpawnBatch: (m: SpawnBatchMetric) => { + sizes.push(m.batchSize); + batches.push(current); + current = []; + }, + }, + ); + expect(sizes).toEqual([1, 2, 4, 8]); + expect(result.outcome).toBe("completed"); + expect(result.succeeded).toBe(15); + expect(result.attempted).toBe(15); + expect(result.failures).toEqual([]); + expect(calls.sort((a, b) => a - b)).toEqual(items); + }); + + test("empty input completes with no spawn/metric calls", async () => { + let spawned = 0; + let metrics = 0; + const result = await slowStartBatch( + [], + async () => { + spawned += 1; + }, + norm(), + { + onSpawnBatch: () => { + metrics += 1; + }, + }, + ); + expect(result.outcome).toBe("completed"); + expect(result.attempted).toBe(0); + expect(spawned).toBe(0); + expect(metrics).toBe(0); + }); + + test("maxBatchSize caps growth: cap 3 over 15 items", async () => { + const sizes: number[] = []; + await slowStartBatch( + Array.from({ length: 15 }, (_, i) => i), + async () => { + /* no-op spawn */ + }, + norm({ maxBatchSize: 3 }), + { onSpawnBatch: (m) => sizes.push(m.batchSize) }, + ); + expect(sizes).toEqual([1, 2, 3, 3, 3, 3]); + }); + + test("multiplier=1 yields fixed-size batches and still covers all items", async () => { + const sizes: number[] = []; + const calls: number[] = []; + const items = Array.from({ length: 7 }, (_, i) => i); + const result = await slowStartBatch( + items, + async (item) => { + calls.push(item); + }, + norm({ initialBatchSize: 1, multiplier: 1 }), + { onSpawnBatch: (m) => sizes.push(m.batchSize) }, + ); + expect(sizes).toEqual([1, 1, 1, 1, 1, 1, 1]); + expect(result.outcome).toBe("completed"); + expect(calls.sort((a, b) => a - b)).toEqual(items); + }); + + test("non-power-of-two growth clamps the final partial batch", async () => { + const sizes: number[] = []; + await slowStartBatch( + Array.from({ length: 15 }, (_, i) => i), + async () => { + /* no-op spawn */ + }, + norm({ initialBatchSize: 2, multiplier: 3 }), + { onSpawnBatch: (m) => sizes.push(m.batchSize) }, + ); + expect(sizes).toEqual([2, 6, 7]); + }); + + test("metric reports succeeded/failed and forwards taskGroupId", async () => { + const seen: SpawnBatchMetric[] = []; + await slowStartBatch( + [1, 2], + async () => { + /* no-op spawn */ + }, + norm({ initialBatchSize: 2 }), + { + taskGroupId: "tg-1", + onSpawnBatch: (m) => seen.push(m), + }, + ); + expect(seen).toHaveLength(1); + expect(seen[0]).toMatchObject({ + taskGroupId: "tg-1", + batchIndex: 0, + batchSize: 2, + attempted: 2, + succeeded: 2, + failed: 0, + }); + }); +}); + +describe("slowStartBatch — failure handling", () => { + test("task failure in batch 1 halts; later batches never fire", async () => { + const spawned: number[] = []; + const result = await slowStartBatch( + Array.from({ length: 15 }, (_, i) => i), + async (item) => { + spawned.push(item); + if (item === 0) throw new Error("bad prompt"); + }, + norm(), + ); + expect(result.outcome).toBe("halted"); + expect(result.attempted).toBe(1); + expect(result.succeeded).toBe(0); + expect(spawned).toEqual([0]); // only batch 1 (size 1) ran + expect(result.failures).toHaveLength(1); + expect(result.failures[0]).toMatchObject({ index: 0, batchIndex: 0, class: "task" }); + }); + + test("RuntimeUnavailable in batch 1 throttles with backoff; no batch 2", async () => { + const spawned: number[] = []; + const result = await slowStartBatch( + Array.from({ length: 15 }, (_, i) => i), + async (item) => { + spawned.push(item); + if (item === 0) throw new RuntimeUnavailableError("runtime down"); + }, + norm(), + ); + expect(result.outcome).toBe("throttled"); + expect(result.retryAfterMs).toBe(1000); // computeSlowStartBackoffMs(0) === baseMs + expect(spawned).toEqual([0]); + expect(result.failures[0]).toMatchObject({ class: "backpressure" }); + }); + + test("AdmissionReject halts and surfaces the actionable reason", async () => { + const result = await slowStartBatch( + [0], + async () => { + throw new AdmissionRejectError({ + ruleName: "max-fanout", + ruleType: "shell", + reason: "capacity exhausted", + }); + }, + norm(), + ); + expect(result.outcome).toBe("halted"); + expect(result.failures[0]).toMatchObject({ class: "admission", reason: "capacity exhausted" }); + }); + + test("terminal wins on a mixed batch (backpressure + task)", async () => { + const result = await slowStartBatch( + [0, 1], + async (item) => { + if (item === 0) throw new RuntimeUnavailableError("runtime down"); + if (item === 1) throw new Error("bad prompt"); + }, + norm({ initialBatchSize: 2 }), + ); + expect(result.outcome).toBe("halted"); // not throttled + expect(result.failures).toHaveLength(2); + }); + + test("custom classifier overrides the default", async () => { + const result = await slowStartBatch( + [0], + async () => { + throw new Error("treat-me-as-backpressure"); + }, + norm(), + { classify: () => "backpressure" }, + ); + expect(result.outcome).toBe("throttled"); + }); +}); diff --git a/src/core/slow-start-batch.ts b/src/core/slow-start-batch.ts new file mode 100644 index 00000000..cedc852f --- /dev/null +++ b/src/core/slow-start-batch.ts @@ -0,0 +1,226 @@ +import { AdmissionRejectError } from "./admission/errors.js"; +import { GroveError } from "./errors.js"; + +export interface BackoffStrategy { + readonly baseMs?: number; + /** + * Exponential growth factor applied to the delay after each requeue attempt. + * `1` is permitted and yields a constant (flat) retry interval. + */ + readonly multiplier?: number; + readonly maxMs?: number; +} + +export interface BatchStrategy { + readonly initialBatchSize?: number; + /** + * Geometric growth factor applied to the batch size after each clean batch. + * `1` is permitted and yields fixed-size batches (a constant concurrency cap). + */ + readonly multiplier?: number; + readonly maxBatchSize?: number; + readonly backoff?: BackoffStrategy; +} + +export interface NormalizedBatchStrategy { + readonly initialBatchSize: number; + readonly multiplier: number; + readonly maxBatchSize: number; // Number.POSITIVE_INFINITY when unbounded + readonly backoff: Required; +} + +const DEFAULT_INITIAL_BATCH_SIZE = 1; +const DEFAULT_MULTIPLIER = 2; +const DEFAULT_MAX_BATCH_SIZE = Number.POSITIVE_INFINITY; +const DEFAULT_BACKOFF_BASE_MS = 1000; +const DEFAULT_BACKOFF_MULTIPLIER = 2; +const DEFAULT_BACKOFF_MAX_MS = 30_000; + +export function normalizeBatchStrategy(input?: BatchStrategy): NormalizedBatchStrategy { + const initialBatchSize = input?.initialBatchSize ?? DEFAULT_INITIAL_BATCH_SIZE; + const multiplier = input?.multiplier ?? DEFAULT_MULTIPLIER; + const maxBatchSize = input?.maxBatchSize ?? DEFAULT_MAX_BATCH_SIZE; + const baseMs = input?.backoff?.baseMs ?? DEFAULT_BACKOFF_BASE_MS; + const backoffMultiplier = input?.backoff?.multiplier ?? DEFAULT_BACKOFF_MULTIPLIER; + const maxMs = input?.backoff?.maxMs ?? DEFAULT_BACKOFF_MAX_MS; + + requirePositiveInt(initialBatchSize, "initialBatchSize"); + requireFiniteMin(multiplier, 1, "multiplier"); + if (maxBatchSize !== DEFAULT_MAX_BATCH_SIZE) { + requirePositiveInt(maxBatchSize, "maxBatchSize"); + if (maxBatchSize < initialBatchSize) { + throw new RangeError( + `maxBatchSize (${maxBatchSize}) must be >= initialBatchSize (${initialBatchSize})`, + ); + } + } + requirePositiveInt(baseMs, "backoff.baseMs"); + requireFiniteMin(backoffMultiplier, 1, "backoff.multiplier"); + requirePositiveInt(maxMs, "backoff.maxMs"); + if (maxMs < baseMs) { + throw new RangeError(`backoff.maxMs (${maxMs}) must be >= backoff.baseMs (${baseMs})`); + } + + return { + initialBatchSize, + multiplier, + maxBatchSize, + backoff: { baseMs, multiplier: backoffMultiplier, maxMs }, + }; +} + +export function computeSlowStartBackoffMs( + attempt: number, + backoff: Required, +): number { + const n = Number.isFinite(attempt) && attempt > 0 ? Math.floor(attempt) : 0; + const raw = backoff.baseMs * backoff.multiplier ** n; + return Math.min(raw, backoff.maxMs); +} + +function requirePositiveInt(value: number, name: string): void { + if (!Number.isInteger(value) || value < 1) { + throw new RangeError(`${name} must be an integer >= 1 (got ${value})`); + } +} + +function requireFiniteMin(value: number, min: number, name: string): void { + if (!Number.isFinite(value) || value < min) { + throw new RangeError(`${name} must be a finite number >= ${min} (got ${value})`); + } +} + +export type FailureClass = "task" | "backpressure" | "admission"; + +export class RuntimeUnavailableError extends GroveError { + readonly reason: string; + + constructor(message: string, options?: { reason?: string }) { + super(message); + this.name = "RuntimeUnavailableError"; + this.reason = options?.reason ?? message; + } +} + +export interface ClassifiedFailure { + readonly index: number; // index into the input items array + readonly batchIndex: number; // 0-based batch the item was in + readonly class: FailureClass; + readonly reason: string; + readonly error: unknown; +} + +export type FailureClassifier = (error: unknown) => FailureClass; + +export const defaultFailureClassifier: FailureClassifier = (error) => { + if (error instanceof AdmissionRejectError) return "admission"; + if (error instanceof RuntimeUnavailableError) return "backpressure"; + return "task"; +}; + +export interface SpawnBatchMetric { + readonly taskGroupId?: string | undefined; + readonly batchIndex: number; + readonly batchSize: number; // == grove_spawn_batch_size + readonly attempted: number; // items dispatched this batch (== batchSize) + readonly succeeded: number; + readonly failed: number; +} + +export type SpawnBatchObserver = (metric: SpawnBatchMetric) => void; + +export interface SlowStartHooks { + readonly onSpawnBatch?: SpawnBatchObserver | undefined; + readonly classify?: FailureClassifier | undefined; + readonly taskGroupId?: string | undefined; +} + +export type SlowStartOutcome = "completed" | "halted" | "throttled"; + +export interface SlowStartResult { + readonly outcome: SlowStartOutcome; + readonly attempted: number; + readonly succeeded: number; + readonly failures: readonly ClassifiedFailure[]; + readonly retryAfterMs?: number | undefined; +} + +export async function slowStartBatch( + items: readonly T[], + spawn: (item: T, batchIndex: number) => Promise, + strategy: NormalizedBatchStrategy, + hooks?: SlowStartHooks, +): Promise { + const classify = hooks?.classify ?? defaultFailureClassifier; + let attempted = 0; + let succeeded = 0; + let offset = 0; + let batchIndex = 0; + let size = Math.min(strategy.initialBatchSize, items.length); + + while (size > 0) { + const batch = items.slice(offset, offset + size); + const settled = await Promise.allSettled( + // async wrapper converts a synchronous throw into a rejection + batch.map(async (item) => spawn(item, batchIndex)), + ); + attempted += size; + + const failures: ClassifiedFailure[] = []; + let batchSucceeded = 0; + settled.forEach((res, i) => { + if (res.status === "fulfilled") { + batchSucceeded += 1; + } else { + failures.push({ + index: offset + i, + batchIndex, + class: classify(res.reason), + reason: failureReason(res.reason), + error: res.reason, + }); + } + }); + succeeded += batchSucceeded; + + hooks?.onSpawnBatch?.({ + taskGroupId: hooks.taskGroupId, + batchIndex, + batchSize: size, + attempted: size, + succeeded: batchSucceeded, + failed: failures.length, + }); + + if (failures.length > 0) { + const anyTerminal = failures.some((f) => f.class === "task" || f.class === "admission"); + if (anyTerminal) { + return { outcome: "halted", attempted, succeeded, failures }; + } + return { + outcome: "throttled", + attempted, + succeeded, + failures, + // attempt 0 here: this runner is single-shot. The requeuing controller + // owns the live attempt counter and re-derives an escalating delay via + // computeSlowStartBackoffMs(liveAttempt, strategy.backoff). + retryAfterMs: computeSlowStartBackoffMs(0, strategy.backoff), + }; + } + + offset += size; + batchIndex += 1; + const remaining = items.length - offset; + size = Math.min(Math.floor(size * strategy.multiplier), strategy.maxBatchSize, remaining); + } + + return { outcome: "completed", attempted, succeeded, failures: [] }; +} + +function failureReason(error: unknown): string { + if (error instanceof AdmissionRejectError) return error.reason; + if (error instanceof RuntimeUnavailableError) return error.reason; + if (error instanceof Error) return error.message; + return String(error); +}