From 3a76c136d977b9b7ccd01e1ff443761ca2813ee7 Mon Sep 17 00:00:00 2001 From: yuvalk Date: Sun, 7 Jun 2026 15:05:34 +0300 Subject: [PATCH] feat(local-deploy): add autosync + hasEmbedding, built on db.ts abstraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rebased cleanly on top of the db.ts driver refactor (PR #1). - server/artifactory.ts: restore Artifactory client (diffSince, fetchArtifact, pushArtifact) — needed by autosync and capture_thought Artifactory write path - server/db.ts: add hasEmbedding(id) method to Db interface and both drivers (supabase: checks embedding column via select; postgres: checks via IS NOT NULL query). Used by autosync to skip Ollama calls for already-embedded thoughts. - server/index.ts: add autosync loop — fires on startup then every 5 min (AUTOSYNC_INTERVAL_MS env var). Uses in-memory cursor advanced after each successful run; calls diffSince(cursor) for incremental diff; skips embedding generation when hasEmbedding returns true. Co-authored-by: Cursor --- server/artifactory.ts | 119 ++++++++++++++++++++++++++++++++++++++++++ server/db.ts | 26 +++++++++ server/index.ts | 67 ++++++++++++++++++++++++ 3 files changed, 212 insertions(+) create mode 100644 server/artifactory.ts diff --git a/server/artifactory.ts b/server/artifactory.ts new file mode 100644 index 0000000..d7c8e04 --- /dev/null +++ b/server/artifactory.ts @@ -0,0 +1,119 @@ +// server/artifactory.ts +// JFrog CLI-backed Artifactory client for Open Brain artifact SOT write-path. +// +// All operations shell out to `jf rt` commands — no raw HTTP, no API key env vars. +// The machine must have `jf` installed and configured (`jf config show` lists servers). +// +// Env vars: +// JF_SERVER_ID — jf server ID to use (default: "intro") +// RT_REPO — generic local repo name (default: "open-brain-memories") +// +// Artifact path convention inside the repo: +// thoughts/.json +// +// Artifact payload (JSON) — embedding excluded, re-generated on sync: +// { id, content, metadata, created_at } + +const JF_SERVER_ID = Deno.env.get("JF_SERVER_ID") ?? "intro"; +const RT_REPO = Deno.env.get("RT_REPO") ?? "open-brain-memories"; + +export type ThoughtArtifact = { + id: string; + content: string; + metadata: Record; + created_at: string; +}; + +// jf search result item shape +type JfSearchItem = { + path: string; // e.g. "open-brain-memories/thoughts/abc123.json" + created: string; // ISO 8601 + name: string; + repo: string; +}; + +async function runJf(args: string[]): Promise<{ code: number; stdout: string; stderr: string }> { + const cmd = new Deno.Command("jf", { + args: [...args, "--server-id", JF_SERVER_ID], + stdout: "piped", + stderr: "piped", + }); + const { code, stdout, stderr } = await cmd.output(); + return { + code, + stdout: new TextDecoder().decode(stdout).trim(), + stderr: new TextDecoder().decode(stderr).trim(), + }; +} + +async function sha256Hex(text: string): Promise { + const buf = await crypto.subtle.digest("SHA-256", new TextEncoder().encode(text)); + return Array.from(new Uint8Array(buf)) + .map((b) => b.toString(16).padStart(2, "0")) + .join(""); +} + +// Push a thought artifact to Artifactory via `jf rt upload`. +// Returns the artifact sub-path within the repo (e.g. "thoughts/.json"). +// Idempotent: same content → same SHA-256 → same path. +export async function pushArtifact(thought: ThoughtArtifact): Promise { + const hash = await sha256Hex(thought.content); + const subPath = `thoughts/${hash}.json`; + const remotePath = `${RT_REPO}/${subPath}`; + + const tmpFile = await Deno.makeTempFile({ suffix: ".json" }); + try { + await Deno.writeTextFile(tmpFile, JSON.stringify(thought, null, 2)); + const { code, stderr } = await runJf(["rt", "upload", tmpFile, remotePath]); + if (code !== 0) throw new Error(`jf rt upload failed: ${stderr}`); + return subPath; + } finally { + await Deno.remove(tmpFile).catch(() => {}); + } +} + +// List all thought artifacts in the repo, sorted by created asc. +export async function listArtifacts(): Promise<{ path: string; created: string }[]> { + const { code, stdout, stderr } = await runJf([ + "rt", "search", + `${RT_REPO}/thoughts/*.json`, + ]); + if (code !== 0) throw new Error(`jf rt search failed: ${stderr}`); + if (!stdout || stdout === "[]") return []; + + const items: JfSearchItem[] = JSON.parse(stdout); + return items + .map((item) => ({ + path: item.path.replace(`${RT_REPO}/`, ""), + created: item.created, + })) + .sort((a, b) => a.created.localeCompare(b.created)); +} + +// Return artifacts created strictly after `sinceIso`. +// Filters client-side from the full list — Artifactory AQL date filtering +// via jf CLI requires a spec file; simpler to filter after a lightweight search. +export async function diffSince(sinceIso: string): Promise<{ path: string; created: string }[]> { + const all = await listArtifacts(); + return all.filter(({ created }) => created > sinceIso); +} + +// Fetch a single artifact by sub-path and return its parsed content. +export async function fetchArtifact(subPath: string): Promise { + const tmpDir = await Deno.makeTempDir(); + try { + const { code, stderr } = await runJf([ + "rt", "download", + `${RT_REPO}/${subPath}`, + `${tmpDir}/`, + "--flat=true", // download directly into tmpDir, no nested subdirs + ]); + if (code !== 0) throw new Error(`jf rt download failed: ${stderr}`); + + const fileName = subPath.split("/").pop()!; + const raw = await Deno.readTextFile(`${tmpDir}/${fileName}`); + return JSON.parse(raw) as ThoughtArtifact; + } finally { + await Deno.remove(tmpDir, { recursive: true }).catch(() => {}); + } +} diff --git a/server/db.ts b/server/db.ts index 086de3c..04f887d 100644 --- a/server/db.ts +++ b/server/db.ts @@ -64,6 +64,8 @@ export interface Db { ): Promise>; updateEmbedding(id: string, embedding: number[]): Promise>; + + hasEmbedding(id: string): Promise>; } // ── Supabase driver ────────────────────────────────────────────────────────── @@ -151,6 +153,15 @@ function makeSupabaseDb(): Db { .eq("id", id); return { data: null, error }; }, + + async hasEmbedding(id) { + const { data, error } = await sb() + .from("thoughts") + .select("embedding") + .eq("id", id) + .single(); + return { data: data ? data.embedding !== null : false, error }; + }, }; } @@ -307,6 +318,21 @@ function makePostgresDb(): Db { client.release(); } }, + + async hasEmbedding(id) { + const client = await pool.connect(); + try { + const result = await client.queryObject<{ has_embedding: boolean }>( + `SELECT (embedding IS NOT NULL) AS has_embedding FROM thoughts WHERE id = $1`, + [id], + ); + return { data: result.rows[0]?.has_embedding ?? false, error: null }; + } catch (e) { + return { data: null, error: { message: (e as Error).message } }; + } finally { + client.release(); + } + }, }; } diff --git a/server/index.ts b/server/index.ts index ae388fa..5914436 100644 --- a/server/index.ts +++ b/server/index.ts @@ -6,6 +6,7 @@ import { Hono } from "hono"; import { z } from "zod"; import { getEmbedding, extractMetadata } from "./llm.ts"; import { db } from "./db.ts"; +import { diffSince, fetchArtifact } from "./artifactory.ts"; const MCP_ACCESS_KEY = Deno.env.get("MCP_ACCESS_KEY")!; @@ -430,4 +431,70 @@ app.all("*", async (c) => { return transport.handleRequest(c); }); +// --- Autosync: Artifactory → Postgres every 5 minutes --- +// In-memory cursor: advances after each successful run so only genuinely new +// Artifactory artifacts are fetched. Resets to epoch on server restart (safe — +// upsert_thought is idempotent; hasEmbedding skips re-embedding). + +const AUTOSYNC_INTERVAL_MS = parseInt( + Deno.env.get("AUTOSYNC_INTERVAL_MS") ?? String(5 * 60 * 1000), +); +let _syncCursor = "1970-01-01T00:00:00.000Z"; + +async function runAutoSync(): Promise { + const newArtifacts = await diffSince(_syncCursor); + + if (!newArtifacts.length) { + console.log(`[autosync] up to date (cursor: ${_syncCursor.slice(0, 19)})`); + return; + } + + console.log(`[autosync] ${newArtifacts.length} new artifact(s) since ${_syncCursor.slice(0, 19)}`); + + for (const { path } of newArtifacts) { + try { + const artifact = await fetchArtifact(path); + + const { data: upsertData, error: upsertErr } = await db.upsertThought( + artifact.content, + { metadata: { ...artifact.metadata, artifact_path: path } }, + ); + + if (upsertErr || !upsertData) { + console.error(`[autosync] upsert failed for ${path}:`, upsertErr?.message); + continue; + } + + const { data: alreadyEmbedded } = await db.hasEmbedding(upsertData.id); + if (alreadyEmbedded) { + console.log(`[autosync] skip embed (already present): ${artifact.content.slice(0, 60)}`); + continue; + } + + const embedding = await getEmbedding(artifact.content); + await db.updateEmbedding(upsertData.id, embedding); + console.log(`[autosync] embedded: ${artifact.content.slice(0, 60)}`); + } catch (err) { + console.error(`[autosync] error on ${path}:`, (err as Error).message); + } + } + + // Advance cursor to now — next run fetches only artifacts created after this point. + _syncCursor = new Date().toISOString(); + console.log(`[autosync] cursor → ${_syncCursor.slice(0, 19)}`); +} + +// Fire immediately on startup, then on the configured interval. +runAutoSync().catch((e) => + console.error("[autosync] startup sync failed:", (e as Error).message) +); +setInterval( + () => + runAutoSync().catch((e) => + console.error("[autosync] interval sync failed:", (e as Error).message) + ), + AUTOSYNC_INTERVAL_MS, +); +console.log(`[autosync] started — interval ${AUTOSYNC_INTERVAL_MS / 1000}s`); + Deno.serve(app.fetch);