Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions server/src/drivers/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,19 @@ export interface DbDriver {
* accept a MQL request object. Routes branch on this to validate the body shape.
*/
readonly queryMode: 'sql' | 'mql';
/** Run a query, optionally switching schema first (atomically on one connection). */
query(sql: string, params?: unknown[], schema?: string): Promise<QueryResult>;
/**
* Run a query, optionally switching schema first (atomically on one connection).
* When `timeoutMs` is set the driver cancels the statement on the server after
* that many ms (MySQL `KILL QUERY`, Postgres `pg_cancel_backend`, MongoDB
* `maxTimeMS`) and rejects with a `transient` DriverError.
*/
query(sql: string, params?: unknown[], schema?: string, timeoutMs?: number): Promise<QueryResult>;
/**
* Run one or more semicolon-separated statements and return a result per statement.
* Implemented by sql-mode drivers; mql-mode drivers can omit it.
* Implemented by sql-mode drivers; mql-mode drivers can omit it. `timeoutMs` has
* the same cancel-on-deadline semantics as {@link query}.
*/
queryAll?(sql: string, schema?: string): Promise<QueryResult[]>;
queryAll?(sql: string, schema?: string, timeoutMs?: number): Promise<QueryResult[]>;
getSchemas(): Promise<string[]>;
getSchema(schema: string): Promise<SchemaInfo>;
/** Targeted lookup for a single table — returns null if it doesn't exist. */
Expand Down
6 changes: 3 additions & 3 deletions server/src/drivers/mongodb.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ describe('MongoDBDriver.query – find', () => {
);
expect(mockClient.db).toHaveBeenCalledWith('app');
expect(mockDb.collection).toHaveBeenCalledWith('users');
expect(mockCollection.find).toHaveBeenCalledWith({ active: true });
expect(mockCollection.find).toHaveBeenCalledWith({ active: true }, {});
expect(r.rows).toEqual([
{ _id: '507f1f77bcf86cd799439011', name: 'Alice', when: '2024-01-02 03:04:05' },
]);
Expand Down Expand Up @@ -402,7 +402,7 @@ describe('MongoDBDriver.query – findOne / aggregate / count', () => {
);
expect(mockCollection.aggregate).toHaveBeenCalledWith([
{ $group: { _id: '$status', n: { $sum: 1 } } },
]);
], {});
expect(r.rows).toEqual([{ _id: 'A', n: 2 }]);
});

Expand All @@ -411,7 +411,7 @@ describe('MongoDBDriver.query – findOne / aggregate / count', () => {
const r = await makeDriver().query(
JSON.stringify({ collection: 'users', operation: 'count', filter: { active: true } }),
);
expect(mockCollection.countDocuments).toHaveBeenCalledWith({ active: true });
expect(mockCollection.countDocuments).toHaveBeenCalledWith({ active: true }, {});
expect(r.rows).toEqual([{ count: 42 }]);
});
});
Expand Down
23 changes: 19 additions & 4 deletions server/src/drivers/mongodb.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { MongoClient, ObjectId, MongoNetworkError, MongoServerSelectionError, MongoParseError } from 'mongodb';
import type { Db, Document } from 'mongodb';
import { DriverError } from './interface.js';
import { timeoutError } from './timeout.js';
import type {
DbDriver,
ConnectionConfig,
Expand Down Expand Up @@ -253,7 +254,15 @@ export class MongoDBDriver implements DbDriver {
}
}

