From d8e3e286eaf5f9c347d4688634466f569f12cd7b Mon Sep 17 00:00:00 2001 From: sonalishintre <42985737+sonalishintre@users.noreply.github.com> Date: Mon, 22 Jun 2026 08:51:16 +0530 Subject: [PATCH 1/2] feat: add LazyFrame sink classes for parquet, csv, ipc, ndjson --- .../plugins/polars_lazyframe_extensions.py | 85 ++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/hamilton/plugins/polars_lazyframe_extensions.py b/hamilton/plugins/polars_lazyframe_extensions.py index b06f2db94..a300195a7 100644 --- a/hamilton/plugins/polars_lazyframe_extensions.py +++ b/hamilton/plugins/polars_lazyframe_extensions.py @@ -47,7 +47,7 @@ from hamilton import registry from hamilton.io import utils -from hamilton.io.data_adapters import DataLoader +from hamilton.io.data_adapters import DataLoader, DataSaver DATAFRAME_TYPE = pl.LazyFrame COLUMN_TYPE = pl.Expr @@ -288,6 +288,85 @@ def load_data(self, type_: type) -> tuple[DATAFRAME_TYPE, dict[str, Any]]: def name(cls) -> str: return "feather" +@dataclasses.dataclass +class PolarsLazyFrameSinkParquet(DataSaver): + """Class to handle sinking a Polars LazyFrame to a Parquet file. + Should map to https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.sink_parquet.html + """ + path: str | Path + + @classmethod + def applicable_types(cls) -> Collection[type]: + return [DATAFRAME_TYPE] + + def save_data(self, data: pl.LazyFrame) -> dict[str, Any]: + data.sink_parquet(self.path) + return utils.get_file_metadata(self.path) + + @classmethod + def name(cls) -> str: + return "parquet" + + +@dataclasses.dataclass +class PolarsLazyFrameSinkCSV(DataSaver): + """Class to handle sinking a Polars LazyFrame to a CSV file. + Should map to https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.sink_csv.html + """ + path: str | Path + + @classmethod + def applicable_types(cls) -> Collection[type]: + return [DATAFRAME_TYPE] + + def save_data(self, data: pl.LazyFrame) -> dict[str, Any]: + data.sink_csv(self.path) + return utils.get_file_metadata(self.path) + + @classmethod + def name(cls) -> str: + return "csv" + + +@dataclasses.dataclass +class PolarsLazyFrameSinkIPC(DataSaver): + """Class to handle sinking a Polars LazyFrame to an IPC/Feather file. + Should map to https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.sink_ipc.html + """ + path: str | Path + + @classmethod + def applicable_types(cls) -> Collection[type]: + return [DATAFRAME_TYPE] + + def save_data(self, data: pl.LazyFrame) -> dict[str, Any]: + data.sink_ipc(self.path) + return utils.get_file_metadata(self.path) + + @classmethod + def name(cls) -> str: + return "ipc" + + +@dataclasses.dataclass +class PolarsLazyFrameSinkNDJSON(DataSaver): + """Class to handle sinking a Polars LazyFrame to an NDJSON file. + Should map to https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.sink_ndjson.html + """ + path: str | Path + + @classmethod + def applicable_types(cls) -> Collection[type]: + return [DATAFRAME_TYPE] + + def save_data(self, data: pl.LazyFrame) -> dict[str, Any]: + data.sink_ndjson(self.path) + return utils.get_file_metadata(self.path) + + @classmethod + def name(cls) -> str: + return "ndjson" + def register_data_loaders(): """Function to register the data loaders for this extension.""" @@ -295,6 +374,10 @@ def register_data_loaders(): PolarsScanCSVReader, PolarsScanParquetReader, PolarsScanFeatherReader, + PolarsLazyFrameSinkParquet, + PolarsLazyFrameSinkCSV, + PolarsLazyFrameSinkIPC, + PolarsLazyFrameSinkNDJSON, ]: registry.register_adapter(loader) From dbb400c3a5dbc3b195527b54e282b95000a50d9c Mon Sep 17 00:00:00 2001 From: sonalishintre <42985737+sonalishintre@users.noreply.github.com> Date: Mon, 29 Jun 2026 05:55:07 +0530 Subject: [PATCH 2/2] test: add tests for LazyFrame sink classes --- .../test_polars_lazyframe_extensions.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/plugins/test_polars_lazyframe_extensions.py b/tests/plugins/test_polars_lazyframe_extensions.py index a35929654..dedb52bbe 100644 --- a/tests/plugins/test_polars_lazyframe_extensions.py +++ b/tests/plugins/test_polars_lazyframe_extensions.py @@ -23,10 +23,15 @@ from polars.testing import assert_frame_equal from sqlalchemy import create_engine + from hamilton.plugins.polars_lazyframe_extensions import ( PolarsScanCSVReader, PolarsScanFeatherReader, PolarsScanParquetReader, + PolarsLazyFrameSinkParquet, + PolarsLazyFrameSinkCSV, + PolarsLazyFrameSinkIPC, + PolarsLazyFrameSinkNDJSON, ) from hamilton.plugins.polars_post_1_0_0_extensions import ( PolarsAvroReader, @@ -211,3 +216,45 @@ def test_polars_spreadsheet(df: pl.LazyFrame, tmp_path: pathlib.Path) -> None: assert write_kwargs["include_header"] is True assert "raise_if_empty" in read_kwargs assert read_kwargs["raise_if_empty"] is True + +##### + +def test_polars_lazyframe_sink_parquet(df: pl.LazyFrame, tmp_path: pathlib.Path) -> None: + file = tmp_path / "test.parquet" + sink = PolarsLazyFrameSinkParquet(path=file) + metadata = sink.save_data(df) + df2 = pl.read_parquet(file) + assert PolarsLazyFrameSinkParquet.applicable_types() == [pl.LazyFrame] + assert file.exists() + assert_frame_equal(df.collect(), df2) + + +def test_polars_lazyframe_sink_csv(df: pl.LazyFrame, tmp_path: pathlib.Path) -> None: + file = tmp_path / "test.csv" + sink = PolarsLazyFrameSinkCSV(path=file) + metadata = sink.save_data(df) + df2 = pl.read_csv(file) + assert PolarsLazyFrameSinkCSV.applicable_types() == [pl.LazyFrame] + assert file.exists() + assert_frame_equal(df.collect(), df2) + + +def test_polars_lazyframe_sink_ipc(df: pl.LazyFrame, tmp_path: pathlib.Path) -> None: + file = tmp_path / "test.ipc" + sink = PolarsLazyFrameSinkIPC(path=file) + metadata = sink.save_data(df) + df2 = pl.read_ipc(file) + assert PolarsLazyFrameSinkIPC.applicable_types() == [pl.LazyFrame] + assert file.exists() + assert_frame_equal(df.collect(), df2) + + +def test_polars_lazyframe_sink_ndjson(df: pl.LazyFrame, tmp_path: pathlib.Path) -> None: + file = tmp_path / "test.ndjson" + sink = PolarsLazyFrameSinkNDJSON(path=file) + metadata = sink.save_data(df) + df2 = pl.read_ndjson(file) + assert PolarsLazyFrameSinkNDJSON.applicable_types() == [pl.LazyFrame] + assert file.exists() + assert_frame_equal(df.collect(), df2) +