diff --git a/DESIGN_ISSUES.md b/DESIGN_ISSUES.md index 7221fda08..73c74565d 100644 --- a/DESIGN_ISSUES.md +++ b/DESIGN_ISSUES.md @@ -1086,6 +1086,37 @@ element type. See PLT-1732 for full design. --- +## `src/orcapod/databases/connector_arrow_database.py` + +### CA1 — SQL connectors silently lose Arrow extension-type field metadata on round-trip +**Status:** in progress +**Severity:** high +**Issue:** PLT-1795 + +`SQLiteConnector` (and any `DBConnectorProtocol` implementation that maps Arrow → SQL types) +does not preserve `ARROW:extension:name` / `ARROW:extension:metadata` field metadata. When a +column whose Arrow type is a `pa.ExtensionType` (e.g. `orcapod.path`, `orcapod.uuid`, or any +dataclass extension type) is written via `ConnectorArrowDatabase.add_records()` and then read +back, the column is returned as the raw storage type (e.g. `large_string`, `large_binary`, +`struct`) with no extension marker. This makes SQL connector round-trips impossible and causes silent data-type loss. + +**Interim fix (PLT-1659):** `ConnectorArrowDatabase.add_records()` now raises `ValueError` +immediately when any column is extension-typed, surfacing the issue at write +time rather than on a confusing read. Two representations are rejected: +- In-memory extension types: `isinstance(field.type, pa.ExtensionType)`. +- Metadata-only columns: plain storage type whose field metadata contains + `b"ARROW:extension:name"` (the representation produced when reading a Parquet/IPC file + with an unregistered extension type). + +**Full fix (PLT-1795, target v0.2):** Preserve extension-type metadata in the SQL schema via +a companion metadata table (one row per column: `table_name`, `column_name`, +`extension_name`, `extension_metadata`). On `create_table_if_not_exists`, write rows for any +extension-typed columns; on `iter_batches`, join the metadata table and reconstruct the +`pa.ExtensionType` for affected columns before returning the batch. Once implemented, the +`ValueError` guard in `add_records()` can be lifted. + +--- + ## `src/orcapod/semantic_types/universal_converter.py` ### UC1 — `python_type_to_arrow_type` raised on `typing.Any` from empty-container inference diff --git a/src/orcapod/databases/connector_arrow_database.py b/src/orcapod/databases/connector_arrow_database.py index ab6928ed2..6e289c5a1 100644 --- a/src/orcapod/databases/connector_arrow_database.py +++ b/src/orcapod/databases/connector_arrow_database.py @@ -244,6 +244,34 @@ def add_records( f"got {rid_type}. Encode the column to bytes before calling add_records()." ) + # Reject Arrow extension-typed columns: SQL connectors do not preserve + # ARROW:extension:* field metadata, so extension types would be silently + # dropped on read, making round-trips impossible. Use DeltaTableDatabase + # or write directly to Parquet instead. See PLT-1795 for the planned fix. + # + # Two representations are checked: + # 1. In-memory extension types: isinstance(field.type, pa.ExtensionType). + # 2. Metadata-only extension columns: a plain Arrow type whose field metadata + # contains the b"ARROW:extension:name" key. This arises when reading a + # Parquet/IPC file with an unregistered extension type — the array is + # decoded as its storage type but the metadata is preserved on the field. + _EXT_NAME_KEY = b"ARROW:extension:name" + ext_fields: list[tuple[str, str]] = [] + for field in records.schema: + if isinstance(field.type, pa.ExtensionType): + ext_fields.append((field.name, field.type.extension_name)) + elif field.metadata and _EXT_NAME_KEY in field.metadata: + ext_fields.append((field.name, field.metadata[_EXT_NAME_KEY].decode("utf-8", errors="replace"))) + if ext_fields: + ext_info = ", ".join(f"{name!r}: {ext_name!r}" for name, ext_name in ext_fields) + raise ValueError( + f"ConnectorArrowDatabase does not support Arrow extension-typed columns " + f"({ext_info}). SQL connectors do not preserve ARROW:extension:* field " + f"metadata, so extension types would be silently dropped on read. " + f"Use DeltaTableDatabase or write directly to Parquet instead. " + f"See PLT-1795 for the planned fix." + ) + records = self._deduplicate_within_table(records) record_key = self._get_record_key(record_path) input_ids = set(cast(list[bytes], records[self.RECORD_ID_COLUMN].to_pylist())) diff --git a/superpowers/plans/2026-06-23-plt-1659-extension-type-roundtrip-integration-tests.md b/superpowers/plans/2026-06-23-plt-1659-extension-type-roundtrip-integration-tests.md new file mode 100644 index 000000000..0028fb2c5 --- /dev/null +++ b/superpowers/plans/2026-06-23-plt-1659-extension-type-roundtrip-integration-tests.md @@ -0,0 +1,827 @@ +# PLT-1659: Extension Type Round-Trip Integration Tests — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use sensei:subagent-driven-development (recommended) or sensei:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add three new integration test files covering end-to-end extension type round-trips through Parquet, Delta Lake, schema compatibility, and per-process cache behaviour. + +**Architecture:** Three focused test files plus one source change and one docs update. Test files: `test_roundtrips.py` (write/read through Parquet and Delta backends), `test_schema_compatibility.py` (Arrow-level identity + Python-type-level compatibility), `test_cache_behavior.py` (registry cache populated and skipped on second read). SQLite backend is excluded from value round-trip tests because `SQLiteConnector` does not preserve `ARROW:extension:*` field metadata; that pattern is already covered by `test_extension_aware_database.py`. Source change: `ConnectorArrowDatabase.add_records()` gets a `ValueError` guard that rejects extension-typed columns (both in-memory `pa.ExtensionType` and metadata-only fields) as an interim safety measure while PLT-1795 is pending. + +**Tech Stack:** pytest, pyarrow, pyarrow.parquet, deltalake, polars, orcapod extension type APIs (`create_registry`, `UniversalTypeConverter`, `DataclassLogicalTypeFactory`), `unittest.mock.patch.object`. + +--- + +## File Map + +| Action | Path | +|---|---| +| Create | `tests/test_extension_types/test_schema_compatibility.py` | +| Create | `tests/test_extension_types/test_cache_behavior.py` | +| Create | `tests/test_extension_types/test_roundtrips.py` | +| Modify | `src/orcapod/databases/connector_arrow_database.py` — add `ValueError` guard in `add_records()` | +| Modify | `DESIGN_ISSUES.md` — add CA1 entry documenting SQL metadata loss and interim guard | + +--- + +## Task 1: Create and check out the feature branch + +**Files:** none (git only) + +- [ ] **Step 1: Verify you are on `extension-type-system`** + +```bash +git branch --show-current +``` + +Expected output: `extension-type-system` + +- [ ] **Step 2: Create and check out the feature branch** + +```bash +git checkout -b eywalker/plt-1659-integration-tests-end-to-end-semantic-type-round-trips +git branch --show-current +``` + +Expected output: `eywalker/plt-1659-integration-tests-end-to-end-semantic-type-round-trips` + +--- + +## Task 2: `test_schema_compatibility.py` + +**Files:** +- Create: `tests/test_extension_types/test_schema_compatibility.py` + +This file has no backend dependencies — it only needs a fresh `UniversalTypeConverter` and `check_schema_compatibility`. + +- [ ] **Step 1: Write the test file** + +Create `tests/test_extension_types/test_schema_compatibility.py` with this exact content: + +```python +"""Integration tests for extension-type-backed schema compatibility. + +Two complementary angles: + +Arrow-level identity + ``converter.python_schema_to_arrow_schema`` assigns each dataclass a unique + Arrow extension name derived from its fully-qualified class name. Two + dataclasses with identical struct shapes but different class names therefore + produce *different* extension names — the core identity guarantee of the + extension type system. + +Python-type-level compatibility + ``check_schema_compatibility`` from ``schema_utils`` uses beartype + ``is_subhint`` to compare Python type annotations. Same class → compatible; + different class with the same struct shape → incompatible. This is the + property that prevents silent data corruption when two unrelated dataclasses + happen to share the same fields. +""" +from __future__ import annotations + +import dataclasses + +import pyarrow as pa + +from orcapod.contexts import create_registry +from orcapod.types import Schema +from orcapod.utils.schema_utils import check_schema_compatibility + + +# Module-level dataclasses — DataclassLogicalTypeFactory rejects local classes +# because they have no stable fully-qualified class name for reconstruction. + +@dataclasses.dataclass +class _PointA: + x: int + y: int + + +@dataclasses.dataclass +class _PointB: + """Same struct shape as _PointA but a different class name.""" + x: int + y: int + + +# ── Arrow-level identity tests ──────────────────────────────────────────────── + + +def test_arrow_schema_distinct_extension_names_for_same_shape(): + """_PointA and _PointB produce different Arrow extension names despite identical shapes. + + This is the core identity guarantee: struct shape alone does not determine + type identity in the extension type system. + """ + converter_a = create_registry().get_context().type_converter + converter_b = create_registry().get_context().type_converter + + type_a = converter_a.register_python_class(_PointA) + type_b = converter_b.register_python_class(_PointB) + + assert isinstance(type_a, pa.ExtensionType) + assert isinstance(type_b, pa.ExtensionType) + + fqcn_a = f"{_PointA.__module__}.{_PointA.__qualname__}" + fqcn_b = f"{_PointB.__module__}.{_PointB.__qualname__}" + assert type_a.extension_name == fqcn_a + assert type_b.extension_name == fqcn_b + assert type_a.extension_name != type_b.extension_name + + +def test_arrow_schema_same_extension_name_idempotent(): + """Registering _PointA twice returns the same extension name both times.""" + converter = create_registry().get_context().type_converter + + type_first = converter.register_python_class(_PointA) + type_second = converter.register_python_class(_PointA) + + assert isinstance(type_first, pa.ExtensionType) + assert isinstance(type_second, pa.ExtensionType) + assert type_first.extension_name == type_second.extension_name + + +# ── Python-type-level compatibility tests ───────────────────────────────────── + + +def test_python_schema_compatibility_passes_same_type(): + """Incoming _PointA is compatible with receiving _PointA.""" + result = check_schema_compatibility( + {"value": _PointA}, + Schema({"value": _PointA}), + ) + assert result is True + + +def test_python_schema_compatibility_rejects_different_type_same_shape(): + """Incoming _PointA is NOT compatible with receiving _PointB. + + Both dataclasses share the same struct shape {x: int, y: int}, but they + are different Python types. The old shape-based system would have accepted + this silently; the extension type system correctly rejects it. + """ + result = check_schema_compatibility( + {"value": _PointA}, + Schema({"value": _PointB}), + ) + assert result is False +``` + +- [ ] **Step 2: Run the tests and verify they pass** + +```bash +uv run pytest tests/test_extension_types/test_schema_compatibility.py -v +``` + +Expected: all 4 tests pass. + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_extension_types/test_schema_compatibility.py +git commit -m "test(extension-types): add schema compatibility integration tests (PLT-1659)" +``` + +--- + +## Task 3: `test_cache_behavior.py` + +**Files:** +- Create: `tests/test_extension_types/test_cache_behavior.py` + +Uses Parquet as the storage backend (simplest — no database wrapper needed). The second test patches `DataclassLogicalTypeFactory.reconstruct_from_arrow` at the class level to count calls; `wraps=` preserves the original behaviour so the test still exercises the real code path. + +- [ ] **Step 1: Write the test file** + +Create `tests/test_extension_types/test_cache_behavior.py` with this exact content: + +```python +"""Integration tests for per-process extension type cache behaviour. + +The ``LogicalTypeRegistry`` stores registered types in an in-memory dict keyed +by Arrow extension name. ``register_discovered_extensions`` skips the factory +call (``reconstruct_from_arrow``) when the extension name is already present in +the registry — this is the "cache hit" path. + +Two tests: + +1. ``test_cache_populated_after_first_read`` — verifies the type is absent from + a fresh converter's registry before reading a Parquet file, and present after. + +2. ``test_factory_not_called_on_second_read`` — verifies that ``reconstruct_from_arrow`` + is called exactly once (first read) and zero additional times on the second + read of the same file. +""" +from __future__ import annotations + +import dataclasses +from unittest.mock import patch + +import pyarrow as pa +import pyarrow.parquet as pq + +from orcapod.contexts import create_registry +from orcapod.extension_types.dataclass_logical_type_factory import DataclassLogicalTypeFactory + + +# Module-level dataclass — local classes cannot be reconstructed from FQCN. + +@dataclasses.dataclass +class _CachePoint: + x: int + y: int + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def _fresh_converter(): + """Return a fresh UniversalTypeConverter from a new registry instance. + + Uses ``create_registry()`` instead of ``get_default_context()`` to avoid + cross-test contamination through the global singleton cache. + """ + return create_registry().get_context().type_converter + + +def _write_parquet(tmp_path, converter) -> str: + """Write a _CachePoint column to Parquet and return the file path as str.""" + converter.register_python_class(_CachePoint) + arrow_schema = converter.python_schema_to_arrow_schema({"point": _CachePoint}) + rows = [{"point": _CachePoint(x=1, y=2)}] + table = converter.python_dicts_to_arrow_table(rows, arrow_schema=arrow_schema) + parquet_path = tmp_path / "cache_test.parquet" + pq.write_table(table, str(parquet_path)) + return str(parquet_path) + + +# ── Tests ───────────────────────────────────────────────────────────────────── + + +def test_cache_populated_after_first_read(tmp_path): + """Registry has _CachePoint after load_extension_types on a fresh converter. + + Before reading: the fresh converter's registry does not know about _CachePoint. + After reading: register_discovered_extensions triggers reconstruct_from_arrow + which registers _CachePoint, populating the cache. + """ + write_converter = _fresh_converter() + parquet_path = _write_parquet(tmp_path, write_converter) + + read_converter = _fresh_converter() + fqcn = f"{_CachePoint.__module__}.{_CachePoint.__qualname__}" + + # Before read: not registered + assert read_converter._logical_type_registry.get_by_arrow_extension_name(fqcn) is None + + read_converter.load_extension_types(pq.read_table(parquet_path)) + + # After read: registered (cache populated) + assert read_converter._logical_type_registry.get_by_arrow_extension_name(fqcn) is not None + + +def test_factory_not_called_on_second_read(tmp_path): + """reconstruct_from_arrow called once on first read, zero times on second read. + + On first read, register_discovered_extensions finds _CachePoint's extension + name in the schema, dispatches to the factory (call count = 1), and stores + the result in the registry. + + On second read, register_discovered_extensions finds the extension name already + in the registry and short-circuits — the factory is not called again + (call count remains 1). + """ + write_converter = _fresh_converter() + parquet_path = _write_parquet(tmp_path, write_converter) + + read_converter = _fresh_converter() + + with patch.object( + DataclassLogicalTypeFactory, + "reconstruct_from_arrow", + autospec=True, + wraps=DataclassLogicalTypeFactory.reconstruct_from_arrow, + ) as spy: + # First read: factory is called once + read_converter.load_extension_types(pq.read_table(parquet_path)) + assert spy.call_count == 1, f"Expected 1 factory call, got {spy.call_count}" + + # Second read on the same file: registry hit — factory not called again + read_converter.load_extension_types(pq.read_table(parquet_path)) + assert spy.call_count == 1, ( + f"Expected still 1 factory call after second read, got {spy.call_count}" + ) +``` + +- [ ] **Step 2: Run the tests and verify they pass** + +```bash +uv run pytest tests/test_extension_types/test_cache_behavior.py -v +``` + +Expected: both tests pass. + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_extension_types/test_cache_behavior.py +git commit -m "test(extension-types): add per-process cache behaviour integration tests (PLT-1659)" +``` + +--- + +## Task 4: `test_roundtrips.py` — backend fixture + all parametrised tests + +**Files:** +- Create: `tests/test_extension_types/test_roundtrips.py` + +**Important note on SQLite:** `SQLiteConnector` maps Arrow types to SQL column types and does not preserve `ARROW:extension:*` field metadata. `ExtensionAwareDatabase` relies on that metadata to auto-register and re-wrap extension types on read. Without it, `apply_extension_types` is a no-op and values are returned as plain storage scalars (string, bytes, dict). SQLite backend round-trip tests are therefore omitted from this file; the `ExtensionAwareDatabase` wrapper behaviour is already covered by `tests/test_databases/test_extension_aware_database.py`. + +The Parquet and Delta backends both preserve field metadata (through the Arrow → Parquet encoding) and fully support the peek-register-read pattern. + +- [ ] **Step 1: Write the test file** + +Create `tests/test_extension_types/test_roundtrips.py` with this exact content: + +```python +"""End-to-end integration tests for extension type round-trips. + +Tests the complete pipeline: + + Python object → write → storage → peek-schema → register → read → Python object + +Each round-trip test is parameterised over two storage backends: + +- ``parquet``: direct ``pyarrow.parquet`` write/read. +- ``delta``: ``deltalake.write_deltalake`` / ``DeltaTable.to_pyarrow_dataset(as_large_types=True).to_table()``. + +SQLite (``ConnectorArrowDatabase`` + ``SQLiteConnector``) is excluded because +``SQLiteConnector`` maps Arrow types to SQL column types and discards +``ARROW:extension:*`` field metadata. Without that metadata, the +peek-register-read pattern cannot auto-register extension types on the read +path. The ``ExtensionAwareDatabase`` wrapper behaviour over SQLite is already +tested in ``tests/test_databases/test_extension_aware_database.py``. +""" +from __future__ import annotations + +import dataclasses +import pathlib +import uuid as uuid_module +from pathlib import Path +from typing import Callable + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest +from upath import UPath + +from orcapod.contexts import create_registry +from orcapod.semantic_types.universal_converter import UniversalTypeConverter + + +# ── Module-level dataclasses ────────────────────────────────────────────────── +# DataclassLogicalTypeFactory rejects local (in-function) classes because they +# have no stable fully-qualified class name for reconstruction from Arrow schema. + +@dataclasses.dataclass +class _PointA: + x: int + y: int + + +@dataclasses.dataclass +class _PointB: + """Same struct shape as _PointA, different class name.""" + x: int + y: int + + +@dataclasses.dataclass +class _Inner: + value: int + + +@dataclasses.dataclass +class _Outer: + inner: _Inner + label: str + + +# ── Storage backend abstraction ─────────────────────────────────────────────── + + +@dataclasses.dataclass +class _StorageBackend: + """Encapsulates backend-specific write and read logic for parameterised tests. + + Args: + name: Short identifier used in pytest test IDs (e.g. ``"parquet"``). + write: Callable that writes an Arrow table to a directory. + read: Callable that reads from that directory and returns an Arrow table + with extension types registered and applied. Must return only the + original user data columns (no ``__record_id`` or similar). + """ + name: str + write: Callable[[pa.Table, Path], None] + read: Callable[[Path, UniversalTypeConverter], pa.Table] + + +def _parquet_write(table: pa.Table, base_path: Path) -> None: + pq.write_table(table, str(base_path / "data.parquet")) + + +def _parquet_read(base_path: Path, converter: UniversalTypeConverter) -> pa.Table: + return converter.load_extension_types(pq.read_table(str(base_path / "data.parquet"))) + + +def _delta_write(table: pa.Table, base_path: Path) -> None: + import deltalake + deltalake.write_deltalake(str(base_path / "delta"), table) + + +def _delta_read(base_path: Path, converter: UniversalTypeConverter) -> pa.Table: + import deltalake + dt = deltalake.DeltaTable(str(base_path / "delta")) + # as_large_types=True preserves large_string / large_binary rather than + # normalising them to string / binary (Delta Lake's default behaviour). + raw = dt.to_pyarrow_dataset(as_large_types=True).to_table() + return converter.load_extension_types(raw) + + +_BACKENDS = [ + _StorageBackend(name="parquet", write=_parquet_write, read=_parquet_read), + _StorageBackend(name="delta", write=_delta_write, read=_delta_read), +] + + +@pytest.fixture(params=_BACKENDS, ids=lambda b: b.name) +def storage_backend(request: pytest.FixtureRequest) -> _StorageBackend: + """Yield one storage backend per parametrised run.""" + return request.param + + +# ── Internal helpers ────────────────────────────────────────────────────────── + + +def _fresh_converter() -> UniversalTypeConverter: + """Return a fresh converter from a new registry instance. + + Uses ``create_registry()`` instead of ``get_default_context()`` to avoid + cross-test contamination through the global singleton cache. + """ + return create_registry().get_context().type_converter + + +def _write_and_read( + schema_dict: dict, + rows: list[dict], + backend: _StorageBackend, + tmp_path: Path, +) -> tuple[pa.Table, UniversalTypeConverter]: + """Write rows with a fresh write converter and read back with a fresh read converter. + + Returns the resulting Arrow table (with extension types applied) and the + read-side converter (needed for ``arrow_table_to_python_dicts``). + """ + write_converter = _fresh_converter() + arrow_schema = write_converter.python_schema_to_arrow_schema(schema_dict) + table = write_converter.python_dicts_to_arrow_table(rows, arrow_schema=arrow_schema) + backend.write(table, tmp_path) + + read_converter = _fresh_converter() + result = backend.read(tmp_path, read_converter) + return result, read_converter + + +# ── Built-in type round-trip tests ─────────────────────────────────────────── + + +def test_builtin_path_round_trip(storage_backend: _StorageBackend, tmp_path: Path) -> None: + """pathlib.Path round-trips through storage with extension name ``orcapod.path``. + + Built-in types (Path, UPath, UUID) are pre-registered in the default context + so the read-side converter already knows about them. The test verifies that: + + 1. The Arrow field carries the ``orcapod.path`` extension type after read. + 2. The Python value is reconstructed as a ``pathlib.Path`` instance. + """ + p = pathlib.Path("/tmp/orcapod/integration/test.txt") + result, read_converter = _write_and_read( + {"col": pathlib.Path}, + [{"col": p}], + storage_backend, + tmp_path, + ) + + field = result.schema.field("col") + assert hasattr(field.type, "extension_name"), ( + f"Expected extension type on field 'col', got plain type {field.type!r}" + ) + assert field.type.extension_name == "orcapod.path" + + rows = read_converter.arrow_table_to_python_dicts(result) + assert len(rows) == 1 + assert isinstance(rows[0]["col"], pathlib.Path) + assert rows[0]["col"] == p + + +def test_builtin_upath_round_trip(storage_backend: _StorageBackend, tmp_path: Path) -> None: + """UPath round-trips through storage with extension name ``orcapod.upath``.""" + u = UPath("s3://my-bucket/data/file.parquet") + result, read_converter = _write_and_read( + {"col": UPath}, + [{"col": u}], + storage_backend, + tmp_path, + ) + + field = result.schema.field("col") + assert hasattr(field.type, "extension_name"), ( + f"Expected extension type on field 'col', got plain type {field.type!r}" + ) + assert field.type.extension_name == "orcapod.upath" + + rows = read_converter.arrow_table_to_python_dicts(result) + assert len(rows) == 1 + assert isinstance(rows[0]["col"], UPath) + assert str(rows[0]["col"]) == str(u) + + +def test_builtin_uuid_round_trip(storage_backend: _StorageBackend, tmp_path: Path) -> None: + """uuid.UUID round-trips through storage with extension name ``orcapod.uuid``.""" + u = uuid_module.UUID("12345678-1234-5678-1234-567812345678") + result, read_converter = _write_and_read( + {"col": uuid_module.UUID}, + [{"col": u}], + storage_backend, + tmp_path, + ) + + field = result.schema.field("col") + assert hasattr(field.type, "extension_name"), ( + f"Expected extension type on field 'col', got plain type {field.type!r}" + ) + assert field.type.extension_name == "orcapod.uuid" + + rows = read_converter.arrow_table_to_python_dicts(result) + assert len(rows) == 1 + assert isinstance(rows[0]["col"], uuid_module.UUID) + assert rows[0]["col"] == u + + +# ── Dataclass round-trip tests ──────────────────────────────────────────────── + + +def test_simple_dataclass_round_trip(storage_backend: _StorageBackend, tmp_path: Path) -> None: + """Simple dataclass round-trips with correct FQCN as the Arrow extension name. + + The read-side converter starts with no knowledge of _PointA. After read, + register_discovered_extensions triggers DataclassLogicalTypeFactory which + imports _PointA from its fully-qualified class name and registers it. + """ + point = _PointA(x=3, y=7) + result, read_converter = _write_and_read( + {"point": _PointA}, + [{"point": point}], + storage_backend, + tmp_path, + ) + + fqcn = f"{_PointA.__module__}.{_PointA.__qualname__}" + field = result.schema.field("point") + assert hasattr(field.type, "extension_name"), ( + f"Expected extension type on field 'point', got {field.type!r}" + ) + assert field.type.extension_name == fqcn + + rows = read_converter.arrow_table_to_python_dicts(result) + assert len(rows) == 1 + reconstructed = rows[0]["point"] + assert isinstance(reconstructed, _PointA) + assert reconstructed.x == 3 + assert reconstructed.y == 7 + + +def test_two_dataclasses_same_shape_distinct_extension_names( + storage_backend: _StorageBackend, tmp_path: Path +) -> None: + """_PointA and _PointB have the same struct shape but different extension names. + + Writing _PointA and reading it back must NOT reconstruct a _PointB, even + though their on-disk struct shapes (x: int, y: int) are identical. The + extension name (FQCN) is the sole identity signal. + """ + point_a = _PointA(x=1, y=2) + result, read_converter = _write_and_read( + {"point": _PointA}, + [{"point": point_a}], + storage_backend, + tmp_path, + ) + + fqcn_a = f"{_PointA.__module__}.{_PointA.__qualname__}" + fqcn_b = f"{_PointB.__module__}.{_PointB.__qualname__}" + + field = result.schema.field("point") + assert hasattr(field.type, "extension_name") + assert field.type.extension_name == fqcn_a + assert field.type.extension_name != fqcn_b # distinct from _PointB + + rows = read_converter.arrow_table_to_python_dicts(result) + reconstructed = rows[0]["point"] + assert isinstance(reconstructed, _PointA) + assert not isinstance(reconstructed, _PointB) + + +def test_nested_dataclass_round_trip(storage_backend: _StorageBackend, tmp_path: Path) -> None: + """Nested dataclass: _Outer and _Inner both registered; full object reconstructed. + + register_discovered_extensions triggers DataclassLogicalTypeFactory for _Outer. + That factory's reconstruct_from_arrow calls converter.register_python_class(_Inner) + as a side-effect, so _Inner is also registered without an explicit peek step. + """ + outer = _Outer(inner=_Inner(value=42), label="hello") + result, read_converter = _write_and_read( + {"item": _Outer}, + [{"item": outer}], + storage_backend, + tmp_path, + ) + + fqcn_outer = f"{_Outer.__module__}.{_Outer.__qualname__}" + fqcn_inner = f"{_Inner.__module__}.{_Inner.__qualname__}" + + assert read_converter._logical_type_registry.get_by_arrow_extension_name(fqcn_outer) is not None, ( + "_Outer should be registered after read" + ) + assert read_converter._logical_type_registry.get_by_arrow_extension_name(fqcn_inner) is not None, ( + "_Inner should be registered transitively after read" + ) + + rows = read_converter.arrow_table_to_python_dicts(result) + assert len(rows) == 1 + reconstructed = rows[0]["item"] + assert isinstance(reconstructed, _Outer) + assert isinstance(reconstructed.inner, _Inner) + assert reconstructed.inner.value == 42 + assert reconstructed.label == "hello" +``` + +- [ ] **Step 2: Run the tests and verify they pass** + +```bash +uv run pytest tests/test_extension_types/test_roundtrips.py -v +``` + +Expected: all 12 parametrised tests pass (6 test functions × 2 backends). + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_extension_types/test_roundtrips.py +git commit -m "test(extension-types): add Parquet/Delta round-trip integration tests (PLT-1659)" +``` + +--- + +## Task 5: Add the Delta Polars native-read test to `test_roundtrips.py` + +**Files:** +- Modify: `tests/test_extension_types/test_roundtrips.py` (append one function) + +This test reads a Delta table back via `pl.read_delta` (Polars' native Delta reader) rather than `DeltaTable.to_pyarrow_table()`, verifying that extension type metadata survives the Polars path. + +When the write-side converter calls `register_python_class(_PointA)`, it registers `_PointA` in both PyArrow's and Polars' **global** registries (as a side-effect of `registry.register_logical_type`). That global registration persists for the duration of the test process, so `pl.read_delta` can resolve `_PointA`'s extension type when reading the underlying Parquet files. + +- [ ] **Step 1: Append the Delta Polars test to `test_roundtrips.py`** + +Append the following block at the end of `tests/test_extension_types/test_roundtrips.py`: + +```python +# ── Delta Lake: Polars native read ─────────────────────────────────────────── + + +def test_delta_polars_read_delta(tmp_path: Path) -> None: + """Write a dataclass column to Delta; read back via pl.read_delta; extension type survives. + + The write-side converter registers _PointA in both PyArrow's and Polars' + global registries (``register_python_class`` calls ``make_polars_extension_type`` + which registers with Polars). ``pl.read_delta`` can therefore decode the column + as the correct Polars extension type, not a plain ``Struct``. + + Note: ``pl.DataFrame.to_arrow()`` exports Polars extension types as PyArrow + extension arrays but with empty serialized bytes (Polars does not forward + ``__arrow_ext_metadata__`` through its Arrow export). Python-object + reconstruction via the Polars-to-Arrow path is therefore not possible; that + path is tested by the separate ``parquet`` / ``delta`` parametrised tests + which read underlying Parquet files directly. + """ + import deltalake + import polars as pl + + delta_path = str(tmp_path / "polars_delta") + fqcn = f"{_PointA.__module__}.{_PointA.__qualname__}" + + # Write — registers _PointA in PyArrow + Polars global registries. + write_converter = _fresh_converter() + write_converter.register_python_class(_PointA) + arrow_schema = write_converter.python_schema_to_arrow_schema({"point": _PointA}) + rows = [{"point": _PointA(x=5, y=9)}] + table = write_converter.python_dicts_to_arrow_table(rows, arrow_schema=arrow_schema) + deltalake.write_deltalake(delta_path, table) + + # Read via Polars native Delta reader. + # _PointA is already in the Polars global registry from the write step above. + df = pl.read_delta(delta_path) + + # Assert the column carries the correct Polars extension type — not a plain Struct. + col_dtype = df.dtypes[0] + assert col_dtype.is_extension(), ( + f"Expected a Polars extension type on column 'point', got {col_dtype!r}" + ) + assert col_dtype.ext_name() == fqcn, ( + f"Expected extension name {fqcn!r}, got {col_dtype.ext_name()!r}" + ) +``` + +- [ ] **Step 2: Run the new test to verify it passes** + +```bash +uv run pytest tests/test_extension_types/test_roundtrips.py::test_delta_polars_read_delta -v +``` + +Expected: 1 test passes. + +- [ ] **Step 3: Run the full roundtrips file to confirm no regressions** + +```bash +uv run pytest tests/test_extension_types/test_roundtrips.py -v +``` + +Expected: 13 tests pass (12 from Task 4 + 1 new). + +- [ ] **Step 4: Commit** + +```bash +git add tests/test_extension_types/test_roundtrips.py +git commit -m "test(extension-types): add Delta Polars native-read round-trip test (PLT-1659)" +``` + +--- + +## Task 6: Full test run and PR + +**Files:** none + +- [ ] **Step 1: Run the full extension-types test suite** + +```bash +uv run pytest tests/test_extension_types/ -v +``` + +Expected: all tests pass. The three new files contribute 17 tests: +- `test_schema_compatibility.py`: 4 tests +- `test_cache_behavior.py`: 2 tests +- `test_roundtrips.py`: 13 tests + +- [ ] **Step 2: Run the broader test suite to check for regressions** + +```bash +uv run pytest tests/ -x -q --ignore=tests/test_semantic_types +``` + +Expected: no new failures. (`test_semantic_types/` tests the old shape-based system and is excluded per the PLT-1659 spec.) + +- [ ] **Step 3: Push the branch** + +```bash +git push -u origin eywalker/plt-1659-integration-tests-end-to-end-semantic-type-round-trips +``` + +- [ ] **Step 4: Open the PR** + +```bash +gh pr create \ + --base extension-type-system \ + --title "test(extension-types): end-to-end round-trip integration tests (PLT-1659)" \ + --body "$(cat <<'EOF' +## Summary + +Adds three integration test files covering the full extension type round-trip pipeline: + +- **`test_roundtrips.py`** — write/read round-trips for built-in types (Path, UPath, UUID), simple dataclass, two same-shaped dataclasses with distinct extension names, nested dataclass, and Polars native Delta read. Parameterised over Parquet and Delta backends. +- **`test_schema_compatibility.py`** — Arrow-level extension name identity checks and Python-type-level `check_schema_compatibility` pass/reject tests. +- **`test_cache_behavior.py`** — verifies the per-process registry cache is populated on first read and that `reconstruct_from_arrow` is not called on subsequent reads of the same file. + +## Deferred (noted in corresponding issues) + +- `list[MyDataclass]` round-trip → PLT-1732 (requires `ListLogicalType`) +- Picklable type tests → PLT-1658 (handler not yet implemented) +- SQLite value round-trips → excluded because `SQLiteConnector` does not preserve `ARROW:extension:*` field metadata; `ExtensionAwareDatabase` wrapper already tested in `test_extension_aware_database.py` + +Closes PLT-1659 +EOF +)" +``` + +- [ ] **Step 5: Confirm the PR URL is printed and note it** + +The `gh pr create` command prints the PR URL. Record it for tracking. diff --git a/superpowers/specs/2026-06-23-plt-1659-extension-type-roundtrip-integration-tests-design.md b/superpowers/specs/2026-06-23-plt-1659-extension-type-roundtrip-integration-tests-design.md new file mode 100644 index 000000000..d44a3ff90 --- /dev/null +++ b/superpowers/specs/2026-06-23-plt-1659-extension-type-roundtrip-integration-tests-design.md @@ -0,0 +1,228 @@ +# PLT-1659: End-to-End Extension Type Round-Trip Integration Tests — Design Spec + +**Date:** 2026-06-23 +**Linear issue:** PLT-1659 +**Branch:** `eywalker/plt-1659-integration-tests-end-to-end-semantic-type-round-trips` +**PR target:** `extension-type-system` + +--- + +## Overview + +This spec covers the design of end-to-end integration tests for the Arrow/Polars extension type +system introduced in the `extension-type-system` branch. The tests validate the complete pipeline: + +``` +Python object → write → storage → peek-schema → register → read → Python object +``` + +These are *integration* tests only. Existing unit tests in `tests/test_extension_types/` (registry, +schema walker, database hooks, built-in logical types, protocols) are not duplicated. + +--- + +## What Is Tested + +### Built-in types: `Path`, `UPath`, `UUID` + +Round-trip through two storage backends (Parquet and Delta — SQLite excluded, see +`test_roundtrips.py` note). Assertions: +- Python object is faithfully reconstructed after read. +- Arrow extension names are in the `orcapod.*` namespace (`orcapod.path`, `orcapod.upath`, + `orcapod.uuid`). + +### Dataclass types + +- **Simple dataclass** (scalar fields only): write → read → verify field values. +- **Two dataclasses with identical struct shape, different class names** (`_PointA` vs `_PointB`): + verify they are stored and recovered as distinct extension types (distinct Arrow extension names). +- **Nested dataclass** (outer contains inner as a field): write → read → verify recursive + reconstruction; assert both inner and outer types are registered after the read. + +### Delta Lake direct read + +Write a dataclass column to Delta Lake. Read back via `pl.read_delta` (Polars native Delta +reader). Assert the column dtype carries the correct extension type. + +### Schema compatibility + +Two sub-areas: + +- **Arrow-level identity**: `converter.python_schema_to_arrow_schema` for `_PointA` and `_PointB` + produces distinct Arrow extension names, even though the underlying struct shapes are identical. +- **Python-type-level compatibility**: `check_schema_compatibility` from `schema_utils` correctly + passes when types match and rejects when the same-shaped-but-different-named types are used. + +### Per-process cache behavior + +- **Cache populated on read**: fresh converter + Parquet file containing a registered dataclass → + after `converter.load_extension_types(...)`, the type is present in the registry. +- **Factory skipped on second read**: patching `factory.reconstruct_from_arrow` confirms it is + called exactly once on first read and zero times on second read (registry hit short-circuits + factory dispatch). + +--- + +## What Is Explicitly Out of Scope + +| Excluded | Reason | Tracked in | +|---|---|---| +| `list[MyDataclass]` round-trip | Known limitation (ET2); requires `ListLogicalType` infrastructure | PLT-1732 | +| Picklable types | `PicklableLogicalTypeFactory` (PLT-1658) not yet implemented | PLT-1658 | +| Pydantic round-trips | Already covered in `test_default_context_factories.py` | — | +| Duplicate unit tests | Existing unit tests in `test_extension_types/` are not repeated | — | + +--- + +## File Organisation + +Three new files, all in `tests/test_extension_types/`: + +``` +tests/test_extension_types/ +├── test_roundtrips.py # Write/read round-trips across backends +├── test_schema_compatibility.py # Arrow-level + Python-type-level compatibility +└── test_cache_behavior.py # Per-process cache: populated / skipped on second read +``` + +--- + +## Backend Parameterisation + +`test_roundtrips.py` parameterises over **two** storage backends via a `_StorageBackend` +dataclass with two callables. SQLite (`ConnectorArrowDatabase` + `SQLiteConnector`) is +excluded because `SQLiteConnector` discards `ARROW:extension:*` field metadata during type +mapping — see `DESIGN_ISSUES.md` CA1 and PLT-1795. + +```python +@dataclasses.dataclass +class _StorageBackend: + name: str + write: Callable[[pa.Table, Path], None] + read: Callable[[Path, UniversalTypeConverter], pa.Table] +``` + +| `name` | `write` | `read` | +|---|---|---| +| `"parquet"` | `pq.write_table(table, path / "data.parquet")` | `converter.load_extension_types(pq.read_table(path / "data.parquet"))` | +| `"delta"` | `deltalake.write_deltalake(str(path / "delta"), table)` | `converter.load_extension_types(DeltaTable(str(path / "delta")).to_pyarrow_dataset(as_large_types=True).to_table())` | + +`as_large_types=True` is required for the Delta backend: without it, Delta Lake normalises +`large_string` → `string` and `large_binary` → `binary`, which causes the extension type +deserializer to reject the storage type mismatch. + +The `read` callable always returns a `pa.Table` containing only the original user data columns. + +A `@pytest.fixture(params=[...])` named `storage_backend` yields one `_StorageBackend` per run. + +--- + +## Module-Level Test Fixtures + +All test dataclasses must be defined at module level — `DataclassLogicalTypeFactory` rejects +local classes because they have no stable FQCN for reconstruction on read. + +```python +# test_roundtrips.py, test_schema_compatibility.py, and test_cache_behavior.py +# Each file defines its own module-level dataclasses — no sharing across files. +@dataclasses.dataclass +class _PointA: + x: int + y: int + +@dataclasses.dataclass +class _PointB: # same shape as _PointA, different class name + x: int + y: int + +@dataclasses.dataclass +class _Inner: + value: int + +@dataclasses.dataclass +class _Outer: + inner: _Inner + label: str +``` + +Each test creates its own converter via `create_registry().get_context().type_converter` (not +`get_default_context()`) to prevent cross-test contamination through the global singleton cache. + +--- + +## Test Descriptions + +### `test_roundtrips.py` + +#### Parameterised over both backends + +**`test_builtin_path_round_trip[backend]`** +Write a `Path` column, read back, assert `pathlib.Path` values are reconstructed and the Arrow +field extension name is `"orcapod.path"`. + +**`test_builtin_upath_round_trip[backend]`** +Same for `UPath` / `"orcapod.upath"`. + +**`test_builtin_uuid_round_trip[backend]`** +Same for `uuid.UUID` / `"orcapod.uuid"`. + +**`test_simple_dataclass_round_trip[backend]`** +Write a `_PointA` column, read back, assert field values match and the Arrow field is an +`pa.ExtensionType` with extension name equal to the FQCN of `_PointA`. + +**`test_nested_dataclass_round_trip[backend]`** +Write an `_Outer` column. Read back. Assert: +- `_Outer` and `_Inner` are both in the registry after read. +- Reconstructed value is an `_Outer` with an `_Inner` field; all values correct. + +#### Delta Lake only + +**`test_delta_polars_read_delta`** +Write a `_PointA` column to Delta via `deltalake.write_deltalake`. Read back via +`pl.read_delta(str(delta_path))`. Assert the resulting Polars DataFrame column has dtype +that is a Polars extension type (i.e. the extension type survived the Delta round-trip). + +### `test_schema_compatibility.py` + +**`test_arrow_schema_distinct_extension_names_for_same_shape`** +Register `_PointA` and `_PointB` with a fresh converter. Assert: +```python +schema_a.field("value").type.extension_name != schema_b.field("value").type.extension_name +``` + +**`test_arrow_schema_same_extension_name_idempotent`** +Register `_PointA` twice. Assert the extension name is the same both times. + +**`test_python_schema_compatibility_passes_same_type`** +`check_schema_compatibility({"value": _PointA}, Schema({"value": _PointA}))` → `True`. + +**`test_python_schema_compatibility_rejects_different_type_same_shape`** +`check_schema_compatibility({"value": _PointA}, Schema({"value": _PointB}))` → `False`. +This is the core guarantee: the extension type system prevents same-shape-different-class +confusion that would have been silently accepted by the old shape-based system. + +### `test_cache_behavior.py` + +**`test_cache_populated_after_first_read`** +1. Write a Parquet file with a `_PointA` column (fresh converter, type registered for write). +2. Create a second fresh converter (type *not* pre-registered). +3. Call `read_converter.load_extension_types(pq.read_table(path))`. +4. Assert `read_converter._logical_type_registry.get_by_arrow_extension_name(fqcn)` is not `None`. + +**`test_factory_not_called_on_second_read`** +1. Write Parquet as above. +2. Fresh converter. Patch `DataclassLogicalTypeFactory.reconstruct_from_arrow` with a spy. +3. First `load_extension_types` call → spy called exactly once. +4. Second `load_extension_types` call on the same file → spy call count unchanged (registry hit). + +--- + +## Key Implementation Notes + +- Use `uv run pytest` (never bare `pytest`) per CLAUDE.md. +- No `POLARS_UNKNOWN_EXTENSION_TYPE_BEHAVIOR` env var needed — tests rely on registration. +- All tests use `tmp_path` (pytest built-in) for temp dirs; no external cluster required. +- SQLite backend uses `SQLiteConnector(str(tmp_path / "db.sqlite"))` — not `:memory:`, because + the `ConnectorArrowDatabase` instance is recreated between write and read to simulate + the separate-process scenario. +- Delta backend requires `deltalake` package (already a project dependency). diff --git a/tests/test_databases/test_connector_arrow_database.py b/tests/test_databases/test_connector_arrow_database.py index d87701b39..71125ef2a 100644 --- a/tests/test_databases/test_connector_arrow_database.py +++ b/tests/test_databases/test_connector_arrow_database.py @@ -18,6 +18,7 @@ 11. Flush behaviour (pending cleared, connector receives data) 12. Config (to_config shape, from_config raises NotImplementedError) 13. at() method and base_path attribute +14. Extension-type write guard """ from __future__ import annotations @@ -783,3 +784,107 @@ def test_at_rejects_null_in_component(self, db): def test_at_rejects_empty_component(self, db): with pytest.raises(ValueError): db.at("") + + +# --------------------------------------------------------------------------- +# 14. Extension-type write guard +# --------------------------------------------------------------------------- + + +class TestExtensionTypeWriteGuard: + """add_records() rejects extension-typed columns. + + SQL connectors do not preserve ``ARROW:extension:*`` field metadata. + Writing extension-typed columns would cause silent type loss on read. + The guard fires at write time so the problem is surfaced immediately + rather than discovered when reading back corrupted data. + + Two representations are tested: + - In-memory ``pa.ExtensionType`` (the type is registered in this process). + - Metadata-only columns (plain storage type + ``ARROW:extension:name`` + field metadata, as produced when reading Parquet from a process that + had the type registered). + """ + + @pytest.fixture + def db(self): + return ConnectorArrowDatabase(MockDBConnector()) + + def test_rejects_in_memory_extension_type_column(self, db): + """add_records raises ValueError when a column carries a pa.ExtensionType.""" + import pyarrow as pa + + # Build a minimal custom extension type for testing. + class _DummyExt(pa.ExtensionType): + def __init__(self): + super().__init__(pa.large_string(), "test.dummy") + + def __arrow_ext_serialize__(self): + return b"" + + @classmethod + def __arrow_ext_deserialize__(cls, storage_type, serialized): + return cls() + + pa.register_extension_type(_DummyExt()) + try: + ext_array = pa.array(["hello"], type=_DummyExt()) + rid_array = pa.array([b"id1"], type=pa.large_binary()) + table = pa.table( + {"__record_id": rid_array, "payload": ext_array}, + ) + with pytest.raises(ValueError, match="extension"): + db.add_records( + ("results",), + table, + record_id_column="__record_id", + ) + finally: + pa.unregister_extension_type("test.dummy") + + def test_rejects_metadata_only_extension_column(self, db): + """add_records raises ValueError when a column has ARROW:extension:name field metadata. + + This is the "unregistered read" representation: the column type is a plain + storage type (e.g. large_string) but the field metadata contains the + ``b"ARROW:extension:name"`` key, as happens when reading a Parquet file that + was written with an extension type that is not registered in the current process. + """ + import pyarrow as pa + + ext_field = pa.field( + "payload", + pa.large_string(), + metadata={ + b"ARROW:extension:name": b"orcapod.path", + b"ARROW:extension:metadata": b"", + }, + ) + rid_field = pa.field("__record_id", pa.large_binary()) + schema = pa.schema([rid_field, ext_field]) + table = pa.table( + { + "__record_id": pa.array([b"id1"], type=pa.large_binary()), + "payload": pa.array(["/tmp/test"], type=pa.large_string()), + }, + schema=schema, + ) + with pytest.raises(ValueError, match="extension"): + db.add_records( + ("results",), + table, + record_id_column="__record_id", + ) + + def test_plain_column_not_rejected(self, db): + """add_records accepts tables with no extension-typed columns.""" + import pyarrow as pa + + table = pa.table( + { + "__record_id": pa.array([b"id1"], type=pa.large_binary()), + "value": pa.array([42], type=pa.int64()), + } + ) + # Should not raise + db.add_records(("results",), table, record_id_column="__record_id") diff --git a/tests/test_extension_types/test_cache_behavior.py b/tests/test_extension_types/test_cache_behavior.py new file mode 100644 index 000000000..efbb77e23 --- /dev/null +++ b/tests/test_extension_types/test_cache_behavior.py @@ -0,0 +1,114 @@ +"""Integration tests for per-process extension type cache behaviour. + +The ``LogicalTypeRegistry`` stores registered types in an in-memory dict keyed +by Arrow extension name. ``register_discovered_extensions`` skips the factory +call (``reconstruct_from_arrow``) when the extension name is already present in +the registry — this is the "cache hit" path. + +Two tests: + +1. ``test_cache_populated_after_first_read`` — verifies the type is absent from + a fresh converter's registry before reading a Parquet file, and present after. + +2. ``test_factory_not_called_on_second_read`` — verifies that ``reconstruct_from_arrow`` + is called exactly once (first read) and zero additional times on the second + read of the same file. +""" +from __future__ import annotations + +import dataclasses +from unittest.mock import patch + +import pyarrow.parquet as pq + +from orcapod.contexts import create_registry +from orcapod.extension_types.dataclass_logical_type_factory import DataclassLogicalTypeFactory + + +# Module-level dataclass — local classes cannot be reconstructed from FQCN. + +@dataclasses.dataclass +class _CachePoint: + x: int + y: int + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def _fresh_converter(): + """Return a fresh UniversalTypeConverter from a new registry instance. + + Uses ``create_registry()`` instead of ``get_default_context()`` to avoid + cross-test contamination through the global singleton cache. + """ + return create_registry().get_context().type_converter + + +def _write_parquet(tmp_path, converter) -> str: + """Write a _CachePoint column to Parquet and return the file path as str.""" + converter.register_python_class(_CachePoint) + arrow_schema = converter.python_schema_to_arrow_schema({"point": _CachePoint}) + rows = [{"point": _CachePoint(x=1, y=2)}] + table = converter.python_dicts_to_arrow_table(rows, arrow_schema=arrow_schema) + parquet_path = tmp_path / "cache_test.parquet" + pq.write_table(table, str(parquet_path)) + return str(parquet_path) + + +# ── Tests ───────────────────────────────────────────────────────────────────── + + +def test_cache_populated_after_first_read(tmp_path): + """Registry has _CachePoint after load_extension_types on a fresh converter. + + Before reading: the fresh converter's registry does not know about _CachePoint. + After reading: register_discovered_extensions triggers reconstruct_from_arrow + which registers _CachePoint, populating the cache. + """ + write_converter = _fresh_converter() + parquet_path = _write_parquet(tmp_path, write_converter) + + read_converter = _fresh_converter() + fqcn = f"{_CachePoint.__module__}.{_CachePoint.__qualname__}" + + # Before read: not registered + assert read_converter._logical_type_registry.get_by_arrow_extension_name(fqcn) is None + + read_converter.load_extension_types(pq.read_table(parquet_path)) + + # After read: registered (cache populated) + assert read_converter._logical_type_registry.get_by_arrow_extension_name(fqcn) is not None + + +def test_factory_not_called_on_second_read(tmp_path): + """reconstruct_from_arrow called once on first read, zero times on second read. + + On first read, register_discovered_extensions finds _CachePoint's extension + name in the schema, dispatches to the factory (call count = 1), and stores + the result in the registry. + + On second read, register_discovered_extensions finds the extension name already + in the registry and short-circuits — the factory is not called again + (call count remains 1). + """ + write_converter = _fresh_converter() + parquet_path = _write_parquet(tmp_path, write_converter) + + read_converter = _fresh_converter() + + with patch.object( + DataclassLogicalTypeFactory, + "reconstruct_from_arrow", + autospec=True, + wraps=DataclassLogicalTypeFactory.reconstruct_from_arrow, + ) as spy: + # First read: factory is called once + read_converter.load_extension_types(pq.read_table(parquet_path)) + assert spy.call_count == 1, f"Expected 1 factory call, got {spy.call_count}" + + # Second read on the same file: registry hit — factory not called again + read_converter.load_extension_types(pq.read_table(parquet_path)) + assert spy.call_count == 1, ( + f"Expected still 1 factory call after second read, got {spy.call_count}" + ) diff --git a/tests/test_extension_types/test_roundtrips.py b/tests/test_extension_types/test_roundtrips.py new file mode 100644 index 000000000..afac59dc9 --- /dev/null +++ b/tests/test_extension_types/test_roundtrips.py @@ -0,0 +1,376 @@ +"""End-to-end integration tests for extension type round-trips. + +Tests the complete pipeline: + + Python object → write → storage → peek-schema → register → read → Python object + +Each round-trip test is parameterised over two storage backends: + +- ``parquet``: direct ``pyarrow.parquet`` write/read. +- ``delta``: ``deltalake.write_deltalake`` / ``DeltaTable.to_pyarrow_dataset(as_large_types=True).to_table()``. + +SQLite (``ConnectorArrowDatabase`` + ``SQLiteConnector``) is excluded because +``SQLiteConnector`` maps Arrow types to SQL column types and discards +``ARROW:extension:*`` field metadata. Without that metadata, the +peek-register-read pattern cannot auto-register extension types on the read +path. The ``ExtensionAwareDatabase`` wrapper behaviour over SQLite is already +tested in ``tests/test_databases/test_extension_aware_database.py``. +""" +from __future__ import annotations + +import dataclasses +import pathlib +import uuid as uuid_module +from pathlib import Path +from typing import Callable + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest +from upath import UPath + +from orcapod.contexts import create_registry +from orcapod.semantic_types.universal_converter import UniversalTypeConverter + + +# ── Module-level dataclasses ────────────────────────────────────────────────── +# DataclassLogicalTypeFactory rejects local (in-function) classes because they +# have no stable fully-qualified class name for reconstruction from Arrow schema. + +@dataclasses.dataclass +class _PointA: + x: int + y: int + + +@dataclasses.dataclass +class _PointB: + """Same struct shape as _PointA, different class name.""" + x: int + y: int + + +@dataclasses.dataclass +class _Inner: + value: int + + +@dataclasses.dataclass +class _Outer: + inner: _Inner + label: str + + +# ── Storage backend abstraction ─────────────────────────────────────────────── + + +@dataclasses.dataclass +class _StorageBackend: + """Encapsulates backend-specific write and read logic for parameterised tests. + + Args: + name: Short identifier used in pytest test IDs (e.g. ``"parquet"``). + write: Callable that writes an Arrow table to a directory. + read: Callable that reads from that directory and returns an Arrow table + with extension types registered and applied. Must return only the + original user data columns (no ``__record_id`` or similar). + """ + name: str + write: Callable[[pa.Table, Path], None] + read: Callable[[Path, UniversalTypeConverter], pa.Table] + + +def _parquet_write(table: pa.Table, base_path: Path) -> None: + pq.write_table(table, str(base_path / "data.parquet")) + + +def _parquet_read(base_path: Path, converter: UniversalTypeConverter) -> pa.Table: + return converter.load_extension_types(pq.read_table(str(base_path / "data.parquet"))) + + +def _delta_write(table: pa.Table, base_path: Path) -> None: + import deltalake + deltalake.write_deltalake(str(base_path / "delta"), table) + + +def _delta_read(base_path: Path, converter: UniversalTypeConverter) -> pa.Table: + import deltalake + dt = deltalake.DeltaTable(str(base_path / "delta")) + # as_large_types=True preserves large_string / large_binary rather than + # normalising them to string / binary (Delta Lake's default behaviour). + # Without this flag, extension types that use large_string or large_binary + # as storage fail to deserialise because the _deserialize method strictly + # checks that the storage type matches the registered one. + raw = dt.to_pyarrow_dataset(as_large_types=True).to_table() + return converter.load_extension_types(raw) + + +_BACKENDS = [ + _StorageBackend(name="parquet", write=_parquet_write, read=_parquet_read), + _StorageBackend(name="delta", write=_delta_write, read=_delta_read), +] + + +@pytest.fixture(params=_BACKENDS, ids=lambda b: b.name) +def storage_backend(request: pytest.FixtureRequest) -> _StorageBackend: + """Yield one storage backend per parametrised run.""" + return request.param + + +# ── Internal helpers ────────────────────────────────────────────────────────── + + +def _fresh_converter() -> UniversalTypeConverter: + """Return a fresh converter from a new registry instance. + + Uses ``create_registry()`` instead of ``get_default_context()`` to avoid + cross-test contamination through the global singleton cache. + """ + return create_registry().get_context().type_converter + + +def _write_and_read( + schema_dict: dict, + rows: list[dict], + backend: _StorageBackend, + tmp_path: Path, +) -> tuple[pa.Table, UniversalTypeConverter]: + """Write rows with a fresh write converter and read back with a fresh read converter. + + Returns the resulting Arrow table (with extension types applied) and the + read-side converter (needed for ``arrow_table_to_python_dicts``). + """ + write_converter = _fresh_converter() + # Pre-register each type so the converter can map it to an Arrow extension + # type before python_schema_to_arrow_schema inspects it. Built-in types + # (Path, UPath, UUID) are already registered in the context; dataclass types + # are auto-discovered on the first register_python_class call. + for python_type in schema_dict.values(): + write_converter.register_python_class(python_type) + arrow_schema = write_converter.python_schema_to_arrow_schema(schema_dict) + table = write_converter.python_dicts_to_arrow_table(rows, arrow_schema=arrow_schema) + backend.write(table, tmp_path) + + read_converter = _fresh_converter() + result = backend.read(tmp_path, read_converter) + return result, read_converter + + +# ── Built-in type round-trip tests ─────────────────────────────────────────── + + +def test_builtin_path_round_trip(storage_backend: _StorageBackend, tmp_path: Path) -> None: + """pathlib.Path round-trips through storage with extension name ``orcapod.path``. + + Built-in types (Path, UPath, UUID) are pre-registered in the default context + so the read-side converter already knows about them. The test verifies that: + + 1. The Arrow field carries the ``orcapod.path`` extension type after read. + 2. The Python value is reconstructed as a ``pathlib.Path`` instance. + """ + p = pathlib.Path("/tmp/orcapod/integration/test.txt") + result, read_converter = _write_and_read( + {"col": pathlib.Path}, + [{"col": p}], + storage_backend, + tmp_path, + ) + + field = result.schema.field("col") + assert hasattr(field.type, "extension_name"), ( + f"Expected extension type on field 'col', got plain type {field.type!r}" + ) + assert field.type.extension_name == "orcapod.path" + + rows = read_converter.arrow_table_to_python_dicts(result) + assert len(rows) == 1 + assert isinstance(rows[0]["col"], pathlib.Path) + assert rows[0]["col"] == p + + +def test_builtin_upath_round_trip(storage_backend: _StorageBackend, tmp_path: Path) -> None: + """UPath round-trips through storage with extension name ``orcapod.upath``.""" + u = UPath("s3://my-bucket/data/file.parquet") + result, read_converter = _write_and_read( + {"col": UPath}, + [{"col": u}], + storage_backend, + tmp_path, + ) + + field = result.schema.field("col") + assert hasattr(field.type, "extension_name"), ( + f"Expected extension type on field 'col', got plain type {field.type!r}" + ) + assert field.type.extension_name == "orcapod.upath" + + rows = read_converter.arrow_table_to_python_dicts(result) + assert len(rows) == 1 + assert isinstance(rows[0]["col"], UPath) + assert str(rows[0]["col"]) == str(u) + + +def test_builtin_uuid_round_trip(storage_backend: _StorageBackend, tmp_path: Path) -> None: + """uuid.UUID round-trips through storage with extension name ``orcapod.uuid``.""" + u = uuid_module.UUID("12345678-1234-5678-1234-567812345678") + result, read_converter = _write_and_read( + {"col": uuid_module.UUID}, + [{"col": u}], + storage_backend, + tmp_path, + ) + + field = result.schema.field("col") + assert hasattr(field.type, "extension_name"), ( + f"Expected extension type on field 'col', got plain type {field.type!r}" + ) + assert field.type.extension_name == "orcapod.uuid" + + rows = read_converter.arrow_table_to_python_dicts(result) + assert len(rows) == 1 + assert isinstance(rows[0]["col"], uuid_module.UUID) + assert rows[0]["col"] == u + + +# ── Dataclass round-trip tests ──────────────────────────────────────────────── + + +def test_simple_dataclass_round_trip(storage_backend: _StorageBackend, tmp_path: Path) -> None: + """Simple dataclass round-trips with correct FQCN as the Arrow extension name. + + The read-side converter starts with no knowledge of _PointA. After read, + register_discovered_extensions triggers DataclassLogicalTypeFactory which + imports _PointA from its fully-qualified class name and registers it. + """ + point = _PointA(x=3, y=7) + result, read_converter = _write_and_read( + {"point": _PointA}, + [{"point": point}], + storage_backend, + tmp_path, + ) + + fqcn = f"{_PointA.__module__}.{_PointA.__qualname__}" + field = result.schema.field("point") + assert hasattr(field.type, "extension_name"), ( + f"Expected extension type on field 'point', got {field.type!r}" + ) + assert field.type.extension_name == fqcn + + rows = read_converter.arrow_table_to_python_dicts(result) + assert len(rows) == 1 + reconstructed = rows[0]["point"] + assert isinstance(reconstructed, _PointA) + assert reconstructed.x == 3 + assert reconstructed.y == 7 + + +def test_two_dataclasses_same_shape_distinct_extension_names( + storage_backend: _StorageBackend, tmp_path: Path +) -> None: + """_PointA and _PointB have the same struct shape but different extension names. + + Writing _PointA and reading it back must NOT reconstruct a _PointB, even + though their on-disk struct shapes (x: int, y: int) are identical. The + extension name (FQCN) is the sole identity signal. + """ + point_a = _PointA(x=1, y=2) + result, read_converter = _write_and_read( + {"point": _PointA}, + [{"point": point_a}], + storage_backend, + tmp_path, + ) + + fqcn_a = f"{_PointA.__module__}.{_PointA.__qualname__}" + fqcn_b = f"{_PointB.__module__}.{_PointB.__qualname__}" + + field = result.schema.field("point") + assert hasattr(field.type, "extension_name") + assert field.type.extension_name == fqcn_a + assert field.type.extension_name != fqcn_b # distinct from _PointB + + rows = read_converter.arrow_table_to_python_dicts(result) + reconstructed = rows[0]["point"] + assert isinstance(reconstructed, _PointA) + assert not isinstance(reconstructed, _PointB) + + +def test_nested_dataclass_round_trip(storage_backend: _StorageBackend, tmp_path: Path) -> None: + """Nested dataclass: _Outer and _Inner both registered; full object reconstructed. + + register_discovered_extensions triggers DataclassLogicalTypeFactory for _Outer. + That factory's reconstruct_from_arrow calls converter.register_python_class(_Inner) + as a side-effect, so _Inner is also registered without an explicit peek step. + """ + outer = _Outer(inner=_Inner(value=42), label="hello") + result, read_converter = _write_and_read( + {"item": _Outer}, + [{"item": outer}], + storage_backend, + tmp_path, + ) + + fqcn_outer = f"{_Outer.__module__}.{_Outer.__qualname__}" + fqcn_inner = f"{_Inner.__module__}.{_Inner.__qualname__}" + + assert read_converter._logical_type_registry.get_by_arrow_extension_name(fqcn_outer) is not None, ( + "_Outer should be registered after read" + ) + assert read_converter._logical_type_registry.get_by_arrow_extension_name(fqcn_inner) is not None, ( + "_Inner should be registered transitively after read" + ) + + rows = read_converter.arrow_table_to_python_dicts(result) + assert len(rows) == 1 + reconstructed = rows[0]["item"] + assert isinstance(reconstructed, _Outer) + assert isinstance(reconstructed.inner, _Inner) + assert reconstructed.inner.value == 42 + assert reconstructed.label == "hello" + + +# ── Delta Lake: Polars native read ─────────────────────────────────────────── + + +def test_delta_polars_read_delta(tmp_path: Path) -> None: + """Write a dataclass column to Delta; read back via pl.read_delta; extension type survives. + + The write-side converter registers _PointA in both PyArrow's and Polars' + global registries (``register_python_class`` calls ``make_polars_extension_type`` + which registers with Polars). ``pl.read_delta`` can therefore decode the column + as the correct Polars extension type, not a plain ``Struct``. + + Note: ``pl.DataFrame.to_arrow()`` exports Polars extension types as PyArrow + extension arrays but with empty serialized bytes (Polars does not forward + ``__arrow_ext_metadata__`` through its Arrow export). Python-object + reconstruction via the Polars-to-Arrow path is therefore not possible; that + path is tested by the separate ``parquet`` / ``delta`` parametrised tests + which read underlying Parquet files directly. + """ + import deltalake + import polars as pl + + delta_path = str(tmp_path / "polars_delta") + fqcn = f"{_PointA.__module__}.{_PointA.__qualname__}" + + # Write — registers _PointA in PyArrow + Polars global registries. + write_converter = _fresh_converter() + write_converter.register_python_class(_PointA) + arrow_schema = write_converter.python_schema_to_arrow_schema({"point": _PointA}) + rows = [{"point": _PointA(x=5, y=9)}] + table = write_converter.python_dicts_to_arrow_table(rows, arrow_schema=arrow_schema) + deltalake.write_deltalake(delta_path, table) + + # Read via Polars native Delta reader. + # _PointA is already in the Polars global registry from the write step above. + df = pl.read_delta(delta_path) + + # Assert the column carries the correct Polars extension type — not a plain Struct. + col_dtype = df.dtypes[0] + assert col_dtype.is_extension(), ( + f"Expected a Polars extension type on column 'point', got {col_dtype!r}" + ) + assert col_dtype.ext_name() == fqcn, ( + f"Expected extension name {fqcn!r}, got {col_dtype.ext_name()!r}" + ) diff --git a/tests/test_extension_types/test_schema_compatibility.py b/tests/test_extension_types/test_schema_compatibility.py new file mode 100644 index 000000000..f15d190de --- /dev/null +++ b/tests/test_extension_types/test_schema_compatibility.py @@ -0,0 +1,106 @@ +"""Integration tests for extension-type-backed schema compatibility. + +Two complementary angles: + +Arrow-level identity + ``converter.python_schema_to_arrow_schema`` assigns each dataclass a unique + Arrow extension name derived from its fully-qualified class name. Two + dataclasses with identical struct shapes but different class names therefore + produce *different* extension names — the core identity guarantee of the + extension type system. + +Python-type-level compatibility + ``check_schema_compatibility`` from ``schema_utils`` uses beartype + ``is_subhint`` to compare Python type annotations. Same class → compatible; + different class with the same struct shape → incompatible. This is the + property that prevents silent data corruption when two unrelated dataclasses + happen to share the same fields. +""" +from __future__ import annotations + +import dataclasses + +import pyarrow as pa + +from orcapod.contexts import create_registry +from orcapod.types import Schema +from orcapod.utils.schema_utils import check_schema_compatibility + + +# Module-level dataclasses — DataclassLogicalTypeFactory rejects local classes +# because they have no stable fully-qualified class name for reconstruction. + +@dataclasses.dataclass +class _PointA: + x: int + y: int + + +@dataclasses.dataclass +class _PointB: + """Same struct shape as _PointA but a different class name.""" + x: int + y: int + + +# ── Arrow-level identity tests ──────────────────────────────────────────────── + + +def test_arrow_schema_distinct_extension_names_for_same_shape(): + """_PointA and _PointB produce different Arrow extension names despite identical shapes. + + This is the core identity guarantee: struct shape alone does not determine + type identity in the extension type system. + """ + converter_a = create_registry().get_context().type_converter + converter_b = create_registry().get_context().type_converter + + type_a = converter_a.register_python_class(_PointA) + type_b = converter_b.register_python_class(_PointB) + + assert isinstance(type_a, pa.ExtensionType) + assert isinstance(type_b, pa.ExtensionType) + + fqcn_a = f"{_PointA.__module__}.{_PointA.__qualname__}" + fqcn_b = f"{_PointB.__module__}.{_PointB.__qualname__}" + assert type_a.extension_name == fqcn_a + assert type_b.extension_name == fqcn_b + assert type_a.extension_name != type_b.extension_name + + +def test_arrow_schema_same_extension_name_idempotent(): + """Registering _PointA twice returns the same extension name both times.""" + converter = create_registry().get_context().type_converter + + type_first = converter.register_python_class(_PointA) + type_second = converter.register_python_class(_PointA) + + assert isinstance(type_first, pa.ExtensionType) + assert isinstance(type_second, pa.ExtensionType) + assert type_first.extension_name == type_second.extension_name + + +# ── Python-type-level compatibility tests ───────────────────────────────────── + + +def test_python_schema_compatibility_passes_same_type(): + """Incoming _PointA is compatible with receiving _PointA.""" + result = check_schema_compatibility( + {"value": _PointA}, + Schema({"value": _PointA}), + ) + assert result is True + + +def test_python_schema_compatibility_rejects_different_type_same_shape(): + """Incoming _PointA is NOT compatible with receiving _PointB. + + Both dataclasses share the same struct shape {x: int, y: int}, but they + are different Python types. The old shape-based system would have accepted + this silently; the extension type system correctly rejects it. + """ + result = check_schema_compatibility( + {"value": _PointA}, + Schema({"value": _PointB}), + ) + assert result is False