From 84dbb1c60af7a2a07b96029623fcd8494585bf0b Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Sat, 27 Jun 2026 00:17:32 +0530 Subject: [PATCH 1/5] feat(semantic-search): use Cloudflare Qwen embeddings --- apps/host-cloudflare/src/config.ts | 2 +- apps/host-cloudflare/src/plugins.ts | 6 +- apps/host-cloudflare/wrangler.jsonc | 2 + .../semantic-search/src/sdk/ai-search.test.ts | 77 ++++++++-- .../semantic-search/src/sdk/ai-search.ts | 140 ++++++++---------- .../src/sdk/embedder-cloudflare.test.ts | 71 +++++++++ .../src/sdk/embedder-cloudflare.ts | 96 ++++++++++++ .../src/sdk/embedding-service.ts | 9 ++ .../plugins/semantic-search/src/sdk/index.ts | 11 +- .../src/sdk/tool-search-backend.ts | 12 ++ 10 files changed, 327 insertions(+), 99 deletions(-) create mode 100644 packages/plugins/semantic-search/src/sdk/embedder-cloudflare.test.ts create mode 100644 packages/plugins/semantic-search/src/sdk/embedder-cloudflare.ts diff --git a/apps/host-cloudflare/src/config.ts b/apps/host-cloudflare/src/config.ts index 1976e3cc6..0a955bf01 100644 --- a/apps/host-cloudflare/src/config.ts +++ b/apps/host-cloudflare/src/config.ts @@ -1,11 +1,11 @@ import type { D1Database, DurableObjectNamespace, + AiSearchInstance, KVNamespace, R2Bucket, } from "@cloudflare/workers-types"; import type { AnalyticsEngineDataset } from "@executor-js/plugin-execution-metrics/cloudflare"; -import type { AiSearchInstance } from "@executor-js/plugin-semantic-search"; import { isValidOrgSlug } from "@executor-js/api"; import { missingPublicOriginWarning, resolvePublicOrigin } from "@executor-js/sdk/public-origin"; diff --git a/apps/host-cloudflare/src/plugins.ts b/apps/host-cloudflare/src/plugins.ts index 8037491a9..9cda735c4 100644 --- a/apps/host-cloudflare/src/plugins.ts +++ b/apps/host-cloudflare/src/plugins.ts @@ -1,3 +1,4 @@ +import type { AiSearchInstance } from "@cloudflare/workers-types"; import { openApiHttpPlugin } from "@executor-js/plugin-openapi/api"; import { googleHttpPlugin } from "@executor-js/plugin-google/api"; import { microsoftHttpPlugin } from "@executor-js/plugin-microsoft/api"; @@ -12,10 +13,7 @@ import { import { noopExecutionObserver } from "@executor-js/sdk"; import { serviceTokensPlugin } from "@executor-js/plugin-service-tokens/server"; import { semanticSearchHttpPlugin } from "@executor-js/plugin-semantic-search/api"; -import { - makeAiSearchToolSearchBackend, - type AiSearchInstance, -} from "@executor-js/plugin-semantic-search"; +import { makeAiSearchToolSearchBackend } from "@executor-js/plugin-semantic-search"; // --------------------------------------------------------------------------- // The Cloudflare host's plugin list — the same protocol/provider plugins as diff --git a/apps/host-cloudflare/wrangler.jsonc b/apps/host-cloudflare/wrangler.jsonc index 03bdc71bd..a7b8a0a4f 100644 --- a/apps/host-cloudflare/wrangler.jsonc +++ b/apps/host-cloudflare/wrangler.jsonc @@ -49,6 +49,8 @@ // Cloudflare AI Search is the preferred backend for semantic `tools.search`. // Uncomment after creating the instance, otherwise deploy will fail because // the binding target is absent. + // Create with: + // `wrangler ai-search create executor-tool-search --embedding-model @cf/qwen/qwen3-embedding-0.6b` // "ai_search": [{ "binding": "AI_SEARCH", "instance_name": "executor-tool-search" }], // The MCP session Durable Object: one addressable isolate per MCP session (the // DO id IS the session id) so a session survives across the Worker's stateless diff --git a/packages/plugins/semantic-search/src/sdk/ai-search.test.ts b/packages/plugins/semantic-search/src/sdk/ai-search.test.ts index 23f6f82e9..2c26eb342 100644 --- a/packages/plugins/semantic-search/src/sdk/ai-search.test.ts +++ b/packages/plugins/semantic-search/src/sdk/ai-search.test.ts @@ -1,11 +1,12 @@ import { describe, expect, it } from "@effect/vitest"; +import type { AiSearchInstance } from "@cloudflare/workers-types"; import { type PluginStorageCollectionFacade, type PluginStorageEntry } from "@executor-js/sdk/core"; import { Effect } from "effect"; import { + DEFAULT_AI_SEARCH_EMBEDDING_MODEL, makeAiSearchToolDiscoveryProvider, reindexAiSearch, - type AiSearchInstance, } from "./ai-search"; import { type aiSearchItems, type AiSearchItemRow } from "./collections"; @@ -59,16 +60,30 @@ const makeItemsCollection = (overrides: Partial): ItemsCollecti ...overrides, }); -const makeAiSearch = (): AiSearchInstance => ({ - items: { - upload: async (name) => ({ id: `item:${name}`, key: name }), - list: async () => ({ result: [], result_info: { total_count: 0, page: 1, per_page: 50 } }), +const makeAiSearchItems = () => + ({ + upload: async (name) => ({ id: `item:${name}`, key: name, status: "queued" }), + list: async () => ({ + result: [], + result_info: { count: 0, total_count: 0, page: 1, per_page: 50 }, + }), delete: async () => {}, - }, + uploadAndPoll: async (name) => ({ id: `item:${name}`, key: name, status: "queued" }), + get: () => expect.unreachable("Unexpected AI Search item lookup"), + }) satisfies Pick["items"]; + +const makeAiSearch = (): Pick => ({ + info: async () => ({ + id: "executor-tool-search", + embedding_model: DEFAULT_AI_SEARCH_EMBEDDING_MODEL, + }), + items: makeAiSearchItems(), search: async () => ({ + search_query: "create repo", chunks: [ { id: "chunk-1", + type: "text", score: 0.7, text: "create a repository", item: { @@ -83,6 +98,7 @@ const makeAiSearch = (): AiSearchInstance => ({ }, { id: "chunk-2", + type: "text", score: 0.9, text: "github repository creation", item: { @@ -97,6 +113,7 @@ const makeAiSearch = (): AiSearchInstance => ({ }, { id: "chunk-3", + type: "text", score: 0.8, text: "send a message", item: { @@ -210,10 +227,10 @@ describe("reindexAiSearch", () => { aiSearch: { ...makeAiSearch(), items: { - ...makeAiSearch().items, + ...makeAiSearchItems(), upload: async (name, content) => { uploadedContent = String(content); - return { id: `item:${name}`, key: name }; + return { id: `item:${name}`, key: name, status: "queued" }; }, }, }, @@ -237,6 +254,48 @@ describe("reindexAiSearch", () => { }), ); + it.effect( + "fails before indexing when the AI Search instance uses a different embedding model", + () => + Effect.gen(function* () { + const error = yield* Effect.flip( + reindexAiSearch({ + executor: { + tools: { + manifest: () => + Effect.succeed([ + { + path: "github.default.main.repos.create", + name: "repos.create", + description: "Create a repository", + integration: "github", + fingerprintVersion: "v1", + indexFingerprint: "fingerprint", + }, + ]), + }, + } as never, + aiSearch: { + ...makeAiSearch(), + info: async () => ({ + id: "executor-tool-search", + embedding_model: "@cf/baai/bge-base-en-v1.5", + }), + }, + items: makeItemsCollection({ + list: () => Effect.sync(() => expect.unreachable("list should not run")), + }), + owner: "org", + namespace: "org", + }), + ); + + expect(error).toMatchObject({ + message: expect.stringContaining(DEFAULT_AI_SEARCH_EMBEDDING_MODEL), + }); + }), + ); + it.effect("removes stale rows even when deleting the remote AI Search item fails", () => Effect.gen(function* () { const removed: string[] = []; @@ -249,7 +308,7 @@ describe("reindexAiSearch", () => { aiSearch: { ...makeAiSearch(), items: { - ...makeAiSearch().items, + ...makeAiSearchItems(), delete: async () => { // oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: test double for rejected AI Search delete promise throw new Error("delete failed"); diff --git a/packages/plugins/semantic-search/src/sdk/ai-search.ts b/packages/plugins/semantic-search/src/sdk/ai-search.ts index 9ac29a9f6..ca93720aa 100644 --- a/packages/plugins/semantic-search/src/sdk/ai-search.ts +++ b/packages/plugins/semantic-search/src/sdk/ai-search.ts @@ -1,3 +1,8 @@ +import type { + AiSearchInstance, + AiSearchItemInfo, + AiSearchSearchResponse, +} from "@cloudflare/workers-types"; import { ExecutionToolError, type Executor, @@ -24,75 +29,12 @@ import type { } from "./tool-search-backend"; import type { ToolSearchIndex } from "./tool-search-index"; -export interface AiSearchUploadedItem { - readonly id: string; - readonly key: string; -} - -export interface AiSearchListedItem { - readonly id: string; - readonly key: string; - readonly status: string; - readonly metadata?: Readonly>; -} - -export interface AiSearchInstance { - readonly items: { - readonly upload: ( - name: string, - content: string | ArrayBuffer | ReadableStream, - options?: { readonly metadata?: Readonly> }, - ) => Promise; - readonly list: (input?: { - readonly page?: number; - readonly per_page?: number; - readonly status?: string; - readonly sort_by?: string; - readonly search?: string; - readonly source?: string; - }) => Promise<{ - readonly result: readonly AiSearchListedItem[]; - readonly result_info?: { - readonly count?: number; - readonly total_count?: number; - readonly page?: number; - readonly per_page?: number; - }; - }>; - readonly delete: (itemId: string) => Promise; - }; - readonly search: (input: { - readonly messages: readonly [{ readonly role: "user"; readonly content: string }]; - readonly ai_search_options?: { - readonly retrieval?: { - readonly retrieval_type?: "vector" | "keyword" | "hybrid"; - readonly max_num_results?: number; - readonly metadata_only?: boolean; - readonly return_on_failure?: boolean; - }; - readonly reranking?: { readonly enabled?: boolean }; - }; - }) => Promise; -} - -export interface AiSearchChunk { - readonly id: string; - readonly score: number; - readonly text?: string; - readonly item?: { - readonly key?: string; - readonly metadata?: Readonly>; - }; -} - -export interface AiSearchSearchResponse { - readonly search_query?: string; - readonly chunks?: readonly AiSearchChunk[]; -} +export const DEFAULT_AI_SEARCH_EMBEDDING_MODEL = "@cf/qwen/qwen3-embedding-0.6b"; export interface AiSearchToolSearchBackendOptions { - readonly aiSearch: AiSearchInstance | undefined; + readonly aiSearch: Pick | undefined; readonly namespace?: string; + readonly embeddingModel?: string; } type ItemsCollection = PluginStorageCollectionFacade; @@ -133,6 +75,27 @@ const notConfigured = (): Effect.Effect => }), ); +const requireEmbeddingModel = (input: { + readonly aiSearch: Pick; + readonly expectedModel: string; +}): Effect.Effect => + Effect.tryPromise({ + try: () => input.aiSearch.info(), + catch: (cause) => + new SemanticSearchError({ message: "Failed to read AI Search instance config.", cause }), + }).pipe( + Effect.flatMap((info) => { + const actualModel = + typeof info.embedding_model === "string" ? info.embedding_model : undefined; + if (!actualModel || actualModel === input.expectedModel) return Effect.void; + return Effect.fail( + new SemanticSearchError({ + message: `AI Search instance uses embedding model "${actualModel}", expected "${input.expectedModel}". Recreate or update the instance before indexing.`, + }), + ); + }), + ); + const unavailableIndex: ToolSearchIndex.Service = { create: () => notConfigured(), scan: () => notConfigured(), @@ -146,7 +109,7 @@ const unavailableIndex: ToolSearchIndex.Service = { }; const deleteItem = ( - aiSearch: AiSearchInstance, + aiSearch: Pick, itemId: string, ): Effect.Effect => Effect.tryPromise({ @@ -156,7 +119,7 @@ const deleteItem = ( }).pipe(Effect.asVoid); const deleteItemBestEffort = ( - aiSearch: AiSearchInstance, + aiSearch: Pick, itemId: string, ): Effect.Effect => deleteItem(aiSearch, itemId).pipe(Effect.catch(() => Effect.void)); @@ -164,7 +127,7 @@ const putIndexedItem = ( items: ItemsCollection, owner: "user" | "org", document: ToolSearchDocument, - uploaded: AiSearchUploadedItem, + uploaded: AiSearchItemInfo, ): Effect.Effect => items .put({ @@ -188,15 +151,20 @@ const putIndexedItem = ( export const reindexAiSearch = (input: { readonly executor: Executor; - readonly aiSearch: AiSearchInstance | undefined; + readonly aiSearch: Pick | undefined; readonly items: ItemsCollection; readonly owner: "user" | "org"; readonly namespace: string; + readonly embeddingModel?: string; readonly maxTools?: number; }): Effect.Effect => { if (!input.aiSearch) return notConfigured(); const aiSearch = input.aiSearch; return Effect.gen(function* () { + yield* requireEmbeddingModel({ + aiSearch, + expectedModel: input.embeddingModel ?? DEFAULT_AI_SEARCH_EMBEDDING_MODEL, + }); const manifests = yield* listToolManifests(input.executor, { maxTools: input.maxTools }); const livePaths = new Set(manifests.map((manifest) => manifest.path)); const existingEntries = yield* input.items @@ -245,10 +213,10 @@ export const reindexAiSearch = (input: { }; const listAiSearchItems = ( - aiSearch: AiSearchInstance, -): Effect.Effect => + aiSearch: Pick, +): Effect.Effect => Effect.gen(function* () { - const all: AiSearchListedItem[] = []; + const all: AiSearchItemInfo[] = []; let page = 1; while (true) { const result = yield* Effect.tryPromise({ @@ -267,7 +235,7 @@ const listAiSearchItems = ( }); export const statusAiSearch = (input: { - readonly aiSearch: AiSearchInstance; + readonly aiSearch: Pick; readonly items: ItemsCollection; readonly namespace: string; }): Effect.Effect => @@ -321,23 +289,33 @@ const rowToResult = (row: AiSearchItemRow, score: number): ToolDiscoveryResult = score, }); -const chunkToResult = (chunk: AiSearchChunk): ToolDiscoveryResult | null => { +const getStringMetadata = ( + metadata: Readonly> | undefined, + key: string, +) => { + const value = metadata?.[key]; + return typeof value === "string" ? value : undefined; +}; + +const chunkToResult = ( + chunk: AiSearchSearchResponse["chunks"][number], +): ToolDiscoveryResult | null => { const metadata = chunk.item?.metadata; - const path = metadata?.path; - const name = metadata?.name; - const integration = metadata?.integration; + const path = getStringMetadata(metadata, "path"); + const name = getStringMetadata(metadata, "name"); + const integration = getStringMetadata(metadata, "integration"); if (!path || !name || !integration) return null; return { path, name, - description: metadata.description, + description: getStringMetadata(metadata, "description"), integration, score: chunk.score, }; }; export const makeAiSearchToolDiscoveryProvider = (deps: { - readonly aiSearch: AiSearchInstance | undefined; + readonly aiSearch: Pick | undefined; readonly items: ItemsCollection | undefined; }): ToolDiscoveryProvider | undefined => { if (!deps.aiSearch) return undefined; @@ -414,6 +392,7 @@ export const makeAiSearchToolSearchBackend = ( options: AiSearchToolSearchBackendOptions, ): ToolSearchBackendFactory => { const namespace = options.namespace ?? "default"; + const embeddingModel = options.embeddingModel ?? DEFAULT_AI_SEARCH_EMBEDDING_MODEL; return { namespace, pluginStorage: { aiSearchItems }, @@ -437,6 +416,7 @@ export const makeAiSearchToolSearchBackend = ( items: storage.aiSearchItems, owner: storage.owner, namespace, + embeddingModel, }), sweep: () => Effect.succeed({ diff --git a/packages/plugins/semantic-search/src/sdk/embedder-cloudflare.test.ts b/packages/plugins/semantic-search/src/sdk/embedder-cloudflare.test.ts new file mode 100644 index 000000000..4026f3da1 --- /dev/null +++ b/packages/plugins/semantic-search/src/sdk/embedder-cloudflare.test.ts @@ -0,0 +1,71 @@ +import { describe, expect, it } from "@effect/vitest"; +import type { + Ai, + Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Input, + Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Output, +} from "@cloudflare/workers-types"; +import { Effect } from "effect"; + +import { + DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_DIMENSIONS, + DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_MODEL, + makeCloudflareWorkersAiEmbedder, +} from "./embedder-cloudflare"; + +describe("makeCloudflareWorkersAiEmbedder", () => { + it.effect("embeds documents and queries with the qwen model by default", () => + Effect.gen(function* () { + const mutableCalls: unknown[] = []; + const ai = { + run: (async ( + model: typeof DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_MODEL, + input: Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Input, + ): Promise => { + mutableCalls.push({ model, input }); + const values = input.documents ?? input.queries ?? input.text ?? []; + const count = Array.isArray(values) ? values.length : 1; + return { + data: Array.from({ length: count }, (_, index) => [index + 1, index + 2]), + shape: [count, 2], + }; + }) as Ai<{ + readonly "@cf/qwen/qwen3-embedding-0.6b": { + readonly inputs: Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Input; + readonly postProcessedOutputs: Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Output; + }; + }>["run"], + } satisfies Pick< + Ai<{ + readonly "@cf/qwen/qwen3-embedding-0.6b": { + readonly inputs: Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Input; + readonly postProcessedOutputs: Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Output; + }; + }>, + "run" + >; + const embedder = makeCloudflareWorkersAiEmbedder({ ai, dimensions: 2 }); + + const docs = yield* embedder.embedDocuments(["alpha", "beta"]); + const query = yield* embedder.embedQuery("alpha"); + + expect(embedder.model).toBe(DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_MODEL); + expect(embedder.dimensions).toBe(2); + expect(docs).toEqual([ + [1, 2], + [2, 3], + ]); + expect(query).toEqual([1, 2]); + expect(mutableCalls).toEqual([ + { + model: DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_MODEL, + input: { documents: ["alpha", "beta"] }, + }, + { + model: DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_MODEL, + input: { queries: ["alpha"] }, + }, + ]); + expect(DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_DIMENSIONS).toBe(1024); + }), + ); +}); diff --git a/packages/plugins/semantic-search/src/sdk/embedder-cloudflare.ts b/packages/plugins/semantic-search/src/sdk/embedder-cloudflare.ts new file mode 100644 index 000000000..c9ca7eb74 --- /dev/null +++ b/packages/plugins/semantic-search/src/sdk/embedder-cloudflare.ts @@ -0,0 +1,96 @@ +import type { + Ai, + Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Input, + Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Output, +} from "@cloudflare/workers-types"; +import { Effect } from "effect"; + +import type { ToolEmbedder } from "./embedder"; +import { SemanticSearchError } from "./errors"; + +export const DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_MODEL = "@cf/qwen/qwen3-embedding-0.6b"; +export const DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_DIMENSIONS = 1024; + +export interface CloudflareWorkersAiEmbedderOptions { + readonly ai: Pick< + Ai<{ + readonly "@cf/qwen/qwen3-embedding-0.6b": { + readonly inputs: Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Input; + readonly postProcessedOutputs: Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Output; + }; + }>, + "run" + >; + readonly dimensions?: number; + readonly batchSize?: number; + readonly documentInstruction?: string; + readonly queryInstruction?: string; +} + +const DEFAULT_BATCH_SIZE = 32; + +const chunk = (items: readonly A[], size: number): A[][] => { + const safe = Math.max(1, Math.floor(size)); + const out: A[][] = []; + for (let i = 0; i < items.length; i += safe) out.push(items.slice(i, i + safe)); + return out; +}; + +const normalizeVectors = ( + response: Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Output, + count: number, +): readonly (readonly number[])[] | undefined => { + if (response.data?.length === count) return response.data; + return undefined; +}; + +export const makeCloudflareWorkersAiEmbedder = ( + options: CloudflareWorkersAiEmbedderOptions, +): ToolEmbedder => { + const model = DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_MODEL; + const batchSize = options.batchSize ?? DEFAULT_BATCH_SIZE; + + const embedBatch = ( + kind: "documents" | "queries", + texts: readonly string[], + instruction: string | undefined, + ): Effect.Effect => { + if (texts.length === 0) return Effect.succeed([]); + return Effect.gen(function* () { + const response = yield* Effect.tryPromise({ + try: () => + options.ai.run(model, { + [kind]: [...texts], + ...(instruction ? { instruction } : {}), + } satisfies Ai_Cf_Qwen_Qwen3_Embedding_0_6B_Input), + catch: (cause) => + new SemanticSearchError({ + message: `Cloudflare Workers AI embedding failed for ${model}.`, + cause, + }), + }); + const embeddings = normalizeVectors(response, texts.length); + if (!embeddings) { + return yield* new SemanticSearchError({ + message: `Cloudflare Workers AI returned ${response.data?.length ?? 0} vectors for ${texts.length} inputs.`, + }); + } + return embeddings; + }); + }; + + return { + model, + dimensions: options.dimensions ?? DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_DIMENSIONS, + embedDocuments: (texts) => + Effect.forEach( + chunk(texts, batchSize), + (group) => embedBatch("documents", group, options.documentInstruction), + { concurrency: 1 }, + ).pipe(Effect.map((groups) => groups.flat())), + embedQuery: (text) => + embedBatch("queries", [text], options.queryInstruction).pipe( + Effect.map((vectors) => vectors[0] ?? []), + ), + }; +}; diff --git a/packages/plugins/semantic-search/src/sdk/embedding-service.ts b/packages/plugins/semantic-search/src/sdk/embedding-service.ts index 7daad9fd4..05eaea87b 100644 --- a/packages/plugins/semantic-search/src/sdk/embedding-service.ts +++ b/packages/plugins/semantic-search/src/sdk/embedding-service.ts @@ -1,6 +1,10 @@ import { Context, Layer } from "effect"; import { makeHashEmbedder } from "./embedder-hash"; +import { + makeCloudflareWorkersAiEmbedder, + type CloudflareWorkersAiEmbedderOptions, +} from "./embedder-cloudflare"; import { makeOpenAiCompatibleEmbedder, type OpenAiCompatibleEmbedderOptions, @@ -26,5 +30,10 @@ export const openAiCompatibleEmbedderLayer = ( ): Layer.Layer => Layer.succeed(EmbedderService)(makeOpenAiCompatibleEmbedder(options)); +export const cloudflareWorkersAiEmbedderLayer = ( + options: CloudflareWorkersAiEmbedderOptions, +): Layer.Layer => + Layer.succeed(EmbedderService)(makeCloudflareWorkersAiEmbedder(options)); + export const hashEmbedderLayer = (dimensions?: number): Layer.Layer => Layer.succeed(EmbedderService)(makeHashEmbedder(dimensions)); diff --git a/packages/plugins/semantic-search/src/sdk/index.ts b/packages/plugins/semantic-search/src/sdk/index.ts index 9b0c50a7f..76c7baa7f 100644 --- a/packages/plugins/semantic-search/src/sdk/index.ts +++ b/packages/plugins/semantic-search/src/sdk/index.ts @@ -17,15 +17,11 @@ export { export { makeAiSearchToolDiscoveryProvider, makeAiSearchToolSearchBackend, + DEFAULT_AI_SEARCH_EMBEDDING_MODEL, reindexAiSearch, statusAiSearch, - type AiSearchChunk, - type AiSearchInstance, - type AiSearchListedItem, - type AiSearchSearchResponse, type AiSearchToolSearchBackendOptions, type AiSearchToolSearchBackendStorage, - type AiSearchUploadedItem, } from "./ai-search"; export { makeVectorToolDiscoveryProvider } from "./provider"; export { @@ -35,6 +31,11 @@ export { type ToolEmbedder, type GeminiEmbedderOptions, } from "./embedder"; +export { + makeCloudflareWorkersAiEmbedder, + DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_DIMENSIONS, + DEFAULT_CLOUDFLARE_WORKERS_AI_EMBEDDING_MODEL, +} from "./embedder-cloudflare"; export { type VectorStore, type VectorMatch, type VectorMatches, type VectorInput } from "./store"; export { makeVectorizeStore, type VectorizeIndex, MAX_TOP_K } from "./store-cloudflare"; export { diff --git a/packages/plugins/semantic-search/src/sdk/tool-search-backend.ts b/packages/plugins/semantic-search/src/sdk/tool-search-backend.ts index 00fc6be78..12d43930b 100644 --- a/packages/plugins/semantic-search/src/sdk/tool-search-backend.ts +++ b/packages/plugins/semantic-search/src/sdk/tool-search-backend.ts @@ -11,6 +11,10 @@ import { Effect } from "effect"; import { type Chunker, makeFacetChunker } from "./chunker"; import { indexChunks, indexJobs, indexRuns, toolFingerprints } from "./collections"; +import { + makeCloudflareWorkersAiEmbedder, + type CloudflareWorkersAiEmbedderOptions, +} from "./embedder-cloudflare"; import { makeGeminiEmbedder, type GeminiEmbedderOptions, type ToolEmbedder } from "./embedder"; import { SemanticSearchError } from "./errors"; import { makeHybridToolDiscoveryProvider } from "./hybrid"; @@ -94,6 +98,7 @@ export interface ToolSearchBackendFactory { export interface VectorToolSearchBackendOptions { readonly namespace?: string; readonly store: VectorStore; + readonly workersAi?: CloudflareWorkersAiEmbedderOptions["ai"]; readonly geminiApiKey?: string; readonly model?: string; readonly dimensions?: number; @@ -125,6 +130,13 @@ export const unconfiguredIndex: ToolSearchIndex.Service = { const makeVectorEmbedder = (options: VectorToolSearchBackendOptions): ToolEmbedder | undefined => options.embedder ?? + (options.workersAi + ? makeCloudflareWorkersAiEmbedder({ + ai: options.workersAi, + dimensions: options.dimensions, + batchSize: options.embedderBatchSize, + }) + : undefined) ?? (options.geminiApiKey ? makeGeminiEmbedder({ apiKey: options.geminiApiKey, From fe469512e5f7e113a2170f8e6012737a2539e4da Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Sat, 27 Jun 2026 00:57:06 +0530 Subject: [PATCH 2/5] refactor(semantic-search): trust ai search model config Remove embedding model validation from the AI Search backend so the bound Cloudflare AI Search instance remains the source of truth for embedding model configuration. --- .../semantic-search/src/sdk/ai-search.test.ts | 54 +------------------ .../semantic-search/src/sdk/ai-search.ts | 35 +----------- .../plugins/semantic-search/src/sdk/index.ts | 1 - 3 files changed, 4 insertions(+), 86 deletions(-) diff --git a/packages/plugins/semantic-search/src/sdk/ai-search.test.ts b/packages/plugins/semantic-search/src/sdk/ai-search.test.ts index 2c26eb342..27673e503 100644 --- a/packages/plugins/semantic-search/src/sdk/ai-search.test.ts +++ b/packages/plugins/semantic-search/src/sdk/ai-search.test.ts @@ -3,11 +3,7 @@ import type { AiSearchInstance } from "@cloudflare/workers-types"; import { type PluginStorageCollectionFacade, type PluginStorageEntry } from "@executor-js/sdk/core"; import { Effect } from "effect"; -import { - DEFAULT_AI_SEARCH_EMBEDDING_MODEL, - makeAiSearchToolDiscoveryProvider, - reindexAiSearch, -} from "./ai-search"; +import { makeAiSearchToolDiscoveryProvider, reindexAiSearch } from "./ai-search"; import { type aiSearchItems, type AiSearchItemRow } from "./collections"; type ItemsCollection = PluginStorageCollectionFacade; @@ -72,11 +68,7 @@ const makeAiSearchItems = () => get: () => expect.unreachable("Unexpected AI Search item lookup"), }) satisfies Pick["items"]; -const makeAiSearch = (): Pick => ({ - info: async () => ({ - id: "executor-tool-search", - embedding_model: DEFAULT_AI_SEARCH_EMBEDDING_MODEL, - }), +const makeAiSearch = (): Pick => ({ items: makeAiSearchItems(), search: async () => ({ search_query: "create repo", @@ -254,48 +246,6 @@ describe("reindexAiSearch", () => { }), ); - it.effect( - "fails before indexing when the AI Search instance uses a different embedding model", - () => - Effect.gen(function* () { - const error = yield* Effect.flip( - reindexAiSearch({ - executor: { - tools: { - manifest: () => - Effect.succeed([ - { - path: "github.default.main.repos.create", - name: "repos.create", - description: "Create a repository", - integration: "github", - fingerprintVersion: "v1", - indexFingerprint: "fingerprint", - }, - ]), - }, - } as never, - aiSearch: { - ...makeAiSearch(), - info: async () => ({ - id: "executor-tool-search", - embedding_model: "@cf/baai/bge-base-en-v1.5", - }), - }, - items: makeItemsCollection({ - list: () => Effect.sync(() => expect.unreachable("list should not run")), - }), - owner: "org", - namespace: "org", - }), - ); - - expect(error).toMatchObject({ - message: expect.stringContaining(DEFAULT_AI_SEARCH_EMBEDDING_MODEL), - }); - }), - ); - it.effect("removes stale rows even when deleting the remote AI Search item fails", () => Effect.gen(function* () { const removed: string[] = []; diff --git a/packages/plugins/semantic-search/src/sdk/ai-search.ts b/packages/plugins/semantic-search/src/sdk/ai-search.ts index ca93720aa..f20cbc6ee 100644 --- a/packages/plugins/semantic-search/src/sdk/ai-search.ts +++ b/packages/plugins/semantic-search/src/sdk/ai-search.ts @@ -29,12 +29,9 @@ import type { } from "./tool-search-backend"; import type { ToolSearchIndex } from "./tool-search-index"; -export const DEFAULT_AI_SEARCH_EMBEDDING_MODEL = "@cf/qwen/qwen3-embedding-0.6b"; - export interface AiSearchToolSearchBackendOptions { - readonly aiSearch: Pick | undefined; + readonly aiSearch: Pick | undefined; readonly namespace?: string; - readonly embeddingModel?: string; } type ItemsCollection = PluginStorageCollectionFacade; @@ -75,27 +72,6 @@ const notConfigured = (): Effect.Effect => }), ); -const requireEmbeddingModel = (input: { - readonly aiSearch: Pick; - readonly expectedModel: string; -}): Effect.Effect => - Effect.tryPromise({ - try: () => input.aiSearch.info(), - catch: (cause) => - new SemanticSearchError({ message: "Failed to read AI Search instance config.", cause }), - }).pipe( - Effect.flatMap((info) => { - const actualModel = - typeof info.embedding_model === "string" ? info.embedding_model : undefined; - if (!actualModel || actualModel === input.expectedModel) return Effect.void; - return Effect.fail( - new SemanticSearchError({ - message: `AI Search instance uses embedding model "${actualModel}", expected "${input.expectedModel}". Recreate or update the instance before indexing.`, - }), - ); - }), - ); - const unavailableIndex: ToolSearchIndex.Service = { create: () => notConfigured(), scan: () => notConfigured(), @@ -151,20 +127,15 @@ const putIndexedItem = ( export const reindexAiSearch = (input: { readonly executor: Executor; - readonly aiSearch: Pick | undefined; + readonly aiSearch: Pick | undefined; readonly items: ItemsCollection; readonly owner: "user" | "org"; readonly namespace: string; - readonly embeddingModel?: string; readonly maxTools?: number; }): Effect.Effect => { if (!input.aiSearch) return notConfigured(); const aiSearch = input.aiSearch; return Effect.gen(function* () { - yield* requireEmbeddingModel({ - aiSearch, - expectedModel: input.embeddingModel ?? DEFAULT_AI_SEARCH_EMBEDDING_MODEL, - }); const manifests = yield* listToolManifests(input.executor, { maxTools: input.maxTools }); const livePaths = new Set(manifests.map((manifest) => manifest.path)); const existingEntries = yield* input.items @@ -392,7 +363,6 @@ export const makeAiSearchToolSearchBackend = ( options: AiSearchToolSearchBackendOptions, ): ToolSearchBackendFactory => { const namespace = options.namespace ?? "default"; - const embeddingModel = options.embeddingModel ?? DEFAULT_AI_SEARCH_EMBEDDING_MODEL; return { namespace, pluginStorage: { aiSearchItems }, @@ -416,7 +386,6 @@ export const makeAiSearchToolSearchBackend = ( items: storage.aiSearchItems, owner: storage.owner, namespace, - embeddingModel, }), sweep: () => Effect.succeed({ diff --git a/packages/plugins/semantic-search/src/sdk/index.ts b/packages/plugins/semantic-search/src/sdk/index.ts index 76c7baa7f..48079f88e 100644 --- a/packages/plugins/semantic-search/src/sdk/index.ts +++ b/packages/plugins/semantic-search/src/sdk/index.ts @@ -17,7 +17,6 @@ export { export { makeAiSearchToolDiscoveryProvider, makeAiSearchToolSearchBackend, - DEFAULT_AI_SEARCH_EMBEDDING_MODEL, reindexAiSearch, statusAiSearch, type AiSearchToolSearchBackendOptions, From 40c841e2a4c1707465b13cad2653c9f2218d16d7 Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Sat, 27 Jun 2026 14:55:04 +0530 Subject: [PATCH 3/5] fix(semantic-search): process ai search indexing in batches --- .../plugins/semantic-search/src/api/group.ts | 5 +- .../semantic-search/src/api/handlers.ts | 3 +- .../semantic-search/src/react/SearchPage.tsx | 42 ++- .../semantic-search/src/sdk/ai-search.test.ts | 349 +++++++++++++++++- .../semantic-search/src/sdk/ai-search.ts | 332 +++++++++++++---- .../semantic-search/src/sdk/documents.ts | 3 +- .../plugins/semantic-search/src/sdk/index.ts | 3 + .../semantic-search/src/sdk/plugin.test.ts | 2 + .../plugins/semantic-search/src/sdk/plugin.ts | 4 + .../src/sdk/tool-search-backend.ts | 17 + 10 files changed, 653 insertions(+), 107 deletions(-) diff --git a/packages/plugins/semantic-search/src/api/group.ts b/packages/plugins/semantic-search/src/api/group.ts index af47fa55d..4fa35cf4a 100644 --- a/packages/plugins/semantic-search/src/api/group.ts +++ b/packages/plugins/semantic-search/src/api/group.ts @@ -10,8 +10,7 @@ import { InternalError } from "@executor-js/api"; // execution-history/graphql convention — the per-request executor is already // owner-scoped at the host edge, so there is no `:scopeId` segment. // -// - reindex (POST) indexes the whole tool catalog for the scoped tenant -// through the same index refresh/chunk/embed pipeline. +// - reindex (POST) uploads changed tool documents into Cloudflare AI Search. // - search (GET) runs a live `tools.search` through the SAME provider the // engine uses, so the operator console sees what the agent would. // - status (GET) reports index population (vector fingerprints + lexical docs). @@ -20,7 +19,7 @@ import { InternalError } from "@executor-js/api"; // `capture` downgrades it to `InternalError`. // --------------------------------------------------------------------------- -/** Result of an index run: counts for each category of tool processed. */ +/** Result of submitting changed documents to the backend index. */ export const ReindexResponse = Schema.Struct({ namespace: Schema.String, total: Schema.Number, diff --git a/packages/plugins/semantic-search/src/api/handlers.ts b/packages/plugins/semantic-search/src/api/handlers.ts index 10a8dd1c9..688298287 100644 --- a/packages/plugins/semantic-search/src/api/handlers.ts +++ b/packages/plugins/semantic-search/src/api/handlers.ts @@ -15,8 +15,7 @@ import { SemanticSearchGroup } from "./group"; // `Layer.succeed(SemanticSearchExtensionService, executor.semanticSearch)`. // The handler also yields the per-request `ExecutorService` (the scoped // executor) and hands it to `reindex`, since the catalog lives on the executor, -// not the plugin ctx. `reindex` is a index run, not a legacy page -// reconciler. +// not the plugin ctx. // --------------------------------------------------------------------------- export class SemanticSearchExtensionService extends Context.Service< diff --git a/packages/plugins/semantic-search/src/react/SearchPage.tsx b/packages/plugins/semantic-search/src/react/SearchPage.tsx index cfab0fa6c..7affaf484 100644 --- a/packages/plugins/semantic-search/src/react/SearchPage.tsx +++ b/packages/plugins/semantic-search/src/react/SearchPage.tsx @@ -1,5 +1,6 @@ import { useState } from "react"; import { useAtomRefresh, useAtomSet, useAtomValue } from "@effect/atom-react"; +import * as Cause from "effect/Cause"; import * as Exit from "effect/Exit"; import * as AsyncResult from "effect/unstable/reactivity/AsyncResult"; import { RefreshCw, Search } from "lucide-react"; @@ -13,16 +14,18 @@ import { TableHeader, TableRow, } from "@executor-js/react/components/table"; +import type { InternalError } from "@executor-js/sdk/core"; import type { SearchResponseType, StatusResponseType } from "../api/group"; import { reindexMutation, searchAtom, statusAtom } from "./atoms"; const SEARCH_LIMIT = 25; +const internalErrorMessage = (cause: Cause.Cause): string => Cause.pretty(cause); + // Operator/debug page for the semantic-search plugin: a live `tools.search` box -// over the engine's discovery provider (Vectorize + FTS5/BM25 hybrid), an index -// status line, and an explicit reindex trigger. Read-only against the catalog — -// the only mutation is reindex. +// over the engine's discovery provider, an index status line, and an explicit +// reindex trigger. Read-only against the catalog, except for reindex. export function SearchPage() { const [input, setInput] = useState(""); const [submitted, setSubmitted] = useState(""); @@ -31,10 +34,10 @@ export function SearchPage() { const searchResult = useAtomValue( searchAtom({ q: submitted, limit: SEARCH_LIMIT }), - ) as AsyncResult.AsyncResult; + ) as AsyncResult.AsyncResult; const statusResult = useAtomValue(statusAtom) as AsyncResult.AsyncResult< StatusResponseType, - unknown + InternalError >; const refreshStatus = useAtomRefresh(statusAtom); const doReindex = useAtomSet(reindexMutation, { mode: "promiseExit" }); @@ -54,6 +57,11 @@ export function SearchPage() { onFailure: () => true, onSuccess: () => false, }); + const searchError = AsyncResult.match(searchResult, { + onInitial: () => null, + onFailure: ({ cause }) => internalErrorMessage(cause), + onSuccess: () => null, + }); const status = AsyncResult.match(statusResult, { onInitial: () => null, onFailure: () => null, @@ -67,12 +75,12 @@ export function SearchPage() { setNotice(null); const exit = await doReindex({ reactivityKeys: [] }); setReindexing(false); + refreshStatus(); if (Exit.isFailure(exit)) { - setNotice("Reindex failed — a full-catalog index run can exceed the Worker CPU limit."); + setNotice("Reindex failed. Check server logs for the trace id."); return; } - setNotice("Reindex complete."); - refreshStatus(); + setNotice("Reindex submitted."); }; return ( @@ -85,8 +93,8 @@ export function SearchPage() {

Run a live tools.search through the - engine's discovery provider — semantic (Vectorize) fused with lexical (FTS5/BM25) when - both are indexed. This is what the agent sees. + engine's configured discovery provider. On Cloudflare this uses the bound AI Search + instance. This is what the agent sees.