async query(sql: string, _params?: unknown[], schema?: string): Promise<QueryResult> {
async query(sql: string, _params?: unknown[], schema?: string, timeoutMs?: number): Promise<QueryResult> {
// MongoDB has no out-of-band "kill query" like SQL's KILL/pg_cancel_backend;
// `maxTimeMS` is the idiomatic equivalent — the server aborts the operation
// itself once the deadline passes. Only the read operations honour it; the
// single-document writes return promptly.
const maxTimeMS = timeoutMs && timeoutMs > 0 ? timeoutMs : undefined;
// Spread into the read operations' options; empty (no `maxTimeMS` key) when
// no timeout is armed, so the deadline is opt-in per query.
const readOpts = maxTimeMS ? { maxTimeMS } : {};
try {
await this.ensureConnected();
const req = parseRequest(sql);
Expand All @@ -262,7 +271,7 @@ export class MongoDBDriver implements DbDriver {

switch (req.operation) {
case 'find': {
let cursor = coll.find(req.filter ?? {});
let cursor = coll.find(req.filter ?? {}, readOpts);
if (req.projection) cursor = cursor.project(req.projection) as typeof cursor;
if (req.sort) cursor = cursor.sort(req.sort);
if (typeof req.skip === 'number') cursor = cursor.skip(req.skip);
Expand All @@ -274,15 +283,16 @@ export class MongoDBDriver implements DbDriver {
const doc = await coll.findOne(req.filter ?? {}, {
projection: req.projection,
sort: req.sort as Document | undefined,
...readOpts,
});
return this.toQueryResult(doc ? [doc] : []);
}
case 'aggregate': {
const docs = await coll.aggregate(req.pipeline ?? []).toArray();
const docs = await coll.aggregate(req.pipeline ?? [], readOpts).toArray();
return this.toQueryResult(docs);
}
case 'count': {
const n = await coll.countDocuments(req.filter ?? {});
const n = await coll.countDocuments(req.filter ?? {}, readOpts);
return {
rows: [{ count: n }],
columnMeta: [
Expand Down Expand Up @@ -343,6 +353,11 @@ export class MongoDBDriver implements DbDriver {
}
} catch (err) {
if (err instanceof DriverError) throw err;
// MaxTimeMSExpired (code 50) means our deadline fired server-side; surface
// it with the same wording the SQL drivers use for a timed-out query.
if (maxTimeMS && (err as { code?: number }).code === 50) {
throw timeoutError(maxTimeMS, err);
}
throw classifyMongoError(err);
}
}
Expand Down
56 changes: 49 additions & 7 deletions server/src/drivers/mysql.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import mysql from 'mysql2/promise';
import type { RowDataPacket, FieldPacket, ResultSetHeader } from 'mysql2/promise';
import type { DbDriver, ConnectionConfig, QueryResult, ColumnMeta, ColumnInfo, SchemaInfo, TableInfo } from './interface.js';
import { withQueryTimeout } from './timeout.js';

function buildMysqlPoolOptions(config: ConnectionConfig): mysql.PoolOptions {
return {
Expand Down Expand Up @@ -162,14 +163,49 @@ export class MysqlDriver implements DbDriver {
await this.pool.end();
}

async query(sql: string, params?: unknown[], schema?: string): Promise<QueryResult> {
/**
* `KILL QUERY` aborts the statement running on `threadId` without dropping the
* connection. Issued on a separate pooled connection because the one running
* the timed-out query is busy. Best-effort: the statement may have already
* finished, or no pool slot may be free — the timeout error surfaces either way.
*/
private async killQuery(threadId: number): Promise<void> {
let conn: mysql.PoolConnection | undefined;
try {
conn = await this.pool.getConnection();
await conn.query(`KILL QUERY ${threadId}`);
} catch {
/* best-effort cancel */
} finally {
conn?.release();
}
}

/**
* Return `conn` to the pool, but `destroy` it instead when the query timed out:
* the `KILL`ed statement is still draining on that socket, and a released
* connection could be handed to the next caller mid-interrupt with a stale
* result set. mysql2 opens a fresh connection on the next request.
*/
private finishConnection(conn: mysql.PoolConnection, timedOut: boolean): void {
if (timedOut) conn.destroy();
else conn.release();
}

async query(sql: string, params?: unknown[], schema?: string, timeoutMs?: number): Promise<QueryResult> {
const conn = await this.pool.getConnection();
const threadId = conn.threadId;
let timedOut = false;
try {
if (schema) {
await conn.query(`USE \`${schema.replace(/`/g, '')}\``);
}

const [rows, fields] = await conn.query(sql, params) as [RowDataPacket[] | ResultSetHeader, FieldPacket[]];
const work = conn.query(sql, params) as Promise<[RowDataPacket[] | ResultSetHeader, FieldPacket[]]>;
const [rows, fields] = await withQueryTimeout(work, timeoutMs, () => {
timedOut = true;
if (threadId != null) void this.killQuery(threadId);
});
// With multipleStatements enabled, mysql2 returns arrays-of-arrays when
// the SQL contains more than one statement. Internal callers (insertRow,
// updateCell, deleteRow, …) only ever pass single statements, so flatten
Expand All @@ -182,20 +218,26 @@ export class MysqlDriver implements DbDriver {
}
return buildMysqlResult(rows, fields);
} finally {
conn.release();
this.finishConnection(conn, timedOut);
}
}

async queryAll(sql: string, schema?: string): Promise<QueryResult[]> {
async queryAll(sql: string, schema?: string, timeoutMs?: number): Promise<QueryResult[]> {
const conn = await this.pool.getConnection();
const threadId = conn.threadId;
let timedOut = false;
try {
if (schema) {
await conn.query(`USE \`${schema.replace(/`/g, '')}\``);
}
const [rawRows, rawFields] = await conn.query(sql) as [
const work = conn.query(sql) as Promise<[
RowDataPacket[] | ResultSetHeader | (RowDataPacket[] | ResultSetHeader)[],
FieldPacket[] | FieldPacket[][],
];
]>;
const [rawRows, rawFields] = await withQueryTimeout(work, timeoutMs, () => {
timedOut = true;
if (threadId != null) void this.killQuery(threadId);
});
// Single statement: mysql2 returns the result directly. Multi-statement: it
// returns parallel arrays — one rowset and one fields array per statement.
const isMulti = Array.isArray(rawRows) && rawRows.length > 0 && Array.isArray((rawRows as unknown[])[0]);
Expand All @@ -206,7 +248,7 @@ export class MysqlDriver implements DbDriver {
const fieldSets = rawFields as FieldPacket[][];
return rowSets.map((r, i) => buildMysqlResult(r, fieldSets[i] ?? []));
} finally {
conn.release();
this.finishConnection(conn, timedOut);
}
}

Expand Down
68 changes: 54 additions & 14 deletions server/src/drivers/postgres.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pg from 'pg';
import { DriverError } from './interface.js';
import type { DbDriver, ConnectionConfig, QueryResult, ColumnMeta, ColumnInfo, SchemaInfo, TableInfo } from './interface.js';
import { withQueryTimeout } from './timeout.js';

// Return DATE / TIME / TIMESTAMP / TIMESTAMPTZ / TIMETZ as raw strings rather
// than JS Dates. pg's default parsers route through `new Date(...)`, which
Expand Down Expand Up @@ -142,10 +143,29 @@ export class PostgresDriver implements DbDriver {
await this.pool.end();
}

async query(sql: string, params?: unknown[], schema?: string): Promise<QueryResult> {
/**
* `pg_cancel_backend` cancels the statement running on `pid` without dropping
* the connection. Run on a separate pooled client because the one running the
* timed-out query is busy. Best-effort — the timeout error surfaces regardless.
*/
private async cancelBackend(pid: number): Promise<void> {
let client: pg.PoolClient | undefined;
try {
client = await this.pool.connect();
await client.query('SELECT pg_cancel_backend($1)', [pid]);
} catch {
/* best-effort cancel */
} finally {
client?.release();
}
}

async query(sql: string, params?: unknown[], schema?: string, timeoutMs?: number): Promise<QueryResult> {
try {
const client = await this.pool.connect();
const pid = (client as { processID?: number }).processID;
let searchPathSet = false;
let timedOut = false;
try {
if (schema) {
await client.query(`SET search_path TO ${this.escapeIdent(schema)}`);
Expand All @@ -159,53 +179,73 @@ export class PostgresDriver implements DbDriver {
let n = 0;
pgSql = sql.replace(/\?/g, () => `$${++n}`);
}
const result = await client.query({ text: pgSql, values: params?.length ? params : undefined });
const work = client.query({ text: pgSql, values: params?.length ? params : undefined });
const result = await withQueryTimeout(work, timeoutMs, () => {
timedOut = true;
if (typeof pid === 'number') void this.cancelBackend(pid);
});
// node-pg's simple query protocol returns an array of results when the SQL
// contains multiple statements. `query()` keeps the legacy single-result
// contract — `queryAll` is the multi-statement entry point — so collapse
// an unexpected array down to its last element.
const single = Array.isArray(result) ? (result as pg.QueryResult[])[result.length - 1] : result;
return buildPgResult(single);
} finally {
// pg.Pool reuses clients without resetting session state, so search_path
// would leak to the next caller on this same client.
if (searchPathSet) {
try { await client.query('SET search_path TO DEFAULT'); } catch { /* fall through to release */ }
}
client.release();
await this.finishClient(client, searchPathSet, timedOut);
}
} catch (err) {
if (err instanceof DriverError) throw err;
throw classifyPgError(err);
}
}

async queryAll(sql: string, schema?: string): Promise<QueryResult[]> {
async queryAll(sql: string, schema?: string, timeoutMs?: number): Promise<QueryResult[]> {
try {
const client = await this.pool.connect();
const pid = (client as { processID?: number }).processID;
let searchPathSet = false;
let timedOut = false;
try {
if (schema) {
await client.query(`SET search_path TO ${this.escapeIdent(schema)}`);
searchPathSet = true;
}
// Always go through the simple query protocol (no values) so multi-statement
// SQL fans out to one result per statement.
const raw = await client.query(sql);
const work = client.query(sql);
const raw = await withQueryTimeout(work, timeoutMs, () => {
timedOut = true;
if (typeof pid === 'number') void this.cancelBackend(pid);
});
const results = Array.isArray(raw) ? (raw as pg.QueryResult[]) : [raw];
return results.map(buildPgResult);
} finally {
if (searchPathSet) {
try { await client.query('SET search_path TO DEFAULT'); } catch { /* fall through to release */ }
}
client.release();
await this.finishClient(client, searchPathSet, timedOut);
}
} catch (err) {
if (err instanceof DriverError) throw err;
throw classifyPgError(err);
}
}

/**
* Reset session state and return the client to the pool — but on timeout, skip
* the reset (it would queue behind the statement still being cancelled) and
* `release(true)` to discard the client so the pool never reuses it mid-cancel.
* pg.Pool reuses clients without resetting state, so search_path would
* otherwise leak to the next caller on this same client.
*/
private async finishClient(client: pg.PoolClient, searchPathSet: boolean, timedOut: boolean): Promise<void> {
if (timedOut) {
client.release(true);
return;
}
if (searchPathSet) {
try { await client.query('SET search_path TO DEFAULT'); } catch { /* fall through to release */ }
}
client.release();
}

async getSchemas(): Promise<string[]> {
const result = await this.pool.query<{ name: string }>(
`SELECT schema_name AS name
Expand Down
Loading
Loading