Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 139 additions & 5 deletions cache/persistent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
Expand All @@ -13,9 +14,11 @@ import (

log "github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
bbolterrors "go.etcd.io/bbolt/errors"
)

const bucketName = "cache"
const countersBucket = "counters"

// PersistentCache wraps BoltDB for persistent storage
// Note: No in-memory cache layer - BoltDB uses mmap so OS handles caching
Expand Down Expand Up @@ -75,6 +78,15 @@ func NewPersistentCache(dbPath string, backupPath string, compressionEnabled boo
return nil, fmt.Errorf("failed to create cache bucket: %v", err)
}

err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(countersBucket))
return err
})
if err != nil {
db.Close()
return nil, fmt.Errorf("failed to create counters bucket: %v", err)
}

pc := &PersistentCache{
db: db,
dbPath: dbPath,
Expand Down Expand Up @@ -165,13 +177,24 @@ func (pc *PersistentCache) Set(key, value string) error {
if b == nil {
return fmt.Errorf("bucket not found")
}
counters := tx.Bucket([]byte(countersBucket))
if counters == nil {
return fmt.Errorf("counters bucket not found")
}

data, err := json.Marshal(entry)
if err != nil {
return err
}

return b.Put([]byte(key), data)
isNew := b.Get([]byte(key)) == nil
if err := b.Put([]byte(key), data); err != nil {
return err
}
if isNew {
return adjustCounter(counters, prefixOf(key), +1)
}
return nil
})
}

Expand All @@ -182,18 +205,39 @@ func (pc *PersistentCache) Delete(key string) error {
if b == nil {
return fmt.Errorf("bucket not found")
}
return b.Delete([]byte(key))
counters := tx.Bucket([]byte(countersBucket))
if counters == nil {
return fmt.Errorf("counters bucket not found")
}

existed := b.Get([]byte(key)) != nil
if err := b.Delete([]byte(key)); err != nil {
return err
}
if existed {
return adjustCounter(counters, prefixOf(key), -1)
}
return nil
})
}

// Clear removes all entries from cache
// Clear removes all entries from cache and resets per-prefix counters in the
// same transaction so counts stay consistent with the wiped cache bucket.
func (pc *PersistentCache) Clear() error {
return pc.db.Update(func(tx *bolt.Tx) error {
if err := tx.DeleteBucket([]byte(bucketName)); err != nil {
return err
}
_, err := tx.CreateBucket([]byte(bucketName))
return err
if _, err := tx.CreateBucket([]byte(bucketName)); err != nil {
return err
}
if err := tx.DeleteBucket([]byte(countersBucket)); err != nil && err != bbolterrors.ErrBucketNotFound {
return err
}
if _, err := tx.CreateBucket([]byte(countersBucket)); err != nil {
return err
}
return nil
})
}

Expand Down Expand Up @@ -240,6 +284,96 @@ func (pc *PersistentCache) Stats() (numKeys int, sizeInKB int) {
return
}

// SizeKB returns the on-disk size of the database file in KB.
func (pc *PersistentCache) SizeKB() int {
info, err := os.Stat(pc.dbPath)
if err != nil {
return 0
}
return int(info.Size() / 1024)
}

// Counts returns the current per-prefix key counts read from the counters
// bucket. Always non-nil. Microseconds to execute regardless of cache size.
func (pc *PersistentCache) Counts() map[string]int64 {
counts := make(map[string]int64)
if err := pc.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(countersBucket))
if b == nil {
return nil
}
return b.ForEach(func(k, v []byte) error {
if len(v) != 8 {
return nil
}
counts[string(k)] = int64(binary.BigEndian.Uint64(v))
return nil
})
}); err != nil {
log.Errorf("%s Failed to read counters: %v", logcolors.LogCache, err)
}
return counts
}

// ReconcileCounters walks the entire cache bucket, recomputes the per-prefix
// counts, and atomically replaces the counters bucket contents. Expensive: cost
// scales with leaf-page count of the cache bucket (multi-minute on multi-GB
// DBs). Safe to call concurrently with Set/Delete: the swap happens in one txn.
// Note: any Set/Delete deltas applied between the scan and the swap will be
// overwritten by the snapshot. The next reconcile run self-corrects.
func (pc *PersistentCache) ReconcileCounters() error {
fresh := make(map[string]int64)
if err := pc.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
return nil
}
return b.ForEach(func(k, _ []byte) error {
fresh[prefixOf(string(k))]++
return nil
})
}); err != nil {
return fmt.Errorf("reconcile: scan failed: %w", err)
}

if err := pc.db.Update(func(tx *bolt.Tx) error {
if err := tx.DeleteBucket([]byte(countersBucket)); err != nil && err != bbolterrors.ErrBucketNotFound {
return err
}
b, err := tx.CreateBucket([]byte(countersBucket))
if err != nil {
return err
}
for name, count := range fresh {
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(count))
if err := b.Put([]byte(name), buf[:]); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("reconcile: swap failed: %w", err)
}
return nil
}

// adjustCounter applies delta (typically +1 or -1) to the named counter inside
// the given counters bucket. Initializes the counter at delta if absent.
func adjustCounter(b *bolt.Bucket, name string, delta int64) error {
var current int64
if v := b.Get([]byte(name)); len(v) == 8 {
current = int64(binary.BigEndian.Uint64(v))
}
current += delta
if current < 0 {
current = 0
}
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(current))
return b.Put([]byte(name), buf[:])
}

// Backup creates a backup of the cache database file
// Returns the backup file path
func (pc *PersistentCache) Backup() (string, error) {
Expand Down
Loading
Loading