diff --git a/package-lock.json b/package-lock.json index 05b9ab2..b45a122 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,20 @@ { "name": "@withone/cli", +<<<<<<< feature/int-3255-cli-cross-platform-identity-keys-identitykeys-built-in + "version": "1.47.12", +======= "version": "1.47.11", +>>>>>>> main "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@withone/cli", +<<<<<<< feature/int-3255-cli-cross-platform-identity-keys-identitykeys-built-in + "version": "1.47.12", +======= "version": "1.47.11", +>>>>>>> main "dependencies": { "@clack/prompts": "^0.9.1", "commander": "^13.1.0", diff --git a/package.json b/package.json index aebc0ab..9870eaf 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,10 @@ { "name": "@withone/cli", +<<<<<<< feature/int-3255-cli-cross-platform-identity-keys-identitykeys-built-in + "version": "1.47.12", +======= "version": "1.47.11", +>>>>>>> main "description": "CLI for managing One", "type": "module", "files": [ diff --git a/profiles/fathom/meetings.json b/profiles/fathom/meetings.json index 7b15a85..1288cc9 100644 --- a/profiles/fathom/meetings.json +++ b/profiles/fathom/meetings.json @@ -17,6 +17,10 @@ "queryParams": { "include_action_items": "true" }, + "identityKeys": [ + { "prefix": "email", "path": "recorded_by.email" }, + { "prefix": "email", "path": "calendar_invitees[].email" } + ], "enrich": { "actionId": "conn_mod_def::GIpBYFV5Mog::G6aag6ykQ-uD6XCeUUnq7Q", "pathVars": { diff --git a/profiles/gmail/gmailThreads.json b/profiles/gmail/gmailThreads.json index b3adba9..6c1535d 100644 --- a/profiles/gmail/gmailThreads.json +++ b/profiles/gmail/gmailThreads.json @@ -15,6 +15,11 @@ "defaultLimit": 100, "pathVars": { "userId": "me" }, "queryParams": { "q": "category:primary" }, + "identityKeys": [ + { "prefix": "email", "path": "messages[].payload.headers[name=From].value" }, + { "prefix": "email", "path": "messages[].payload.headers[name=To].value" }, + { "prefix": "email", "path": "messages[].payload.headers[name=Cc].value" } + ], "enrich": { "actionId": "conn_mod_def::GJ3ok0Eq0R8::AAzgZVLqTg2iBuITKpJLZg", "pathVars": { "userId": "me", "id": "{id}" }, diff --git a/profiles/google-calendar/events.json b/profiles/google-calendar/events.json index d29bc37..fb1b1ae 100644 --- a/profiles/google-calendar/events.json +++ b/profiles/google-calendar/events.json @@ -13,5 +13,9 @@ }, "pathVars": { "calendarId": "primary" }, "defaultLimit": 250, - "limitParam": "maxResults" + "limitParam": "maxResults", + "identityKeys": [ + { "prefix": "email", "path": "organizer.email" }, + { "prefix": "email", "path": "attendees[].email" } + ] } diff --git a/src/commands/mem.ts b/src/commands/mem.ts index 78312d5..24a8f1a 100644 --- a/src/commands/mem.ts +++ b/src/commands/mem.ts @@ -27,6 +27,7 @@ import { memLinkedCommand, memSourcesCommand, memFindBySourceCommand, + memFindByKeyCommand, } from './mem/records.js'; import { memDoctorCommand } from './mem/doctor.js'; import { memExportCommand, memImportCommand } from './mem/export.js'; @@ -190,6 +191,13 @@ export function registerMemoryCommands(program: Command): void { .description('Look up the record owning "/:"') .action(memFindBySourceCommand); + mem.command('find-by-key [secondKey]') + .description('Records sharing an identity key (e.g. email:jane@acme.com), grouped by type. Two keys = intersection.') + .option('--type ', 'Only this record type (e.g. gmail/gmailThreads)') + .option('--limit ', 'Max records shown per type (default 10)') + .action((key: string, secondKey: string | undefined, flags: { type?: string; limit?: string }) => + memFindByKeyCommand(key, secondKey, flags)); + // Sync subverb: mounted as a full alias of `one sync`. Same handlers, same // options — only the command path differs. Keeps `one mem sync` feeling // native inside the memory subsystem without forking the implementation. diff --git a/src/commands/mem/find-by-key.test.ts b/src/commands/mem/find-by-key.test.ts new file mode 100644 index 0000000..f3a7fe8 --- /dev/null +++ b/src/commands/mem/find-by-key.test.ts @@ -0,0 +1,93 @@ +import { describe, it, before, after } from 'node:test'; +import assert from 'node:assert/strict'; +import fs from 'node:fs'; +import path from 'node:path'; +import os from 'node:os'; +import { memFindByKeyCommand } from './records.js'; +import { getBackend, resetBackendSingleton } from '../../lib/memory/runtime.js'; +import { registerBackend } from '../../lib/memory/plugins.js'; +import { pglitePlugin } from '../../lib/memory/plugins/pglite/index.js'; +import { updateMemoryConfig, DEFAULT_MEMORY_CONFIG } from '../../lib/memory/config.js'; +import { writeConfig } from '../../lib/config.js'; + +// #131: `one mem find-by-key []` — exercises the full command path +// (getBackend → findByKeys → group-by-type → agent JSON) against a real PGlite +// backend seeded with identity_keys. + +/** Force --agent mode and capture the JSON the command writes to stdout. */ +async function runAgent(fn: () => Promise): Promise { + const prev = process.env.ONE_AGENT; + process.env.ONE_AGENT = '1'; + const orig = process.stdout.write.bind(process.stdout); + let buf = ''; + (process.stdout as unknown as { write: (c: string) => boolean }).write = (chunk: string) => { buf += chunk; return true; }; + try { + await fn(); + } finally { + (process.stdout as unknown as { write: typeof orig }).write = orig; + if (prev === undefined) delete process.env.ONE_AGENT; else process.env.ONE_AGENT = prev; + } + const lines = buf.trim().split('\n').filter(Boolean); + return JSON.parse(lines[lines.length - 1]); +} + +describe('mem find-by-key command (#131)', () => { + let tmpHome: string; + + before(async () => { + tmpHome = fs.mkdtempSync(path.join(os.tmpdir(), 'find-by-key-test-')); + process.env.HOME = tmpHome; + fs.mkdirSync(path.join(tmpHome, '.one'), { mode: 0o700 }); + writeConfig({ apiKey: 'sk_test_dummy', installedAgents: [], createdAt: new Date().toISOString() }); + registerBackend(pglitePlugin); + updateMemoryConfig({ ...DEFAULT_MEMORY_CONFIG, backend: 'pglite', pglite: { dbPath: path.join(tmpHome, 'mem.pglite') } }); + resetBackendSingleton(); + const backend = await getBackend(); + + const now = new Date().toISOString(); + await backend.upsertByKeys({ type: 'attio/people', data: { name: 'Jane Smith' }, keys: ['attio/people:J1'], identity_keys: ['email:jane@acme.com'], sources: { 'attio/people:J1': { last_synced_at: now } } }); + await backend.upsertByKeys({ type: 'gmail/gmailThreads', data: { subject: 'Q2 pricing' }, keys: ['gmail/gmailThreads:T1'], identity_keys: ['email:jane@acme.com', 'email:bob@acme.com'], sources: { 'gmail/gmailThreads:T1': { last_synced_at: now } } }); + await backend.upsertByKeys({ type: 'gmail/gmailThreads', data: { subject: 'intro' }, keys: ['gmail/gmailThreads:T2'], identity_keys: ['email:jane@acme.com'], sources: { 'gmail/gmailThreads:T2': { last_synced_at: now } } }); + }); + + after(async () => { + const backend = await getBackend(); + await backend.close(); + resetBackendSingleton(); + try { fs.rmSync(tmpHome, { recursive: true, force: true }); } catch { /* ignore */ } + }); + + it('groups records by type with counts (single key)', async () => { + const out = await runAgent(() => memFindByKeyCommand('email:jane@acme.com', undefined, {})); + assert.deepEqual(out.keys, ['email:jane@acme.com']); + assert.equal(out.total, 3); + assert.equal(out.byType['attio/people'].count, 1); + assert.equal(out.byType['gmail/gmailThreads'].count, 2); + assert.equal(out.byType['gmail/gmailThreads'].items.length, 2); + }); + + it('--type filters to one record type', async () => { + const out = await runAgent(() => memFindByKeyCommand('email:jane@acme.com', undefined, { type: 'gmail/gmailThreads' })); + assert.equal(out.total, 2); + assert.deepEqual(Object.keys(out.byType), ['gmail/gmailThreads']); + }); + + it('two keys return the intersection', async () => { + const out = await runAgent(() => memFindByKeyCommand('email:jane@acme.com', 'email:bob@acme.com', {})); + assert.deepEqual(out.keys, ['email:jane@acme.com', 'email:bob@acme.com']); + assert.equal(out.total, 1); + assert.equal(out.byType['gmail/gmailThreads'].count, 1); + }); + + it('--limit caps items shown per type (count stays accurate)', async () => { + const out = await runAgent(() => memFindByKeyCommand('email:jane@acme.com', undefined, { type: 'gmail/gmailThreads', limit: '1' })); + assert.equal(out.byType['gmail/gmailThreads'].count, 2, 'true count preserved'); + assert.equal(out.byType['gmail/gmailThreads'].items.length, 1, 'items capped by --limit'); + }); + + it('returns empty for an unknown key', async () => { + const out = await runAgent(() => memFindByKeyCommand('email:nobody@nowhere.com', undefined, {})); + assert.equal(out.total, 0); + assert.deepEqual(out.byType, {}); + }); +}); diff --git a/src/commands/mem/records.ts b/src/commands/mem/records.ts index a416d33..d5a6af2 100644 --- a/src/commands/mem/records.ts +++ b/src/commands/mem/records.ts @@ -4,10 +4,12 @@ * link, unlink, linked. */ +import pc from 'picocolors'; import * as output from '../../lib/output.js'; import { getBackend, addRecord } from '../../lib/memory/runtime.js'; import { embed } from '../../lib/memory/embedding.js'; import { getMemoryConfigOrDefault } from '../../lib/memory/index.js'; +import type { MemRecord } from '../../lib/memory/types.js'; import { okJson, parseCsv, parseJsonArg, parsePositiveInt, printList, printRecord, requireMemoryInit, semanticSearchUpgradeHint, semanticSearchUpgradeLine } from './util.js'; interface AddFlags { @@ -261,3 +263,86 @@ export async function memFindBySourceCommand(sourceKey: string): Promise { if (!record) output.error(`No record owns source "${sourceKey}"`); printRecord(record as unknown as Record); } + +interface FindByKeyFlags { + type?: string; + limit?: string; +} + +/** Best-effort human label for a record (the first present common title-ish field). */ +function summarizeRecord(r: MemRecord): string { + const d = r.data ?? {}; + for (const field of ['title', 'subject', 'name', 'full_name', 'display_name', 'summary', 'headline', 'email']) { + const v = (d as Record)[field]; + if (typeof v === 'string' && v.trim()) return v.trim().slice(0, 80); + } + return pc.dim(r.id.slice(0, 8)); +} + +/** Compact "2d ago" style relative time. */ +function relativeTime(iso?: string): string { + if (!iso) return ''; + const then = new Date(iso).getTime(); + if (Number.isNaN(then)) return ''; + const s = Math.max(0, Math.floor((Date.now() - then) / 1000)); + if (s < 60) return `${s}s ago`; + const m = Math.floor(s / 60); if (m < 60) return `${m}m ago`; + const h = Math.floor(m / 60); if (h < 24) return `${h}h ago`; + const d = Math.floor(h / 24); if (d < 30) return `${d}d ago`; + const mo = Math.floor(d / 30); if (mo < 12) return `${mo}mo ago`; + return `${Math.floor(mo / 12)}y ago`; +} + +/** + * `one mem find-by-key []` — list every record whose `keys[]` + * contains the given identity key (e.g. `email:jane@acme.com`), grouped by + * type. Two keys → the intersection (records carrying BOTH). This is the + * cross-platform-join query surface for #131 (issue proposed `mem linked`, but + * that name is already taken by the relation-graph command, so this mirrors the + * existing `find-by-source`). Works for any prefix — `email:`, `domain:`, etc. + */ +export async function memFindByKeyCommand(key: string, secondKey: string | undefined, flags: FindByKeyFlags): Promise { + requireMemoryInit(); + const backend = await getBackend(); + const keys = secondKey ? [key, secondKey] : [key]; + const perType = parsePositiveInt(flags.limit, 10, '--limit'); + const records = await backend.findByKeys(keys, { type: flags.type }); + + // Group by type, preserving the query's type-then-recency ordering. + const byType = new Map(); + for (const r of records) { + const list = byType.get(r.type) ?? []; + list.push(r); + byType.set(r.type, list); + } + + if (output.isAgentMode()) { + const grouped: Record = {}; + for (const [type, list] of byType) { + grouped[type] = { count: list.length, items: list.slice(0, perType) }; + } + output.json({ keys, total: records.length, byType: grouped }); + return; + } + + const label = keys.map(k => pc.cyan(k)).join(pc.dim(' + ')); + if (records.length === 0) { + console.log(`\n No records linked to ${label}.\n`); + return; + } + + console.log(); + console.log(` ${label} ${pc.dim('—')} ${records.length} record${records.length === 1 ? '' : 's'} across ${byType.size} type${byType.size === 1 ? '' : 's'}`); + console.log(); + const typeWidth = Math.min(30, Math.max(...[...byType.keys()].map(t => t.length))); + for (const [type, list] of byType) { + console.log(` ${pc.bold(type.padEnd(typeWidth))} ${pc.dim(`${list.length} record${list.length === 1 ? '' : 's'}`)}`); + for (const r of list.slice(0, perType)) { + console.log(` ${pc.dim('·')} ${summarizeRecord(r)} ${pc.dim(relativeTime(r.updated_at))}`); + } + if (list.length > perType) { + console.log(` ${pc.dim(`… and ${list.length - perType} more (raise --limit)`)}`); + } + } + console.log(); +} diff --git a/src/lib/guide-content.ts b/src/lib/guide-content.ts index e1b9c63..b87e7c7 100644 --- a/src/lib/guide-content.ts +++ b/src/lib/guide-content.ts @@ -815,22 +815,38 @@ Transform, exclude, identityKey, and hooks all fire in **both** phases. In Phase ## Cross-Platform Identity -Add \`identityKey\` to a sync profile to extract a stable cross-platform identifier (e.g. email) into a normalized \`_identity\` column: +Two ways to tag a record with a cross-platform identifier (e.g. email), depending on whether the record IS an entity or INVOLVES many: + +**\`identityKey\` (singular)** — "this record IS this entity". The value goes into the record's \`keys[]\`, which the store uses to MERGE records for the same entity across platforms (Attio + HubSpot for the same person collapse into one): \`\`\`json {"platform": "hubspot", "model": "contacts", "identityKey": "properties.email"} -{"platform": "stripe", "model": "customers", "identityKey": "email"} {"platform": "attio", "model": "attioPeople", "identityKey": "email_addresses[0].email_address"} \`\`\` -The value is lowercased and trimmed, stored as a prefixed key on the mem record (e.g. \`email:jane@acme.com\`). Look up across platforms: +**\`identityKeys\` (plural, #128)** — "this record INVOLVES these people". For records with N participants (Gmail thread From/To/Cc, calendar attendees, meeting invitees) where a single key can't capture everyone. These go into a separate \`identity_keys[]\` column that does NOT merge — so a thread with many participants stays its own record: + +\`\`\`json +{"platform": "google-calendar", "model": "events", "identityKeys": [ + {"prefix": "email", "path": "organizer.email"}, + {"prefix": "email", "path": "attendees[].email"} +]} +\`\`\` + +Each \`path\` supports \`[]\` wildcards (one key per element) and a \`[name=From]\` equality filter (e.g. Gmail \`messages[].payload.headers[name=From].value\`). \`email\`-prefixed values are email-extracted, so display-name headers (\`"Jane "\`) and comma-lists normalize cleanly. Values are lowercased/trimmed/deduped. \`sync test\` previews how many identity keys each record resolves. + +Query everything sharing an identity key, grouped by type: \`\`\`bash +one --agent mem find-by-key email:jane@acme.com # all records carrying this key +one --agent mem find-by-key email:jane@acme.com --type gmail/gmailThreads +one --agent mem find-by-key email:jane@acme.com email:bob@acme.com # intersection (both keys) +# Look up the single record owning a source key: one --agent mem find-by-source hubspot/contacts: -# Or via the dotted --where path on the identity key: -one --agent sync query hubspot/contacts --where 'email=jane@acme.com' \`\`\` +\`find-by-key\` spans BOTH columns — entity keys in \`keys[]\` and association keys in \`identity_keys[]\`. + \`sync sql\` was retired in the unified memory cutover — a raw-SQL surface can't safely span the embedded Postgres, remote Postgres, and third-party backends without leaking specifics. Use \`mem search\` / \`sync search\` / \`sync query\` with dotted \`--where\` paths instead. ## Exclude Fields @@ -918,7 +934,8 @@ Every \`sync X\` command is also exposed as \`mem sync X\` — same handlers, sa | limitLocation | no | "query" (default) or "body" for POST endpoints | | enrich | no | Detail endpoint config for record enrichment (actionId, pathVars, concurrency) | | transform | no | Shell command to transform records (stdin: JSON array, stdout: JSON array) | -| identityKey | no | Dot-path to cross-platform identifier (e.g. email) → stored as \`_identity\` column | +| identityKey | no | "This record IS this entity" — dot-path to a cross-platform id (e.g. email) → \`keys[]\` (drives entity merge) | +| identityKeys | no | "This record INVOLVES these people" — \`[{prefix, path}]\` for N participants (#128) → non-merging \`identity_keys[]\`; query with \`mem find-by-key\` | | exclude | no | Dot-path fields to strip before storing (e.g. \`["messages[].body"]\`) | | onInsert/onUpdate/onChange | no | Change hooks (shell command, "log", or flow) | diff --git a/src/lib/memory/backend.ts b/src/lib/memory/backend.ts index cd20e3f..4651048 100644 --- a/src/lib/memory/backend.ts +++ b/src/lib/memory/backend.ts @@ -116,6 +116,14 @@ export interface MemBackend { addSource(recordId: string, ref: SourceRefInput): Promise; removeSource(recordId: string, sourceKey: string): Promise; findBySource(sourceKey: string): Promise; + /** + * Find every record whose `keys[]` array contains ALL of the given keys + * (cross-platform identity join, #131). One key → every record carrying it; + * multiple keys → the intersection (records carrying all of them). Excludes + * archived records unless `status` says otherwise. Results are ordered by + * type, then most-recent. Powers `one mem find-by-key`. + */ + findByKeys(keys: string[], opts?: { type?: string; status?: 'active' | 'archived' | 'all'; limit?: number }): Promise; listSources(recordId: string): Promise; // Sync state diff --git a/src/lib/memory/plugins/embedded-postgres/index.ts b/src/lib/memory/plugins/embedded-postgres/index.ts index f9e26a9..f58f1fa 100644 --- a/src/lib/memory/plugins/embedded-postgres/index.ts +++ b/src/lib/memory/plugins/embedded-postgres/index.ts @@ -374,6 +374,7 @@ class LazyEmbeddedPostgresBackend implements MemBackend { async addSource(...a: Parameters): ReturnType { return (await this.ensure()).addSource(...a); } async removeSource(...a: Parameters): ReturnType { return (await this.ensure()).removeSource(...a); } async findBySource(...a: Parameters): ReturnType { return (await this.ensure()).findBySource(...a); } + async findByKeys(...a: Parameters): ReturnType { return (await this.ensure()).findByKeys(...a); } async listSources(...a: Parameters): ReturnType { return (await this.ensure()).listSources(...a); } async getSyncState(...a: Parameters): ReturnType { return (await this.ensure()).getSyncState(...a); } diff --git a/src/lib/memory/plugins/pglite/index.ts b/src/lib/memory/plugins/pglite/index.ts index 37da54b..8fdf684 100644 --- a/src/lib/memory/plugins/pglite/index.ts +++ b/src/lib/memory/plugins/pglite/index.ts @@ -182,6 +182,7 @@ class LazyPgliteBackend implements MemBackend { async addSource(...a: Parameters): ReturnType { return (await this.ensure()).addSource(...a); } async removeSource(...a: Parameters): ReturnType { return (await this.ensure()).removeSource(...a); } async findBySource(...a: Parameters): ReturnType { return (await this.ensure()).findBySource(...a); } + async findByKeys(...a: Parameters): ReturnType { return (await this.ensure()).findByKeys(...a); } async listSources(...a: Parameters): ReturnType { return (await this.ensure()).listSources(...a); } async getSyncState(...a: Parameters): ReturnType { return (await this.ensure()).getSyncState(...a); } diff --git a/src/lib/memory/plugins/pglite/pglite.test.ts b/src/lib/memory/plugins/pglite/pglite.test.ts index 7f61195..6be12d3 100644 --- a/src/lib/memory/plugins/pglite/pglite.test.ts +++ b/src/lib/memory/plugins/pglite/pglite.test.ts @@ -33,7 +33,7 @@ describe('PGlite plugin — live integration', () => { it('reports the schema version after ensureSchema', async () => { const v = await backend.getSchemaVersion(); - assert.equal(v, '2.1.0'); + assert.equal(v, '2.2.0'); }); it('advertises capabilities the CoreBackend relies on', () => { @@ -256,4 +256,61 @@ describe('PGlite plugin — live integration', () => { assert.ok(s.activeCount >= 0); assert.equal(s.recordCount, s.activeCount + s.archivedCount); }); + + // #128/#131: identity_keys[] is a SEPARATE column that must NOT drive the + // upsert overlap-merge. This is the regression guard for the bug that + // motivated the redesign — participant emails in keys[] collapsed + // multi-participant records into each other / into contacts. + it('identity_keys do NOT merge records that share one (the #128 fix)', async () => { + const attio = await backend.upsertByKeys({ + type: 'attio/people', + data: { name: 'Jane' }, + keys: ['attio/people:JANE'], + identity_keys: ['email:jane@acme.com'], + sources: { 'attio/people:JANE': { last_synced_at: new Date().toISOString() } }, + }); + const thread = await backend.upsertByKeys({ + type: 'gmail/gmailThreads', + data: { subject: 'hello' }, + keys: ['gmail/gmailThreads:T1'], + identity_keys: ['email:jane@acme.com', 'email:moe@withone.ai'], + sources: { 'gmail/gmailThreads:T1': { last_synced_at: new Date().toISOString() } }, + }); + assert.notEqual(thread.record.id, attio.record.id, 'thread must NOT merge into the contact'); + assert.equal(thread.record.type, 'gmail/gmailThreads', 'thread keeps its own type'); + assert.deepEqual((thread.record.identity_keys ?? []).sort(), ['email:jane@acme.com', 'email:moe@withone.ai'].sort()); + }); + + it('findByKeys joins records across types by a shared identity key (#131)', async () => { + const key = 'email:link@acme.com'; + await backend.upsertByKeys({ type: 'attio/people', data: { name: 'Link Person' }, keys: ['attio/people:LP'], identity_keys: [key], sources: { 'attio/people:LP': { last_synced_at: new Date().toISOString() } } }); + await backend.upsertByKeys({ type: 'gmail/gmailThreads', data: { subject: 'one' }, keys: ['gmail/gmailThreads:LT1'], identity_keys: [key], sources: { 'gmail/gmailThreads:LT1': { last_synced_at: new Date().toISOString() } } }); + await backend.upsertByKeys({ type: 'gmail/gmailThreads', data: { subject: 'two' }, keys: ['gmail/gmailThreads:LT2'], identity_keys: [key], sources: { 'gmail/gmailThreads:LT2': { last_synced_at: new Date().toISOString() } } }); + + const found = await backend.findByKeys([key]); + assert.equal(found.length, 3, 'three records share the identity key'); + const types = new Set(found.map(r => r.type)); + assert.ok(types.has('attio/people') && types.has('gmail/gmailThreads')); + + // --type filter + const onlyThreads = await backend.findByKeys([key], { type: 'gmail/gmailThreads' }); + assert.equal(onlyThreads.length, 2); + }); + + it('findByKeys with two keys returns the intersection (#131)', async () => { + await backend.upsertByKeys({ type: 'gmail/gmailThreads', data: { subject: 'jane+bob' }, keys: ['gmail/gmailThreads:IX1'], identity_keys: ['email:jane@acme.com', 'email:bob@acme.com'], sources: { 'gmail/gmailThreads:IX1': { last_synced_at: new Date().toISOString() } } }); + await backend.upsertByKeys({ type: 'gmail/gmailThreads', data: { subject: 'jane only' }, keys: ['gmail/gmailThreads:IX2'], identity_keys: ['email:jane@acme.com'], sources: { 'gmail/gmailThreads:IX2': { last_synced_at: new Date().toISOString() } } }); + + const both = await backend.findByKeys(['email:jane@acme.com', 'email:bob@acme.com']); + assert.ok(both.every(r => (r.identity_keys ?? []).includes('email:bob@acme.com')), 'only records with BOTH keys'); + assert.ok(both.some(r => r.data.subject === 'jane+bob')); + assert.ok(!both.some(r => r.data.subject === 'jane only')); + }); + + it('findByKeys also matches entity keys in keys[] (union of both columns)', async () => { + // A contact whose own email is the singular identityKey lands in keys[]. + await backend.upsertByKeys({ type: 'attio/people', data: { name: 'Entity Keyed' }, keys: ['attio/people:EK', 'email:entity@acme.com'], sources: { 'attio/people:EK': { last_synced_at: new Date().toISOString() } } }); + const found = await backend.findByKeys(['email:entity@acme.com']); + assert.ok(found.some(r => r.data.name === 'Entity Keyed'), 'find-by-key spans keys[] and identity_keys[]'); + }); }); diff --git a/src/lib/memory/plugins/postgres-core/backend.ts b/src/lib/memory/plugins/postgres-core/backend.ts index d579335..4cabfd3 100644 --- a/src/lib/memory/plugins/postgres-core/backend.ts +++ b/src/lib/memory/plugins/postgres-core/backend.ts @@ -68,6 +68,7 @@ interface RecordRow { data: Record; tags: string[] | null; keys: string[] | null; + identity_keys: string[] | null; sources: SourcesMap; searchable_text: string | null; embedding: unknown; @@ -90,6 +91,7 @@ function toRecord(row: RecordRow): MemRecord { data: row.data, tags: row.tags ?? undefined, keys: row.keys ?? undefined, + identity_keys: row.identity_keys ?? undefined, sources: row.sources ?? {}, searchable_text: row.searchable_text, embedded_at: row.embedded_at, @@ -184,14 +186,14 @@ export class CoreBackend implements MemBackend { const embedding = vectorLiteral(row.embedding ?? null); const sql = this.caps.vectorSearch ? `INSERT INTO mem_records - (type, data, tags, keys, sources, searchable_text, content_hash, weight, + (type, data, tags, keys, identity_keys, sources, searchable_text, content_hash, weight, embedding, embedded_at, embedding_model) - VALUES ($1, $2::jsonb, $3::text[], $4::text[], $5::jsonb, $6, $7, $8, - $9::vector, CASE WHEN $9 IS NOT NULL THEN NOW() ELSE NULL END, $10) + VALUES ($1, $2::jsonb, $3::text[], $4::text[], $5::text[], $6::jsonb, $7, $8, $9, + $10::vector, CASE WHEN $10 IS NOT NULL THEN NOW() ELSE NULL END, $11) RETURNING *` : `INSERT INTO mem_records - (type, data, tags, keys, sources, searchable_text, content_hash, weight) - VALUES ($1, $2::jsonb, $3::text[], $4::text[], $5::jsonb, $6, $7, $8) + (type, data, tags, keys, identity_keys, sources, searchable_text, content_hash, weight) + VALUES ($1, $2::jsonb, $3::text[], $4::text[], $5::text[], $6::jsonb, $7, $8, $9) RETURNING *`; const params = this.caps.vectorSearch ? [ @@ -199,6 +201,7 @@ export class CoreBackend implements MemBackend { JSON.stringify(row.data), row.tags ?? null, row.keys ?? null, + row.identity_keys ?? null, JSON.stringify(row.sources ?? {}), row.searchable_text ?? null, row.content_hash ?? null, @@ -211,6 +214,7 @@ export class CoreBackend implements MemBackend { JSON.stringify(row.data), row.tags ?? null, row.keys ?? null, + row.identity_keys ?? null, JSON.stringify(row.sources ?? {}), row.searchable_text ?? null, row.content_hash ?? null, @@ -227,7 +231,7 @@ export class CoreBackend implements MemBackend { const res = await this.client.query<{ id: string; action: 'inserted' | 'updated' }>( `SELECT id, action FROM mem_upsert_by_keys( $1::text, $2::jsonb, $3::text[], $4::text[], $5::jsonb, $6::text, $7::text, - $8::integer, $9::text, $10::text, $11::boolean + $8::integer, $9::text, $10::text, $11::boolean, $12::text[] )`, [ row.type, @@ -241,6 +245,7 @@ export class CoreBackend implements MemBackend { embedding, embeddingModel, opts.replace ?? false, + row.identity_keys ?? null, ], ); const { id, action } = res.rows[0]; @@ -599,6 +604,37 @@ export class CoreBackend implements MemBackend { return res.rows[0] ? toRecord(res.rows[0]) : null; } + async findByKeys( + keys: string[], + opts: { type?: string; status?: 'active' | 'archived' | 'all'; limit?: number } = {}, + ): Promise { + if (!keys.length) return []; + // A match means the record carries ALL given keys across EITHER column — + // `keys[]` (entity/source keys) or `identity_keys[]` (participant + // associations, #128). One key → every record carrying it; many → the + // intersection. Params only — no string interpolation. + const params: unknown[] = [keys]; + const where: string[] = [`(COALESCE(keys, '{}'::text[]) || COALESCE(identity_keys, '{}'::text[])) @> $1::text[]`]; + const status = opts.status ?? 'active'; + if (status !== 'all') { + params.push(status); + where.push(`status = $${params.length}`); + } + if (opts.type) { + params.push(opts.type); + where.push(`type = $${params.length}`); + } + params.push(Math.min(opts.limit ?? 2000, 5000)); + const res = await this.client.query( + `SELECT * FROM mem_records + WHERE ${where.join(' AND ')} + ORDER BY type ASC, updated_at DESC NULLS LAST + LIMIT $${params.length}`, + params, + ); + return res.rows.map(toRecord); + } + async listSources(recordId: string): Promise { const res = await this.client.query<{ sources: SourcesMap }>( `SELECT sources FROM mem_records WHERE id = $1`, diff --git a/src/lib/memory/plugins/postgres/index.ts b/src/lib/memory/plugins/postgres/index.ts index af00a88..0fb3003 100644 --- a/src/lib/memory/plugins/postgres/index.ts +++ b/src/lib/memory/plugins/postgres/index.ts @@ -152,6 +152,7 @@ class LazyPostgresBackend implements MemBackend { async addSource(...a: Parameters): ReturnType { return (await this.ensure()).addSource(...a); } async removeSource(...a: Parameters): ReturnType { return (await this.ensure()).removeSource(...a); } async findBySource(...a: Parameters): ReturnType { return (await this.ensure()).findBySource(...a); } + async findByKeys(...a: Parameters): ReturnType { return (await this.ensure()).findByKeys(...a); } async listSources(...a: Parameters): ReturnType { return (await this.ensure()).listSources(...a); } async getSyncState(...a: Parameters): ReturnType { return (await this.ensure()).getSyncState(...a); } diff --git a/src/lib/memory/schema.ts b/src/lib/memory/schema.ts index ca61267..a9ec204 100644 --- a/src/lib/memory/schema.ts +++ b/src/lib/memory/schema.ts @@ -12,7 +12,7 @@ * See docs/plans/unified-memory.md §4 for design rationale. */ -export const SCHEMA_VERSION = '2.1.0'; +export const SCHEMA_VERSION = '2.2.0'; // `pg_trgm` was in the original mem schema but nothing in the unified query // layer uses it; dropped to keep optional-extension backends happy. Can be @@ -42,6 +42,14 @@ CREATE TABLE IF NOT EXISTS mem_records ( data JSONB NOT NULL, tags TEXT[], keys TEXT[], + -- Cross-platform identity keys (#128): queryable associations like + -- email:jane@acme.com for every participant of a record. UNLIKE keys[], + -- these are NOT merge identifiers — they are exempt from the + -- key-uniqueness trigger and the upsert overlap-merge, so a Gmail thread + -- carrying many participant emails stays its own record instead of + -- collapsing into a contact (or into every other thread that shares a + -- participant). Queried by "mem find-by-key". + identity_keys TEXT[], sources JSONB NOT NULL DEFAULT '{}', @@ -90,6 +98,12 @@ CREATE TABLE IF NOT EXISTS mem_meta ( value TEXT NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); + +-- Additive migration for stores created before identity_keys existed (#128). +-- Safe to re-run; existing rows get identity_keys populated lazily on their +-- next sync. CREATE TABLE IF NOT EXISTS above won't add columns to a table +-- that already exists, so the ALTER is required for upgrades. +ALTER TABLE mem_records ADD COLUMN IF NOT EXISTS identity_keys TEXT[]; `; /** @@ -110,6 +124,7 @@ export const INDEXES_SQL = ` CREATE INDEX IF NOT EXISTS idx_records_type ON mem_records(type); CREATE INDEX IF NOT EXISTS idx_records_status ON mem_records(status); CREATE INDEX IF NOT EXISTS idx_records_keys ON mem_records USING GIN(keys); +CREATE INDEX IF NOT EXISTS idx_records_identity_keys ON mem_records USING GIN(identity_keys); CREATE INDEX IF NOT EXISTS idx_records_tags ON mem_records USING GIN(tags); CREATE INDEX IF NOT EXISTS idx_records_data ON mem_records USING GIN(data jsonb_path_ops); CREATE INDEX IF NOT EXISTS idx_records_sources ON mem_records USING GIN(sources jsonb_path_ops); @@ -250,7 +265,8 @@ CREATE OR REPLACE FUNCTION mem_upsert_by_keys( p_weight INTEGER DEFAULT NULL, p_embedding TEXT DEFAULT NULL, p_embedding_model TEXT DEFAULT NULL, - p_replace BOOLEAN DEFAULT FALSE + p_replace BOOLEAN DEFAULT FALSE, + p_identity_keys TEXT[] DEFAULT NULL ) RETURNS TABLE (id UUID, action TEXT) LANGUAGE plpgsql AS $$ DECLARE existing_id UUID; @@ -284,6 +300,12 @@ BEGIN ELSE (SELECT ARRAY(SELECT DISTINCT unnest FROM unnest(COALESCE(r.tags, '{}') || COALESCE(p_tags, '{}')))) END, keys = (SELECT ARRAY(SELECT DISTINCT unnest FROM unnest(COALESCE(r.keys, '{}') || COALESCE(p_keys, '{}')))), + -- identity_keys: replace on a replace-sync, otherwise union (never + -- drop an association). NOT part of the overlap-merge above. + identity_keys = CASE + WHEN p_replace THEN COALESCE(p_identity_keys, '{}'::text[]) + ELSE (SELECT ARRAY(SELECT DISTINCT unnest FROM unnest(COALESCE(r.identity_keys, '{}') || COALESCE(p_identity_keys, '{}')))) + END, sources = r.sources || COALESCE(p_sources, '{}'::jsonb), searchable_text = CASE WHEN p_replace THEN p_searchable_text @@ -302,12 +324,13 @@ BEGIN result_action := 'updated'; ELSE INSERT INTO mem_records ( - type, data, tags, keys, sources, searchable_text, content_hash, weight + type, data, tags, keys, identity_keys, sources, searchable_text, content_hash, weight ) VALUES ( p_type, p_data, p_tags, p_keys, + p_identity_keys, COALESCE(p_sources, '{}'::jsonb), p_searchable_text, p_content_hash, @@ -342,7 +365,8 @@ CREATE OR REPLACE FUNCTION mem_upsert_by_keys( p_weight INTEGER DEFAULT NULL, p_embedding TEXT DEFAULT NULL, p_embedding_model TEXT DEFAULT NULL, - p_replace BOOLEAN DEFAULT FALSE + p_replace BOOLEAN DEFAULT FALSE, + p_identity_keys TEXT[] DEFAULT NULL ) RETURNS TABLE (id UUID, action TEXT) LANGUAGE plpgsql AS $$ DECLARE existing_id UUID; @@ -371,6 +395,12 @@ BEGIN ELSE (SELECT ARRAY(SELECT DISTINCT unnest FROM unnest(COALESCE(r.tags, '{}') || COALESCE(p_tags, '{}')))) END, keys = (SELECT ARRAY(SELECT DISTINCT unnest FROM unnest(COALESCE(r.keys, '{}') || COALESCE(p_keys, '{}')))), + -- identity_keys: replace on replace-sync, else union. Not part of + -- the overlap-merge (see no-vector variant). + identity_keys = CASE + WHEN p_replace THEN COALESCE(p_identity_keys, '{}'::text[]) + ELSE (SELECT ARRAY(SELECT DISTINCT unnest FROM unnest(COALESCE(r.identity_keys, '{}') || COALESCE(p_identity_keys, '{}')))) + END, sources = r.sources || COALESCE(p_sources, '{}'::jsonb), searchable_text = CASE WHEN p_replace THEN p_searchable_text @@ -392,13 +422,14 @@ BEGIN result_action := 'updated'; ELSE INSERT INTO mem_records ( - type, data, tags, keys, sources, searchable_text, content_hash, + type, data, tags, keys, identity_keys, sources, searchable_text, content_hash, weight, embedding, embedded_at, embedding_model ) VALUES ( p_type, p_data, p_tags, p_keys, + p_identity_keys, COALESCE(p_sources, '{}'::jsonb), p_searchable_text, p_content_hash, diff --git a/src/lib/memory/sync/identity-keys.test.ts b/src/lib/memory/sync/identity-keys.test.ts new file mode 100644 index 0000000..115f4a4 --- /dev/null +++ b/src/lib/memory/sync/identity-keys.test.ts @@ -0,0 +1,215 @@ +import { describe, it } from 'node:test'; +import assert from 'node:assert/strict'; +import { collectIdentityKeys } from './mem-writer.js'; +import { loadBuiltinProfile } from './builtin-profiles.js'; +import type { SyncProfile } from './types.js'; + +// #128: identityKeys (plural) — multiple cross-platform identity keys per +// record, with [] wildcard fan-out, normalization, and dedupe. The singular +// identityKey must keep its original scalar behavior (backwards compatible). + +type IdProfile = Pick; + +describe('collectIdentityKeys — singular identityKey (backwards compatible) (#128)', () => { + it('extracts a scalar identity key, lowercased + trimmed, with derived prefix', () => { + const keys = collectIdentityKeys({ email: ' Jane@Acme.COM ' }, { identityKey: 'email' } as IdProfile); + assert.deepEqual(keys, ['email:jane@acme.com']); + }); + + it('derives prefix from the path (email/phone/domain/id)', () => { + assert.deepEqual(collectIdentityKeys({ work_phone: '+1-555' }, { identityKey: 'work_phone' } as IdProfile), ['phone:+1-555']); + assert.deepEqual(collectIdentityKeys({ company_domain: 'Acme.com' }, { identityKey: 'company_domain' } as IdProfile), ['domain:acme.com']); + assert.deepEqual(collectIdentityKeys({ ref: 'ABC' }, { identityKey: 'ref' } as IdProfile), ['id:abc']); + }); + + it('resolves dotted + numeric-index paths', () => { + const rec = { email_addresses: [{ email_address: 'a@b.com' }] }; + assert.deepEqual(collectIdentityKeys(rec, { identityKey: 'email_addresses[0].email_address' } as IdProfile), ['email:a@b.com']); + }); + + it('produces no key when the value is missing/empty/object', () => { + assert.deepEqual(collectIdentityKeys({}, { identityKey: 'email' } as IdProfile), []); + assert.deepEqual(collectIdentityKeys({ email: '' }, { identityKey: 'email' } as IdProfile), []); + assert.deepEqual(collectIdentityKeys({ email: { nested: 1 } }, { identityKey: 'email' } as IdProfile), []); + }); + + it('returns [] when no identity config is set', () => { + assert.deepEqual(collectIdentityKeys({ email: 'a@b.com' }, {} as IdProfile), []); + }); +}); + +describe('collectIdentityKeys — plural identityKeys with [] wildcard (#128)', () => { + it('fans out a [] wildcard path to one key per element', () => { + const rec = { attendees: [{ email: 'A@x.com' }, { email: 'b@x.com' }, { email: 'c@x.com' }] }; + const keys = collectIdentityKeys(rec, { identityKeys: [{ prefix: 'email', path: 'attendees[].email' }] } as IdProfile); + assert.deepEqual(keys, ['email:a@x.com', 'email:b@x.com', 'email:c@x.com']); + }); + + it('handles nested [] wildcards (messages[].headers[].value)', () => { + const rec = { + messages: [ + { headers: [{ value: 'one@x.com' }, { value: 'two@x.com' }] }, + { headers: [{ value: 'three@x.com' }] }, + ], + }; + const keys = collectIdentityKeys(rec, { identityKeys: [{ prefix: 'email', path: 'messages[].headers[].value' }] } as IdProfile); + assert.deepEqual(keys, ['email:one@x.com', 'email:two@x.com', 'email:three@x.com']); + }); + + it('dedupes repeats within and across entries (order-preserving)', () => { + const rec = { + organizer: { email: 'host@x.com' }, + attendees: [{ email: 'host@x.com' }, { email: 'guest@x.com' }, { email: 'GUEST@x.com' }], + }; + const keys = collectIdentityKeys(rec, { + identityKeys: [ + { prefix: 'email', path: 'organizer.email' }, + { prefix: 'email', path: 'attendees[].email' }, + ], + } as IdProfile); + assert.deepEqual(keys, ['email:host@x.com', 'email:guest@x.com']); + }); + + it('respects each entry\'s prefix', () => { + const rec = { primary: 'a@x.com', site: 'acme.com' }; + const keys = collectIdentityKeys(rec, { + identityKeys: [ + { prefix: 'email', path: 'primary' }, + { prefix: 'domain', path: 'site' }, + ], + } as IdProfile); + assert.deepEqual(keys, ['email:a@x.com', 'domain:acme.com']); + }); + + it('skips null/empty elements in a wildcard array', () => { + const rec = { attendees: [{ email: 'a@x.com' }, { email: '' }, { email: null }, { other: 1 }, { email: 'b@x.com' }] }; + const keys = collectIdentityKeys(rec, { identityKeys: [{ prefix: 'email', path: 'attendees[].email' }] } as IdProfile); + assert.deepEqual(keys, ['email:a@x.com', 'email:b@x.com']); + }); + + it('ignores malformed entries (missing prefix or path)', () => { + const rec = { email: 'a@x.com' }; + const keys = collectIdentityKeys(rec, { + identityKeys: [ + { prefix: '', path: 'email' } as any, + { prefix: 'email', path: '' } as any, + { prefix: 'email', path: 'email' }, + ], + } as IdProfile); + assert.deepEqual(keys, ['email:a@x.com']); + }); +}); + +describe('collectIdentityKeys — email extraction from header values (#129)', () => { + it('strips display names and lowercases', () => { + const rec = { from: 'Jane Smith ' }; + const keys = collectIdentityKeys(rec, { identityKeys: [{ prefix: 'email', path: 'from' }] } as IdProfile); + assert.deepEqual(keys, ['email:jane@acme.com']); + }); + + it('extracts every address from a comma-list (To/Cc)', () => { + const rec = { to: 'a@x.com, Bob , c@z.com' }; + const keys = collectIdentityKeys(rec, { identityKeys: [{ prefix: 'email', path: 'to' }] } as IdProfile); + assert.deepEqual(keys, ['email:a@x.com', 'email:b@y.com', 'email:c@z.com']); + }); + + it('passes already-clean emails through unchanged (gcal attendees — #130)', () => { + const rec = { attendees: [{ email: 'jane@acme.com' }, { email: 'BOB@acme.com' }] }; + const keys = collectIdentityKeys(rec, { identityKeys: [{ prefix: 'email', path: 'attendees[].email' }] } as IdProfile); + assert.deepEqual(keys, ['email:jane@acme.com', 'email:bob@acme.com']); + }); + + it('yields nothing for an email-prefixed value with no address', () => { + const rec = { from: 'mailer-daemon (no address)' }; + assert.deepEqual(collectIdentityKeys(rec, { identityKeys: [{ prefix: 'email', path: 'from' }] } as IdProfile), []); + }); +}); + +describe('collectIdentityKeys — [name=From] header filter (#129 Gmail shape)', () => { + const thread = { + messages: [ + { payload: { headers: [ + { name: 'From', value: 'Moe ' }, + { name: 'To', value: 'anish@intently.ai, jane@acme.com' }, + { name: 'Subject', value: 'pricing for vip@whale.com' }, // must NOT leak + { name: 'Received', value: 'by mail-server@google.com' }, // must NOT leak + ] } }, + { payload: { headers: [ + { name: 'From', value: 'anish@intently.ai' }, + { name: 'Cc', value: 'Boss ' }, + ] } }, + ], + }; + + it('extracts From/To/Cc participants across all messages, ignoring other headers', () => { + const keys = collectIdentityKeys(thread, { + identityKeys: [ + { prefix: 'email', path: "messages[].payload.headers[name=From].value" }, + { prefix: 'email', path: "messages[].payload.headers[name=To].value" }, + { prefix: 'email', path: "messages[].payload.headers[name=Cc].value" }, + ], + } as IdProfile); + assert.deepEqual(keys, [ + 'email:moe@withone.ai', + 'email:anish@intently.ai', + 'email:jane@acme.com', + 'email:boss@acme.com', + ]); + // Subject/Received emails must be absent + assert.ok(!keys.includes('email:vip@whale.com')); + assert.ok(!keys.includes('email:mail-server@google.com')); + }); + + it('filter is case-insensitive on the field value', () => { + const rec = { headers: [{ name: 'from', value: 'x@y.com' }] }; + const keys = collectIdentityKeys(rec, { identityKeys: [{ prefix: 'email', path: 'headers[name=From].value' }] } as IdProfile); + assert.deepEqual(keys, ['email:x@y.com']); + }); +}); + +describe('built-in profiles declare working identity keys (#129/#130)', () => { + it('gmail/gmailThreads resolves From/To/Cc participants across the thread', () => { + const profile = loadBuiltinProfile('gmail', 'gmailThreads') as unknown as SyncProfile; + assert.ok(profile?.identityKeys?.length, 'gmail profile declares identityKeys'); + const thread = { + messages: [ + { payload: { headers: [ + { name: 'From', value: 'Moe ' }, + { name: 'To', value: 'anish@intently.ai, jane@acme.com' }, + { name: 'Subject', value: 'note to self@nope.com' }, + ] } }, + { payload: { headers: [{ name: 'Cc', value: 'Boss ' }] } }, + ], + }; + const keys = collectIdentityKeys(thread, profile); + assert.deepEqual(keys.sort(), ['email:anish@intently.ai', 'email:boss@acme.com', 'email:jane@acme.com', 'email:moe@withone.ai'].sort()); + assert.ok(!keys.includes('email:self@nope.com'), 'Subject email must not leak'); + }); + + it('google-calendar/events resolves organizer + attendees', () => { + const profile = loadBuiltinProfile('google-calendar', 'events') as unknown as SyncProfile; + assert.ok(profile?.identityKeys?.length, 'gcal profile declares identityKeys'); + const event = { organizer: { email: 'Host@acme.com' }, attendees: [{ email: 'a@x.com' }, { email: 'b@y.com' }] }; + assert.deepEqual(collectIdentityKeys(event, profile).sort(), ['email:a@x.com', 'email:b@y.com', 'email:host@acme.com'].sort()); + }); + + it('fathom/meetings resolves host + calendar invitees', () => { + const profile = loadBuiltinProfile('fathom', 'meetings') as unknown as SyncProfile; + assert.ok(profile?.identityKeys?.length, 'fathom profile declares identityKeys'); + const meeting = { recorded_by: { email: 'rec@acme.com' }, calendar_invitees: [{ email: 'x@y.com' }] }; + assert.deepEqual(collectIdentityKeys(meeting, profile).sort(), ['email:rec@acme.com', 'email:x@y.com'].sort()); + }); +}); + +describe('collectIdentityKeys — singular + plural combined (#128)', () => { + it('merges both sources and dedupes the overlap (prefix from singular path name)', () => { + // Singular `from_email` derives the `email` prefix (path name contains + // "email"), so it dedupes against the plural `email:` keys. + const rec = { from_email: 'me@x.com', to: [{ email: 'me@x.com' }, { email: 'you@x.com' }] }; + const keys = collectIdentityKeys(rec, { + identityKey: 'from_email', + identityKeys: [{ prefix: 'email', path: 'to[].email' }], + } as IdProfile); + assert.deepEqual(keys, ['email:me@x.com', 'email:you@x.com']); + }); +}); diff --git a/src/lib/memory/sync/index.ts b/src/lib/memory/sync/index.ts index 264f680..f142878 100644 --- a/src/lib/memory/sync/index.ts +++ b/src/lib/memory/sync/index.ts @@ -478,6 +478,21 @@ async function syncTestCommand( } } + if (report.identityKeysPreview) { + const { perRecord, sampleKeys } = report.identityKeysPreview; + const total = perRecord.reduce((a, b) => a + b, 0); + const min = perRecord.length ? Math.min(...perRecord) : 0; + const max = perRecord.length ? Math.max(...perRecord) : 0; + const mark = total === 0 ? pc.yellow('~') : pc.green('✓'); + console.log(`\n ${pc.bold('Identity keys')} ${pc.dim(`(cross-platform — #128)`)}`); + console.log(` ${mark} ${perRecord.length} sample${perRecord.length === 1 ? '' : 's'}, ${min}–${max} key${max === 1 ? '' : 's'} per record`); + if (sampleKeys.length > 0) { + console.log(` ${pc.dim('e.g.')} ${sampleKeys.slice(0, 8).map(k => pc.cyan(k)).join(', ')}`); + } else { + console.log(` ${pc.yellow('note:')} no identity keys resolved on these samples — check the identityKey/identityKeys paths.`); + } + } + if (searchablePreview) { console.log(`\n ${pc.bold('Searchable preview')} ${pc.dim(`(${searchablePreview.mode}, ${searchablePreview.sampledRecords} sample${searchablePreview.sampledRecords === 1 ? '' : 's'})`)}`); console.log(` ${pc.dim('length:')} ${searchablePreview.length} chars (first sample)`); diff --git a/src/lib/memory/sync/mem-writer.ts b/src/lib/memory/sync/mem-writer.ts index 46414ea..bfef47e 100644 --- a/src/lib/memory/sync/mem-writer.ts +++ b/src/lib/memory/sync/mem-writer.ts @@ -166,7 +166,6 @@ export async function writePageToMemory( updates: [], }; const type = `${profile.platform}/${profile.model}`; - const identityKey = profile.identityKey; // `--embed` on `sync run` wins over the profile's memory.embed flag. // Lets users flip on embeddings for one run (e.g. backfilling after // a first sync done with embedOnSync: false) without editing the @@ -194,14 +193,12 @@ export async function writePageToMemory( } const sourceKey = `${type}:${String(externalId)}`; - const keys = [sourceKey]; - - if (identityKey) { - const raw = getByDotPath(record, identityKey); - if (raw !== null && raw !== undefined && raw !== '') { - keys.push(`${deriveIdentityPrefix(identityKey)}:${String(raw).toLowerCase().trim()}`); - } - } + // keys[] = entity/merge identifiers (source key + singular identityKey). + // identity_keys[] = participant associations (plural identityKeys, #128) — + // a SEPARATE column that never triggers merge, so a Gmail thread carrying + // many participant emails stays its own record. See schema.ts. + const keys = [sourceKey, ...collectEntityKeys(record, profile)]; + const identityKeys = collectAssociationKeys(record, profile); // Strip sync-internal bookkeeping from the payload const data = { ...record }; @@ -222,6 +219,7 @@ export async function writePageToMemory( type, data, keys, + identity_keys: identityKeys.length ? identityKeys : undefined, searchable_text, sources: { [sourceKey]: { @@ -265,3 +263,164 @@ function deriveIdentityPrefix(dotPath: string): string { if (lower.includes('domain')) return 'domain'; return 'id'; } + +// Liberal email matcher — used to pull addresses out of mail-header values like +// `"Jane Smith "` or comma-lists `"a@x.com, Bob "` (#129). +const EMAIL_RE = /[A-Za-z0-9._%+'-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}/g; + +/** + * Turn one resolved raw value into zero or more normalized key values for the + * given prefix. For `email` keys we extract every email address found in the + * value (so display-name headers and comma-lists work, and already-clean + * emails pass through unchanged). For every other prefix we lowercase/trim the + * whole scalar value. Objects/arrays/empties yield nothing. + */ +function identityValuesFor(prefix: string, raw: unknown): string[] { + if (raw === null || raw === undefined || typeof raw === 'object') return []; + const s = String(raw); + if (prefix === 'email') { + return (s.match(EMAIL_RE) ?? []).map(e => e.toLowerCase()); + } + const v = s.toLowerCase().trim(); + return v ? [v] : []; +} + +type PathToken = + | { type: 'field'; name: string } + | { type: 'index'; i: number } + | { type: 'wild' } + | { type: 'filter'; field: string; value: string }; + +/** + * Tokenize an identity dot-path. Beyond plain fields it supports: + * `[]` array wildcard (fan out over every element) + * `[0]` numeric index + * `[name=From]` equality filter — keep array elements whose `name` field + * equals `From` (case-insensitive; quotes optional). Needed + * for Gmail headers (#129): `payload.headers[name=From].value`. + */ +function tokenizeIdentityPath(path: string): PathToken[] { + const tokens: PathToken[] = []; + const re = /([^.[\]]+)|\[([^\]]*)\]/g; + let m: RegExpExecArray | null; + while ((m = re.exec(path)) !== null) { + if (m[1] !== undefined) { + tokens.push({ type: 'field', name: m[1] }); + } else { + const inner = m[2]; + if (inner === '') tokens.push({ type: 'wild' }); + else if (/^\d+$/.test(inner)) tokens.push({ type: 'index', i: Number(inner) }); + else { + const eq = inner.indexOf('='); + if (eq > 0) { + const field = inner.slice(0, eq).trim(); + const value = inner.slice(eq + 1).trim().replace(/^['"]|['"]$/g, ''); + tokens.push({ type: 'filter', field, value }); + } + // Unknown bracket content is ignored (no match → no keys). + } + } + } + return tokens; +} + +/** + * Resolve an identity path against a record, returning a flat list of leaf + * values. Walks a "frontier" of values so `[]` wildcards and `[name=From]` + * filters fan out naturally. Superset of the plain dot-path / `[]` resolver. + */ +function resolveIdentityPath(root: unknown, path: string): unknown[] { + let frontier: unknown[] = [root]; + for (const tok of tokenizeIdentityPath(path)) { + const next: unknown[] = []; + for (const cur of frontier) { + if (cur === null || cur === undefined) continue; + if (tok.type === 'field') { + if (typeof cur === 'object' && !Array.isArray(cur)) next.push((cur as Record)[tok.name]); + } else if (tok.type === 'index') { + if (Array.isArray(cur)) next.push(cur[tok.i]); + } else if (tok.type === 'wild') { + if (Array.isArray(cur)) next.push(...cur); + } else { // filter + if (Array.isArray(cur)) { + for (const el of cur) { + if (el && typeof el === 'object' && + String((el as Record)[tok.field] ?? '').toLowerCase() === tok.value.toLowerCase()) { + next.push(el); + } + } + } + } + } + frontier = next; + } + return frontier; +} + +/** Resolve one {prefix, path} entry to its (possibly many) prefixed keys. */ +function keysForEntry(record: Record, prefix: string, path: string): string[] { + if (!prefix || !path) return []; + const out: string[] = []; + for (const raw of resolveIdentityPath(record, path)) { + for (const value of identityValuesFor(prefix, raw)) out.push(`${prefix}:${value}`); + } + return out; +} + +function dedupe(keys: string[]): string[] { + const seen = new Set(); + const out: string[] = []; + for (const k of keys) { + if (seen.has(k)) continue; + seen.add(k); + out.push(k); + } + return out; +} + +/** + * ENTITY keys — from a profile's singular `identityKey`. These mean "this + * record IS this entity" and go into `keys[]`, where the store uses them to + * merge cross-platform records for the same entity (Attio + HubSpot for the + * same person). Scalar/dot-path resolve, `email`-aware extraction. Deduped. + */ +export function collectEntityKeys( + record: Record, + profile: Pick, +): string[] { + if (!profile.identityKey) return []; + return dedupe(keysForEntry(record, deriveIdentityPrefix(profile.identityKey), profile.identityKey)); +} + +/** + * ASSOCIATION keys — from a profile's plural `identityKeys` (#128). These mean + * "this record INVOLVES these people" (thread participants, event attendees) + * and go into the separate `identity_keys[]` column, which does NOT drive + * merge/uniqueness — so a many-participant record keeps its own identity. + * Each entry's `path` supports `[]` wildcard + `[name=From]` filter fan-out; + * `email`-prefixed values are email-extracted (handles `"Jane "` and + * comma-lists). Deduped, first-seen order preserved. + */ +export function collectAssociationKeys( + record: Record, + profile: Pick, +): string[] { + const out: string[] = []; + for (const entry of profile.identityKeys ?? []) { + if (!entry || !entry.path || !entry.prefix) continue; + out.push(...keysForEntry(record, entry.prefix, entry.path)); + } + return dedupe(out); +} + +/** + * All cross-platform identity keys for a record (entity ∪ association), + * deduped. Used by `sync test` previews and tests; the writer routes the two + * kinds into their separate columns via the functions above. + */ +export function collectIdentityKeys( + record: Record, + profile: Pick, +): string[] { + return dedupe([...collectEntityKeys(record, profile), ...collectAssociationKeys(record, profile)]); +} diff --git a/src/lib/memory/sync/test.ts b/src/lib/memory/sync/test.ts index 9eca9f7..0dfc9f3 100644 --- a/src/lib/memory/sync/test.ts +++ b/src/lib/memory/sync/test.ts @@ -6,6 +6,7 @@ import { getNextPageParams } from './pagination.js'; import type { SyncProfile } from './types.js'; import { extractRecords, isRootPath } from './extract.js'; import { resolveProfileConnectionKey } from './profile.js'; +import { collectIdentityKeys } from './mem-writer.js'; export interface SyncTestCheck { name: string; @@ -30,6 +31,15 @@ export interface SyncTestReport { paginationPreview?: Record; /** Fields that sync test auto-fixed from the real response (e.g. resultsPath). */ autoFixed?: Record; + /** + * Resolved cross-platform identity keys across the sampled records (#128): + * how many keys each sampled record produced, and a few example values. Only + * present when the profile declares `identityKey` and/or `identityKeys`. + */ + identityKeysPreview?: { + perRecord: number[]; + sampleKeys: string[]; + }; } /** How many records --show-searchable averages over when reporting hit rates. */ @@ -319,6 +329,23 @@ export async function testSyncProfile(api: OneApi, profile: SyncProfile): Promis })); report.sample = first; report.samples = (records as Record[]).slice(0, SEARCHABLE_SAMPLE_SIZE); + + // Preview resolved cross-platform identity keys (#128) so authors can see + // how many keys each record yields (e.g. all thread participants) and a few + // sample values, without running a real sync. + if (profile.identityKey || (profile.identityKeys && profile.identityKeys.length > 0)) { + const perRecord: number[] = []; + const sampleKeys = new Set(); + for (const rec of report.samples) { + const keys = collectIdentityKeys(rec, profile); + perRecord.push(keys.length); + for (const k of keys) { + if (sampleKeys.size < 8) sampleKeys.add(k); + } + } + report.identityKeysPreview = { perRecord, sampleKeys: [...sampleKeys] }; + } + report.ok = checks.every(c => c.ok); return report; diff --git a/src/lib/memory/sync/types.ts b/src/lib/memory/sync/types.ts index b10be4d..0f1a7d1 100644 --- a/src/lib/memory/sync/types.ts +++ b/src/lib/memory/sync/types.ts @@ -68,12 +68,27 @@ export interface SyncProfile { limitLocation?: 'query' | 'body'; /** * Dot-path to a field that identifies this record across platforms. - * The value is extracted, lowercased, and stored as `_identity` on every - * record. Use a stable cross-platform identifier like email address. + * The value is extracted, lowercased, and stored as a prefixed entry in the + * record's `keys[]` array (e.g. `email:jane@acme.com`). Use a stable + * cross-platform identifier like email address. * * Example: "properties.email", "email", "email_addresses[0].email_address" */ identityKey?: string; + /** + * Multiple cross-platform identity keys per record (issue #128). Use for + * records with N participants — a Gmail thread's From/To/Cc, calendar + * attendees, meeting invitees — where a single `identityKey` can't capture + * everyone. Each entry's `path` resolves via the dot-path walker, with `[]` + * wildcards expanding to ONE key per array element; each resolved value is + * lowercased/trimmed and emitted as `${prefix}:${value}` into the record's + * `keys[]` array (deduped). Combine freely with `identityKey`. + * + * Example: + * [{ "prefix": "email", "path": "attendees[].email" }, + * { "prefix": "email", "path": "organizer.email" }] + */ + identityKeys?: Array<{ prefix: string; path: string }>; /** * Dot-path field names to strip from each record before storing. * Supports array notation: "messages[].body" strips `body` from each diff --git a/src/lib/memory/types.ts b/src/lib/memory/types.ts index 8015848..683eb40 100644 --- a/src/lib/memory/types.ts +++ b/src/lib/memory/types.ts @@ -32,6 +32,8 @@ export interface MemRecord { data: Record; tags?: string[]; keys?: string[]; + /** Cross-platform identity associations (#128) — see RecordInput.identity_keys. */ + identity_keys?: string[]; sources: SourcesMap; searchable_text?: string | null; @@ -98,6 +100,13 @@ export interface RecordInput { data: Record; tags?: string[]; keys?: string[]; + /** + * Cross-platform identity associations (#128) — e.g. `email:jane@acme.com` + * for each participant. Stored in the separate `identity_keys[]` column, + * which (unlike `keys[]`) does NOT drive upsert-merge or key uniqueness, so a + * record can carry many without collapsing into other records. + */ + identity_keys?: string[]; sources?: SourcesMap; searchable_text?: string | null; content_hash?: string | null;