diff --git a/python/CHANGELOG.md b/python/CHANGELOG.md index 407e657a7..4b9205bbd 100644 --- a/python/CHANGELOG.md +++ b/python/CHANGELOG.md @@ -7,6 +7,36 @@ This project adheres to [Semantic Versioning](http://semver.org/). ### What's New +#### Faster `get_data` pagination + +Up to a ~80x speedup for some get_data calls. + +#### Shared on-disk cache (opt-out, on by default) + +`client.channels.get_data(...)` now caches the channel windows it returns to disk by default. Subsequent calls covering the same channel/time range — including from a fresh process — read straight out of the cache instead of going to the wire. This also bounds memory: nothing is held in process after the call returns, which fixes the OOM seen on long sustained pulls (~5–7 GB of cache for a 145M-point pull in earlier versions). + +The cache lives on the `SiftClient` as a single shared store: every cache-aware resource writes to one global byte budget at one path, with one LRU policy. The default location is `/sift-data-cache`, capped at 4 GiB with LRU eviction. If the default path can't be opened (read-only filesystem, restricted container, etc.), the client logs a warning and continues with caching disabled — `get_data` still works, it just always goes to the wire. + +`ignore_cache=True` on `client.channels.get_data(...)` now skips writing into the cache as well as reading from it. Previously a "non-caching" workload still appended to the shared cache on every call. + +Configuration lives on the new `client.cache` namespace — knobs are global because the store is shared: + +```python +# Opt out — no data persisted to disk; every get_data call goes to the wire. +client.cache.disable_disk() + +# Reconfigure the location or byte cap. +client.cache.enable_disk(path="/data/sift-cache", max_bytes=2 * 1024 ** 3) + +# Remove a stale or corrupted cache directory. +client.cache.clear_disk() # default tmp path +client.cache.clear_disk("/data/sift-cache") # custom path +``` + +`enable_disk` is also the way to turn the cache back on after a prior `disable_disk` call. + +The cache is powered by [`diskcache`](https://grantjenks.com/docs/diskcache/) (pure-Python, SQLite-backed) with LRU eviction. + #### Resource and principal attributes (ABAC) Added a public API for attribute based access control (ABAC) attributes. `client.resource_attributes` manages attribute keys assigned to entities (assets, channels, runs), and `client.principal_attributes` manages attribute keys assigned to principals (users and user groups). Both are available synchronously and asynchronously via `client.async_`. diff --git a/python/lib/sift_client/_internal/cache_namespace.py b/python/lib/sift_client/_internal/cache_namespace.py new file mode 100644 index 000000000..c76ccaeb9 --- /dev/null +++ b/python/lib/sift_client/_internal/cache_namespace.py @@ -0,0 +1,114 @@ +"""User-facing surface for the shared on-disk cache. + +This module hosts the small bag of methods exposed as ``client.cache``. +The cache itself (a :class:`~sift_client._internal.disk_cache.DiskCache`) +lives on :class:`~sift_client.client.SiftClient` so every resource that +wants to persist results across calls can reach into one shared store. + +The namespace deliberately mirrors :class:`DiskCache` rather than the +old per-resource API (``client.channels.enable_data_cache_disk(...)``): +since the store is shared, configuration is global. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from sift_client._internal.disk_cache import DiskCache + +if TYPE_CHECKING: + import os + + from sift_client.client import SiftClient + +logger = logging.getLogger(__name__) + + +class CacheNamespace: + """Resource-agnostic surface for the on-disk cache shared by all resources. + + Exposed as ``client.cache``. The actual handle (:class:`DiskCache`) is + constructed lazily on first use so importing :mod:`sift_client` doesn't + pay the diskcache cost up front. Configuration changes made before + that first use are recorded against the + :class:`~sift_client._internal.disk_cache_config.DiskCacheConfig` on the + client and applied when the store opens; changes after first use are + routed directly to the live :class:`DiskCache`. + + Default policy: disk caching is **opt-out** (the ``DiskCacheConfig`` is + constructed with ``enabled=True``). Users who don't want any state on + disk call :meth:`disable_disk` to silence it; users who want a custom + location or byte cap call :meth:`enable_disk` with arguments. + """ + + def __init__(self, client: SiftClient): + self._client = client + + def enable_disk( + self, + *, + path: str | os.PathLike[str] | None = None, + max_bytes: int | None = None, + ) -> None: + """Enable (or reconfigure) on-disk caching. + + Disk caching is **on by default** at :attr:`DiskCache.DEFAULT_DISK_PATH`; + use this method to override the path or size, or to turn the cache + back on after a prior :meth:`disable_disk` call. + + Reconfiguring a live cache (different ``path`` or ``max_bytes``) + closes the previous handle and opens a new one. Existing entries + at the new path become available as cache hits. + + An explicit ``path`` that can't be opened (permission denied, + read-only filesystem, ...) raises so the caller knows their + request didn't take. The default-path open does *not* raise — see + :meth:`SiftClient._get_disk_cache` for the silent fall-back. + + Args: + path: Directory to persist to. ``None`` (the default) uses + :attr:`DiskCache.DEFAULT_DISK_PATH`. + max_bytes: Byte cap on disk usage. ``None`` uses + :attr:`DiskCache.DEFAULT_DISK_MAX_BYTES` (4 GiB). When the + bound is reached, ``diskcache``'s LRU eviction takes over. + + Example: + client.cache.enable_disk(path="/data/sift-cache") + client.cache.enable_disk(max_bytes=1024 ** 3) # 1 GiB + """ + client = self._client + client._disk_cache_config.enable(path=path, max_bytes=max_bytes) + if client._disk_cache is not None: + client._disk_cache.enable_disk(path=path, max_bytes=max_bytes) + + def disable_disk(self) -> None: + """Opt out of on-disk caching (no reads or writes). + + Caching is on by default; call this when you don't want any + cached data written to or read from disk. Closes any open cache + file handle. The on-disk directory is NOT deleted — use + :meth:`clear_disk` to wipe it. + """ + client = self._client + client._disk_cache_config.disable() + if client._disk_cache is not None: + client._disk_cache.disable_disk() + + def clear_disk(self, path: str | os.PathLike[str] | None = None) -> None: + """Delete a previously-persisted on-disk cache directory. + + Drops stale caches from previous sessions, recovers from a + corrupt cache, or reclaims disk space. Removes the directory + entirely; if disk caching is on, the next access re-opens an + empty cache at the same path. + + Args: + path: Directory of the cache to clear. ``None`` (the default) + targets :attr:`DiskCache.DEFAULT_DISK_PATH`. + + Raises: + ValueError: If ``path`` exists but does not look like a sift + data cache directory. + """ + DiskCache.clear_disk(path) diff --git a/python/lib/sift_client/_internal/disk_cache.py b/python/lib/sift_client/_internal/disk_cache.py new file mode 100644 index 000000000..0118c6477 --- /dev/null +++ b/python/lib/sift_client/_internal/disk_cache.py @@ -0,0 +1,336 @@ +"""Shared on-disk key/value store used by every resource that wants to cache results. + +One :class:`DiskCache` instance lives on the :class:`SiftClient` (see +``client._disk_cache``). Resources don't construct their own — they receive +a reference and wrap it in a typed adapter that namespaces keys (e.g. +``ChannelDataCache`` in ``low_level_wrappers/data.py``). The store itself +is deliberately value-agnostic: callers hand in ``size_bytes`` for the +oversize guard, ``diskcache`` pickles whatever object the caller supplied, +and the store never needs to know what's inside. + +This module is the sibling of :mod:`._disk_cache_config` — the config +holds user intent (enabled / path / max_bytes) and the store is the live +handle keyed off that intent. + +Key behaviours pinned here so the adapter layer can stay thin: + +* Default path lives under :func:`tempfile.gettempdir` and is shared + across processes, so a fresh session reads previously-cached entries. +* The byte cap is one global budget; LRU eviction spans all resources + sharing the store (channels, calculated channels, exports, ...). +* :meth:`clear_disk` (classmethod) refuses to delete a directory that + doesn't look like a sift cache (no diskcache marker), so a typo'd + path can't take out the user's documents. +* Oversized entries are skipped with a one-shot warning per key — + otherwise diskcache's eviction loop would drain every other row + trying to fit an unfittable entry. +* Construction with ``disk_path=None`` (or after :meth:`disable_disk`) + is a silent no-op store. Callers don't need to branch on disabled + state; reads always miss and writes are dropped. +""" + +from __future__ import annotations + +import logging +import os +import shutil +import tempfile +from pathlib import Path +from typing import TYPE_CHECKING, Any, Iterator, cast + +if TYPE_CHECKING: + import diskcache + +logger = logging.getLogger(__name__) + + +class DiskCache: + """Process-wide disk-backed key/value store. + + Wraps a :class:`diskcache.Cache` with the lifecycle management and + safety rails sift resources rely on. The instance is shared — each + resource adapter namespaces its keys (e.g. ``channel:``) so multiple + resources can write to the same store without colliding. + + When ``disk_path`` is ``None``, the instance is a silent no-op: every + ``get`` misses, every ``put`` is dropped, and ``__contains__`` is + always ``False``. This lets callers treat "caching disabled" the same + as a cold cache, with no branching needed at the read/write site. + + Args: + disk_path: Directory to persist to. ``None`` keeps the store + disabled. A previously-populated directory is reused, so a + fresh process reading the same path sees existing entries. + disk_max_bytes: Byte cap on the store. ``None`` falls back to + :attr:`DEFAULT_DISK_MAX_BYTES`. Ignored when ``disk_path`` + is ``None``. + """ + + #: Default directory for the shared cache. Lives under + #: :func:`tempfile.gettempdir` so it survives across sessions of the + #: same user but doesn't pollute the home directory. The suffix is + #: fixed so multiple ``SiftClient`` instances naturally share the + #: same store and pick up each other's prior sessions. + DEFAULT_DISK_PATH: str = os.path.join(tempfile.gettempdir(), "sift-data-cache") + + #: Default byte cap when :meth:`enable_disk` is called without an + #: explicit ``max_bytes``. 4 GiB is generous for the typical ``/tmp`` + #: filesystem; ``diskcache`` enforces the cap with its own SQLite- + #: backed LRU eviction once the bound is reached. + DEFAULT_DISK_MAX_BYTES: int = 4 * 1024 * 1024 * 1024 + + #: Marker file ``diskcache`` writes inside every cache directory. The + #: classmethod :meth:`clear_disk` checks for this before any + #: ``shutil.rmtree`` so a typo'd path can't wipe out an unrelated + #: directory. + _DISKCACHE_MARKER: str = "cache.db" + + def __init__( + self, + *, + disk_path: str | os.PathLike[str] | None = None, + disk_max_bytes: int | None = None, + ): + # Keys we've already logged an "entry exceeds disk cap" warning + # for. Tracks the full namespaced key (e.g. ``channel:foo``), not + # the resource-side id, so two adapters that happen to share an + # id space don't collide on dedup. A successful normal put + # clears the bit so a future regression re-warns. + self._oversized_warned: set[str] = set() + self._disk: diskcache.Cache | None = None + self._disk_path: str | None = None + self._disk_max_bytes: int | None = None + if disk_path is not None: + self._open_disk( + str(disk_path), + disk_max_bytes if disk_max_bytes is not None else self.DEFAULT_DISK_MAX_BYTES, + ) + + @classmethod + def clear_disk(cls, path: str | os.PathLike[str] | None = None) -> None: + """Delete a previously-persisted on-disk cache directory. + + Use this to drop stale caches from previous sessions, recover + from a corrupt cache, or reclaim disk space. The directory is + removed entirely; a future :meth:`enable_disk` call at the same + path opens a fresh empty cache. + + Args: + path: Directory of the cache to clear. ``None`` (the default) + targets :attr:`DEFAULT_DISK_PATH`. + + Raises: + ValueError: If ``path`` exists but does not look like a sift + cache directory (missing the ``diskcache`` marker file). + The guard makes accidental misuse a hard error rather + than silent data loss. + """ + target = Path(path) if path is not None else Path(cls.DEFAULT_DISK_PATH) + if not target.exists(): + return + if not (target / cls._DISKCACHE_MARKER).exists(): + raise ValueError( + f"{str(target)!r} does not look like a sift data cache " + f"directory (missing {cls._DISKCACHE_MARKER!r} marker). " + f"Refusing to delete." + ) + shutil.rmtree(target) + + @property + def disk_enabled(self) -> bool: + """Whether a disk handle is currently open.""" + return self._disk is not None + + @property + def disk_path(self) -> str | None: + """Filesystem path of the cache when enabled, else ``None``.""" + return self._disk_path + + @property + def disk_max_bytes(self) -> int | None: + """Configured byte cap on disk usage, or ``None`` when disabled.""" + return self._disk_max_bytes + + def __contains__(self, key: str) -> bool: + """True if ``key`` is cached. Always ``False`` when disabled.""" + if self._disk is None: + return False + return key in self._disk + + def __iter__(self) -> Iterator[str]: + """Yield cached keys. Lets adapters scope a clear to their prefix. + + Yields nothing when disabled. The underlying diskcache iterator + is snapshot-style, but callers that intend to mutate during + iteration should still wrap with ``list(...)`` to be safe. + + ``diskcache.Cache`` is typed as yielding ``bytes | str | ...`` + because it supports arbitrary key types; the cast narrows to the + ``str`` contract this layer enforces. Adapters never write + non-string keys. + """ + if self._disk is None: + return + for key in self._disk: + yield cast("str", key) + + def enable_disk( + self, + *, + path: str | os.PathLike[str] | None = None, + max_bytes: int | None = None, + ) -> None: + """Open the disk handle, replacing any previous one. + + Reconfiguring to a different ``path`` or ``max_bytes`` closes the + prior handle first. Existing entries at the new path become + available via :meth:`get` without further setup. + + Args: + path: Directory to persist to. ``None`` uses + :attr:`DEFAULT_DISK_PATH`. + max_bytes: Byte cap (``None`` → :attr:`DEFAULT_DISK_MAX_BYTES`). + """ + target_path = str(path) if path is not None else self.DEFAULT_DISK_PATH + target_max = max_bytes if max_bytes is not None else self.DEFAULT_DISK_MAX_BYTES + if ( + self._disk is not None + and self._disk_path == target_path + and self._disk_max_bytes == target_max + ): + return + self._close_disk() + self._open_disk(target_path, target_max) + + def disable_disk(self) -> None: + """Close the disk handle (if open). Does not touch on-disk contents. + + Use :meth:`clear_disk` to remove a directory from disk. + """ + self._close_disk() + + def get(self, key: str) -> Any | None: + """Return the cached value for ``key`` or ``None`` on a miss. + + Returns ``None`` for misses, decoded values for hits, and ``None`` + (after self-invalidating the row) for corrupt entries surfaced + by ``diskcache`` as ``sqlite3.DatabaseError`` or similar. The + caller is expected to ``isinstance``-check the result against + the type they wrote. + """ + if self._disk is None: + return None + try: + return self._disk.get(key, default=None, retry=True) + except Exception: + # diskcache surfaces ``sqlite3.DatabaseError`` (and friends) + # for corrupt or partially-written entries from a prior + # session. Treat as a miss and force-drop the bad row so + # we don't repeatedly trip the same path. + logger.warning("disk cache read failed for %s; invalidating", key) + try: + del self._disk[key] + except Exception: + pass + return None + + def put(self, key: str, value: Any, *, size_bytes: int) -> None: + """Write ``value`` under ``key``. No-op when disabled. + + Entries whose ``size_bytes`` exceeds :attr:`disk_max_bytes` are + skipped with a one-shot warning per key, since diskcache's + eviction loop would otherwise drain every other row trying — and + failing — to fit an oversized entry. Callers are responsible + for measuring the size; the store stays value-agnostic. + + Args: + key: Namespaced key (e.g. ``"channel:"``). Adapters are + responsible for picking a prefix that won't collide with + other adapters writing to the same store. + value: Anything ``diskcache`` can pickle. + size_bytes: Caller-measured size used for the oversize guard. + """ + if self._disk is None: + return + if self._disk_max_bytes is not None and size_bytes > self._disk_max_bytes: + if key not in self._oversized_warned: + logger.warning( + "Entry for %s (%d bytes) is larger than the disk " + "cache cap (%d bytes); skipping disk cache for this " + "entry so other entries aren't evicted. Raise the " + "cap via ``client.cache.enable_disk(max_bytes=...)`` " + "to cache this entry on disk.", + key, + size_bytes, + self._disk_max_bytes, + ) + self._oversized_warned.add(key) + try: + self._disk.delete(key, retry=True) + except Exception: + pass + return + try: + self._disk.set(key, value, retry=True) + self._oversized_warned.discard(key) + except Exception: + # Best-effort persistence: keep going on disk errors so the + # caller's request still succeeds. Drop the (possibly + # partial) disk row. + logger.warning("disk cache write failed for %s; invalidating", key) + try: + self._disk.delete(key, retry=True) + except Exception: + pass + + def invalidate(self, key: str) -> None: + """Remove ``key`` from the cache. Safe to call when absent.""" + # Invalidation is a fresh start for this key; the next put should + # re-evaluate against the current cap and re-warn if still too big. + self._oversized_warned.discard(key) + if self._disk is not None: + try: + self._disk.delete(key, retry=True) + except Exception: + pass + + def clear(self) -> None: + """Wipe every entry from the store. The directory itself remains. + + Spans all adapters sharing the store — typically used at test + teardown or for full reset. Adapters that want to wipe only their + own namespace should iterate ``self`` and call :meth:`invalidate` + on matching keys. + """ + self._oversized_warned.clear() + if self._disk is not None: + self._disk.clear() + + def close(self) -> None: + """Release the disk file handle. Safe to call when disabled.""" + self._close_disk() + + def _open_disk(self, path: str, max_bytes: int) -> None: + import diskcache + + os.makedirs(path, exist_ok=True) + self._disk = diskcache.Cache( + directory=path, + size_limit=max_bytes, + eviction_policy="least-recently-used", + statistics=0, + tag_index=0, + ) + self._disk_path = path + self._disk_max_bytes = max_bytes + + def _close_disk(self) -> None: + if self._disk is None: + return + try: + self._disk.close() + except Exception: + pass + self._disk = None + self._disk_path = None + self._disk_max_bytes = None diff --git a/python/lib/sift_client/_internal/disk_cache_config.py b/python/lib/sift_client/_internal/disk_cache_config.py new file mode 100644 index 000000000..c49eaf442 --- /dev/null +++ b/python/lib/sift_client/_internal/disk_cache_config.py @@ -0,0 +1,87 @@ +"""User-expressed configuration for a resource's optional disk-cache tier.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import os + + +class DiskCacheConfig: + """Holds a resource's disk-cache enable/path/max-bytes intent. + + Resources own one instance, mutate it via :meth:`enable` / :meth:`disable` + in response to user calls, and read the properties at lazy-init time to + decide what kwargs to forward to their cache-aware wrapper. + + The :attr:`using_default_path` property is the key invariant for the + silent-fallback-vs-loud-raise distinction in resource lazy-init code: + if the user picked a specific path and opening fails, the failure + surfaces; if the user left the default and opening fails, the resource + falls back to memory-only without disrupting the call. + + Args: + enabled: Initial enabled state. Pass ``True`` for opt-out (the disk + tier is on by default and users call ``disable`` to turn it off); + pass ``False`` for opt-in (users call ``enable`` to turn it on). + """ + + def __init__(self, *, enabled: bool = True) -> None: + self._enabled = enabled + self._path: str | None = None + self._max_bytes: int | None = None + + @property + def enabled(self) -> bool: + """Whether the disk tier should be opened on the next lazy init.""" + return self._enabled + + @property + def path(self) -> str | None: + """User-supplied disk-cache path, or ``None`` to defer to the cache's default.""" + return self._path + + @property + def max_bytes(self) -> int | None: + """User-supplied disk-cache byte cap, or ``None`` to defer to the cache's default.""" + return self._max_bytes + + @property + def using_default_path(self) -> bool: + """``True`` when the disk tier is enabled *and* the path is the cache's default. + + Resources use this to decide whether to silently fall back to memory + on a disk-open failure (default path: the user didn't ask for it + specifically, so degrade gracefully) or to re-raise (explicit path: + the user asked for it, so failure must surface). + """ + return self._enabled and self._path is None + + def enable( + self, + *, + path: str | os.PathLike[str] | None = None, + max_bytes: int | None = None, + ) -> None: + """Mark the disk tier as enabled, optionally with a custom path or byte cap. + + Args: + path: Directory to persist to. ``None`` leaves the cache's + default in effect. + max_bytes: Byte cap on the disk tier. ``None`` leaves the + cache's default in effect. + """ + self._enabled = True + self._path = str(path) if path is not None else None + self._max_bytes = max_bytes + + def disable(self) -> None: + """Mark the disk tier as disabled and clear any custom path / byte cap. + + Subsequent :meth:`enable` calls re-enable at the cache's defaults + unless overrides are supplied. + """ + self._enabled = False + self._path = None + self._max_bytes = None diff --git a/python/lib/sift_client/_internal/low_level_wrappers/data.py b/python/lib/sift_client/_internal/low_level_wrappers/data.py index 57b24e398..c524a9e03 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/data.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/data.py @@ -16,6 +16,7 @@ ) from sift.data.v2.data_pb2_grpc import DataServiceStub +from sift_client._internal.disk_cache import DiskCache from sift_client._internal.low_level_wrappers.base import LowLevelClientBase from sift_client._internal.time import to_timestamp_nanos from sift_client.sift_types.channel import Channel, ChannelDataType @@ -40,11 +41,114 @@ class ChannelCacheEntry(BaseModel): data: pd.DataFrame start_time: datetime end_time: datetime + size_bytes: int + + +def _new_cache_entry( + data: pd.DataFrame, start_time: datetime, end_time: datetime +) -> ChannelCacheEntry: + return ChannelCacheEntry( + data=data, + start_time=start_time, + end_time=end_time, + size_bytes=int(data.memory_usage(deep=True).sum()), + ) + + +class ChannelDataCache: + """Channel-side adapter over the shared :class:`DiskCache` store. + + The store is owned by :class:`~sift_client.client.SiftClient` and + shared by every cache-aware resource; this adapter is the typed, + namespaced view of it that the channel data path uses. + + Responsibilities the adapter holds onto: + + * **Key namespacing.** Every read/write goes through :meth:`_key`, + which prefixes the channel id with ``channel:``. That keeps a + future calculated-channels or exports adapter on the same store + from colliding on raw resource ids. + * **Typing.** ``put`` only accepts :class:`ChannelCacheEntry`; + ``get`` ``isinstance``-checks the raw value before handing it back, + so a corrupt or cross-adapter row reads as a miss instead of + blowing up downstream pandas code. + * **Size measurement.** The store stays value-agnostic; the adapter + already computes ``size_bytes`` on the entry via + :func:`_new_cache_entry` (``DataFrame.memory_usage(deep=True)``) so + it just forwards that to the store's oversize guard. + * **Resource-side state.** :attr:`name_id_map` lives here because + it's channel-specific bookkeeping needed to wire raw fetch + responses (keyed by channel *name*) back to the cache (keyed by + channel *id*). + + The :class:`DiskCacheAdapter` ``Protocol`` is intentionally not + declared yet — there's only one adapter shape so far. When a second + resource grows its own adapter, extract the Protocol from the two + real shapes rather than guessing from one. + """ + + #: Namespace prefix for keys this adapter writes to the shared + #: :class:`DiskCache`. Picked at class scope so adapters in other + #: resources can pick distinct prefixes without runtime negotiation. + KEY_PREFIX: str = "channel:" + + def __init__(self, store: DiskCache): + """Wrap ``store`` with channel-data semantics. + + Args: + store: The shared :class:`DiskCache` instance owned by the + :class:`SiftClient`. Multiple adapters may share one store. + """ + self._store = store + self.name_id_map: dict[str, str] = {} + + def _key(self, channel_id: str) -> str: + return f"{self.KEY_PREFIX}{channel_id}" + + @property + def store(self) -> DiskCache: + """The shared underlying store. Tests reach in for store-level state.""" + return self._store + def __contains__(self, channel_id: str) -> bool: + """True if the channel is cached. False when the store is disabled.""" + return self._key(channel_id) in self._store -class ChannelCache(BaseModel): - name_id_map: dict[str, str] - channels: dict[str, ChannelCacheEntry] + def get(self, channel_id: str) -> ChannelCacheEntry | None: + """Return the entry for ``channel_id`` if cached, otherwise None. + + Type-checks the raw value before returning so a row written by a + different adapter (or a corrupt entry that survived) reads as a + miss instead of being handed back as the wrong type. + """ + raw = self._store.get(self._key(channel_id)) + if not isinstance(raw, ChannelCacheEntry): + return None + return raw + + def put(self, channel_id: str, entry: ChannelCacheEntry) -> None: + """Insert or replace ``channel_id`` on disk. + + Forwards :attr:`ChannelCacheEntry.size_bytes` to the store so its + oversize guard can decide whether to write or skip+warn. No-op + when the underlying store is disabled. + """ + self._store.put(self._key(channel_id), entry, size_bytes=entry.size_bytes) + + def invalidate(self, channel_id: str) -> None: + """Remove ``channel_id`` from the cache. Safe when absent.""" + self._store.invalidate(self._key(channel_id)) + + def clear(self) -> None: + """Wipe every channel entry. Other adapters' entries are preserved. + + Walks the shared store's keyspace once and drops anything under + :attr:`KEY_PREFIX`. ``list(...)`` snapshots the iterator since + we mutate during iteration. + """ + for key in list(self._store): + if key.startswith(self.KEY_PREFIX): + self._store.invalidate(key) class DataLowLevelClient(LowLevelClientBase, WithGrpcClient): @@ -53,15 +157,29 @@ class DataLowLevelClient(LowLevelClientBase, WithGrpcClient): This class provides a thin wrapper around the autogenerated bindings for the DataAPI. """ - channel_cache: ChannelCache = ChannelCache(name_id_map={}, channels={}) - - def __init__(self, grpc_client: GrpcClient): + def __init__( + self, + grpc_client: GrpcClient, + *, + channel_cache: ChannelDataCache | None = None, + ): """Initialize the DataLowLevelClient. Args: grpc_client: The gRPC client to use for making API calls. + channel_cache: Adapter wrapping the shared :class:`DiskCache` the + :class:`SiftClient` owns. When ``None`` (only the unit-test + construction path), the wrapper falls back to a no-op store + so cache reads/writes are silent. Production callers always + pass an adapter built from ``client._get_disk_cache()``. """ super().__init__(grpc_client) + # Production wires the shared store in via the resource. The fallback + # here lets a bare ``DataLowLevelClient(MagicMock())`` keep working + # in unit tests without forcing every site to plumb a store. + if channel_cache is None: + channel_cache = ChannelDataCache(DiskCache()) + self.channel_cache = channel_cache def _update_name_id_map(self, channels: list[Channel]): """Update the name id map with the new channels.""" @@ -109,7 +227,7 @@ def _filter_cached_channels(self, channel_ids: list[str]) -> tuple[list[str], li cached_channels = [] not_cached_channels = [] for channel_id in channel_ids: - if self.channel_cache.channels.get(channel_id): + if channel_id in self.channel_cache: cached_channels.append(channel_id) else: not_cached_channels.append(channel_id) @@ -139,7 +257,7 @@ def _check_cache( A tuple of (data, start_time, end_time) where data is a pandas dataframe and start and end times are what should be used for the next call based on what is not covered by the cached data. """ - cached_data = self.channel_cache.channels.get(channel_id) + cached_data = self.channel_cache.get(channel_id) ret_start_time = start_time ret_end_time = end_time ret_data = None @@ -204,24 +322,21 @@ def _update_cache( # So we just don't update the cache. continue - if channel_id in self.channel_cache.channels: - self.channel_cache.channels[channel_id].data = ( - pd.concat([self.channel_cache.channels[channel_id].data, data]) - .groupby(level=0) - .last() - ) - self.channel_cache.channels[channel_id].start_time = min( - suggested_start_time, self.channel_cache.channels[channel_id].start_time - ) - self.channel_cache.channels[channel_id].end_time = max( - end_time, self.channel_cache.channels[channel_id].end_time + existing = self.channel_cache.get(channel_id) + if existing is not None: + merged_data = pd.concat([existing.data, data]).groupby(level=0).last() + entry = _new_cache_entry( + data=merged_data, + start_time=min(suggested_start_time, existing.start_time), + end_time=max(end_time, existing.end_time), ) else: - self.channel_cache.channels[channel_id] = ChannelCacheEntry( + entry = _new_cache_entry( data=data, start_time=suggested_start_time, end_time=end_time, ) + self.channel_cache.put(channel_id, entry) async def get_channel_data( self, @@ -298,22 +413,47 @@ async def get_channel_data( tasks.append(task) pages = await asyncio.gather(*tasks) - # Flatten the data - for page in pages: - for data in page: - page_results = self.try_deserialize_channel_data(data) - for name, df in page_results.items(): - if name not in ret_data: - ret_data[name] = df - else: - ret_data[name] = pd.concat([ret_data[name], df]).groupby(level=0).last() + ret_data = self._merge_pages(pages, initial=ret_data) - self._update_cache( - channel_data=ret_data, start_time=start_time, end_time=end_time, run_id=run_id - ) + if not ignore_cache: + self._update_cache( + channel_data=ret_data, start_time=start_time, end_time=end_time, run_id=run_id + ) return ret_data + def _merge_pages( + self, + pages: list[list[Any]], + *, + initial: dict[str, pd.DataFrame], + ) -> dict[str, pd.DataFrame]: + """Flatten paged channel data + any cached slices into one DataFrame per channel. + + ``initial`` carries any cached slices already populated by + ``_check_cache``. Cached entries are folded in as the first frame for + their channel so they participate in the same final concat; + ``groupby(level=0).last()`` preserves the previous behavior of letting + a later-positioned (fresher) value win on duplicate timestamps. + """ + per_channel_frames: dict[str, list[pd.DataFrame]] = {} + for page in pages: + for data in page: + for name, df in self.try_deserialize_channel_data(data).items(): + per_channel_frames.setdefault(name, []).append(df) + + ret_data: dict[str, pd.DataFrame] = dict(initial) + for name, frames in per_channel_frames.items(): + if name in ret_data: + # Cached slice goes first so fresher pages (positioned later + # in the list) win on overlapping timestamps after groupby. + frames.insert(0, ret_data[name]) + if len(frames) == 1: + ret_data[name] = frames[0] + else: + ret_data[name] = pd.concat(frames).groupby(level=0).last() + return ret_data + @staticmethod def try_deserialize_channel_data(channel_data: Any) -> dict[str, pd.DataFrame]: """Deserialize a channel data object into a numpy array.""" diff --git a/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_data.py b/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_data.py new file mode 100644 index 000000000..6e28bd2bd --- /dev/null +++ b/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_data.py @@ -0,0 +1,590 @@ +"""Tests for :mod:`sift_client._internal.low_level_wrappers.data`. + +Four classes, narrowest scope first: + +* :class:`TestChannelDataCache` — the typed adapter over the shared + :class:`DiskCache`. Covers key namespacing, the isinstance guard on + ``get``, and the prefix-scoped ``clear``. +* :class:`TestMergePages` — ``DataLowLevelClient._merge_pages``, the + per-channel concat helper. +* :class:`TestDataLowLevelClient` — constructor wiring and per-instance + isolation. +* :class:`TestGetChannelData` — end-to-end on the public + ``get_channel_data`` API against a mocked ``_get_data_impl``. + +Storage-layer behaviour (oversize guards, marker-checked clear, +cross-session reload) lives in ``_tests/_internal/test_disk_cache.py``; +this file stays focused on the channel-data path. + +The OOM regression that motivated this code happened because the cache +was a class attribute that grew without bound. ``test_per_instance_isolation`` +is the canary that catches anyone re-introducing that pattern, even though +ownership has since moved to the client. +""" + +from __future__ import annotations + +from contextlib import contextmanager +from datetime import datetime, timedelta, timezone +from typing import Any, Iterator +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytest + +from sift_client._internal.disk_cache import DiskCache +from sift_client._internal.low_level_wrappers.data import ( + ChannelCacheEntry, + ChannelDataCache, + DataLowLevelClient, + _new_cache_entry, +) +from sift_client.sift_types.channel import Channel, ChannelDataType + +_NOW = datetime(2025, 1, 1, tzinfo=timezone.utc) +_WINDOW_END = _NOW + timedelta(days=1) + + +# ---------- shared helpers ----------- + + +def _frame( + cid: str = "value", + *, + rows: int = 5, + start: datetime = _NOW, + offset: int = 0, + freq: str = "ms", + value_dtype: str = "float64", +) -> pd.DataFrame: + """DataFrame indexed by a tz-aware DatetimeIndex with ``rows`` rows.""" + index = pd.date_range(start, periods=rows, freq=freq, tz=timezone.utc) + return pd.DataFrame( + {cid: [(offset + i) * 1.0 for i in range(rows)]}, + index=index, + ).astype({cid: value_dtype}) + + +def _entry(*, rows: int = 5, value_dtype: str = "float64") -> ChannelCacheEntry: + """``ChannelCacheEntry`` wrapping a small generated DataFrame.""" + data = _frame(rows=rows, value_dtype=value_dtype) + return _new_cache_entry( + data=data, + start_time=data.index[0].to_pydatetime(), + end_time=data.index[-1].to_pydatetime(), + ) + + +def _channel(cid: str) -> Channel: + """Minimal ``Channel`` with required fields populated.""" + return Channel( + id_=cid, + name=cid, + data_type=ChannelDataType.DOUBLE, + description="", + unit="", + asset_id="a1", + is_archived=False, + created_date=_NOW, + modified_date=_NOW, + created_by_user_id="u1", + modified_by_user_id="u1", + ) + + +def _client_with_cache(tmp_path, subdir: str = "cache") -> DataLowLevelClient: + """Build a ``DataLowLevelClient`` whose adapter points at ``tmp_path``. + + Tests that exercise cache behaviour (hits/misses) need an actual + disk-backed adapter, so the store has to be opened explicitly. A + plain ``DataLowLevelClient(MagicMock())`` defaults to a no-op store + and would silently turn every cache test into a wire-path test. + """ + store = DiskCache(disk_path=tmp_path / subdir) + return DataLowLevelClient(MagicMock(), channel_cache=ChannelDataCache(store)) + + +def _patch_deserializer(sentinel_to_frames: dict[str, dict[str, pd.DataFrame]]) -> Any: + """Patch ``try_deserialize_channel_data`` to translate string sentinels. + + Lets tests pass strings in lieu of protos. Returned object is a context + manager; callers use ``with _patch_deserializer(...):``. + """ + return patch.object( + DataLowLevelClient, + "try_deserialize_channel_data", + staticmethod(lambda s: sentinel_to_frames[s]), + ) + + +@contextmanager +def _fake_grpc( + client: DataLowLevelClient, + channel_to_pages: dict[str, list[pd.DataFrame]], +) -> Iterator[list[dict[str, Any]]]: + """Mock the gRPC boundary so each "page" is a sentinel string. + + ``_get_data_impl`` is replaced with a coroutine that pops one DataFrame + off ``channel_to_pages[cid]`` per call per channel, until exhausted. + ``try_deserialize_channel_data`` is patched to map the sentinel back to + the corresponding ``{channel: DataFrame}`` dict. + + Yields a ``call_log`` list so tests can assert which channels actually + hit the wire. The patch is torn down and ``_get_data_impl`` restored on + exit. + """ + sentinel_to_frames: dict[str, dict[str, pd.DataFrame]] = {} + next_page_index: dict[str, int] = dict.fromkeys(channel_to_pages, 0) + call_log: list[dict[str, Any]] = [] + + async def fake_impl( + *, + channel_ids: list[str], + page_size: int | None = None, + page_token: str | None = None, + order_by: str | None = None, + **kwargs: Any, + ) -> tuple[list[str], str]: + call_log.append({"channel_ids": list(channel_ids), **kwargs}) + data: list[str] = [] + more_remaining = False + for cid in channel_ids: + i = next_page_index[cid] + if i >= len(channel_to_pages[cid]): + continue # this channel is exhausted; just emit nothing + sentinel = f"{cid}|{i}" + sentinel_to_frames[sentinel] = {cid: channel_to_pages[cid][i]} + data.append(sentinel) + next_page_index[cid] += 1 + if next_page_index[cid] < len(channel_to_pages[cid]): + more_remaining = True + # ``_handle_pagination`` loops until it sees ``page_token == ""``. + return data, ("next" if more_remaining else "") + + original_impl = client._get_data_impl + client._get_data_impl = fake_impl # type: ignore[method-assign] + try: + with _patch_deserializer(sentinel_to_frames): + yield call_log + finally: + client._get_data_impl = original_impl # type: ignore[method-assign] + + +# ---------- tests ----------- + + +class TestChannelDataCache: + """The typed adapter over the shared :class:`DiskCache`. + + Three invariants get pinned: + + 1. Every operation routes through the namespaced key + (``channel:``), so two adapters sharing one store don't + collide on bare resource ids. + 2. :meth:`ChannelDataCache.get` returns ``None`` on a type-mismatch + hit (e.g. a row another adapter wrote) instead of handing + arbitrary objects to downstream pandas code. + 3. :meth:`ChannelDataCache.clear` wipes only the adapter's namespace + — entries belonging to other adapters survive. + + Store-level behaviour (oversized guards, cross-session reload, + marker-checked clear_disk) is exercised in ``test_disk_cache.py``. + """ + + def test_get_miss_returns_none(self, tmp_path): + adapter = ChannelDataCache(DiskCache(disk_path=tmp_path / "miss")) + try: + assert "c1" not in adapter + assert adapter.get("c1") is None + finally: + adapter.store.close() + + def test_round_trip(self, tmp_path): + """Put then get returns an equivalent entry.""" + adapter = ChannelDataCache(DiskCache(disk_path=tmp_path / "rt")) + try: + entry = _entry(rows=8) + adapter.put("c1", entry) + assert "c1" in adapter + got = adapter.get("c1") + assert got is not None + pd.testing.assert_frame_equal(got.data, entry.data) + assert got.start_time == entry.start_time + assert got.end_time == entry.end_time + finally: + adapter.store.close() + + def test_writes_use_namespaced_key(self, tmp_path): + """The raw store sees ``channel:``, not the bare id. + + Pins the key-shape contract two adapters share. Without it, a + second adapter that happens to share an id with the channel + adapter would clobber the channel row. + """ + store = DiskCache(disk_path=tmp_path / "ns") + adapter = ChannelDataCache(store) + try: + adapter.put("c1", _entry(rows=4)) + assert "channel:c1" in store + assert "c1" not in store + finally: + store.close() + + def test_get_isinstance_check_filters_foreign_rows(self, tmp_path): + """A row whose value isn't a ChannelCacheEntry reads as a miss. + + Models a corrupt entry or a key collision from another writer. + ``ChannelDataCache.get`` must isinstance-check the raw value so + callers downstream never receive the wrong shape. + """ + store = DiskCache(disk_path=tmp_path / "foreign") + adapter = ChannelDataCache(store) + try: + store.put("channel:c1", {"not": "an entry"}, size_bytes=64) + assert adapter.get("c1") is None + finally: + store.close() + + def test_invalidate_removes_entry(self, tmp_path): + adapter = ChannelDataCache(DiskCache(disk_path=tmp_path / "inval")) + try: + adapter.invalidate("never_added") # safe before any puts + adapter.put("c1", _entry(rows=4)) + adapter.invalidate("c1") + assert "c1" not in adapter + assert adapter.get("c1") is None + finally: + adapter.store.close() + + def test_clear_is_prefix_scoped(self, tmp_path): + """``clear`` drops channel rows but leaves other adapters' rows alone. + + Simulates a second resource writing to the same store with a + different prefix; the channel adapter's clear must not be a + whole-store wipe. + """ + store = DiskCache(disk_path=tmp_path / "scoped") + adapter = ChannelDataCache(store) + try: + adapter.put("c1", _entry(rows=4)) + adapter.put("c2", _entry(rows=4)) + # Simulate a row written by a different adapter. + store.put("other:1", "foreign-value", size_bytes=64) + adapter.clear() + assert "c1" not in adapter + assert "c2" not in adapter + assert "other:1" in store + finally: + store.close() + + def test_size_bytes_propagates_to_store(self, tmp_path): + """The adapter forwards the entry's ``size_bytes`` to the store guard. + + Sized below the entry's actual ``size_bytes`` so the store's + oversize guard kicks in. The adapter never measures size itself; + it relies on ``_new_cache_entry`` having stamped the value. + """ + entry = _entry(rows=10_000) + store = DiskCache(disk_path=tmp_path / "size", disk_max_bytes=entry.size_bytes // 2) + adapter = ChannelDataCache(store) + try: + adapter.put("c1", entry) + assert "c1" not in adapter # oversize skipped by the store + finally: + store.close() + + def test_no_op_store_keeps_adapter_silent(self): + """An adapter on a disabled store behaves like a cold cache. + + Disabling the store is the path ``client.cache.disable_disk()`` + exercises; resources can keep their adapter reference and every + operation just no-ops. + """ + adapter = ChannelDataCache(DiskCache()) + assert not adapter.store.disk_enabled + adapter.put("c1", _entry(rows=4)) + assert "c1" not in adapter + assert adapter.get("c1") is None + adapter.invalidate("c1") + adapter.clear() + + +class TestMergePages: + """Behaviour of :meth:`DataLowLevelClient._merge_pages`. + + The helper replaces a previously inline O(N²) per-page concat loop with + a single batched concat per channel. These tests pin the merge + semantics so a future refactor can't silently drift: + + * Single-frame channels skip the concat entirely (cheap identity path). + * Multi-frame channels concat in collected order; ``groupby.last`` + makes the latest frame win on overlapping timestamps. + * Cached slices from ``_check_cache`` are folded in as the *first* + frame so fresh pages still win on overlap. + """ + + @pytest.mark.parametrize("pages", [[], [[]]], ids=["no_tasks_queued", "task_returned_empty"]) + def test_no_fresh_data_returns_initial(self, pages: list) -> None: + """No fresh pages → initial dict passes through by identity.""" + client = DataLowLevelClient(MagicMock()) + initial_df = _frame("chan", rows=5) + with _patch_deserializer({}): + result = client._merge_pages(pages=pages, initial={"chan": initial_df}) + assert result["chan"] is initial_df + + def test_single_frame_skips_concat(self) -> None: + """One frame for a channel → returned by identity, no concat call.""" + only_df = _frame("chan", rows=5) + client = DataLowLevelClient(MagicMock()) + with _patch_deserializer({"p1": {"chan": only_df}}): + result = client._merge_pages(pages=[["p1"]], initial={}) + assert result["chan"] is only_df + + def test_disjoint_pages_concat_in_order(self) -> None: + """Multiple disjoint pages for one channel → single concat result.""" + df1 = _frame("chan", rows=10, start=_NOW, offset=0, freq="s") + df2 = _frame("chan", rows=10, start=_NOW + timedelta(minutes=1), offset=10, freq="s") + df3 = _frame("chan", rows=10, start=_NOW + timedelta(minutes=2), offset=20, freq="s") + client = DataLowLevelClient(MagicMock()) + sentinels = {"p1": {"chan": df1}, "p2": {"chan": df2}, "p3": {"chan": df3}} + with _patch_deserializer(sentinels): + result = client._merge_pages(pages=[["p1", "p2"], ["p3"]], initial={}) + expected = pd.concat([df1, df2, df3]).groupby(level=0).last() + pd.testing.assert_frame_equal(result["chan"].sort_index(), expected.sort_index()) + assert len(result["chan"]) == 30 + + def test_overlapping_timestamps_later_page_wins(self) -> None: + """On overlap, the later page's value survives ``groupby.last``. + + Pins the old inline ``concat([acc, new]).groupby(level=0).last()`` + semantic: latest concat position wins on conflict. + """ + index = pd.date_range(_NOW, periods=5, freq="ms", tz=timezone.utc) + df_first = pd.DataFrame({"chan": [0] * 5}, index=index) + df_second = pd.DataFrame({"chan": [99] * 5}, index=index) + client = DataLowLevelClient(MagicMock()) + with _patch_deserializer({"p1": {"chan": df_first}, "p2": {"chan": df_second}}): + result = client._merge_pages(pages=[["p1", "p2"]], initial={}) + assert (result["chan"]["chan"] == 99).all() + + def test_cached_slice_folded_in_first_and_loses_on_overlap(self) -> None: + """Cached slice from ``_check_cache`` is the first frame in the merge. + + Fresh pages must overwrite cached values on duplicate timestamps, + matching the pre-existing "latest fetch wins" semantic. + """ + index = pd.date_range(_NOW, periods=5, freq="ms", tz=timezone.utc) + cached = pd.DataFrame({"chan": [-1] * 5}, index=index) + fresh = pd.DataFrame({"chan": [42] * 5}, index=index) + client = DataLowLevelClient(MagicMock()) + with _patch_deserializer({"p1": {"chan": fresh}}): + result = client._merge_pages(pages=[["p1"]], initial={"chan": cached}) + assert (result["chan"]["chan"] == 42).all() + + def test_multiple_channels_independent(self) -> None: + """Per-channel grouping is independent: one channel's pages don't bleed.""" + a1 = _frame("a", rows=5, start=_NOW, offset=0, freq="s") + a2 = _frame("a", rows=5, start=_NOW + timedelta(minutes=1), offset=5, freq="s") + b1 = _frame("b", rows=5, start=_NOW, offset=100, freq="s") + client = DataLowLevelClient(MagicMock()) + sentinels = {"p_a1": {"a": a1}, "p_a2": {"a": a2}, "p_b1": {"b": b1}} + with _patch_deserializer(sentinels): + result = client._merge_pages(pages=[["p_a1", "p_b1"], ["p_a2"]], initial={}) + assert len(result["a"]) == 10 + assert len(result["b"]) == 5 + assert (result["b"]["b"] >= 100).all() + + def test_does_not_mutate_initial(self) -> None: + """``initial`` is a defensive copy; caller's dict isn't mutated.""" + cached = _frame("chan", rows=5) + initial = {"chan": cached} + fresh = _frame("chan", rows=5, start=_NOW + timedelta(seconds=1), offset=10) + client = DataLowLevelClient(MagicMock()) + with _patch_deserializer({"p1": {"chan": fresh}}): + client._merge_pages(pages=[["p1"]], initial=initial) + assert initial["chan"] is cached + + +class TestDataLowLevelClient: + """Constructor wiring and per-instance isolation. + + Per-call behaviour (cache hits, ``ignore_cache``, pagination) lives in + :class:`TestGetChannelData`. + """ + + def test_default_construction_uses_no_op_store(self) -> None: + """Default construction leaves the adapter wrapping a disabled store. + + Resources wire the shared store in via the keyword arg; the + ``MagicMock()``-only path here keeps unit tests free of disk I/O. + """ + client = DataLowLevelClient(MagicMock()) + assert isinstance(client.channel_cache, ChannelDataCache) + assert not client.channel_cache.store.disk_enabled + + def test_per_instance_isolation(self, tmp_path) -> None: + """Two clients with distinct stores must not share cache state. + + Regression test for the original OOM bug: ``channel_cache`` was a + class attribute, so every ``SiftClient`` in the process appended + to the same dict. Two fresh adapters over independent stores must + stay independent — even now that store ownership has moved to the + client, the contract is the same. + """ + client_a = _client_with_cache(tmp_path, "a") + client_b = _client_with_cache(tmp_path, "b") + try: + client_a.channel_cache.put("c1", _entry(rows=10)) + assert "c1" in client_a.channel_cache + assert "c1" not in client_b.channel_cache + finally: + client_a.channel_cache.store.close() + client_b.channel_cache.store.close() + + def test_adapter_kwarg_propagates(self, tmp_path) -> None: + """The constructor honours an externally-constructed adapter. + + Mirrors the production wiring where ``ChannelsAPIAsync`` builds + the adapter from ``client._get_disk_cache()`` and hands it in. + """ + store = DiskCache(disk_path=tmp_path / "external", disk_max_bytes=8_192) + adapter = ChannelDataCache(store) + client = DataLowLevelClient(MagicMock(), channel_cache=adapter) + try: + assert client.channel_cache is adapter + assert client.channel_cache.store is store + assert client.channel_cache.store.disk_max_bytes == 8_192 + finally: + store.close() + + +class TestGetChannelData: + """End-to-end assertions on the public ``get_channel_data`` return shape.""" + + @pytest.mark.asyncio + async def test_single_page_per_channel(self) -> None: + """Result is keyed by channel name; single-page frames pass through unchanged.""" + client = DataLowLevelClient(MagicMock()) + c1_df, c2_df = _frame("c1"), _frame("c2", offset=100) + with _fake_grpc(client, {"c1": [c1_df], "c2": [c2_df]}): + result = await client.get_channel_data( + channels=[_channel("c1"), _channel("c2")], + start_time=_NOW, + end_time=_WINDOW_END, + ignore_cache=True, + ) + assert set(result.keys()) == {"c1", "c2"} + pd.testing.assert_frame_equal(result["c1"], c1_df) + pd.testing.assert_frame_equal(result["c2"], c2_df) + + @pytest.mark.asyncio + async def test_multi_page_response_concatenated_per_channel(self) -> None: + """Three disjoint pages for one channel → single merged frame. + + Catches regressions in the ``_handle_pagination`` + ``_merge_pages`` + interaction (the perf fix's batched concat must still produce the + full 30-row contiguous result). + """ + client = DataLowLevelClient(MagicMock()) + p1 = _frame("c1", rows=10, start=_NOW, offset=0) + p2 = _frame("c1", rows=10, start=_NOW + timedelta(seconds=1), offset=10) + p3 = _frame("c1", rows=10, start=_NOW + timedelta(seconds=2), offset=20) + with _fake_grpc(client, {"c1": [p1, p2, p3]}): + result = await client.get_channel_data( + channels=[_channel("c1")], + start_time=_NOW, + end_time=_WINDOW_END, + ignore_cache=True, + ) + assert set(result.keys()) == {"c1"} + assert len(result["c1"]) == 30 + expected = pd.concat([p1, p2, p3]).groupby(level=0).last() + pd.testing.assert_frame_equal(result["c1"].sort_index(), expected.sort_index()) + + @pytest.mark.asyncio + async def test_cache_hit_short_circuits_grpc(self, tmp_path) -> None: + """Second request for the same channel + window skips ``_get_data_impl``. + + Stages two pages-worth of data so a faulty cache that falls through + wouldn't silently pass by hitting EOF — any second-call invocation + would consume the second page and bump ``len(call_log)``. + """ + client = _client_with_cache(tmp_path) + df = _frame("c1") + try: + with _fake_grpc(client, {"c1": [df, df]}) as call_log: + first = await client.get_channel_data( + channels=[_channel("c1")], + start_time=_NOW, + end_time=_WINDOW_END, + ) + calls_after_first = len(call_log) + assert calls_after_first >= 1 + + second = await client.get_channel_data( + channels=[_channel("c1")], + start_time=_NOW, + end_time=_WINDOW_END, + ) + assert len(call_log) == calls_after_first, ( + "second call should be served from cache without invoking _get_data_impl" + ) + pd.testing.assert_frame_equal(first["c1"].sort_index(), second["c1"].sort_index()) + finally: + client.channel_cache.store.close() + + @pytest.mark.asyncio + async def test_partial_cache_hit_merges_cached_and_fresh(self, tmp_path) -> None: + """Cached + uncached channels resolved together in one return dict. + + Only the uncached channel triggers ``_get_data_impl``. + """ + client = _client_with_cache(tmp_path) + c1_df, c2_df = _frame("c1"), _frame("c2", offset=100) + try: + with _fake_grpc(client, {"c1": [c1_df], "c2": [c2_df]}) as call_log: + await client.get_channel_data( + channels=[_channel("c1")], + start_time=_NOW, + end_time=_WINDOW_END, + ) + calls_after_warmup = len(call_log) + + result = await client.get_channel_data( + channels=[_channel("c1"), _channel("c2")], + start_time=_NOW, + end_time=_WINDOW_END, + ) + new_calls = call_log[calls_after_warmup:] + + assert new_calls, "c2 should hit the wire on the second call" + for call in new_calls: + assert call["channel_ids"] == ["c2"], f"only c2 should hit the wire, saw {call!r}" + assert set(result.keys()) == {"c1", "c2"} + pd.testing.assert_frame_equal(result["c1"].sort_index(), c1_df.sort_index()) + pd.testing.assert_frame_equal(result["c2"].sort_index(), c2_df.sort_index()) + finally: + client.channel_cache.store.close() + + @pytest.mark.asyncio + async def test_ignore_cache_true_returns_fresh_and_skips_write(self, tmp_path) -> None: + """``ignore_cache=True`` returns mock data and leaves the cache empty. + + End-to-end version of the latent bug that compounded the customer's + OOM: pre-fix, ``_update_cache`` ran even when the caller had asked + the cache to be ignored. + """ + client = _client_with_cache(tmp_path) + df = _frame("c1") + try: + with _fake_grpc(client, {"c1": [df]}): + result = await client.get_channel_data( + channels=[_channel("c1")], + start_time=_NOW, + end_time=_WINDOW_END, + ignore_cache=True, + ) + pd.testing.assert_frame_equal(result["c1"], df) + assert "c1" not in client.channel_cache + finally: + client.channel_cache.store.close() diff --git a/python/lib/sift_client/_tests/_internal/test_disk_cache.py b/python/lib/sift_client/_tests/_internal/test_disk_cache.py new file mode 100644 index 000000000..5711580a6 --- /dev/null +++ b/python/lib/sift_client/_tests/_internal/test_disk_cache.py @@ -0,0 +1,341 @@ +"""Tests for :mod:`sift_client._internal.disk_cache`. + +Two classes, narrowest scope first: + +* :class:`TestDiskCache` — direct unit tests on :class:`DiskCache`: + the disabled-when-no-path no-op, fresh writes/reads, cross-session + reload, oversize guard + dedup keyed on the full namespaced key, and + the marker-guarded :meth:`DiskCache.clear_disk` classmethod. +* :class:`TestClearDisk` — the classmethod's defensive guards. + +The store is intentionally key/value-agnostic — every test treats it as +a plain ``str``-keyed dict that happens to persist across handles, with +``size_bytes`` supplied by the caller. The channel-specific adapter +(:class:`ChannelDataCache`) is exercised separately in ``test_data.py``. +""" + +from __future__ import annotations + +import logging +from contextlib import contextmanager +from typing import Iterator + +import pytest + +from sift_client._internal.disk_cache import DiskCache + +# Snapshot of the production constant captured at import time. The autouse +# ``_isolate_default_disk_cache_path`` fixture in ``conftest.py`` overrides +# the class attribute per test; the constant-shape test still needs the +# real value to assert against. +_PRODUCTION_DEFAULT_DISK_PATH = DiskCache.DEFAULT_DISK_PATH + + +@contextmanager +def _capture_disk_cache_warnings() -> Iterator[list[logging.LogRecord]]: + """Capture warnings emitted by the disk-cache logger directly. + + Pytest's ``caplog`` reads from the root logger, but the Sift pytest + plugin sets ``propagate=False`` on the ``sift_client`` logger when + audit logging is active, so records emitted from any descendant don't + reach the root. Attaching a list-backed handler at the leaf logger + bypasses that. + """ + target = logging.getLogger("sift_client._internal.disk_cache") + records: list[logging.LogRecord] = [] + + class _ListHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + records.append(record) + + handler = _ListHandler(level=logging.WARNING) + target.addHandler(handler) + try: + yield records + finally: + target.removeHandler(handler) + + +class TestDiskCache: + """End-to-end behaviour of the shared on-disk store.""" + + def test_disabled_when_no_path(self) -> None: + """``DiskCache()`` with no ``disk_path`` is a silent no-op.""" + cache = DiskCache() + assert cache.disk_enabled is False + assert cache.disk_path is None + assert cache.disk_max_bytes is None + # Every operation no-ops; no AttributeError, no warning. + cache.put("k", "v", size_bytes=4) + assert "k" not in cache + assert cache.get("k") is None + assert list(iter(cache)) == [] + cache.invalidate("k") + cache.clear() + cache.close() + + def test_fresh_cache_writes_and_reads(self, tmp_path) -> None: + """A fresh disk directory accepts writes and serves them back.""" + cache = DiskCache(disk_path=tmp_path / "fresh") + try: + assert cache.disk_enabled + assert cache.disk_path == str(tmp_path / "fresh") + assert cache.disk_max_bytes == DiskCache.DEFAULT_DISK_MAX_BYTES + cache.put("k", {"hello": "world"}, size_bytes=64) + assert "k" in cache + assert cache.get("k") == {"hello": "world"} + finally: + cache.close() + + def test_reopen_existing_dir_sees_prior_session_entries(self, tmp_path) -> None: + """Closing then reopening at the same path surfaces prior entries. + + This is the cold-start reuse guarantee: a fresh process pointing + at a populated directory reads back what an earlier process wrote. + """ + path = tmp_path / "prev-session" + session1 = DiskCache(disk_path=path) + session1.put("k", [1, 2, 3], size_bytes=24) + session1.close() + + session2 = DiskCache(disk_path=path) + try: + assert "k" in session2 + assert session2.get("k") == [1, 2, 3] + finally: + session2.close() + + def test_repeated_put_overwrites(self, tmp_path) -> None: + cache = DiskCache(disk_path=tmp_path / "overwrite") + try: + cache.put("k", "first", size_bytes=8) + cache.put("k", "second", size_bytes=8) + assert cache.get("k") == "second" + finally: + cache.close() + + def test_invalidate_removes_entry(self, tmp_path) -> None: + cache = DiskCache(disk_path=tmp_path / "inval") + try: + cache.invalidate("never_added") # safe before any puts + cache.put("k", "v", size_bytes=4) + cache.invalidate("k") + assert "k" not in cache + assert cache.get("k") is None + finally: + cache.close() + + def test_clear_wipes_store(self, tmp_path) -> None: + cache = DiskCache(disk_path=tmp_path / "clear") + try: + cache.put("a", 1, size_bytes=8) + cache.put("b", 2, size_bytes=8) + cache.clear() + assert "a" not in cache + assert "b" not in cache + finally: + cache.close() + + def test_iter_yields_keys(self, tmp_path) -> None: + """``__iter__`` exposes the keyspace so adapters can prefix-clear.""" + cache = DiskCache(disk_path=tmp_path / "iter") + try: + cache.put("alpha:1", 1, size_bytes=8) + cache.put("beta:1", 2, size_bytes=8) + cache.put("alpha:2", 3, size_bytes=8) + assert set(cache) == {"alpha:1", "alpha:2", "beta:1"} + finally: + cache.close() + + def test_disable_disk_closes_handle(self, tmp_path) -> None: + """Turning off disk closes the handle and silences subsequent ops.""" + cache = DiskCache(disk_path=tmp_path / "disable") + try: + cache.put("k", "v", size_bytes=4) + cache.disable_disk() + assert not cache.disk_enabled + assert cache.disk_path is None + assert "k" not in cache + assert cache.get("k") is None + cache.put("new", "x", size_bytes=4) # silently dropped + assert "new" not in cache + finally: + cache.close() + + def test_enable_disk_reconfigures_path(self, tmp_path) -> None: + """Reconfiguring to a different path closes the old handle. + + The new directory starts empty: ``k`` lived in the old directory + so the lookup at the new path misses. + """ + cache = DiskCache(disk_path=tmp_path / "a") + try: + cache.put("k", "v", size_bytes=4) + cache.enable_disk(path=tmp_path / "b") + assert cache.disk_path == str(tmp_path / "b") + assert "k" not in cache + finally: + cache.close() + + def test_enable_disk_noop_when_same_settings(self, tmp_path) -> None: + """Re-enabling with identical settings doesn't churn the disk handle.""" + cache = DiskCache(disk_path=tmp_path / "noop") + try: + handle_before = cache._disk + cache.enable_disk(path=tmp_path / "noop", max_bytes=DiskCache.DEFAULT_DISK_MAX_BYTES) + assert cache._disk is handle_before + finally: + cache.close() + + def test_oversized_entry_skipped_and_preserves_neighbours(self, tmp_path) -> None: + """An entry larger than the cap is skipped without evicting peers. + + Without this guard, ``diskcache``'s cull would evict every other + row trying to fit an unfittable entry, then drop the entry itself + — the wipe-everything failure mode the cache work originally fixed. + + Cap is sized to leave plenty of room for diskcache's pickle + envelope around the small entries while still being small enough + that the declared oversized ``size_bytes`` (10 MB) trips the + guard. ``size_bytes`` is the caller's contract — the store + compares that, not the actual on-disk size. + """ + cap = 1 * 1024 * 1024 # 1 MiB + cache = DiskCache(disk_path=tmp_path / "oversize", disk_max_bytes=cap) + try: + cache.put("small-1", "value", size_bytes=64) + cache.put("small-2", "value", size_bytes=64) + with _capture_disk_cache_warnings() as records: + cache.put("huge", "value", size_bytes=10 * 1024 * 1024) + assert "small-1" in cache + assert "small-2" in cache + assert "huge" not in cache + assert any("larger than the disk cache cap" in r.getMessage() for r in records) + finally: + cache.close() + + def test_oversized_put_drops_prior_entry(self, tmp_path) -> None: + """An oversized re-insert must drop the prior value, not silently keep it.""" + cap = 1 * 1024 * 1024 + cache = DiskCache(disk_path=tmp_path / "drop-prior", disk_max_bytes=cap) + try: + cache.put("k", "small", size_bytes=64) + assert "k" in cache + cache.put("k", "big", size_bytes=10 * 1024 * 1024) + assert "k" not in cache + finally: + cache.close() + + def test_oversized_put_warns_once_per_key(self, tmp_path) -> None: + """Repeated oversized puts for the same key log once, not every call.""" + cap = 1 * 1024 * 1024 + cache = DiskCache(disk_path=tmp_path / "dedup", disk_max_bytes=cap) + try: + with _capture_disk_cache_warnings() as records: + for _ in range(5): + cache.put("k", "v", size_bytes=10 * 1024 * 1024) + warnings = [r for r in records if "larger than the disk cache cap" in r.getMessage()] + assert len(warnings) == 1 + finally: + cache.close() + + def test_oversized_warning_resets_after_normal_put(self, tmp_path) -> None: + """A successful normal-sized put clears the dedup bit for that key.""" + cap = 1 * 1024 * 1024 + cache = DiskCache(disk_path=tmp_path / "reset-normal", disk_max_bytes=cap) + try: + with _capture_disk_cache_warnings() as records: + cache.put("k", "v", size_bytes=10 * 1024 * 1024) # 1st warning + cache.put("k", "v", size_bytes=64) # resets state + cache.put("k", "v", size_bytes=10 * 1024 * 1024) # 2nd warning + warnings = [r for r in records if "larger than the disk cache cap" in r.getMessage()] + assert len(warnings) == 2 + finally: + cache.close() + + def test_dedup_keys_on_full_namespaced_key(self, tmp_path) -> None: + """Dedup is per-key, so two adapters' colliding bare ids don't share state. + + Pins the design choice that the oversize warning dedup tracks the + full namespaced key handed to ``put`` (e.g. ``channel:foo`` vs + ``calc:foo``) rather than collapsing on the bare id. Two different + prefixes for the same suffix each get their own one-shot warning. + """ + cap = 1 * 1024 * 1024 + cache = DiskCache(disk_path=tmp_path / "two-prefixes", disk_max_bytes=cap) + try: + with _capture_disk_cache_warnings() as records: + cache.put("alpha:foo", "v", size_bytes=10 * 1024 * 1024) + cache.put("beta:foo", "v", size_bytes=10 * 1024 * 1024) + warnings = [r for r in records if "larger than the disk cache cap" in r.getMessage()] + assert len(warnings) == 2 + messages = [r.getMessage() for r in warnings] + assert any("alpha:foo" in m for m in messages) + assert any("beta:foo" in m for m in messages) + finally: + cache.close() + + def test_invalidate_resets_oversized_warning(self, tmp_path) -> None: + cap = 1 * 1024 * 1024 + cache = DiskCache(disk_path=tmp_path / "reset-inval", disk_max_bytes=cap) + try: + with _capture_disk_cache_warnings() as records: + cache.put("k", "v", size_bytes=10 * 1024 * 1024) + cache.invalidate("k") + cache.put("k", "v", size_bytes=10 * 1024 * 1024) + warnings = [r for r in records if "larger than the disk cache cap" in r.getMessage()] + assert len(warnings) == 2 + finally: + cache.close() + + def test_clear_resets_oversized_warning(self, tmp_path) -> None: + cap = 1 * 1024 * 1024 + cache = DiskCache(disk_path=tmp_path / "reset-clear", disk_max_bytes=cap) + try: + with _capture_disk_cache_warnings() as records: + cache.put("a", "v", size_bytes=10 * 1024 * 1024) + cache.put("b", "v", size_bytes=10 * 1024 * 1024) + cache.clear() + cache.put("a", "v", size_bytes=10 * 1024 * 1024) + cache.put("b", "v", size_bytes=10 * 1024 * 1024) + warnings = [r for r in records if "larger than the disk cache cap" in r.getMessage()] + assert len(warnings) == 4 + finally: + cache.close() + + +class TestClearDisk: + """:meth:`DiskCache.clear_disk` removes a cache dir, refuses other dirs.""" + + def test_clear_removes_directory(self, tmp_path) -> None: + path = tmp_path / "victim" + cache = DiskCache(disk_path=path) + cache.put("k", "v", size_bytes=4) + cache.close() + assert path.exists() + DiskCache.clear_disk(path) + assert not path.exists() + + def test_clear_missing_path_is_noop(self, tmp_path) -> None: + DiskCache.clear_disk(tmp_path / "never-existed") # no raise + + def test_clear_refuses_non_diskcache_directory(self, tmp_path) -> None: + """A typo'd path with unrelated contents must not be wiped.""" + target = tmp_path / "user-stuff" + target.mkdir() + (target / "important.txt").write_text("don't delete me") + with pytest.raises(ValueError, match="does not look like a sift data cache"): + DiskCache.clear_disk(target) + assert (target / "important.txt").read_text() == "don't delete me" + + def test_default_path_constant_under_tmp(self) -> None: + """Default lives under the OS tmp dir, not a user directory. + + Reads the module-level snapshot rather than ``DEFAULT_DISK_PATH`` + directly because the autouse fixture monkeypatches that attribute + for every test. + """ + import tempfile + + assert _PRODUCTION_DEFAULT_DISK_PATH.startswith(tempfile.gettempdir()) + assert _PRODUCTION_DEFAULT_DISK_PATH.endswith("sift-data-cache") diff --git a/python/lib/sift_client/_tests/_internal/test_disk_cache_config.py b/python/lib/sift_client/_tests/_internal/test_disk_cache_config.py new file mode 100644 index 000000000..bce8a4ab9 --- /dev/null +++ b/python/lib/sift_client/_tests/_internal/test_disk_cache_config.py @@ -0,0 +1,112 @@ +"""Tests for :class:`sift_client._internal.disk_cache_config.DiskCacheConfig`. + +The class is a small intent holder; the tests pin three things that +resource lazy-init code relies on: + +* Enable / disable round-trips preserve the right state and clear overrides. +* ``using_default_path`` reflects "enabled AND no user override", which + drives the silent-fallback-vs-loud-raise distinction in resources. +* ``enable`` accepts ``os.PathLike`` and stringifies it eagerly so consumers + never need to handle ``pathlib.Path`` vs ``str``. +""" + +from __future__ import annotations + +import pathlib + +import pytest + +from sift_client._internal.disk_cache_config import DiskCacheConfig + + +class TestDiskCacheConfig: + def test_opt_out_initial_state_enabled_no_overrides(self) -> None: + """``enabled=True`` (opt-out) starts on with no overrides.""" + config = DiskCacheConfig(enabled=True) + assert config.enabled + assert config.path is None + assert config.max_bytes is None + assert config.using_default_path + + def test_opt_in_initial_state_disabled(self) -> None: + """``enabled=False`` (opt-in) starts off; ``using_default_path`` is False.""" + config = DiskCacheConfig(enabled=False) + assert not config.enabled + assert config.path is None + assert config.max_bytes is None + assert not config.using_default_path + + def test_enable_with_no_args_keeps_defaults(self) -> None: + """``enable()`` with no args turns on and clears any prior overrides.""" + config = DiskCacheConfig(enabled=False) + config.enable() + assert config.enabled + assert config.path is None + assert config.max_bytes is None + assert config.using_default_path + + def test_enable_with_path_marks_non_default(self) -> None: + """A user-supplied path flips ``using_default_path`` off.""" + config = DiskCacheConfig(enabled=True) + config.enable(path="/custom/path") + assert config.enabled + assert config.path == "/custom/path" + assert not config.using_default_path + + def test_enable_with_max_bytes_keeps_default_path(self) -> None: + """Setting ``max_bytes`` alone doesn't make the path non-default.""" + config = DiskCacheConfig(enabled=True) + config.enable(max_bytes=1024) + assert config.enabled + assert config.path is None + assert config.max_bytes == 1024 + assert config.using_default_path + + def test_enable_stringifies_pathlike(self) -> None: + """``os.PathLike`` inputs are stored as strings so consumers can be dumb.""" + config = DiskCacheConfig(enabled=True) + config.enable(path=pathlib.Path("/some/path")) + assert isinstance(config.path, str) + assert config.path == "/some/path" + + def test_disable_clears_overrides(self) -> None: + """``disable()`` zeroes path and max_bytes so a future re-enable starts clean.""" + config = DiskCacheConfig(enabled=True) + config.enable(path="/custom", max_bytes=4096) + config.disable() + assert not config.enabled + assert config.path is None + assert config.max_bytes is None + assert not config.using_default_path + + def test_reenable_after_disable_returns_to_defaults(self) -> None: + """``disable`` then ``enable()`` (no args) restores the opt-out starting state.""" + config = DiskCacheConfig(enabled=True) + config.enable(path="/custom", max_bytes=4096) + config.disable() + config.enable() + assert config.enabled + assert config.path is None + assert config.max_bytes is None + assert config.using_default_path + + @pytest.mark.parametrize( + ("enabled", "path", "expected"), + [ + (True, None, True), + (True, "/custom", False), + (False, None, False), + (False, "/custom", False), # disabled wins even with a stashed path + ], + ids=["enabled+default", "enabled+custom", "disabled+default", "disabled+custom"], + ) + def test_using_default_path_matrix( + self, enabled: bool, path: str | None, expected: bool + ) -> None: + """``using_default_path`` is the AND of ``enabled`` and ``path is None``.""" + config = DiskCacheConfig(enabled=enabled) + if path is not None: + # Bypass enable() so we can exercise the disabled+custom combo + # without enable() flipping enabled back on. + config._path = path + assert config.using_default_path is expected diff --git a/python/lib/sift_client/_tests/conftest.py b/python/lib/sift_client/_tests/conftest.py index 41469dac5..5790a2f7a 100644 --- a/python/lib/sift_client/_tests/conftest.py +++ b/python/lib/sift_client/_tests/conftest.py @@ -9,6 +9,30 @@ from sift_client.util.util import AsyncAPIs +@pytest.fixture(autouse=True) +def _isolate_default_disk_cache_path(monkeypatch, tmp_path): + """Redirect ``DiskCache.DEFAULT_DISK_PATH`` to a per-test tmp dir. + + On-disk caching is **opt-out** — any test that triggers the lazy + ``DiskCache`` init through ``SiftClient._get_disk_cache`` would + otherwise create the real ``/tmp/sift-data-cache`` directory and leak + state across runs. Redirecting the default to ``tmp_path`` keeps every + test self-contained without each test having to know the cache is on + by default. + + The override preserves the ``sift-data-cache`` suffix so + ``TestClearDisk::test_default_path_constant_under_tmp`` keeps + validating the real shape of the constant. + """ + from sift_client._internal.disk_cache import DiskCache + + monkeypatch.setattr( + DiskCache, + "DEFAULT_DISK_PATH", + str(tmp_path / "sift-data-cache"), + ) + + @pytest.fixture(scope="session") def sift_client() -> SiftClient: """Create a SiftClient instance for testing. diff --git a/python/lib/sift_client/_tests/test_client_cache.py b/python/lib/sift_client/_tests/test_client_cache.py new file mode 100644 index 000000000..bb7e85279 --- /dev/null +++ b/python/lib/sift_client/_tests/test_client_cache.py @@ -0,0 +1,261 @@ +"""Tests for :mod:`sift_client._internal.cache_namespace`. + +The namespace is the user-facing surface for the shared on-disk store +that lives on the :class:`SiftClient`. Three concerns get pinned here: + +1. Default policy (opt-out: caching on at the default path) lands on + the live store on first use. +2. Pre-init configuration (``client.cache.disable_disk()`` / + ``enable_disk(path=..., max_bytes=...)`` before any resource has + touched the cache) takes effect on the lazy build. +3. Post-init reconfiguration mutates the live :class:`DiskCache` in + place rather than swapping it out — every resource adapter holds a + reference to the same store. + +The single-instance-shared-across-resources invariant is the architectural +linchpin: a future second adapter must see the *same* handle as the channel +adapter so a global byte budget and LRU still apply. +""" + +from __future__ import annotations + +import pytest + +from sift_client._internal.disk_cache import DiskCache + + +def _make_client(): + """Build a SiftClient-like object with the bits the namespace needs. + + Reaching into ``sift_client.SiftClient.__init__`` requires a live gRPC + config; the namespace only touches ``_disk_cache_config`` and + ``_disk_cache``, so a tiny stand-in keeps these tests independent of + transport setup. + """ + from sift_client._internal.cache_namespace import CacheNamespace + from sift_client._internal.disk_cache_config import DiskCacheConfig + + class _StandinClient: + def __init__(self) -> None: + self._disk_cache_config = DiskCacheConfig(enabled=True) + self._disk_cache: DiskCache | None = None + self.cache = CacheNamespace(self) # type: ignore[arg-type] + + return _StandinClient() + + +# Pull the same lazy-init helper the real client uses so we exercise the +# default-path-fallback path against the live code rather than a mock. +def _get_disk_cache(client) -> DiskCache: + if client._disk_cache is None: + config = client._disk_cache_config + if not config.enabled: + client._disk_cache = DiskCache() + return client._disk_cache + target_path = config.path or DiskCache.DEFAULT_DISK_PATH + try: + client._disk_cache = DiskCache( + disk_path=target_path, + disk_max_bytes=config.max_bytes, + ) + except Exception: + if not config.using_default_path: + raise + client._disk_cache = DiskCache() + return client._disk_cache + + +class TestCacheNamespaceDefaults: + """Opt-out default: the namespace is on, default path, fresh start.""" + + def test_enabled_by_default(self): + """First lazy access lands at ``DiskCache.DEFAULT_DISK_PATH``.""" + client = _make_client() + store = _get_disk_cache(client) + try: + assert store.disk_enabled + assert store.disk_path == DiskCache.DEFAULT_DISK_PATH + finally: + store.close() + + def test_one_store_shared_across_lazy_calls(self): + """Re-entering ``_get_disk_cache`` returns the same handle.""" + client = _make_client() + first = _get_disk_cache(client) + second = _get_disk_cache(client) + try: + assert first is second + finally: + first.close() + + +class TestEnableDisk: + """``client.cache.enable_disk`` configures the store, pre- and post-init.""" + + def test_pre_init_path_lands_on_store(self, tmp_path): + client = _make_client() + client.cache.enable_disk(path=str(tmp_path / "pre"), max_bytes=4096) + store = _get_disk_cache(client) + try: + assert store.disk_enabled + assert store.disk_path == str(tmp_path / "pre") + assert store.disk_max_bytes == 4096 + finally: + store.close() + + def test_post_init_swap_uses_same_store_instance(self, tmp_path): + """Reconfiguring after first use mutates in place rather than re-creating. + + Every resource adapter holds a reference to ``client._disk_cache``; + if a reconfig replaced the handle, those adapters would still see + the stale one. ``DiskCache.enable_disk`` swaps the *contents* on + the same instance. + """ + client = _make_client() + client.cache.disable_disk() # start from off so this is a real on transition + store = _get_disk_cache(client) + try: + assert not store.disk_enabled + client.cache.enable_disk(path=str(tmp_path / "post")) + assert client._disk_cache is store # same instance + assert store.disk_enabled + assert store.disk_path == str(tmp_path / "post") + finally: + store.close() + + def test_enable_with_default_path_lands_on_default(self, monkeypatch, tmp_path): + """``enable_disk()`` with no args uses :attr:`DEFAULT_DISK_PATH`. + + Redirects the constant so the test doesn't create the real + ``/tmp/sift-data-cache`` directory. + """ + fake_default = str(tmp_path / "fake-default") + monkeypatch.setattr(DiskCache, "DEFAULT_DISK_PATH", fake_default) + + client = _make_client() + client.cache.enable_disk() + store = _get_disk_cache(client) + try: + assert store.disk_path == fake_default + finally: + store.close() + + +class TestDisableDisk: + """``client.cache.disable_disk`` turns the live cache off.""" + + def test_disable_closes_live_handle(self, tmp_path): + client = _make_client() + client.cache.enable_disk(path=str(tmp_path / "to-close")) + store = _get_disk_cache(client) + try: + assert store.disk_enabled + client.cache.disable_disk() + assert not store.disk_enabled + assert store.disk_path is None + finally: + store.close() + + def test_disable_before_lazy_init_keeps_store_off(self, tmp_path): + """Calling disable before first use means the lazy build skips the open.""" + client = _make_client() + client.cache.disable_disk() + store = _get_disk_cache(client) + try: + assert not store.disk_enabled + finally: + store.close() + + +class TestClearDiskProxy: + """``client.cache.clear_disk`` proxies through to :meth:`DiskCache.clear_disk`.""" + + def test_clear_removes_directory(self, tmp_path): + path = tmp_path / "to-clear" + # Populate a real cache directory so the marker check passes. + cache = DiskCache(disk_path=path) + cache.close() + assert path.exists() + + client = _make_client() + client.cache.clear_disk(path) + assert not path.exists() + + +class TestLazyInitFallback: + """The default-path-failure fallback used by ``SiftClient._get_disk_cache``.""" + + def test_default_path_failure_falls_back_to_no_cache(self, monkeypatch, tmp_path): + """If the default cache path can't be opened, the lazy init produces + a disabled :class:`DiskCache` rather than raising. + + Simulated by pointing ``DEFAULT_DISK_PATH`` at a path that already + exists as a regular file — ``os.makedirs(..., exist_ok=True)`` + raises ``FileExistsError`` for non-directory targets. + """ + blocker = tmp_path / "not-a-dir" + blocker.write_text("i am a file, not a directory") + monkeypatch.setattr(DiskCache, "DEFAULT_DISK_PATH", str(blocker)) + + client = _make_client() + store = _get_disk_cache(client) # must not raise + try: + assert not store.disk_enabled + finally: + store.close() + + def test_explicit_path_failure_propagates(self, tmp_path): + """An explicit path that can't be opened propagates the OSError. + + Silent fallback would hide a user mistake. + """ + blocker = tmp_path / "not-a-dir" + blocker.write_text("i am a file, not a directory") + + client = _make_client() + client.cache.enable_disk(path=str(blocker)) + with pytest.raises(FileExistsError): + _get_disk_cache(client) + + +class TestSiftClientIntegration: + """End-to-end through the real :class:`SiftClient.__init__` entry point. + + Asserts the wire-up: the namespace really lives at ``client.cache``, + the config is mutable through it, and the lazy ``_get_disk_cache`` + returns the configured store. + """ + + def _make_real_client(self): + from sift_client import SiftClient, SiftConnectionConfig + + return SiftClient( + connection_config=SiftConnectionConfig( + api_key="x", + grpc_url="disabled.invalid:0", + rest_url="https://disabled.invalid", + use_ssl=False, + ) + ) + + def test_attribute_present_and_uses_real_lazy_init(self, monkeypatch, tmp_path): + fake_default = str(tmp_path / "real-client-default") + monkeypatch.setattr(DiskCache, "DEFAULT_DISK_PATH", fake_default) + + client = self._make_real_client() + store = client._get_disk_cache() + try: + assert client.cache is not None + assert store.disk_enabled + assert store.disk_path == fake_default + finally: + store.close() + + def test_disable_before_first_get_data_keeps_store_off(self): + client = self._make_real_client() + client.cache.disable_disk() + store = client._get_disk_cache() + try: + assert not store.disk_enabled + finally: + store.close() diff --git a/python/lib/sift_client/client.py b/python/lib/sift_client/client.py index 2e2b64ffd..2cda463f1 100644 --- a/python/lib/sift_client/client.py +++ b/python/lib/sift_client/client.py @@ -1,5 +1,10 @@ from __future__ import annotations +import logging +from typing import TYPE_CHECKING + +from sift_client._internal.cache_namespace import CacheNamespace +from sift_client._internal.disk_cache_config import DiskCacheConfig from sift_client._internal.urls import frontend_origin_for_api from sift_client.resources import ( AssetsAPI, @@ -45,6 +50,11 @@ ) from sift_client.util.util import AsyncAPIs +if TYPE_CHECKING: + from sift_client._internal.disk_cache import DiskCache + +logger = logging.getLogger(__name__) + class SiftClient( WithGrpcClient, @@ -126,6 +136,9 @@ class SiftClient( data_import: DataImportAPI """Instance of the Data Import API for making synchronous requests.""" + cache: CacheNamespace + """Surface for the shared on-disk cache used by every cache-aware resource.""" + async_: AsyncAPIs """Accessor for the asynchronous APIs. All asynchronous APIs are available as attributes on this accessor.""" @@ -148,6 +161,7 @@ def __init__( Set this for on-prem or custom deployments whose API host can't be mapped to a frontend automatically; see the ``app_url`` property. A value here takes precedence over ``connection_config.app_url``. + """ if not (api_key and grpc_url and rest_url) and not connection_config: raise ValueError( @@ -179,6 +193,14 @@ def __init__( # pytest plugin's ``--sift-disabled`` mode. self._simulate: bool = False + # Shared on-disk cache: user intent in ``_disk_cache_config`` (opt-out + # default), live handle in ``_disk_cache`` (lazy so importing this + # module doesn't pay the diskcache cost up front). The + # ``client.cache`` namespace mutates both. + self._disk_cache_config = DiskCacheConfig(enabled=True) + self._disk_cache: DiskCache | None = None + self.cache = CacheNamespace(self) + self.ping = PingAPI(self) self.assets = AssetsAPI(self) self.calculated_channels = CalculatedChannelsAPI(self) @@ -230,6 +252,52 @@ def rest_client(self) -> RestClient: """The REST client used by the SiftClient for making REST API calls.""" return self._rest_client + def _get_disk_cache(self) -> DiskCache: + """Lazy accessor for the shared on-disk cache. Internal to resources. + + The cache is built on first use so that importing ``sift_client`` + doesn't pay the ``diskcache``/``sqlite`` cost up front. The opt-out + default ("disk caching on at the temp-dir path") is applied here, + along with the silent-fallback-on-default-path failure: if the + user left :class:`DiskCacheConfig` at its defaults and opening + fails (read-only ``/tmp``, restricted container, ...), we log a + warning and return a disabled :class:`DiskCache` so resources can + still serve requests by going to the wire. An explicit user- + supplied path that can't be opened propagates so the caller knows + their request didn't take. + + After the first call this just returns the memoized handle. + Subsequent ``client.cache.enable_disk(...)`` calls mutate the + existing handle in place; this method is not re-entered. + """ + if self._disk_cache is None: + from sift_client._internal.disk_cache import DiskCache + + config = self._disk_cache_config + if not config.enabled: + self._disk_cache = DiskCache() + return self._disk_cache + target_path = config.path or DiskCache.DEFAULT_DISK_PATH + try: + self._disk_cache = DiskCache( + disk_path=target_path, + disk_max_bytes=config.max_bytes, + ) + except Exception: + if not config.using_default_path: + raise + logger.warning( + "Could not open the default sift data cache at %r; " + "falling back to no caching. Call " + "``client.cache.disable_disk()`` to silence this " + "warning, or pass an explicit path via " + "``client.cache.enable_disk(path=...)``.", + target_path, + exc_info=True, + ) + self._disk_cache = DiskCache() + return self._disk_cache + @property def app_url(self) -> str | None: """The Sift web-app origin for this client, or None if it can't be determined. diff --git a/python/lib/sift_client/resources/channels.py b/python/lib/sift_client/resources/channels.py index aa5cdf96e..df5d218a9 100644 --- a/python/lib/sift_client/resources/channels.py +++ b/python/lib/sift_client/resources/channels.py @@ -242,9 +242,20 @@ async def unarchive(self, channels: list[str | Channel]) -> None: def _ensure_data_low_level_client(self): """Ensure that the data low level client is initialized. Separated out like this to not require large dependencies (pandas/pyarrow) for the client if not fetching data.""" if self._data_low_level_client is None: - from sift_client._internal.low_level_wrappers.data import DataLowLevelClient - - self._data_low_level_client = DataLowLevelClient(grpc_client=self.client.grpc_client) + from sift_client._internal.low_level_wrappers.data import ( + ChannelDataCache, + DataLowLevelClient, + ) + + # The shared on-disk store lives on the client; we just wrap it + # in the channel-side adapter. Cache configuration (enable / + # disable / clear / path / max_bytes) is owned by + # ``client.cache`` — there's no resource-level knob anymore. + store = self.client._get_disk_cache() + self._data_low_level_client = DataLowLevelClient( + grpc_client=self.client.grpc_client, + channel_cache=ChannelDataCache(store), + ) async def get_data( self, diff --git a/python/pyproject.toml b/python/pyproject.toml index b435022e7..dfe94c043 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ "requests~=2.25", "requests-toolbelt~=1.0", "alive-progress~=3.0", + "diskcache~=5.6", # May move these to optional dependencies in the future. "pandas-stubs>=2.0,<4.0", "types-PyYAML~=6.0", @@ -251,6 +252,13 @@ dev-all = ["development", "all", "build"] docs-build = ["dev-all", "docs"] # Note python 3.9+ [tool.uv] +# Pin uv to a version that writes lockfile revision 3 (introduced in uv 0.8.4 +# by astral-sh/uv#14489, which added ``exclude-newer-package`` to the lock +# schema). Older uv silently rolls the lockfile back to revision 2 on the +# next ``uv lock`` / ``uv sync`` (a no-op-looking change), then a teammate +# on a newer uv re-bumps it — churning the revision field in PRs. +# ``required-version`` blocks the older uv up front with a clear error. +required-version = ">=0.8.4" # Fork resolution per Python minor in the support range. Each fork resolves # independently, which lets 3.8 pick numpy 1.24.x + rosbags 0.9.23 without # being constrained by the 3.9+ universe (numpy 2.0 drops 3.8). @@ -352,6 +360,13 @@ module = "nptdms" ignore_missing_imports = true ignore_errors = true +# diskcache ships without inline type hints or PEP 561 marker. Used by the +# channel data cache's optional on-disk tier. +[[tool.mypy.overrides]] +module = "diskcache" +ignore_missing_imports = true +ignore_errors = true + # alive-progress 3.3.0 ships py.typed but its `alive_it` signature is too # tight (declares `Collection[Never]`), which breaks unpacking generators. [[tool.mypy.overrides]] diff --git a/python/uv.lock b/python/uv.lock index d152551a9..7a0c68645 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -638,6 +638,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/c3/253a89ee03fc9b9682f1541728eb66db7db22148cd94f89ab22528cd1e1b/deprecation-2.1.0-py2.py3-none-any.whl", hash = "sha256:a10811591210e1fb0e768a8c25517cabeabcba6f0bf96564f8ff45189f90b14a", size = 11178, upload-time = "2020-04-20T14:23:36.581Z" }, ] +[[package]] +name = "diskcache" +version = "5.6.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/3f/21/1c1ffc1a039ddcc459db43cc108658f32c57d271d7289a2794e401d0fdb6/diskcache-5.6.3.tar.gz", hash = "sha256:2c3a3fa2743d8535d832ec61c2054a1641f41775aa7c556758a109941e33e4fc", size = 67916, upload-time = "2023-08-31T06:12:00.316Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3f/27/4570e78fc0bf5ea0ca45eb1de3818a23787af9b390c0b0a0033a1b8236f9/diskcache-5.6.3-py3-none-any.whl", hash = "sha256:5e31b2d5fbad117cc363ebaf6b689474db18a1f6438bc82358b024abd4c2ca19", size = 45550, upload-time = "2023-08-31T06:11:58.822Z" }, +] + [[package]] name = "eval-type-backport" version = "0.3.1" @@ -4334,6 +4343,7 @@ source = { editable = "." } dependencies = [ { name = "alive-progress", version = "3.1.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, { name = "alive-progress", version = "3.3.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9'" }, + { name = "diskcache" }, { name = "eval-type-backport" }, { name = "filelock", version = "3.16.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, { name = "filelock", version = "3.19.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version == '3.9.*'" }, @@ -4562,6 +4572,7 @@ requires-dist = [ { name = "cffi", marker = "extra == 'dev-all'", specifier = "~=1.14" }, { name = "cffi", marker = "extra == 'docs-build'", specifier = "~=1.14" }, { name = "cffi", marker = "extra == 'openssl'", specifier = "~=1.14" }, + { name = "diskcache", specifier = "~=5.6" }, { name = "eval-type-backport", specifier = "~=0.2" }, { name = "filelock", specifier = "~=3.15" }, { name = "googleapis-common-protos", specifier = ">=1.60" },