Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .zed/rules
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ Remove any optional sections that don't apply rather than leaving them empty.
When working on a feature, create and checkout a git branch using the gitBranchName
returned by the primary Linear issue (e.g. eywalker/plt-911-add-documentation-for-orcapod-python).

Feature branch PRs always target the "dev" branch. The dev → main PR is used
for versioning/releases only.
Feature branch PRs always target "main". Create a feature branch from "main" and open PRs against "main".

If a feature branch / PR corresponds to multiple Linear issues, list all of them in the
PR description body so that Linear's GitHub integration auto-tracks the PR against each
Expand Down
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Remove any optional sections that don't apply rather than leaving them empty.
When working on a feature, create and checkout a git branch using the `gitBranchName`
returned by the primary Linear issue (e.g. `eywalker/plt-911-add-documentation-for-orcapod-python`).

**Feature branch PRs always target the `extension-type-system` branch.** The `extension-type-system``dev` `main` PRs are used for integration and releases.
**Feature branch PRs always target `main`.** Create a feature branch from `main` and open PRs against `main`.

If a feature branch / PR corresponds to multiple Linear issues, list all of them in the
PR description body so that Linear's GitHub integration auto-tracks the PR against each
Expand Down
21 changes: 21 additions & 0 deletions DESIGN_ISSUES.md
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,27 @@ Should be removed in the next breaking release. Consider adding deprecation warn

---

### H5 — `_process_table_columns` materializes extension columns to Python even with no handler
**Status:** open
**Severity:** medium
**Issue:** ITL-433

`StarfixArrowHasher._process_table_columns` calls `to_pylist()` on every extension-typed
column before running `SemanticHashingVisitor`. For columns whose Python type has no registered
semantic handler (pydantic models, dataclasses, any unhandled extension type), the visitor
returns the value unchanged and the data is immediately re-serialized back to Arrow. The Python
roundtrip serves no purpose in this case and is O(rows) deserialization work wasted.

The fix is to short-circuit at the column level: before calling `to_pylist()`, check whether
`type_handler_registry.has_handler(python_type)` would be True for this column's extension
type. If not, call `normalize_extension_columns()` directly on the Arrow column (uses
`ExtensionArray.storage`, no Python materialization) and skip the visitor loop entirely.
Columns with a registered handler (e.g. `Path`) continue through the existing path unchanged.

The `normalize_extension_columns` utility landed in ITL-432.

---

## `src/orcapod/utils/`

### U1 — Source-info column type hard-coded to `large_string`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def __init__(
# top-level extension type from each field's Arrow type before inserting it into the
# struct. On the read path, ``reconstruct_from_arrow`` receives a ``storage_type``
# already guaranteed storage-safe by ``register_storage_type``.
self._polars_ext_class = make_polars_extension_type(logical_name, storage_type)
self._polars_ext_class = make_polars_extension_type(
logical_name,
storage_type,
metadata=json.dumps({"category": DATACLASS_CATEGORY}),
)
self._polars_ext: pl.BaseExtension | None = None

@property
Expand Down
6 changes: 5 additions & 1 deletion src/orcapod/extension_types/pydantic_logical_type_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def __init__(
# top-level extension type from each field's Arrow type before inserting it into the
# struct. On the read path, ``reconstruct_from_arrow`` receives a ``storage_type``
# already guaranteed storage-safe by ``register_storage_type``.
self._polars_ext_class = make_polars_extension_type(logical_name, storage_type)
self._polars_ext_class = make_polars_extension_type(
logical_name,
storage_type,
metadata=json.dumps({"category": PYDANTIC_CATEGORY}),
)
self._polars_ext: pl.BaseExtension | None = None

@property
Expand Down
23 changes: 20 additions & 3 deletions src/orcapod/hashing/arrow_hashers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from orcapod.hashing.schema_cleaner import clean_schema_for_hashing, has_extension_metadata
from orcapod.hashing.visitors import SemanticHashingVisitor
from orcapod.utils.arrow_utils import normalize_extension_columns
from orcapod.types import ContentHash

if TYPE_CHECKING:
Expand Down Expand Up @@ -57,8 +58,17 @@ def hasher_id(self) -> str:
return self._hasher_id

def _process_table_columns(self, table: "pa.Table | pa.RecordBatch") -> "pa.Table":
"""Replace semantic-typed columns with their content-hash bytes."""
new_columns: list[pa.Array] = []
"""Replace semantic-typed columns with content-hash bytes; normalize extension columns.

For columns whose Python type has a registered semantic handler (e.g. ``Path``),
the extension-typed column is replaced by a ``pa.large_binary()`` column of
content-hash tokens. For all other extension-typed columns (visitor passthrough),
the column is normalized to IPC storage representation via
``normalize_extension_columns`` — storage type for the data, extension identity
in field metadata — so that ``ArrowDigester`` can hash them without encountering
a live ``pa.ExtensionType``, which is unhashable.
"""
new_columns: list[pa.Array | pa.ChunkedArray] = []
new_fields: list[pa.Field] = []

for i, field in enumerate(table.schema):
Expand Down Expand Up @@ -91,6 +101,7 @@ def _process_table_columns(self, table: "pa.Table | pa.RecordBatch") -> "pa.Tabl

if new_type is None:
new_type = field.type

new_columns.append(pa.array(processed_data, type=new_type))
new_fields.append(field.with_type(new_type))

Expand All @@ -99,10 +110,16 @@ def _process_table_columns(self, table: "pa.Table | pa.RecordBatch") -> "pa.Tabl
f"Failed to process column '{field.name}': {exc}"
) from exc

