Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions server/artifactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// server/artifactory.ts
// JFrog CLI-backed Artifactory client for Open Brain artifact SOT write-path.
//
// All operations shell out to `jf rt` commands — no raw HTTP, no API key env vars.
// The machine must have `jf` installed and configured (`jf config show` lists servers).
//
// Env vars:
// JF_SERVER_ID — jf server ID to use (default: "intro")
// RT_REPO — generic local repo name (default: "open-brain-memories")
//
// Artifact path convention inside the repo:
// thoughts/<sha256-of-content>.json
//
// Artifact payload (JSON) — embedding excluded, re-generated on sync:
// { id, content, metadata, created_at }

const JF_SERVER_ID = Deno.env.get("JF_SERVER_ID") ?? "intro";
const RT_REPO = Deno.env.get("RT_REPO") ?? "open-brain-memories";

export type ThoughtArtifact = {
id: string;
content: string;
metadata: Record<string, unknown>;
created_at: string;
};

// jf search result item shape
type JfSearchItem = {
path: string; // e.g. "open-brain-memories/thoughts/abc123.json"
created: string; // ISO 8601
name: string;
repo: string;
};

async function runJf(args: string[]): Promise<{ code: number; stdout: string; stderr: string }> {
const cmd = new Deno.Command("jf", {
args: [...args, "--server-id", JF_SERVER_ID],
stdout: "piped",
stderr: "piped",
});
const { code, stdout, stderr } = await cmd.output();
return {
code,
stdout: new TextDecoder().decode(stdout).trim(),
stderr: new TextDecoder().decode(stderr).trim(),
};
}

async function sha256Hex(text: string): Promise<string> {
const buf = await crypto.subtle.digest("SHA-256", new TextEncoder().encode(text));
return Array.from(new Uint8Array(buf))
.map((b) => b.toString(16).padStart(2, "0"))
.join("");
}

// Push a thought artifact to Artifactory via `jf rt upload`.
// Returns the artifact sub-path within the repo (e.g. "thoughts/<sha256>.json").
// Idempotent: same content → same SHA-256 → same path.
export async function pushArtifact(thought: ThoughtArtifact): Promise<string> {
const hash = await sha256Hex(thought.content);
const subPath = `thoughts/${hash}.json`;
const remotePath = `${RT_REPO}/${subPath}`;

const tmpFile = await Deno.makeTempFile({ suffix: ".json" });
try {
await Deno.writeTextFile(tmpFile, JSON.stringify(thought, null, 2));
const { code, stderr } = await runJf(["rt", "upload", tmpFile, remotePath]);
if (code !== 0) throw new Error(`jf rt upload failed: ${stderr}`);
return subPath;
} finally {
await Deno.remove(tmpFile).catch(() => {});
}
}

// List all thought artifacts in the repo, sorted by created asc.
export async function listArtifacts(): Promise<{ path: string; created: string }[]> {
const { code, stdout, stderr } = await runJf([
"rt", "search",
`${RT_REPO}/thoughts/*.json`,
]);
if (code !== 0) throw new Error(`jf rt search failed: ${stderr}`);
if (!stdout || stdout === "[]") return [];

const items: JfSearchItem[] = JSON.parse(stdout);
return items
.map((item) => ({
path: item.path.replace(`${RT_REPO}/`, ""),
created: item.created,
}))
.sort((a, b) => a.created.localeCompare(b.created));
}

// Return artifacts created strictly after `sinceIso`.
// Filters client-side from the full list — Artifactory AQL date filtering
// via jf CLI requires a spec file; simpler to filter after a lightweight search.
export async function diffSince(sinceIso: string): Promise<{ path: string; created: string }[]> {
const all = await listArtifacts();
return all.filter(({ created }) => created > sinceIso);
}

// Fetch a single artifact by sub-path and return its parsed content.
export async function fetchArtifact(subPath: string): Promise<ThoughtArtifact> {
const tmpDir = await Deno.makeTempDir();
try {
const { code, stderr } = await runJf([
"rt", "download",
`${RT_REPO}/${subPath}`,
`${tmpDir}/`,
"--flat=true", // download directly into tmpDir, no nested subdirs
]);
if (code !== 0) throw new Error(`jf rt download failed: ${stderr}`);

const fileName = subPath.split("/").pop()!;
const raw = await Deno.readTextFile(`${tmpDir}/${fileName}`);
return JSON.parse(raw) as ThoughtArtifact;
} finally {
await Deno.remove(tmpDir, { recursive: true }).catch(() => {});
}
}
26 changes: 26 additions & 0 deletions server/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ export interface Db {
): Promise<DbResult<{ id: string }>>;

