Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/plugin-storage-bulk-upserts.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@executor-js/fumadb": patch
"@executor-js/sdk": patch
---

Add a FumaDB bulk upsert query path and route plugin-storage bulk writes through
it so existing rows are updated without delete/reinsert churn.
82 changes: 82 additions & 0 deletions packages/core/fumadb/src/adapters/drizzle/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ function buildWhere(
);
}

function countConditionParameters(condition: Condition): number {
if (condition.type === ConditionType.Compare) {
if (condition.b instanceof Column) return 0;
if (Array.isArray(condition.b)) return condition.b.length;
return 1;
}
if (condition.type === ConditionType.Not) return countConditionParameters(condition.item);
return condition.items.reduce((count, item) => count + countConditionParameters(item), 0);
}

function mapValues(
values: Record<string, unknown>,
table: AnyTable
Expand Down Expand Up @@ -303,6 +313,78 @@ export function fromDrizzle(
await this.createMany(table, [v.create]);
}
},
async upsertMany(table, v) {
if (v.values.length === 0) return;
if (v.target.length === 0) {
// oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape
throw new Error("[FumaDB] upsertMany requires at least one target column.");
}
if (v.update.length === 0) {
// oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape
throw new Error("[FumaDB] upsertMany requires at least one update column.");
}
if (provider !== "sqlite" && provider !== "postgresql") {
for (const value of v.values) {
const targetCondition: Condition = {
type: ConditionType.And,
items: v.target.map((column) => ({
type: ConditionType.Compare,
a: column,
operator: "=",
b: value[column.ormName],
})),
};
await this.upsert(table, {
where: v.where
? { type: ConditionType.And, items: [targetCondition, v.where] }
: targetCondition,
update: Object.fromEntries(
v.update.map((column) => [column.ormName, value[column.ormName]]),
),
create: value,
});
}
return;
}

const drizzleTable = toDrizzle(table);
const values = v.values.map((value) => mapValues(value, table));
const where = v.where ? buildWhere(toDrizzleColumn, v.where) : undefined;
const whereParameters = v.where ? countConditionParameters(v.where) : 0;
const columnsPerRow = values.length > 0 ? Math.max(1, Object.keys(values[0]!).length) : 1;
const batchSize = maxBoundParameters
? Math.max(
1,
Math.min(
CREATE_MANY_BATCH_SIZE,
Math.floor(Math.max(1, maxBoundParameters - whereParameters) / columnsPerRow),
),
)
: CREATE_MANY_BATCH_SIZE;
const target = v.target.map((column) => drizzleTable[column.names.drizzle]);
const set = Object.fromEntries(
v.update.map((column) => [
column.names.drizzle,
Drizzle.sql.raw(`excluded.${column.names.sql}`),
]),
);

for (let i = 0; i < values.length; i += batchSize) {
const batch = values.slice(i, i + batchSize);
const insert = db.insert(drizzleTable).values(batch) as unknown as {
onConflictDoUpdate: (input: {
readonly target: typeof target;
readonly set: typeof set;
readonly where?: typeof where;
}) => Promise<unknown>;
};
await insert.onConflictDoUpdate({
target,
set,
...(where === undefined ? {} : { where }),
});
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
},
async findMany(table, v) {
return (
await db.query[table.names.drizzle].findMany(buildQueryConfig(table, v))
Expand Down
29 changes: 29 additions & 0 deletions packages/core/fumadb/src/adapters/memory/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,35 @@ export function memoryAdapter(options: MemoryAdapterOptions = {}): FumaDBAdapter
}
await this.create(table, v.create);
},
async upsertMany(table, v) {
if (v.target.length === 0) {
// oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape
throw new Error("[FumaDB] upsertMany requires at least one target column.");
}
if (v.update.length === 0) {
// oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape
throw new Error("[FumaDB] upsertMany requires at least one update column.");
}
for (const value of v.values) {
const existing = tableRows(db, table).find(
(row) =>
matchesCondition(row, v.where) &&
v.target.every((column) => row[column.ormName] === value[column.ormName]),
);
if (existing) {
Object.assign(
existing,
cloneValue(
Object.fromEntries(
v.update.map((column) => [column.ormName, value[column.ormName]]),
),
),
);
continue;
}
await this.create(table, value);
}
},
Comment thread
greptile-apps[bot] marked this conversation as resolved.
async create(table, values) {
const row = applyDefaults(table, values);
tableRows(db, table).push(row);
Expand Down
16 changes: 16 additions & 0 deletions packages/core/fumadb/src/query/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,22 @@ export interface AbstractQuery<S extends AnySchema> {
}
) => Promise<void>;

/**
* Bulk upsert rows by a unique column target.
*
* Adapters with native conflict support should implement this as one or more
* `INSERT ... ON CONFLICT ... DO UPDATE` statements. Other adapters may fall
* back to per-row `upsert`.
*/
upsertMany: <TableName extends keyof S["tables"]>(
table: TableName,
v: {
target: (keyof S["tables"][TableName]["columns"])[];
update: (keyof S["tables"][TableName]["columns"])[];
values: TableToInsertValues<S["tables"][TableName]>[];
}
) => Promise<void>;

/**
* Note: you cannot update the id of a row, some databases don't support that (including MongoDB).
*/
Expand Down
126 changes: 125 additions & 1 deletion packages/core/fumadb/src/query/orm/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
AnySchema,
AnyTable,
} from "../../schema";
import { Column } from "../../schema";
import type {
AbstractQuery,
AnySelectClause,
Expand All @@ -12,7 +13,12 @@ import type {
JoinBuilder,
OrderBy,
} from "..";
import { buildCondition, createBuilder, type Condition } from "../condition-builder";
import {
buildCondition,
createBuilder,
type Condition,
ConditionType,
} from "../condition-builder";

export interface CompiledJoin {
relation: AnyRelation;
Expand Down Expand Up @@ -231,6 +237,27 @@ const applyUpdatePolicies = async (
return nextWhere;
};

const conditionKey = (condition: Condition | undefined): string => {
if (!condition) return "none";
if (condition.type === ConditionType.Compare) {
const right =
condition.b instanceof Column ? { column: condition.b.ormName } : { value: condition.b };
return JSON.stringify({
type: "compare",
left: condition.a.ormName,
operator: condition.operator,
right,
});
}
if (condition.type === ConditionType.Not) {
return JSON.stringify({ type: "not", item: conditionKey(condition.item) });
}
return JSON.stringify({
type: condition.type === ConditionType.And ? "and" : "or",
items: condition.items.map(conditionKey),
});
};

const applyDeletePolicies = async (
table: AnyTable,
where: Condition | undefined,
Expand Down Expand Up @@ -284,6 +311,16 @@ export interface ORMAdapter<S extends AnySchema = AnySchema> {
},
) => Promise<void>;

upsertMany?: (
table: AnyTable,
v: {
target: AnyColumn[];
update: AnyColumn[];
values: Record<string, unknown>[];
where?: Condition;
},
) => Promise<void>;

create: (
table: AnyTable,
values: Record<string, unknown>,
Expand Down Expand Up @@ -367,6 +404,93 @@ export function toORM<S extends AnySchema>(
...options,
});
},
async upsertMany(name, { target, update, values }) {
const table = toTable(name);
if (values.length === 0) return;
if (target.length === 0) {
// oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: public query rejects invalid upsert shape
throw new Error("[FumaDB] upsertMany requires at least one target column.");
}
if (update.length === 0) {
// oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: public query rejects invalid upsert shape
throw new Error("[FumaDB] upsertMany requires at least one update column.");
}

const targetColumns = target.map((columnName) => {
const column = table.columns[columnName as string];
if (!column) throw new Error(`[FumaDB] unknown column name ${String(columnName)}.`);
return column;
});
const updateColumns = update.map((columnName) => {
const column = table.columns[columnName as string];
if (!column) throw new Error(`[FumaDB] unknown column name ${String(columnName)}.`);
return column;
});

const builder = createBuilder(table.columns);
const permittedRows: {
readonly value: Record<string, unknown>;
readonly where: Condition | undefined;
}[] = [];
for (const value of values) {
const updateValues = Object.fromEntries(
updateColumns.map((column) => [column.ormName, value[column.ormName]]),
);
const constrainedWhere = await applyUpdatePolicies(
table,
undefined,
updateValues,
context,
"upsert",
value,
);
if (constrainedWhere === false) continue;
await runCreatePolicies(table, value, context);
permittedRows.push({ value, where: constrainedWhere });
}
if (permittedRows.length === 0) return;

if (internal.upsertMany) {
const groups = new Map<
string,
{ readonly where: Condition | undefined; readonly values: Record<string, unknown>[] }
>();
for (const row of permittedRows) {
const key = conditionKey(row.where);
const group = groups.get(key);
if (group) {
group.values.push(row.value);
} else {
groups.set(key, { where: row.where, values: [row.value] });
}
}
for (const group of groups.values()) {
await internal.upsertMany(table, {
target: targetColumns,
update: updateColumns,
values: group.values,
where: group.where,
});
}
return;
}

for (const row of permittedRows) {
const value = row.value;
const targetWhere = builder.and(
...targetColumns.map((column) => builder(column.ormName, "=", value[column.ormName])),
);
const where = builder.and(targetWhere, row.where ?? true);
if (where === false) continue;
await internal.upsert(table, {
where: where === true ? undefined : where,
update: Object.fromEntries(
updateColumns.map((column) => [column.ormName, value[column.ormName]]),
),
create: value,
});
}
},
async create(name, values) {
const table = toTable(name);
await runCreatePolicies(table, values, context);
Expand Down
Loading
Loading