return pa.table(
intermediate = pa.table(
new_columns,
schema=pa.schema(new_fields, metadata=table.schema.metadata),
)
# Normalize any remaining extension-typed columns to their IPC storage
# representation (storage type + ARROW:extension:* field metadata).
# This handles the visitor passthrough case — extension types with no
# registered semantic handler — so that ArrowDigester never receives a
# live pa.ExtensionType, which is unhashable and would crash starfix.
return normalize_extension_columns(intermediate)

def hash_schema(self, schema: "pa.Schema") -> ContentHash:
"""Hash an Arrow schema using the starfix canonical algorithm."""
Expand Down
76 changes: 76 additions & 0 deletions src/orcapod/utils/arrow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,82 @@ def normalize_view_types(arrow_type: "pa.DataType") -> "pa.DataType":
return arrow_type


def normalize_extension_columns(table: "pa.Table") -> "pa.Table":

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add thorough unit tests for normalize_extension_columns

"""Return a copy of ``table`` with all extension-typed columns converted to
their IPC/Parquet storage representation.

For each top-level column whose type is a ``pa.ExtensionType``, the column
data is replaced with the underlying storage array (via
``ExtensionArray.storage`` — no Python-level materialization) and the field
gains ``ARROW:extension:name`` and ``ARROW:extension:metadata`` keys in its
metadata, exactly matching the on-disk Arrow IPC/Parquet format.

Non-extension columns are returned unchanged. Schema-level metadata and
existing per-field metadata are preserved; the two ``ARROW:extension:*``
keys are merged in (or added) without touching any other metadata already
on the field.

This is a fast path: for tables with no extension columns the original
table object is returned immediately. For tables that do have extension
columns a new table is constructed; chunking is preserved and the column
data itself is not copied — each chunk's ``ExtensionArray.storage``
property returns a zero-copy view of the underlying buffers.

Note: only **top-level** extension columns are handled. Extension types
nested inside struct fields or list element types are not supported by the
orcapod type system (see ET1 in DESIGN_ISSUES.md) and are left unchanged.

Args:
table: Input Arrow table, may contain extension-typed columns.

Returns:
A ``pa.Table`` where every top-level extension-typed column has been
replaced by its storage-typed equivalent with extension identity
preserved in field metadata.
"""
if not any(isinstance(field.type, pa.ExtensionType) for field in table.schema):
return table

new_columns: list[pa.ChunkedArray] = []
new_fields: list[pa.Field] = []
for i, field in enumerate(table.schema):
if isinstance(field.type, pa.ExtensionType):
ext_type = field.type
# Preserve chunking: convert each ExtensionArray chunk to its
# .storage chunk (zero-copy view of the underlying buffers) and
# rebuild a ChunkedArray. Calling combine_chunks() first would
# allocate new buffers for multi-chunk columns, defeating the
# zero-copy guarantee.
col = table.column(i)
storage_arr = pa.chunked_array(
[chunk.storage for chunk in col.chunks],
type=ext_type.storage_type,
)
serialized = ext_type.__arrow_ext_serialize__()
# Merge extension identity into existing field metadata (if any)
# so that non-extension keys already on the field are preserved.
existing_meta = dict(field.metadata) if field.metadata else {}
existing_meta[b"ARROW:extension:name"] = (
ext_type.extension_name.encode("utf-8")
)
existing_meta[b"ARROW:extension:metadata"] = serialized
new_fields.append(pa.field(
field.name,
ext_type.storage_type,
nullable=field.nullable,
metadata=existing_meta,
))
new_columns.append(storage_arr)
else:
new_columns.append(table.column(i))
new_fields.append(field)

return pa.table(
new_columns,
schema=pa.schema(new_fields, metadata=table.schema.metadata),
)


def normalize_table_view_types(table: "pa.Table") -> "pa.Table":
"""Cast a table's view-typed columns to their large variants.

Expand Down
Loading
Loading