updateEmbedding(id: string, embedding: number[]): Promise<DbResult<null>>;

hasEmbedding(id: string): Promise<DbResult<boolean>>;
}

// ── Supabase driver ──────────────────────────────────────────────────────────
Expand Down Expand Up @@ -151,6 +153,15 @@ function makeSupabaseDb(): Db {
.eq("id", id);
return { data: null, error };
},

async hasEmbedding(id) {
const { data, error } = await sb()
.from("thoughts")
.select("embedding")
.eq("id", id)
.single();
return { data: data ? data.embedding !== null : false, error };
},
};
}

Expand Down Expand Up @@ -307,6 +318,21 @@ function makePostgresDb(): Db {
client.release();
}
},

async hasEmbedding(id) {
const client = await pool.connect();
try {
const result = await client.queryObject<{ has_embedding: boolean }>(
`SELECT (embedding IS NOT NULL) AS has_embedding FROM thoughts WHERE id = $1`,
[id],
);
return { data: result.rows[0]?.has_embedding ?? false, error: null };
} catch (e) {
return { data: null, error: { message: (e as Error).message } };
} finally {
client.release();
}
},
};
}

Expand Down
67 changes: 67 additions & 0 deletions server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Hono } from "hono";
import { z } from "zod";
import { getEmbedding, extractMetadata } from "./llm.ts";
import { db } from "./db.ts";
import { diffSince, fetchArtifact } from "./artifactory.ts";

const MCP_ACCESS_KEY = Deno.env.get("MCP_ACCESS_KEY")!;

Expand Down Expand Up @@ -430,4 +431,70 @@ app.all("*", async (c) => {
return transport.handleRequest(c);
});

// --- Autosync: Artifactory → Postgres every 5 minutes ---
// In-memory cursor: advances after each successful run so only genuinely new
// Artifactory artifacts are fetched. Resets to epoch on server restart (safe —
// upsert_thought is idempotent; hasEmbedding skips re-embedding).

const AUTOSYNC_INTERVAL_MS = parseInt(
Deno.env.get("AUTOSYNC_INTERVAL_MS") ?? String(5 * 60 * 1000),
);
let _syncCursor = "1970-01-01T00:00:00.000Z";

async function runAutoSync(): Promise<void> {
const newArtifacts = await diffSince(_syncCursor);

if (!newArtifacts.length) {
console.log(`[autosync] up to date (cursor: ${_syncCursor.slice(0, 19)})`);
return;
}

console.log(`[autosync] ${newArtifacts.length} new artifact(s) since ${_syncCursor.slice(0, 19)}`);

for (const { path } of newArtifacts) {
try {
const artifact = await fetchArtifact(path);

const { data: upsertData, error: upsertErr } = await db.upsertThought(
artifact.content,
{ metadata: { ...artifact.metadata, artifact_path: path } },
);

if (upsertErr || !upsertData) {
console.error(`[autosync] upsert failed for ${path}:`, upsertErr?.message);
continue;
}

const { data: alreadyEmbedded } = await db.hasEmbedding(upsertData.id);
if (alreadyEmbedded) {
console.log(`[autosync] skip embed (already present): ${artifact.content.slice(0, 60)}`);
continue;
}

const embedding = await getEmbedding(artifact.content);
await db.updateEmbedding(upsertData.id, embedding);
console.log(`[autosync] embedded: ${artifact.content.slice(0, 60)}`);
} catch (err) {
console.error(`[autosync] error on ${path}:`, (err as Error).message);
}
}

// Advance cursor to now — next run fetches only artifacts created after this point.
_syncCursor = new Date().toISOString();
console.log(`[autosync] cursor → ${_syncCursor.slice(0, 19)}`);
}

// Fire immediately on startup, then on the configured interval.
runAutoSync().catch((e) =>
console.error("[autosync] startup sync failed:", (e as Error).message)
);
setInterval(
() =>
runAutoSync().catch((e) =>
console.error("[autosync] interval sync failed:", (e as Error).message)
),
AUTOSYNC_INTERVAL_MS,
);
console.log(`[autosync] started — interval ${AUTOSYNC_INTERVAL_MS / 1000}s`);

Deno.serve(app.fetch);