Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion hamilton/plugins/polars_lazyframe_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

from hamilton import registry
from hamilton.io import utils
from hamilton.io.data_adapters import DataLoader
from hamilton.io.data_adapters import DataLoader, DataSaver

DATAFRAME_TYPE = pl.LazyFrame
COLUMN_TYPE = pl.Expr
Expand Down Expand Up @@ -288,13 +288,96 @@ def load_data(self, type_: type) -> tuple[DATAFRAME_TYPE, dict[str, Any]]:
def name(cls) -> str:
return "feather"

@dataclasses.dataclass
class PolarsLazyFrameSinkParquet(DataSaver):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name this PolarsSinkParquetWriter?

"""Class to handle sinking a Polars LazyFrame to a Parquet file.
Should map to https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.sink_parquet.html
"""
path: str | Path

@classmethod
def applicable_types(cls) -> Collection[type]:
return [DATAFRAME_TYPE]

def save_data(self, data: pl.LazyFrame) -> dict[str, Any]:
data.sink_parquet(self.path)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the ability to add the lazyframe.sink_parquet kwargs? https://docs.pola.rs/api/python/stable/reference/api/polars.LazyFrame.sink_parquet.html

return utils.get_file_metadata(self.path)

@classmethod
def name(cls) -> str:
return "parquet"


@dataclasses.dataclass
class PolarsLazyFrameSinkCSV(DataSaver):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same rename: PolarsSinkCSVWriter

"""Class to handle sinking a Polars LazyFrame to a CSV file.
Should map to https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.sink_csv.html
"""
path: str | Path

@classmethod
def applicable_types(cls) -> Collection[type]:
return [DATAFRAME_TYPE]

def save_data(self, data: pl.LazyFrame) -> dict[str, Any]:
data.sink_csv(self.path)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return utils.get_file_metadata(self.path)

@classmethod
def name(cls) -> str:
return "csv"


@dataclasses.dataclass
class PolarsLazyFrameSinkIPC(DataSaver):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same rename style: PolarsSinkFeatherWriter

"""Class to handle sinking a Polars LazyFrame to an IPC/Feather file.
Should map to https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.sink_ipc.html
"""
path: str | Path

@classmethod
def applicable_types(cls) -> Collection[type]:
return [DATAFRAME_TYPE]

def save_data(self, data: pl.LazyFrame) -> dict[str, Any]:
data.sink_ipc(self.path)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return utils.get_file_metadata(self.path)

@classmethod
def name(cls) -> str:
return "ipc"


@dataclasses.dataclass
class PolarsLazyFrameSinkNDJSON(DataSaver):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same rename: PolarsSinkNDJSONWriter

"""Class to handle sinking a Polars LazyFrame to an NDJSON file.
Should map to https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.sink_ndjson.html
"""
path: str | Path

@classmethod
def applicable_types(cls) -> Collection[type]:
return [DATAFRAME_TYPE]

def save_data(self, data: pl.LazyFrame) -> dict[str, Any]:
data.sink_ndjson(self.path)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return utils.get_file_metadata(self.path)

@classmethod
def name(cls) -> str:
return "ndjson"


def register_data_loaders():
"""Function to register the data loaders for this extension."""
for loader in [
PolarsScanCSVReader,
PolarsScanParquetReader,
PolarsScanFeatherReader,
PolarsLazyFrameSinkParquet,
PolarsLazyFrameSinkCSV,
PolarsLazyFrameSinkIPC,
PolarsLazyFrameSinkNDJSON,
]:
registry.register_adapter(loader)

Expand Down
47 changes: 47 additions & 0 deletions tests/plugins/test_polars_lazyframe_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
from polars.testing import assert_frame_equal
from sqlalchemy import create_engine


from hamilton.plugins.polars_lazyframe_extensions import (
PolarsScanCSVReader,
PolarsScanFeatherReader,
PolarsScanParquetReader,
PolarsLazyFrameSinkParquet,
PolarsLazyFrameSinkCSV,
PolarsLazyFrameSinkIPC,
PolarsLazyFrameSinkNDJSON,
)
from hamilton.plugins.polars_post_1_0_0_extensions import (
PolarsAvroReader,
Expand Down Expand Up @@ -211,3 +216,45 @@ def test_polars_spreadsheet(df: pl.LazyFrame, tmp_path: pathlib.Path) -> None:
assert write_kwargs["include_header"] is True
assert "raise_if_empty" in read_kwargs
assert read_kwargs["raise_if_empty"] is True

#####

def test_polars_lazyframe_sink_parquet(df: pl.LazyFrame, tmp_path: pathlib.Path) -> None:
file = tmp_path / "test.parquet"
sink = PolarsLazyFrameSinkParquet(path=file)
metadata = sink.save_data(df)
df2 = pl.read_parquet(file)
assert PolarsLazyFrameSinkParquet.applicable_types() == [pl.LazyFrame]
assert file.exists()
assert_frame_equal(df.collect(), df2)


def test_polars_lazyframe_sink_csv(df: pl.LazyFrame, tmp_path: pathlib.Path) -> None:
file = tmp_path / "test.csv"
sink = PolarsLazyFrameSinkCSV(path=file)
metadata = sink.save_data(df)
df2 = pl.read_csv(file)
assert PolarsLazyFrameSinkCSV.applicable_types() == [pl.LazyFrame]
assert file.exists()
assert_frame_equal(df.collect(), df2)


def test_polars_lazyframe_sink_ipc(df: pl.LazyFrame, tmp_path: pathlib.Path) -> None:
file = tmp_path / "test.ipc"
sink = PolarsLazyFrameSinkIPC(path=file)
metadata = sink.save_data(df)
df2 = pl.read_ipc(file)
assert PolarsLazyFrameSinkIPC.applicable_types() == [pl.LazyFrame]
assert file.exists()
assert_frame_equal(df.collect(), df2)


def test_polars_lazyframe_sink_ndjson(df: pl.LazyFrame, tmp_path: pathlib.Path) -> None:
file = tmp_path / "test.ndjson"
sink = PolarsLazyFrameSinkNDJSON(path=file)
metadata = sink.save_data(df)
df2 = pl.read_ndjson(file)
assert PolarsLazyFrameSinkNDJSON.applicable_types() == [pl.LazyFrame]
assert file.exists()
assert_frame_equal(df.collect(), df2)