diff --git a/.changeset/plugin-storage-bulk-upserts.md b/.changeset/plugin-storage-bulk-upserts.md new file mode 100644 index 000000000..4265a60e9 --- /dev/null +++ b/.changeset/plugin-storage-bulk-upserts.md @@ -0,0 +1,7 @@ +--- +"@executor-js/fumadb": patch +"@executor-js/sdk": patch +--- + +Add a FumaDB bulk upsert query path and route plugin-storage bulk writes through +it so existing rows are updated without delete/reinsert churn. diff --git a/packages/core/fumadb/src/adapters/drizzle/query.ts b/packages/core/fumadb/src/adapters/drizzle/query.ts index 9471ecda0..20eb39efb 100644 --- a/packages/core/fumadb/src/adapters/drizzle/query.ts +++ b/packages/core/fumadb/src/adapters/drizzle/query.ts @@ -121,6 +121,16 @@ function buildWhere( ); } +function countConditionParameters(condition: Condition): number { + if (condition.type === ConditionType.Compare) { + if (condition.b instanceof Column) return 0; + if (Array.isArray(condition.b)) return condition.b.length; + return 1; + } + if (condition.type === ConditionType.Not) return countConditionParameters(condition.item); + return condition.items.reduce((count, item) => count + countConditionParameters(item), 0); +} + function mapValues( values: Record, table: AnyTable @@ -303,6 +313,78 @@ export function fromDrizzle( await this.createMany(table, [v.create]); } }, + async upsertMany(table, v) { + if (v.values.length === 0) return; + if (v.target.length === 0) { + // oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape + throw new Error("[FumaDB] upsertMany requires at least one target column."); + } + if (v.update.length === 0) { + // oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape + throw new Error("[FumaDB] upsertMany requires at least one update column."); + } + if (provider !== "sqlite" && provider !== "postgresql") { + for (const value of v.values) { + const targetCondition: Condition = { + type: ConditionType.And, + items: v.target.map((column) => ({ + type: ConditionType.Compare, + a: column, + operator: "=", + b: value[column.ormName], + })), + }; + await this.upsert(table, { + where: v.where + ? { type: ConditionType.And, items: [targetCondition, v.where] } + : targetCondition, + update: Object.fromEntries( + v.update.map((column) => [column.ormName, value[column.ormName]]), + ), + create: value, + }); + } + return; + } + + const drizzleTable = toDrizzle(table); + const values = v.values.map((value) => mapValues(value, table)); + const where = v.where ? buildWhere(toDrizzleColumn, v.where) : undefined; + const whereParameters = v.where ? countConditionParameters(v.where) : 0; + const columnsPerRow = values.length > 0 ? Math.max(1, Object.keys(values[0]!).length) : 1; + const batchSize = maxBoundParameters + ? Math.max( + 1, + Math.min( + CREATE_MANY_BATCH_SIZE, + Math.floor(Math.max(1, maxBoundParameters - whereParameters) / columnsPerRow), + ), + ) + : CREATE_MANY_BATCH_SIZE; + const target = v.target.map((column) => drizzleTable[column.names.drizzle]); + const set = Object.fromEntries( + v.update.map((column) => [ + column.names.drizzle, + Drizzle.sql.raw(`excluded.${column.names.sql}`), + ]), + ); + + for (let i = 0; i < values.length; i += batchSize) { + const batch = values.slice(i, i + batchSize); + const insert = db.insert(drizzleTable).values(batch) as unknown as { + onConflictDoUpdate: (input: { + readonly target: typeof target; + readonly set: typeof set; + readonly where?: typeof where; + }) => Promise; + }; + await insert.onConflictDoUpdate({ + target, + set, + ...(where === undefined ? {} : { where }), + }); + } + }, async findMany(table, v) { return ( await db.query[table.names.drizzle].findMany(buildQueryConfig(table, v)) diff --git a/packages/core/fumadb/src/adapters/memory/index.ts b/packages/core/fumadb/src/adapters/memory/index.ts index a596373f5..5c34debc5 100644 --- a/packages/core/fumadb/src/adapters/memory/index.ts +++ b/packages/core/fumadb/src/adapters/memory/index.ts @@ -174,6 +174,35 @@ export function memoryAdapter(options: MemoryAdapterOptions = {}): FumaDBAdapter } await this.create(table, v.create); }, + async upsertMany(table, v) { + if (v.target.length === 0) { + // oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape + throw new Error("[FumaDB] upsertMany requires at least one target column."); + } + if (v.update.length === 0) { + // oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape + throw new Error("[FumaDB] upsertMany requires at least one update column."); + } + for (const value of v.values) { + const existing = tableRows(db, table).find( + (row) => + matchesCondition(row, v.where) && + v.target.every((column) => row[column.ormName] === value[column.ormName]), + ); + if (existing) { + Object.assign( + existing, + cloneValue( + Object.fromEntries( + v.update.map((column) => [column.ormName, value[column.ormName]]), + ), + ), + ); + continue; + } + await this.create(table, value); + } + }, async create(table, values) { const row = applyDefaults(table, values); tableRows(db, table).push(row); diff --git a/packages/core/fumadb/src/query/index.ts b/packages/core/fumadb/src/query/index.ts index 2128adb0e..45633dfb9 100644 --- a/packages/core/fumadb/src/query/index.ts +++ b/packages/core/fumadb/src/query/index.ts @@ -174,6 +174,22 @@ export interface AbstractQuery { } ) => Promise; + /** + * Bulk upsert rows by a unique column target. + * + * Adapters with native conflict support should implement this as one or more + * `INSERT ... ON CONFLICT ... DO UPDATE` statements. Other adapters may fall + * back to per-row `upsert`. + */ + upsertMany: ( + table: TableName, + v: { + target: (keyof S["tables"][TableName]["columns"])[]; + update: (keyof S["tables"][TableName]["columns"])[]; + values: TableToInsertValues[]; + } + ) => Promise; + /** * Note: you cannot update the id of a row, some databases don't support that (including MongoDB). */ diff --git a/packages/core/fumadb/src/query/orm/index.ts b/packages/core/fumadb/src/query/orm/index.ts index f0bac42e7..fae537445 100644 --- a/packages/core/fumadb/src/query/orm/index.ts +++ b/packages/core/fumadb/src/query/orm/index.ts @@ -4,6 +4,7 @@ import type { AnySchema, AnyTable, } from "../../schema"; +import { Column } from "../../schema"; import type { AbstractQuery, AnySelectClause, @@ -12,7 +13,12 @@ import type { JoinBuilder, OrderBy, } from ".."; -import { buildCondition, createBuilder, type Condition } from "../condition-builder"; +import { + buildCondition, + createBuilder, + type Condition, + ConditionType, +} from "../condition-builder"; export interface CompiledJoin { relation: AnyRelation; @@ -231,6 +237,27 @@ const applyUpdatePolicies = async ( return nextWhere; }; +const conditionKey = (condition: Condition | undefined): string => { + if (!condition) return "none"; + if (condition.type === ConditionType.Compare) { + const right = + condition.b instanceof Column ? { column: condition.b.ormName } : { value: condition.b }; + return JSON.stringify({ + type: "compare", + left: condition.a.ormName, + operator: condition.operator, + right, + }); + } + if (condition.type === ConditionType.Not) { + return JSON.stringify({ type: "not", item: conditionKey(condition.item) }); + } + return JSON.stringify({ + type: condition.type === ConditionType.And ? "and" : "or", + items: condition.items.map(conditionKey), + }); +}; + const applyDeletePolicies = async ( table: AnyTable, where: Condition | undefined, @@ -284,6 +311,16 @@ export interface ORMAdapter { }, ) => Promise; + upsertMany?: ( + table: AnyTable, + v: { + target: AnyColumn[]; + update: AnyColumn[]; + values: Record[]; + where?: Condition; + }, + ) => Promise; + create: ( table: AnyTable, values: Record, @@ -367,6 +404,93 @@ export function toORM( ...options, }); }, + async upsertMany(name, { target, update, values }) { + const table = toTable(name); + if (values.length === 0) return; + if (target.length === 0) { + // oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: public query rejects invalid upsert shape + throw new Error("[FumaDB] upsertMany requires at least one target column."); + } + if (update.length === 0) { + // oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: public query rejects invalid upsert shape + throw new Error("[FumaDB] upsertMany requires at least one update column."); + } + + const targetColumns = target.map((columnName) => { + const column = table.columns[columnName as string]; + if (!column) throw new Error(`[FumaDB] unknown column name ${String(columnName)}.`); + return column; + }); + const updateColumns = update.map((columnName) => { + const column = table.columns[columnName as string]; + if (!column) throw new Error(`[FumaDB] unknown column name ${String(columnName)}.`); + return column; + }); + + const builder = createBuilder(table.columns); + const permittedRows: { + readonly value: Record; + readonly where: Condition | undefined; + }[] = []; + for (const value of values) { + const updateValues = Object.fromEntries( + updateColumns.map((column) => [column.ormName, value[column.ormName]]), + ); + const constrainedWhere = await applyUpdatePolicies( + table, + undefined, + updateValues, + context, + "upsert", + value, + ); + if (constrainedWhere === false) continue; + await runCreatePolicies(table, value, context); + permittedRows.push({ value, where: constrainedWhere }); + } + if (permittedRows.length === 0) return; + + if (internal.upsertMany) { + const groups = new Map< + string, + { readonly where: Condition | undefined; readonly values: Record[] } + >(); + for (const row of permittedRows) { + const key = conditionKey(row.where); + const group = groups.get(key); + if (group) { + group.values.push(row.value); + } else { + groups.set(key, { where: row.where, values: [row.value] }); + } + } + for (const group of groups.values()) { + await internal.upsertMany(table, { + target: targetColumns, + update: updateColumns, + values: group.values, + where: group.where, + }); + } + return; + } + + for (const row of permittedRows) { + const value = row.value; + const targetWhere = builder.and( + ...targetColumns.map((column) => builder(column.ormName, "=", value[column.ormName])), + ); + const where = builder.and(targetWhere, row.where ?? true); + if (where === false) continue; + await internal.upsert(table, { + where: where === true ? undefined : where, + update: Object.fromEntries( + updateColumns.map((column) => [column.ormName, value[column.ormName]]), + ), + create: value, + }); + } + }, async create(name, values) { const table = toTable(name); await runCreatePolicies(table, values, context); diff --git a/packages/core/fumadb/src/query/table-policy.test.ts b/packages/core/fumadb/src/query/table-policy.test.ts index 51d0c2b8c..688ba06f2 100644 --- a/packages/core/fumadb/src/query/table-policy.test.ts +++ b/packages/core/fumadb/src/query/table-policy.test.ts @@ -382,6 +382,24 @@ describe("FumaDB table policies", () => { title: "A Three", }, }); + await tenantA.upsertMany("posts", { + target: ["id"], + update: ["title"], + values: [ + { + id: "post-a-1", + tenantId: "tenant-a", + authorId: "author-a", + title: "tenant-a-bulk-upserted", + }, + { + id: "post-a-4", + tenantId: "tenant-a", + authorId: "author-a", + title: "A Four", + }, + ], + }); await expect( tenantA.findMany("posts", { @@ -391,7 +409,7 @@ describe("FumaDB table policies", () => { ).resolves.toEqual([ { id: "post-a-1", - title: "tenant-a-updated", + title: "tenant-a-bulk-upserted", }, { id: "post-a-2", @@ -401,6 +419,10 @@ describe("FumaDB table policies", () => { id: "post-a-3", title: "A Three", }, + { + id: "post-a-4", + title: "A Four", + }, ]); expect(tenantAContext.observed).toEqual( @@ -460,6 +482,37 @@ describe("FumaDB table policies", () => { }), ); + it.effect("rejects invalid bulk upsert conflict shapes", () => + useHarness(async (orm) => { + await seedTenants(orm); + const tenantA = withQueryContext(orm, makeContext(["tenant-a"], "tenant-a")); + const values = [ + { + id: "post-a-bulk-upsert", + tenantId: "tenant-a", + authorId: "author-a", + title: "A bulk upsert", + }, + ]; + + await expect( + tenantA.upsertMany("posts", { + target: [], + update: ["title"], + values, + }), + ).rejects.toThrow("[FumaDB] upsertMany requires at least one target column."); + + await expect( + tenantA.upsertMany("posts", { + target: ["id"], + update: [], + values, + }), + ).rejects.toThrow("[FumaDB] upsertMany requires at least one update column."); + }), + ); + it.effect("fails closed when a query wrapper does not forward context rebinding", () => useHarness(async (orm) => { const wrapped = { ...orm }; @@ -471,7 +524,7 @@ describe("FumaDB table policies", () => { ); it.effect( - "rejects out-of-context writes across createMany, updateMany, upsert, and transactions", + "rejects out-of-context writes across createMany, updateMany, upsert, upsertMany, and transactions", () => useHarness(async (orm) => { await seedTenants(orm); @@ -524,6 +577,47 @@ describe("FumaDB table policies", () => { }), ).rejects.toThrow("tenant tenant-b is not allowed for posts"); + await expect( + tenantA.upsertMany("posts", { + target: ["id"], + update: ["title"], + values: [ + { + id: "post-a-bulk-upsert", + tenantId: "tenant-a", + authorId: "author-a", + title: "A bulk upsert", + }, + { + id: "post-b-bulk-upsert", + tenantId: "tenant-b", + authorId: "author-b", + title: "B bulk upsert", + }, + ], + }), + ).rejects.toThrow("tenant tenant-b is not allowed for posts"); + await expect( + tenantA.findFirst("posts", { + where: (builder) => builder("id", "=", "post-a-bulk-upsert"), + }), + ).resolves.toBeNull(); + + await expect( + tenantA.upsertMany("posts", { + target: ["id"], + update: ["tenantId"], + values: [ + { + id: "post-a-1", + tenantId: "tenant-b", + authorId: "author-b", + title: "tenant move", + }, + ], + }), + ).rejects.toThrow("tenant tenant-b is not allowed for posts"); + await expect( tenantA.transaction(async (tx) => { await tx.create("posts", { diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 38e12a3c6..33cc393ed 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -143,7 +143,6 @@ import { connectionIdentifier } from "./connection-name-identifier"; import { annotateToolResultOutcome } from "./tool-result"; const PLUGIN_STORAGE_DELETE_KEY_BATCH_SIZE = 90; -const PLUGIN_STORAGE_CREATE_ROW_BATCH_SIZE = 90; const MAX_APPROVAL_ARGUMENT_PREVIEW_CHARS = 4_000; // --------------------------------------------------------------------------- @@ -601,6 +600,14 @@ type LooseStorageDb = { tableName: string, rows: readonly Record[], ) => Promise; + readonly upsertMany: ( + tableName: string, + options: { + readonly target: readonly string[]; + readonly update: readonly string[]; + readonly values: readonly Record[]; + }, + ) => Promise; readonly deleteMany: (tableName: string, options?: unknown) => Promise; readonly findFirst: ( tableName: string, @@ -637,6 +644,19 @@ const makeCoreDb = (fuma: ReturnType) => ({ : fuma .use(`${tableName}.createMany`, (db) => asLooseStorageDb(db).createMany(tableName, rows)) .pipe(Effect.asVoid), + upsertMany: ( + tableName: TName, + options: { + readonly target: readonly string[]; + readonly update: readonly string[]; + readonly values: readonly Record[]; + }, + ): Effect.Effect => + options.values.length === 0 + ? Effect.void + : fuma.use(`${tableName}.upsertMany`, (db) => + asLooseStorageDb(db).upsertMany(tableName, options), + ), deleteMany: ( tableName: TName, options: { readonly where?: CoreWhere } = {}, @@ -986,33 +1006,22 @@ const makePluginStorageFacade = (input: { const uniqueEntries = [...entriesById.values()]; if (uniqueEntries.length === 0) return; - yield* deleteManyImpl(owner, os.subject, uniqueEntries); - const now = new Date(); - for ( - let offset = 0; - offset < uniqueEntries.length; - offset += PLUGIN_STORAGE_CREATE_ROW_BATCH_SIZE - ) { - const batchEntries = uniqueEntries.slice( - offset, - offset + PLUGIN_STORAGE_CREATE_ROW_BATCH_SIZE, - ); - yield* input.core.createMany( - "plugin_storage", - batchEntries.map((entry) => ({ - tenant, - owner: os.owner, - subject: os.subject, - plugin_id: input.pluginId, - collection: entry.collection, - key: entry.key, - data: entry.data, - created_at: now, - updated_at: now, - })), - ); - } + yield* input.core.upsertMany("plugin_storage", { + target: ["tenant", "owner", "subject", "plugin_id", "collection", "key"], + update: ["data", "updated_at"], + values: uniqueEntries.map((entry) => ({ + tenant, + owner: os.owner, + subject: os.subject, + plugin_id: input.pluginId, + collection: entry.collection, + key: entry.key, + data: entry.data, + created_at: now, + updated_at: now, + })), + }); }); const removeManyImpl = ( @@ -1109,10 +1118,24 @@ const makePluginStorageFacade = (input: { PluginStorageEntry>, StorageFailure >, + putMany: (storageInput) => + putManyImpl( + storageInput.owner, + storageInput.entries.map((entry) => ({ + collection: definition.name, + key: entry.key, + data: entry.data, + })), + ), query: (storageInput) => queryCollection(definition, storageInput), count: (storageInput) => queryCollection(definition, storageInput).pipe(Effect.map((rows) => rows.length)), remove: (storageInput) => removeImpl(storageInput.owner, definition.name, storageInput.key), + removeMany: (storageInput) => + removeManyImpl( + storageInput.owner, + storageInput.keys.map((key) => ({ collection: definition.name, key })), + ), }), get: (storageInput) => getVisible(storageInput.collection, storageInput.key), getForOwner: (storageInput) => diff --git a/packages/core/sdk/src/fuma-runtime.ts b/packages/core/sdk/src/fuma-runtime.ts index e560b1b09..e0a1dfa2c 100644 --- a/packages/core/sdk/src/fuma-runtime.ts +++ b/packages/core/sdk/src/fuma-runtime.ts @@ -143,6 +143,7 @@ const makeSafeFumaQuery = ( db.transaction((transactionDb) => run(makeSafeFumaQuery(transactionDb, options))), updateMany: (name, value) => db.updateMany(table(name), value), upsert: (name, value) => db.upsert(table(name), value), + upsertMany: (name, value) => db.upsertMany(table(name), value), }; return Object.freeze(query); diff --git a/packages/core/sdk/src/plugin-storage.test.ts b/packages/core/sdk/src/plugin-storage.test.ts index a547d06a5..a7624c0c4 100644 --- a/packages/core/sdk/src/plugin-storage.test.ts +++ b/packages/core/sdk/src/plugin-storage.test.ts @@ -65,18 +65,14 @@ const executionHistoryPlugin = definePlugin(() => ({ owner: Owner, rows: readonly { readonly key: string; readonly data: ToolCall }[], ) => - ctx.pluginStorage.putMany({ + ctx.storage.toolCalls.putMany({ owner, - entries: rows.map((row) => ({ - collection: toolCalls.name, - key: row.key, - data: row.data, - })), + entries: rows, }), removeMany: (owner: Owner, keys: readonly string[]) => - ctx.pluginStorage.removeMany({ + ctx.storage.toolCalls.removeMany({ owner, - entries: keys.map((key) => ({ collection: toolCalls.name, key })), + keys, }), get: (key: string) => ctx.storage.toolCalls.get({ key }), getForOwner: (owner: Owner, key: string) => ctx.storage.toolCalls.getForOwner({ owner, key }), diff --git a/packages/core/sdk/src/plugin-storage.ts b/packages/core/sdk/src/plugin-storage.ts index e854feb51..1c0eec046 100644 --- a/packages/core/sdk/src/plugin-storage.ts +++ b/packages/core/sdk/src/plugin-storage.ts @@ -135,6 +135,21 @@ export interface PluginStorageCollectionScopedKeyInput extends PluginStorageColl readonly owner: Owner; } +export interface PluginStorageCollectionPutManyEntry { + readonly key: string; + readonly data: TData; +} + +export interface PluginStorageCollectionPutManyInput { + readonly owner: Owner; + readonly entries: readonly PluginStorageCollectionPutManyEntry[]; +} + +export interface PluginStorageCollectionRemoveManyInput { + readonly owner: Owner; + readonly keys: readonly string[]; +} + export interface PluginStorageCollectionListInput { readonly keyPrefix?: string; } @@ -188,6 +203,9 @@ export interface PluginStorageCollectionFacade< readonly put: ( input: PluginStorageCollectionPutInput>, ) => Effect.Effect>, StorageFailure>; + readonly putMany: ( + input: PluginStorageCollectionPutManyInput>, + ) => Effect.Effect; readonly query: ( input?: PluginStorageCollectionQueryInput, ) => Effect.Effect< @@ -200,6 +218,9 @@ export interface PluginStorageCollectionFacade< readonly remove: ( input: PluginStorageCollectionScopedKeyInput, ) => Effect.Effect; + readonly removeMany: ( + input: PluginStorageCollectionRemoveManyInput, + ) => Effect.Effect; } export interface PluginStorageFacade { diff --git a/packages/core/sdk/src/test-config.ts b/packages/core/sdk/src/test-config.ts index 7aba38ff0..45d906c39 100644 --- a/packages/core/sdk/src/test-config.ts +++ b/packages/core/sdk/src/test-config.ts @@ -53,6 +53,14 @@ const makeLazyTestFumaDb = (options: { transaction: async (run) => (await start()).db.internal.transaction(run), updateMany: async (table, value) => (await start()).db.internal.updateMany(table, value), upsert: async (table, value) => (await start()).db.internal.upsert(table, value), + upsertMany: async (table, value) => { + const actual = await start(); + if (!actual.db.internal.upsertMany) { + // oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: lazy test DB must expose the current FumaDB adapter surface + throw new Error("[FumaDB] upsertMany is not supported by this adapter."); + } + return actual.db.internal.upsertMany(table, value); + }, }; const queryMethods = new Set([ @@ -65,6 +73,7 @@ const makeLazyTestFumaDb = (options: { "transaction", "updateMany", "upsert", + "upsertMany", ]); const makeDb = (context?: ExecutorOwnerPolicyContext): FumaDb => diff --git a/packages/plugins/openapi/src/sdk/store.test.ts b/packages/plugins/openapi/src/sdk/store.test.ts index ef4a4ed2c..24b7494f0 100644 --- a/packages/plugins/openapi/src/sdk/store.test.ts +++ b/packages/plugins/openapi/src/sdk/store.test.ts @@ -49,9 +49,11 @@ describe("OpenAPI operation store", () => { data: input.data, }), ), + putMany: () => Effect.void, query: () => Effect.succeed([]), count: () => Effect.succeed(0), remove: () => Effect.void, + removeMany: () => Effect.void, }), get: (input: { readonly collection: string; readonly key: string }) => Effect.succeed(