diff --git a/cache/persistent.go b/cache/persistent.go index d615a42..c80d27a 100644 --- a/cache/persistent.go +++ b/cache/persistent.go @@ -1,6 +1,7 @@ package cache import ( + "encoding/binary" "encoding/json" "fmt" "io" @@ -13,9 +14,11 @@ import ( log "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" + bbolterrors "go.etcd.io/bbolt/errors" ) const bucketName = "cache" +const countersBucket = "counters" // PersistentCache wraps BoltDB for persistent storage // Note: No in-memory cache layer - BoltDB uses mmap so OS handles caching @@ -75,6 +78,15 @@ func NewPersistentCache(dbPath string, backupPath string, compressionEnabled boo return nil, fmt.Errorf("failed to create cache bucket: %v", err) } + err = db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(countersBucket)) + return err + }) + if err != nil { + db.Close() + return nil, fmt.Errorf("failed to create counters bucket: %v", err) + } + pc := &PersistentCache{ db: db, dbPath: dbPath, @@ -165,13 +177,24 @@ func (pc *PersistentCache) Set(key, value string) error { if b == nil { return fmt.Errorf("bucket not found") } + counters := tx.Bucket([]byte(countersBucket)) + if counters == nil { + return fmt.Errorf("counters bucket not found") + } data, err := json.Marshal(entry) if err != nil { return err } - return b.Put([]byte(key), data) + isNew := b.Get([]byte(key)) == nil + if err := b.Put([]byte(key), data); err != nil { + return err + } + if isNew { + return adjustCounter(counters, prefixOf(key), +1) + } + return nil }) } @@ -182,18 +205,39 @@ func (pc *PersistentCache) Delete(key string) error { if b == nil { return fmt.Errorf("bucket not found") } - return b.Delete([]byte(key)) + counters := tx.Bucket([]byte(countersBucket)) + if counters == nil { + return fmt.Errorf("counters bucket not found") + } + + existed := b.Get([]byte(key)) != nil + if err := b.Delete([]byte(key)); err != nil { + return err + } + if existed { + return adjustCounter(counters, prefixOf(key), -1) + } + return nil }) } -// Clear removes all entries from cache +// Clear removes all entries from cache and resets per-prefix counters in the +// same transaction so counts stay consistent with the wiped cache bucket. func (pc *PersistentCache) Clear() error { return pc.db.Update(func(tx *bolt.Tx) error { if err := tx.DeleteBucket([]byte(bucketName)); err != nil { return err } - _, err := tx.CreateBucket([]byte(bucketName)) - return err + if _, err := tx.CreateBucket([]byte(bucketName)); err != nil { + return err + } + if err := tx.DeleteBucket([]byte(countersBucket)); err != nil && err != bbolterrors.ErrBucketNotFound { + return err + } + if _, err := tx.CreateBucket([]byte(countersBucket)); err != nil { + return err + } + return nil }) } @@ -240,6 +284,96 @@ func (pc *PersistentCache) Stats() (numKeys int, sizeInKB int) { return } +// SizeKB returns the on-disk size of the database file in KB. +func (pc *PersistentCache) SizeKB() int { + info, err := os.Stat(pc.dbPath) + if err != nil { + return 0 + } + return int(info.Size() / 1024) +} + +// Counts returns the current per-prefix key counts read from the counters +// bucket. Always non-nil. Microseconds to execute regardless of cache size. +func (pc *PersistentCache) Counts() map[string]int64 { + counts := make(map[string]int64) + if err := pc.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(countersBucket)) + if b == nil { + return nil + } + return b.ForEach(func(k, v []byte) error { + if len(v) != 8 { + return nil + } + counts[string(k)] = int64(binary.BigEndian.Uint64(v)) + return nil + }) + }); err != nil { + log.Errorf("%s Failed to read counters: %v", logcolors.LogCache, err) + } + return counts +} + +// ReconcileCounters walks the entire cache bucket, recomputes the per-prefix +// counts, and atomically replaces the counters bucket contents. Expensive: cost +// scales with leaf-page count of the cache bucket (multi-minute on multi-GB +// DBs). Safe to call concurrently with Set/Delete: the swap happens in one txn. +// Note: any Set/Delete deltas applied between the scan and the swap will be +// overwritten by the snapshot. The next reconcile run self-corrects. +func (pc *PersistentCache) ReconcileCounters() error { + fresh := make(map[string]int64) + if err := pc.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bucketName)) + if b == nil { + return nil + } + return b.ForEach(func(k, _ []byte) error { + fresh[prefixOf(string(k))]++ + return nil + }) + }); err != nil { + return fmt.Errorf("reconcile: scan failed: %w", err) + } + + if err := pc.db.Update(func(tx *bolt.Tx) error { + if err := tx.DeleteBucket([]byte(countersBucket)); err != nil && err != bbolterrors.ErrBucketNotFound { + return err + } + b, err := tx.CreateBucket([]byte(countersBucket)) + if err != nil { + return err + } + for name, count := range fresh { + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], uint64(count)) + if err := b.Put([]byte(name), buf[:]); err != nil { + return err + } + } + return nil + }); err != nil { + return fmt.Errorf("reconcile: swap failed: %w", err) + } + return nil +} + +// adjustCounter applies delta (typically +1 or -1) to the named counter inside +// the given counters bucket. Initializes the counter at delta if absent. +func adjustCounter(b *bolt.Bucket, name string, delta int64) error { + var current int64 + if v := b.Get([]byte(name)); len(v) == 8 { + current = int64(binary.BigEndian.Uint64(v)) + } + current += delta + if current < 0 { + current = 0 + } + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], uint64(current)) + return b.Put([]byte(name), buf[:]) +} + // Backup creates a backup of the cache database file // Returns the backup file path func (pc *PersistentCache) Backup() (string, error) { diff --git a/cache/persistent_test.go b/cache/persistent_test.go index dbfcede..a967ef5 100644 --- a/cache/persistent_test.go +++ b/cache/persistent_test.go @@ -1,10 +1,14 @@ package cache import ( + "encoding/binary" + "fmt" "os" "path/filepath" "testing" "time" + + bolt "go.etcd.io/bbolt" ) // setupTestCache creates a temporary cache for testing @@ -607,3 +611,185 @@ func TestDeleteBackup_InvalidFile(t *testing.T) { t.Error("Expected error when deleting non-.db file") } } + +func TestCountersBucketExists(t *testing.T) { + pc, _, cleanup := setupTestCache(t, false) + defer cleanup() + + err := pc.db.View(func(tx *bolt.Tx) error { + if b := tx.Bucket([]byte("counters")); b == nil { + return fmt.Errorf("counters bucket missing") + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestCounts_EmptyCacheReturnsEmptyMap(t *testing.T) { + pc, _, cleanup := setupTestCache(t, false) + defer cleanup() + + counts := pc.Counts() + if len(counts) != 0 { + t.Errorf("expected empty counts on fresh cache, got %v", counts) + } +} + +func TestSet_IncrementsCounterOnNewKey(t *testing.T) { + pc, _, cleanup := setupTestCache(t, false) + defer cleanup() + + if err := pc.Set("ttml_lyrics:viva la vida coldplay", ""); err != nil { + t.Fatal(err) + } + if err := pc.Set("kugou_lyrics:foo", "lrc"); err != nil { + t.Fatal(err) + } + if err := pc.Set("no_lyrics:bar", `{"reason":"x"}`); err != nil { + t.Fatal(err) + } + + counts := pc.Counts() + if counts["ttml"] != 1 || counts["kugou"] != 1 || counts["negative"] != 1 { + t.Errorf("got counts %v, want ttml=1 kugou=1 negative=1", counts) + } +} + +func TestSet_DoesNotDoubleCountOnReSet(t *testing.T) { + pc, _, cleanup := setupTestCache(t, false) + defer cleanup() + + key := "ttml_lyrics:same key" + if err := pc.Set(key, "v1"); err != nil { + t.Fatal(err) + } + if err := pc.Set(key, "v2"); err != nil { + t.Fatal(err) + } + + if got := pc.Counts()["ttml"]; got != 1 { + t.Errorf("expected ttml=1 after re-Set, got %d", got) + } +} + +func TestDelete_DecrementsCounter(t *testing.T) { + pc, _, cleanup := setupTestCache(t, false) + defer cleanup() + + key := "ttml_lyrics:song" + if err := pc.Set(key, "v"); err != nil { + t.Fatal(err) + } + if got := pc.Counts()["ttml"]; got != 1 { + t.Fatalf("setup: expected ttml=1, got %d", got) + } + if err := pc.Delete(key); err != nil { + t.Fatal(err) + } + if got := pc.Counts()["ttml"]; got != 0 { + t.Errorf("after delete: expected ttml=0, got %d", got) + } +} + +func TestDelete_OnMissingKeyIsNoop(t *testing.T) { + pc, _, cleanup := setupTestCache(t, false) + defer cleanup() + + if err := pc.Delete("ttml_lyrics:does not exist"); err != nil { + t.Fatal(err) + } + if got := pc.Counts()["ttml"]; got != 0 { + t.Errorf("expected ttml=0, got %d", got) + } +} + +func TestReconcileCounters_CorrectsDrift(t *testing.T) { + pc, _, cleanup := setupTestCache(t, false) + defer cleanup() + + // Insert directly via the underlying bucket so counters are NOT bumped. + // This simulates pre-counter data or drift. + keys := []string{ + "ttml_lyrics:a", "ttml_lyrics:b", "ttml_lyrics:c", + "kugou_lyrics:x", + "no_lyrics:y", "no_lyrics:z", + } + if err := pc.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bucketName)) + for _, k := range keys { + if err := b.Put([]byte(k), []byte(`{}`)); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatal(err) + } + + if got := pc.Counts()["ttml"]; got != 0 { + t.Fatalf("setup invariant: counters should still be empty, got ttml=%d", got) + } + + if err := pc.ReconcileCounters(); err != nil { + t.Fatal(err) + } + + counts := pc.Counts() + if counts["ttml"] != 3 || counts["kugou"] != 1 || counts["negative"] != 2 { + t.Errorf("after reconcile: got %v, want ttml=3 kugou=1 negative=2", counts) + } +} + +func TestClear_ResetsCounters(t *testing.T) { + pc, _, cleanup := setupTestCache(t, false) + defer cleanup() + + if err := pc.Set("ttml_lyrics:x", "v"); err != nil { + t.Fatal(err) + } + if got := pc.Counts()["ttml"]; got != 1 { + t.Fatalf("setup: ttml=%d, want 1", got) + } + if err := pc.Clear(); err != nil { + t.Fatal(err) + } + if got := pc.Counts(); len(got) != 0 { + t.Errorf("after Clear: counts should be empty, got %v", got) + } +} + +func TestReconcileCounters_WipesStaleCounters(t *testing.T) { + pc, _, cleanup := setupTestCache(t, false) + defer cleanup() + + // Seed a deliberately-wrong counter value to ensure reconcile wipes it. + if err := pc.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(countersBucket)) + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], 999) + return b.Put([]byte("ttml"), buf[:]) + }); err != nil { + t.Fatal(err) + } + if got := pc.Counts()["ttml"]; got != 999 { + t.Fatalf("setup: expected ttml=999, got %d", got) + } + + // Insert one real key directly, bypassing Set, so reconcile must compute the + // truth from the cache bucket alone. + if err := pc.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket([]byte(bucketName)).Put([]byte("ttml_lyrics:real"), []byte(`{}`)) + }); err != nil { + t.Fatal(err) + } + + if err := pc.ReconcileCounters(); err != nil { + t.Fatal(err) + } + + if got := pc.Counts()["ttml"]; got != 1 { + t.Errorf("after reconcile: expected ttml=1 (wiped from 999), got %d", got) + } +} diff --git a/cache/prefix.go b/cache/prefix.go new file mode 100644 index 0000000..e4cd9b9 --- /dev/null +++ b/cache/prefix.go @@ -0,0 +1,19 @@ +package cache + +import "strings" + +// prefixOf returns the public counter name for a cache key. The cache key format +// is ":". Known prefixes get mapped to short public names; the +// special "no_lyrics" prefix becomes "negative". Anything else (including the +// no-colon case) becomes "unknown". +func prefixOf(key string) string { + idx := strings.IndexByte(key, ':') + if idx <= 0 { + return "unknown" + } + prefix := key[:idx] + if prefix == "no_lyrics" { + return "negative" + } + return strings.TrimSuffix(prefix, "_lyrics") +} diff --git a/cache/prefix_test.go b/cache/prefix_test.go new file mode 100644 index 0000000..3c936d0 --- /dev/null +++ b/cache/prefix_test.go @@ -0,0 +1,25 @@ +package cache + +import "testing" + +func TestPrefixOf(t *testing.T) { + cases := []struct { + in, want string + }{ + {"ttml_lyrics:song artist", "ttml"}, + {"kugou_lyrics:foo", "kugou"}, + {"qq_lyrics:foo", "qq"}, + {"legacy_lyrics:foo", "legacy"}, + {"no_lyrics:song artist", "negative"}, + {"weird_no_colon_key", "unknown"}, + {"", "unknown"}, + {":empty_prefix", "unknown"}, + {"future_provider_lyrics:x", "future_provider"}, + } + for _, tc := range cases { + got := prefixOf(tc.in) + if got != tc.want { + t.Errorf("prefixOf(%q) = %q, want %q", tc.in, got, tc.want) + } + } +} diff --git a/cache/stats_cache.go b/cache/stats_cache.go index 19c4c52..8b5de17 100644 --- a/cache/stats_cache.go +++ b/cache/stats_cache.go @@ -11,42 +11,41 @@ import ( ) const ( - StatsStatusComputing = "computing" - StatsStatusReady = "ready" + StatsStatusSeeding = "seeding" + StatsStatusReady = "ready" + StatsStatusError = "error" ) -// CachedStats is an immutable snapshot of cache statistics. +// CachedStats reports the lifecycle state of the counter-reconciliation loop. +// The actual key counts are NOT held here: callers read them live from +// PersistentCache.Counts(), which is microseconds. type CachedStats struct { - NumKeys int `json:"num_keys"` - SizeKB int `json:"size_kb"` - ComputedAt time.Time `json:"computed_at"` - DurationMs int64 `json:"duration_ms"` - Status string `json:"status"` + Status string `json:"status"` + LastReconciledAt time.Time `json:"last_reconciled_at"` + LastDurationMs int64 `json:"last_duration_ms"` + LastError string `json:"last_error,omitempty"` } -// StatsCache holds the most recent stats snapshot computed in the background. -// Reads are O(1) and lock-free; writes are serialized via a TryLock so concurrent -// refreshes collapse into a single scan. type StatsCache struct { value atomic.Pointer[CachedStats] cache *PersistentCache refreshMu sync.Mutex } -// NewStatsCache returns a StatsCache seeded with a "computing" snapshot. func NewStatsCache(c *PersistentCache) *StatsCache { sc := &StatsCache{cache: c} - sc.value.Store(&CachedStats{Status: StatsStatusComputing}) + sc.value.Store(&CachedStats{Status: StatsStatusSeeding}) return sc } -// Get returns the most recent snapshot. Always non-nil. func (sc *StatsCache) Get() *CachedStats { return sc.value.Load() } -// Refresh computes a fresh snapshot and stores it. If a refresh is already in -// flight, the call is a no-op (the in-flight scan's result will be published). +// Refresh runs ReconcileCounters and updates the lifecycle state. +// No-op if another refresh is already in flight. On failure the snapshot moves +// to StatsStatusError, preserves LastReconciledAt from the previous good run, +// and records the error message in LastError. func (sc *StatsCache) Refresh() { if !sc.refreshMu.TryLock() { return @@ -54,24 +53,35 @@ func (sc *StatsCache) Refresh() { defer sc.refreshMu.Unlock() start := time.Now() - keys, sizeKB := sc.cache.Stats() + if err := sc.cache.ReconcileCounters(); err != nil { + log.Errorf("%s Reconcile failed: %v", logcolors.LogCache, err) + prev := sc.value.Load() + sc.value.Store(&CachedStats{ + Status: StatsStatusError, + LastReconciledAt: prev.LastReconciledAt, + LastDurationMs: time.Since(start).Milliseconds(), + LastError: err.Error(), + }) + return + } sc.value.Store(&CachedStats{ - NumKeys: keys, - SizeKB: sizeKB, - ComputedAt: time.Now(), - DurationMs: time.Since(start).Milliseconds(), - Status: StatsStatusReady, + Status: StatsStatusReady, + LastReconciledAt: time.Now(), + LastDurationMs: time.Since(start).Milliseconds(), }) } -// StartBackgroundRefresh kicks off an immediate scan in a goroutine and then -// re-scans every interval. Stops when stop is closed. +// StartBackgroundRefresh runs an immediate seed-reconcile in a goroutine and +// then re-reconciles every interval. Stops when stop is closed. func (sc *StatsCache) StartBackgroundRefresh(interval time.Duration, stop <-chan struct{}) { go func() { - log.Infof("%s Computing initial stats snapshot (refresh every %s)", logcolors.LogCache, interval) + log.Infof("%s Seeding counters (reconcile cadence: %s)", logcolors.LogCache, interval) sc.Refresh() - snap := sc.Get() - log.Infof("%s Initial stats snapshot ready: %d keys, %d KB (took %dms)", logcolors.LogCache, snap.NumKeys, snap.SizeKB, snap.DurationMs) + if snap := sc.Get(); snap.Status == StatsStatusReady { + log.Infof("%s Counter seed complete (took %dms)", logcolors.LogCache, snap.LastDurationMs) + } else { + log.Errorf("%s Counter seed FAILED (status=%s): %s", logcolors.LogCache, snap.Status, snap.LastError) + } ticker := time.NewTicker(interval) defer ticker.Stop() @@ -79,8 +89,11 @@ func (sc *StatsCache) StartBackgroundRefresh(interval time.Duration, stop <-chan select { case <-ticker.C: sc.Refresh() - snap := sc.Get() - log.Infof("%s Stats snapshot refreshed: %d keys, %d KB (took %dms)", logcolors.LogCache, snap.NumKeys, snap.SizeKB, snap.DurationMs) + if snap := sc.Get(); snap.Status == StatsStatusReady { + log.Infof("%s Counters reconciled (took %dms)", logcolors.LogCache, snap.LastDurationMs) + } else { + log.Errorf("%s Counter reconcile FAILED (status=%s): %s", logcolors.LogCache, snap.Status, snap.LastError) + } case <-stop: return } diff --git a/cache/stats_cache_test.go b/cache/stats_cache_test.go index b15afbf..0836209 100644 --- a/cache/stats_cache_test.go +++ b/cache/stats_cache_test.go @@ -6,34 +6,26 @@ import ( "time" ) -func TestStatsCache_InitialStateIsComputing(t *testing.T) { +func TestStatsCache_InitialStateIsSeeding(t *testing.T) { pc, _, cleanup := setupTestCache(t, false) defer cleanup() sc := NewStatsCache(pc) snap := sc.Get() - - if snap == nil { - t.Fatal("expected snapshot, got nil") - } - if snap.Status != StatsStatusComputing { - t.Errorf("expected status %q, got %q", StatsStatusComputing, snap.Status) + if snap.Status != StatsStatusSeeding { + t.Errorf("status = %q, want %q", snap.Status, StatsStatusSeeding) } - if snap.NumKeys != 0 { - t.Errorf("expected 0 keys before first refresh, got %d", snap.NumKeys) - } - if !snap.ComputedAt.IsZero() { - t.Errorf("expected zero ComputedAt before first refresh, got %v", snap.ComputedAt) + if !snap.LastReconciledAt.IsZero() { + t.Errorf("LastReconciledAt should be zero before first reconcile, got %v", snap.LastReconciledAt) } } -func TestStatsCache_RefreshPopulatesFromUnderlyingCache(t *testing.T) { +func TestStatsCache_RefreshMovesStatusToReady(t *testing.T) { pc, _, cleanup := setupTestCache(t, false) defer cleanup() - pc.Set("a", "1") - pc.Set("b", "2") - pc.Set("c", "3") + pc.Set("ttml_lyrics:a", "1") + pc.Set("kugou_lyrics:b", "2") sc := NewStatsCache(pc) before := time.Now() @@ -41,89 +33,101 @@ func TestStatsCache_RefreshPopulatesFromUnderlyingCache(t *testing.T) { snap := sc.Get() if snap.Status != StatsStatusReady { - t.Errorf("expected status %q after refresh, got %q", StatsStatusReady, snap.Status) + t.Errorf("status = %q, want %q", snap.Status, StatsStatusReady) } - if snap.NumKeys != 3 { - t.Errorf("expected 3 keys, got %d", snap.NumKeys) + if snap.LastReconciledAt.Before(before) { + t.Errorf("LastReconciledAt = %v, expected >= %v", snap.LastReconciledAt, before) } - if snap.ComputedAt.Before(before) { - t.Errorf("expected ComputedAt >= %v, got %v", before, snap.ComputedAt) + + counts := pc.Counts() + if counts["ttml"] != 1 || counts["kugou"] != 1 { + t.Errorf("after reconcile: got %v, want ttml=1 kugou=1", counts) } } -func TestStatsCache_RefreshReflectsCacheGrowth(t *testing.T) { +func TestStatsCache_ConcurrentRefreshIsSafe(t *testing.T) { pc, _, cleanup := setupTestCache(t, false) defer cleanup() + pc.Set("ttml_lyrics:a", "1") sc := NewStatsCache(pc) - sc.Refresh() - if got := sc.Get().NumKeys; got != 0 { - t.Fatalf("expected 0 keys initially, got %d", got) - } - pc.Set("a", "1") - pc.Set("b", "2") - sc.Refresh() + var wg sync.WaitGroup + for i := 0; i < 20; i++ { + wg.Add(1) + go func() { + defer wg.Done() + sc.Refresh() + }() + } + wg.Wait() - if got := sc.Get().NumKeys; got != 2 { - t.Errorf("expected 2 keys after adding entries, got %d", got) + if sc.Get().Status != StatsStatusReady { + t.Errorf("status = %q, want ready", sc.Get().Status) } } -func TestStatsCache_GetIsConcurrentSafe(t *testing.T) { +func TestStatsCache_RefreshOnClosedDBPublishesError(t *testing.T) { pc, _, cleanup := setupTestCache(t, false) - defer cleanup() + // Don't defer cleanup, we close the DB manually below. - pc.Set("a", "1") sc := NewStatsCache(pc) + // Drive the cache into a state where reconcile must fail by closing the + // underlying DB. + if err := pc.Close(); err != nil { + t.Fatal(err) + } + sc.Refresh() - var wg sync.WaitGroup - for i := 0; i < 100; i++ { - wg.Add(1) - go func() { - defer wg.Done() - snap := sc.Get() - if snap.NumKeys != 1 { - t.Errorf("expected 1 key, got %d", snap.NumKeys) - } - }() + snap := sc.Get() + if snap.Status != StatsStatusError { + t.Errorf("status = %q, want %q", snap.Status, StatsStatusError) } - wg.Wait() + if snap.LastError == "" { + t.Error("expected non-empty LastError") + } + _ = cleanup // not used, db already closed } -func TestStatsCache_ConcurrentRefreshIsSafe(t *testing.T) { +func TestStatsCache_RefreshErrorPreservesLastReconciledAt(t *testing.T) { pc, _, cleanup := setupTestCache(t, false) - defer cleanup() - pc.Set("a", "1") + pc.Set("ttml_lyrics:a", "1") sc := NewStatsCache(pc) + sc.Refresh() - var wg sync.WaitGroup - for i := 0; i < 20; i++ { - wg.Add(1) - go func() { - defer wg.Done() - sc.Refresh() - }() + goodSnap := sc.Get() + if goodSnap.Status != StatsStatusReady { + t.Fatalf("setup: status = %q, want %q", goodSnap.Status, StatsStatusReady) + } + if goodSnap.LastReconciledAt.IsZero() { + t.Fatal("setup: LastReconciledAt should be set after successful reconcile") } - wg.Wait() - snap := sc.Get() - if snap.Status != StatsStatusReady { - t.Errorf("expected status %q, got %q", StatsStatusReady, snap.Status) + if err := pc.Close(); err != nil { + t.Fatal(err) + } + + sc.Refresh() + + errSnap := sc.Get() + if errSnap.Status != StatsStatusError { + t.Errorf("status = %q, want %q", errSnap.Status, StatsStatusError) } - if snap.NumKeys != 1 { - t.Errorf("expected 1 key, got %d", snap.NumKeys) + if !errSnap.LastReconciledAt.Equal(goodSnap.LastReconciledAt) { + t.Errorf("LastReconciledAt = %v, want %v (preserved from last good reconcile)", + errSnap.LastReconciledAt, goodSnap.LastReconciledAt) } + _ = cleanup } -func TestStatsCache_StartBackgroundRefreshSeedsInitialScan(t *testing.T) { +func TestStatsCache_StartBackgroundRefreshTriggersInitialReconcile(t *testing.T) { pc, _, cleanup := setupTestCache(t, false) defer cleanup() - pc.Set("a", "1") - pc.Set("b", "2") + pc.Set("ttml_lyrics:a", "1") + pc.Set("ttml_lyrics:b", "2") sc := NewStatsCache(pc) stop := make(chan struct{}) @@ -138,11 +142,7 @@ func TestStatsCache_StartBackgroundRefreshSeedsInitialScan(t *testing.T) { time.Sleep(10 * time.Millisecond) } - snap := sc.Get() - if snap.Status != StatsStatusReady { - t.Fatalf("expected background refresh to complete within deadline; status %q", snap.Status) - } - if snap.NumKeys != 2 { - t.Errorf("expected 2 keys, got %d", snap.NumKeys) + if sc.Get().Status != StatsStatusReady { + t.Fatalf("initial reconcile did not complete; status %q", sc.Get().Status) } } diff --git a/handlers.go b/handlers.go index 688c78e..85ee327 100644 --- a/handlers.go +++ b/handlers.go @@ -505,16 +505,23 @@ func getStats(w http.ResponseWriter, r *http.Request) { s := stats.Get() snapshot := s.Snapshot() - // Add cache storage info. Reads the cached snapshot computed in the background - // every 6h so this endpoint never blocks on a full bucket scan. + // Add cache storage info. Reads live counters maintained by PersistentCache, + // so this endpoint never blocks on a full bucket scan. cs := cacheStats.Get() + counts := persistentCache.Counts() + var total int64 + for _, n := range counts { + total += n + } + sizeKB := persistentCache.SizeKB() snapshot["cache_storage"] = map[string]interface{}{ - "keys": cs.NumKeys, - "size_kb": cs.SizeKB, - "size_mb": float64(cs.SizeKB) / 1024, - "status": cs.Status, - "computed_at": cs.ComputedAt, - "duration_ms": cs.DurationMs, + "keys_total": total, + "keys_by_provider": counts, + "size_kb": sizeKB, + "size_mb": float64(sizeKB) / 1024, + "status": cs.Status, + "last_reconciled_at": cs.LastReconciledAt, + "last_duration_ms": cs.LastDurationMs, } // Add circuit breaker status @@ -714,15 +721,21 @@ func restoreCache(w http.ResponseWriter, r *http.Request) { // Refresh the cached stats snapshot so /stats reflects the restored state. cacheStats.Refresh() - cs := cacheStats.Get() + counts := persistentCache.Counts() + var total int64 + for _, n := range counts { + total += n + } + sizeKB := persistentCache.SizeKB() log.Infof("%s Cache restored from backup: %s", logcolors.LogCacheRestore, backupFileName) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ - "message": "Cache restored successfully", - "restored_from": backupFileName, - "keys_restored": cs.NumKeys, - "size_kb": cs.SizeKB, + "message": "Cache restored successfully", + "restored_from": backupFileName, + "keys_total": total, + "keys_by_provider": counts, + "size_kb": sizeKB, }) } diff --git a/main.go b/main.go index eb0c359..6424a06 100644 --- a/main.go +++ b/main.go @@ -96,10 +96,11 @@ func main() { // Initialize metadata and indexes buckets (separate from cache bucket) initMetadataBuckets() - // Start background stats refresh (24h interval). Reads /stats hit the cached - // snapshot instead of triggering a full scan. + // Counter reconciliation loop. Counters are live (updated transactionally with + // Set/Delete) so /stats is microseconds. The weekly reconcile only corrects + // drift from rare type-flips. cacheStats = cache.NewStatsCache(persistentCache) - cacheStats.StartBackgroundRefresh(24*time.Hour, nil) + cacheStats.StartBackgroundRefresh(7*24*time.Hour, nil) // Start bearer token auto-scraper (proactive refresh based on JWT expiry) ttml.StartBearerTokenMonitor()