From 40939788d5db40d16a316cf88658428c1e98b275 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 9 Jun 2026 12:49:31 +0530 Subject: [PATCH 01/11] Pass task/asset scopes to serialize methods instead of ti_id/asset_ref --- .../task-and-asset-store.rst | 21 +++++----- .../src/airflow_shared/state/__init__.py | 11 ++--- shared/state/tests/state/test_state.py | 40 ++++++++++--------- .../src/airflow/sdk/execution_time/context.py | 7 ++-- .../task_sdk/execution_time/test_context.py | 12 +++--- 5 files changed, 47 insertions(+), 44 deletions(-) diff --git a/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst b/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst index 5cf54115e4628..6621f39eecd8e 100644 --- a/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst +++ b/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst @@ -157,12 +157,12 @@ Override four serialization hooks from :class:`~airflow.sdk.state.BaseStoreBacke * ``serialize_task_store_to_ref``: called by ``TaskStoreAccessor.set()`` before the value is sent to the Execution API; return a compact reference string (e.g. an S3 key) to be stored in the database instead of the raw value. * ``deserialize_task_store_from_ref``: called by ``TaskStoreAccessor.get()`` after retrieving the reference from the backend; return the actual value. -* ``serialize_asset_store_to_ref``: same as the task variant but for asset store; receives the asset name or URI as ``asset_ref``. +* ``serialize_asset_store_to_ref``: same as the task variant but for asset store; receives the asset scope as ``scope`` (an :class:`~airflow.sdk.state.AssetScope` with ``name`` and/or ``uri``). * ``deserialize_asset_store_from_ref``: called by ``AssetStoreAccessor.get()`` to resolve the stored reference back to the actual value. .. important:: - **References must be deterministic.** Given the same inputs (``ti_id`` + ``key`` for task store; ``asset_ref`` + ``key`` for asset store), the serialization method must always return the same reference string. Do not embed timestamps, random UUIDs, or any other non-deterministic component in the reference path. + **References must be deterministic.** Given the same inputs (``scope`` + ``key``), the serialization method must always return the same reference string. Do not embed timestamps, random UUIDs, or any other non-deterministic component in the reference path. When a key is deleted or cleared, Airflow clears the database reference *first*, then calls the backend's ``delete()`` or ``clear()`` method. If backend cleanup fails after the DB row is gone, the external object is orphaned. Because the reference is deterministic, a subsequent ``set()`` for the same key will overwrite the orphaned object, making the failure recoverable. A non-deterministic reference would leave the external object permanently orphaned with no way to locate it. @@ -178,17 +178,18 @@ Example skeleton: class S3StateBackend(BaseStoreBackend): - def _task_ref(self, ti_id: str, key: str) -> str: - return f"airflow/task-store/{ti_id}/{key}" + def _task_ref(self, scope: TaskScope, key: str) -> str: + return f"airflow/task-store/{scope.dag_id}/{scope.run_id}/{scope.task_id}/{scope.map_index}/{key}" - def _asset_ref(self, asset_ref: str, key: str) -> str: + def _asset_ref(self, scope: AssetScope, key: str) -> str: import hashlib - safe = hashlib.sha256(asset_ref.encode()).hexdigest()[:16] + asset_id = scope.name or scope.uri or "" + safe = hashlib.sha256(asset_id.encode()).hexdigest()[:16] return f"airflow/asset-store/{safe}/{key}" - def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, ti_id: str) -> str: - s3_key = self._task_ref(ti_id, key) + def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, scope: TaskScope) -> str: + s3_key = self._task_ref(scope, key) s3_client.put_object(Bucket=BUCKET, Key=s3_key, Body=json.dumps(value).encode()) return s3_key @@ -196,8 +197,8 @@ Example skeleton: s3_object = s3_client.get_object(Bucket=BUCKET, Key=stored) return json.loads(s3_object["Body"].read().decode()) - def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str, asset_ref: str) -> str: - s3_key = self._asset_ref(asset_ref, key) + def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str, scope: AssetScope) -> str: + s3_key = self._asset_ref(scope, key) s3_client.put_object(Bucket=BUCKET, Key=s3_key, Body=json.dumps(value).encode()) return s3_key diff --git a/shared/state/src/airflow_shared/state/__init__.py b/shared/state/src/airflow_shared/state/__init__.py index 3c963acd18db4..cd2c4b3172e96 100644 --- a/shared/state/src/airflow_shared/state/__init__.py +++ b/shared/state/src/airflow_shared/state/__init__.py @@ -247,7 +247,7 @@ def cleanup(self) -> None: ``[state_store] default_retention_days``) and deciding what to delete. """ - def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, ti_id: str) -> str: + def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, scope: TaskScope) -> str: """ Serialize a task store value before it is sent to the execution API for db persistence. @@ -260,7 +260,7 @@ def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, ti_id: str) that wrapper before passing ``stored`` to ``deserialize_task_store_from_ref()``. Do not wrap the reference yourself. - The returned reference must be deterministic — given the same ``ti_id`` and ``key`` it + The returned reference must be deterministic — given the same ``scope`` and ``key`` it must always return the same string. Do not use timestamps or random UUIDs as part of the reference, otherwise ``delete()``/``clear()`` cannot reconstruct it and the external object will be orphaned. By default, it JSON dumps the value and returns a JSON string. @@ -277,7 +277,7 @@ def deserialize_task_store_from_ref(self, stored: str) -> JsonValue: """ return json.loads(stored) - def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str, asset_ref: str) -> str: + def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str, scope: AssetScope) -> str: """ Serialize an asset store value before it is sent to the Execution API for db persistence. @@ -290,10 +290,7 @@ def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str, asset_ref: that wrapper before passing ``stored`` to ``deserialize_asset_store_from_ref()``. Do not wrap the reference yourself. - ``asset_ref`` is either the asset name or URI, depending on how the accessor was - constructed. It may be a URI string if the task inlet was declared as ``AssetUriRef``. - - The returned reference must be deterministic — given the same ``asset_ref`` and ``key`` it + The returned reference must be deterministic — given the same ``scope`` and ``key`` it must always return the same string. Do not use timestamps or random UUIDs as part of the reference, otherwise ``delete()``/``clear()`` cannot reconstruct it and the external object will be orphaned. By default, it JSON dumps the value and returns a JSON string. diff --git a/shared/state/tests/state/test_state.py b/shared/state/tests/state/test_state.py index b527b4fc6a183..08d6c74a55f9e 100644 --- a/shared/state/tests/state/test_state.py +++ b/shared/state/tests/state/test_state.py @@ -18,7 +18,7 @@ import pytest -from airflow_shared.state import AssetScope, BaseStoreBackend, StoreScope +from airflow_shared.state import AssetScope, BaseStoreBackend, StoreScope, TaskScope class TestAssetScope: @@ -88,20 +88,22 @@ def test_abstract_methods_cover_full_interface(self): def test_task_store_serialize_deserialize_round_trip(self, backend): original = "app_1234" - serialized = backend.serialize_task_store_to_ref(value=original, key="job_id", ti_id="abc-123") + scope = TaskScope(dag_id="d", run_id="r", task_id="t", map_index=-1) + serialized = backend.serialize_task_store_to_ref(value=original, key="job_id", scope=scope) deserialized = backend.deserialize_task_store_from_ref(serialized) assert deserialized == original def test_task_store_serialize_deserialize_typed_values(self, backend): """Default backend passes typed values through unchanged (custom backends handle storage).""" + scope = TaskScope(dag_id="d", run_id="r", task_id="t", map_index=-1) assert ( backend.deserialize_task_store_from_ref( - backend.serialize_task_store_to_ref(value=42, key="count", ti_id="abc-123") + backend.serialize_task_store_to_ref(value=42, key="count", scope=scope) ) == 42 ) assert backend.deserialize_task_store_from_ref( - backend.serialize_task_store_to_ref(value={"status": "ok"}, key="result", ti_id="abc-123") + backend.serialize_task_store_to_ref(value={"status": "ok"}, key="result", scope=scope) ) == {"status": "ok"} def test_custom_backend_overrides_task_store_ser_deser(self): @@ -115,38 +117,39 @@ async def aset(self, scope, key, value): ... async def adelete(self, scope, key): ... async def aclear(self, scope, *, all_map_indices=False): ... - def serialize_task_store_to_ref(self, *, value, key, ti_id): - return f"s3://bucket/{ti_id}/{key}" + def serialize_task_store_to_ref(self, *, value, key, scope: TaskScope): + return f"s3://bucket/{scope.dag_id}/{scope.task_id}/{key}" def deserialize_task_store_from_ref(self, stored): return f"fetched:{stored}" b = MyBackend() - assert b.serialize_task_store_to_ref(value="app_1234", key="job_id", ti_id="abc-123") == ( - "s3://bucket/abc-123/job_id" + scope = TaskScope(dag_id="my_dag", run_id="r", task_id="my_task", map_index=-1) + assert b.serialize_task_store_to_ref(value="app_1234", key="job_id", scope=scope) == ( + "s3://bucket/my_dag/my_task/job_id" ) assert ( - b.deserialize_task_store_from_ref("s3://bucket/abc-123/job_id") - == "fetched:s3://bucket/abc-123/job_id" + b.deserialize_task_store_from_ref("s3://bucket/my_dag/my_task/job_id") + == "fetched:s3://bucket/my_dag/my_task/job_id" ) def test_asset_store_serialize_deserialize_round_trip(self, backend): original = "2026-05-01" - serialized = backend.serialize_asset_store_to_ref( - value="2026-05-01", key="watermark", asset_ref="my_asset" - ) + scope = AssetScope(name="my_asset") + serialized = backend.serialize_asset_store_to_ref(value="2026-05-01", key="watermark", scope=scope) deserialized = backend.deserialize_asset_store_from_ref(serialized) assert deserialized == original def test_asset_store_serialize_deserialize_typed_values(self, backend): + scope = AssetScope(name="my_asset") assert ( backend.deserialize_asset_store_from_ref( - backend.serialize_asset_store_to_ref(value=5, key="total_runs", asset_ref="my_asset") + backend.serialize_asset_store_to_ref(value=5, key="total_runs", scope=scope) ) == 5 ) assert backend.deserialize_asset_store_from_ref( - backend.serialize_asset_store_to_ref(value={"rows": 1234}, key="last_run", asset_ref="my_asset") + backend.serialize_asset_store_to_ref(value={"rows": 1234}, key="last_run", scope=scope) ) == {"rows": 1234} def test_custom_backend_overrides_asset_store_ser_deser(self): @@ -160,14 +163,15 @@ async def aset(self, scope, key, value): ... async def adelete(self, scope, key): ... async def aclear(self, scope, *, all_map_indices=False): ... - def serialize_asset_store_to_ref(self, *, value, key, asset_ref): - return f"s3://bucket/assets/{asset_ref}/{key}" + def serialize_asset_store_to_ref(self, *, value, key, scope: AssetScope): + return f"s3://bucket/assets/{scope.name}/{key}" def deserialize_asset_store_from_ref(self, stored): return f"resolved:{stored}" b = MyBackend() - assert b.serialize_asset_store_to_ref(value="2026-05-01", key="watermark", asset_ref="my_asset") == ( + scope = AssetScope(name="my_asset") + assert b.serialize_asset_store_to_ref(value="2026-05-01", key="watermark", scope=scope) == ( "s3://bucket/assets/my_asset/watermark" ) assert ( diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index fa83ce3a5f5f5..12e1f127ea1cf 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -30,6 +30,7 @@ import attrs import structlog +from airflow.sdk._shared.state import AssetScope from airflow.sdk.configuration import conf from airflow.sdk.definitions._internal.contextmanager import _CURRENT_CONTEXT from airflow.sdk.definitions._internal.types import NOTSET @@ -593,7 +594,7 @@ def set(self, key: str, value: JsonValue, *, retention: timedelta | None = None) backend = _get_worker_state_store_backend() stored: JsonValue = value if backend is not None: - ref: str = backend.serialize_task_store_to_ref(value=value, key=key, ti_id=str(self._ti_id)) + ref: str = backend.serialize_task_store_to_ref(value=value, key=key, scope=self._scope) # wrap the value with a marker to indicate that it's stored externally, and include the ref to the external storage stored = _wrap_external_ref(ref) @@ -715,10 +716,10 @@ def set(self, key: str, value: JsonValue) -> None: # if custom backend is configured, store the value on the custom backend, and return the reference # to the stored value to store in the DB backend = _get_worker_state_store_backend() - asset_ref = self._name or self._uri or "" stored: JsonValue = value if backend is not None: - ref = backend.serialize_asset_store_to_ref(value=value, key=key, asset_ref=asset_ref) + scope = AssetScope(name=self._name, uri=self._uri) + ref = backend.serialize_asset_store_to_ref(value=value, key=key, scope=scope) stored = _wrap_external_ref(ref) msg: ToSupervisor diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py b/task-sdk/tests/task_sdk/execution_time/test_context.py index 1e668fb9ffdf5..3a8b4e3ea786f 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_context.py +++ b/task-sdk/tests/task_sdk/execution_time/test_context.py @@ -1619,8 +1619,8 @@ def __init__(self): self._actual_key_value_store: dict[str, str] = {} # key -> actual value self.reference: dict[str, str] = {} # key -> stored ref (mem:// URI) - def serialize_task_store_to_ref(self, *, value, key: str, ti_id: str) -> str: - ref = f"mem://{ti_id}/{key}" + def serialize_task_store_to_ref(self, *, value, key: str, scope) -> str: + ref = f"mem://{scope.dag_id}/{scope.task_id}/{key}" self._actual_key_value_store[key] = value self.reference[key] = ref return ref @@ -1629,8 +1629,8 @@ def deserialize_task_store_from_ref(self, stored: str) -> JsonValue: key = stored.rsplit("/", 1)[-1] return self._actual_key_value_store.get(key, stored) - def serialize_asset_store_to_ref(self, *, value, key: str, asset_ref: str) -> str: - ref = f"mem://{asset_ref}/{key}" + def serialize_asset_store_to_ref(self, *, value, key: str, scope) -> str: + ref = f"mem://{scope.name or scope.uri}/{key}" self._actual_key_value_store[key] = value self.reference[key] = ref return ref @@ -1672,7 +1672,7 @@ def backend(self): def test_set_returns_reference_to_storage(self, mock_supervisor_comms, backend, time_machine): """set() stores actual value in backend and sends mem:// reference via comms.""" mock_supervisor_comms.send.return_value = OKResponse(ok=True) - expected_ref = f"mem://{self.TI_ID}/job_id" + expected_ref = f"mem://{self.SCOPE.dag_id}/{self.SCOPE.task_id}/job_id" frozen_dt = datetime(2026, 1, 1, 12, 0, 0, tzinfo=dt_timezone.utc) time_machine.move_to(frozen_dt, tick=False) @@ -1693,7 +1693,7 @@ def test_set_returns_reference_to_storage(self, mock_supervisor_comms, backend, def test_get_resolves_reference_to_actual_value(self, mock_supervisor_comms, backend): """get() fetches mem:// reference from DB, resolves it to actual value via backend.""" - ref = _wrap_external_ref(f"mem://{self.TI_ID}/job_id") + ref = _wrap_external_ref(f"mem://{self.SCOPE.dag_id}/{self.SCOPE.task_id}/job_id") backend._actual_key_value_store["job_id"] = "app_001" mock_supervisor_comms.send.return_value = TaskStoreResult(value=ref) From 58d45d4c86a7aa89b6121271c4245bf323f936c8 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 9 Jun 2026 12:56:04 +0530 Subject: [PATCH 02/11] Pass task/asset scopes to serialize methods instead of ti_id/asset_ref --- task-sdk/tests/task_sdk/execution_time/test_context.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py b/task-sdk/tests/task_sdk/execution_time/test_context.py index 3a8b4e3ea786f..c1c7ad1626b18 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_context.py +++ b/task-sdk/tests/task_sdk/execution_time/test_context.py @@ -1620,7 +1620,7 @@ def __init__(self): self.reference: dict[str, str] = {} # key -> stored ref (mem:// URI) def serialize_task_store_to_ref(self, *, value, key: str, scope) -> str: - ref = f"mem://{scope.dag_id}/{scope.task_id}/{key}" + ref = f"mem://{scope.dag_id}/{scope.run_id}/{scope.task_id}/{scope.map_index}/{key}" self._actual_key_value_store[key] = value self.reference[key] = ref return ref @@ -1672,7 +1672,7 @@ def backend(self): def test_set_returns_reference_to_storage(self, mock_supervisor_comms, backend, time_machine): """set() stores actual value in backend and sends mem:// reference via comms.""" mock_supervisor_comms.send.return_value = OKResponse(ok=True) - expected_ref = f"mem://{self.SCOPE.dag_id}/{self.SCOPE.task_id}/job_id" + expected_ref = f"mem://{self.SCOPE.dag_id}/{self.SCOPE.run_id}/{self.SCOPE.task_id}/{self.SCOPE.map_index}/job_id" frozen_dt = datetime(2026, 1, 1, 12, 0, 0, tzinfo=dt_timezone.utc) time_machine.move_to(frozen_dt, tick=False) @@ -1693,7 +1693,9 @@ def test_set_returns_reference_to_storage(self, mock_supervisor_comms, backend, def test_get_resolves_reference_to_actual_value(self, mock_supervisor_comms, backend): """get() fetches mem:// reference from DB, resolves it to actual value via backend.""" - ref = _wrap_external_ref(f"mem://{self.SCOPE.dag_id}/{self.SCOPE.task_id}/job_id") + ref = _wrap_external_ref( + f"mem://{self.SCOPE.dag_id}/{self.SCOPE.run_id}/{self.SCOPE.task_id}/{self.SCOPE.map_index}/job_id" + ) backend._actual_key_value_store["job_id"] = "app_001" mock_supervisor_comms.send.return_value = TaskStoreResult(value=ref) From e6a4d0128586443e4797c82cd23659b74dd5ffa3 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 9 Jun 2026 13:33:53 +0530 Subject: [PATCH 03/11] fixing tests --- .../task_sdk/execution_time/test_task_runner.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 1968de749736d..5bf2434fba4cf 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -62,7 +62,7 @@ timezone, ) from airflow.sdk._shared.observability.metrics.base_stats_logger import StatsLogger -from airflow.sdk._shared.state import TaskScope +from airflow.sdk._shared.state import AssetScope, TaskScope from airflow.sdk.api.datamodels._generated import ( AssetProfile, AssetResponse, @@ -5817,7 +5817,7 @@ def execute(self, context): run(runtime_ti, context=runtime_ti.get_template_context(), log=mock.MagicMock()) mock_backend.serialize_asset_store_to_ref.assert_called_once_with( - value="2026-05-01", key="watermark", asset_ref="my_asset" + value="2026-05-01", key="watermark", scope=AssetScope(name="my_asset", uri=None) ) mock_supervisor_comms.send.assert_any_call( SetAssetStoreByName( @@ -5843,7 +5843,13 @@ def execute(self, context): mock_supervisor_comms.send.side_effect = TestTaskInstanceStateOperations._watcher_side_effect mock_backend = mock.MagicMock() - ref = f"mem://{runtime_ti.id}/job_id" + scope = TaskScope( + dag_id=runtime_ti.dag_id, + run_id=runtime_ti.run_id, + task_id=runtime_ti.task_id, + map_index=runtime_ti.map_index, + ) + ref = f"mem://{scope.dag_id}/{scope.run_id}/{scope.task_id}/{scope.map_index}/job_id" mock_backend.serialize_task_store_to_ref.return_value = ref with mock.patch( @@ -5852,7 +5858,7 @@ def execute(self, context): run(runtime_ti, context=runtime_ti.get_template_context(), log=mock.MagicMock()) mock_backend.serialize_task_store_to_ref.assert_called_once_with( - value="app_001", key="job_id", ti_id=str(runtime_ti.id) + value="app_001", key="job_id", scope=scope ) mock_supervisor_comms.send.assert_any_call( SetTaskStore( From 24caae63017b67ca63eda159b67afbcd8bb7d406 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 9 Jun 2026 17:31:34 +0530 Subject: [PATCH 04/11] Introducing object store backend for task and asset store --- .../provider_dependencies.json.sha256sum | 2 +- providers/common/io/docs/index.rst | 1 + providers/common/io/docs/store_backend.rst | 81 ++++++ providers/common/io/provider.yaml | 28 ++ .../providers/common/io/get_provider_info.py | 21 ++ .../providers/common/io/store/__init__.py | 16 ++ .../providers/common/io/store/backend.py | 250 +++++++++++++++++ .../providers/common/io/version_compat.py | 1 + .../io/tests/unit/common/io/store/__init__.py | 16 ++ .../unit/common/io/store/test_backend.py | 252 ++++++++++++++++++ 10 files changed, 667 insertions(+), 1 deletion(-) create mode 100644 providers/common/io/docs/store_backend.rst create mode 100644 providers/common/io/src/airflow/providers/common/io/store/__init__.py create mode 100644 providers/common/io/src/airflow/providers/common/io/store/backend.py create mode 100644 providers/common/io/tests/unit/common/io/store/__init__.py create mode 100644 providers/common/io/tests/unit/common/io/store/test_backend.py diff --git a/generated/provider_dependencies.json.sha256sum b/generated/provider_dependencies.json.sha256sum index 49ab0db88c08e..77196492535b5 100644 --- a/generated/provider_dependencies.json.sha256sum +++ b/generated/provider_dependencies.json.sha256sum @@ -1 +1 @@ -f042436099826662d45d5f59c100a363d5e12facd51a7c8b850ccbce08d8c4ee +e8ce5121920fd66b1d1d70566662ee9147acf573f84008e1e7f65dadb77657f1 diff --git a/providers/common/io/docs/index.rst b/providers/common/io/docs/index.rst index 909d83bd41df5..c7bfd92d533e8 100644 --- a/providers/common/io/docs/index.rst +++ b/providers/common/io/docs/index.rst @@ -38,6 +38,7 @@ Transferring a file Operators Object Storage XCom Backend + Object Storage State Store Backend .. toctree:: :hidden: diff --git a/providers/common/io/docs/store_backend.rst b/providers/common/io/docs/store_backend.rst new file mode 100644 index 0000000000000..d52c11243f89c --- /dev/null +++ b/providers/common/io/docs/store_backend.rst @@ -0,0 +1,81 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Object Storage State Store Backend +=================================== + +The default state store backend is :class:`~airflow.state.metastore.MetastoreStateBackend`, which persists +task and asset state in the Airflow metadata database. For larger values, you may want to store state on +object storage instead. + +To enable object storage for task and asset store, set ``backend`` in the ``[state_store]`` section to +``airflow.providers.common.io.store.backend.StoreObjectStorageBackend``, and set +``store_objectstorage_path`` to the desired base location. The connection id is obtained from the user +part of the URL, e.g. ``store_objectstorage_path = s3://conn_id@mybucket/task-state/``. + +Task state is stored under ``////`` and asset state under +``assets//`` beneath the configured base path. + +By default (``store_objectstorage_threshold = 0``) all serialized values are offloaded to object storage. +Set ``store_objectstorage_threshold`` to a positive number of bytes to only offload values whose +serialized size meets or exceeds the threshold, anything smaller are stored in the Airflow metadata database. + +Optionally set ``store_objectstorage_compression`` to an fsspec-supported compression algorithm such as +``gzip`` or ``snappy`` to compress values before writing. + +The following example stores all task and asset state in S3, compressed with gzip:: + + [state_store] + backend = airflow.providers.common.io.store.backend.StoreObjectStorageBackend + + [common.io] + store_objectstorage_path = s3://conn_id@mybucket/task-state/ + store_objectstorage_compression = gzip + +To only offload values larger than 1 MB:: + + [state_store] + backend = airflow.providers.common.io.store.backend.StoreObjectStorageBackend + + [common.io] + store_objectstorage_path = s3://conn_id@mybucket/task-state/ + store_objectstorage_threshold = 1048576 + +Using the local filesystem (useful for development):: + + [state_store] + backend = airflow.providers.common.io.store.backend.StoreObjectStorageBackend + + [common.io] + store_objectstorage_path = file:///var/airflow/task-state/ + +.. note:: + + Compression requires the relevant library to be installed in your Python environment. + For example, ``snappy`` requires ``python-snappy``. Gzip and bz2 work out of the box. + +.. note:: + + ``expires_at`` is not enforced by this backend. Values written to object storage persist + indefinitely until explicitly deleted. Use your object storage provider's lifecycle policies + (e.g. S3 lifecycle rules, GCS object lifecycle) to automatically expire old state. + +.. note:: + + Task state paths are keyed on ``(dag_id, run_id, task_id, map_index)`` and are stable across + task retries. This makes this backend suitable for operators that use + :class:`~airflow.sdk.ResumableJobMixin` to reconnect to external jobs after a retry. diff --git a/providers/common/io/provider.yaml b/providers/common/io/provider.yaml index 7329ccde6602b..3cabca3723786 100644 --- a/providers/common/io/provider.yaml +++ b/providers/common/io/provider.yaml @@ -115,3 +115,31 @@ config: type: string example: "gz" default: "" + store_objectstorage_path: + description: | + Base path on object storage for the task/asset store backend, in URL format. + When set, StoreObjectStorageBackend will persist task and asset state under this + prefix, organised as //// for tasks and + assets// for assets. + version_added: 1.8.0 + type: string + example: "s3://conn_id@bucket/task-state/" + default: "" + store_objectstorage_threshold: + description: | + Threshold in bytes for offloading serialized store values to object storage. 0 means + always offload to object storage. Any positive number means values will be offloaded + only when their serialized size is at least that many bytes. Must be non-negative. + version_added: 1.8.0 + type: integer + example: "1000000" + default: "0" + store_objectstorage_compression: + description: | + Compression algorithm to use when writing task/asset store values to object storage. + Supported algorithms are a.o.: snappy, zip, gzip, bz2, and lzma. If not specified, + no compression will be used. The same algorithm must be available on all workers. + version_added: 1.8.0 + type: string + example: "gz" + default: "" diff --git a/providers/common/io/src/airflow/providers/common/io/get_provider_info.py b/providers/common/io/src/airflow/providers/common/io/get_provider_info.py index a08546d9ed5fa..3ce51f5209d2c 100644 --- a/providers/common/io/src/airflow/providers/common/io/get_provider_info.py +++ b/providers/common/io/src/airflow/providers/common/io/get_provider_info.py @@ -84,6 +84,27 @@ def get_provider_info(): "example": "gz", "default": "", }, + "store_objectstorage_path": { + "description": "Base path on object storage for the task/asset store backend, in URL format.\nWhen set, StoreObjectStorageBackend will persist task and asset state under this\nprefix, organised as //// for tasks and\nassets// for assets.\n", + "version_added": "1.8.0", + "type": "string", + "example": "s3://conn_id@bucket/task-state/", + "default": "", + }, + "store_objectstorage_threshold": { + "description": "Threshold in bytes for offloading serialized store values to object storage. 0 means\nalways offload to object storage. Any positive number means values will be offloaded\nonly when their serialized size is at least that many bytes. Must be non-negative.\n", + "version_added": "1.8.0", + "type": "integer", + "example": "1000000", + "default": "0", + }, + "store_objectstorage_compression": { + "description": "Compression algorithm to use when writing task/asset store values to object storage.\nSupported algorithms are a.o.: snappy, zip, gzip, bz2, and lzma. If not specified,\nno compression will be used. The same algorithm must be available on all workers.\n", + "version_added": "1.8.0", + "type": "string", + "example": "gz", + "default": "", + }, }, } }, diff --git a/providers/common/io/src/airflow/providers/common/io/store/__init__.py b/providers/common/io/src/airflow/providers/common/io/store/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/common/io/src/airflow/providers/common/io/store/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/common/io/src/airflow/providers/common/io/store/backend.py b/providers/common/io/src/airflow/providers/common/io/store/backend.py new file mode 100644 index 0000000000000..921a906a8402b --- /dev/null +++ b/providers/common/io/src/airflow/providers/common/io/store/backend.py @@ -0,0 +1,250 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import contextlib +import json +from functools import cache +from typing import TYPE_CHECKING +from urllib.parse import urlsplit + +import fsspec.utils + +from airflow.providers.common.compat.sdk import conf + +if TYPE_CHECKING: + from datetime import datetime + + from pydantic import JsonValue + from sqlalchemy.ext.asyncio import AsyncSession + from sqlalchemy.orm import Session + +try: + from airflow.sdk._shared.state import AssetScope, BaseStoreBackend, StoreScope, TaskScope +except ImportError: + raise ImportError( + "StoreObjectStorageBackend requires Airflow >= 3.3. Please upgrade your Airflow installation." + ) from None + +from airflow.sdk import ObjectStoragePath + +SECTION = "common.io" + + +@cache +def _get_base_path() -> ObjectStoragePath: + return ObjectStoragePath(conf.get_mandatory_value(SECTION, "store_objectstorage_path")) + + +@cache +def _get_compression() -> str | None: + value = conf.get(SECTION, "store_objectstorage_compression", fallback=None) + return value or None + + +@cache +def _get_threshold() -> int: + return conf.getint(SECTION, "store_objectstorage_threshold", fallback=0) + + +def _compression_suffix() -> str: + compression = _get_compression() + if not compression: + return "" + for suffix, c in fsspec.utils.compressions.items(): + if c == compression: + return f".{suffix}" + raise ValueError(f"Compression {compression!r} is not supported.") + + +def _safe_segment(value: str) -> str: + """ + Sanitise a string for use as a single path segment. + + This is a simple implementation that replaces slashes with underscores. + """ + return value.replace("/", "_").replace("\\", "_") + + +def _task_path(scope: TaskScope, key: str) -> ObjectStoragePath: + suffix = _compression_suffix() + return ( + _get_base_path() + / _safe_segment(scope.dag_id) + / _safe_segment(scope.run_id) + / _safe_segment(scope.task_id) + / str(scope.map_index) + / f"{_safe_segment(key)}{suffix}" + ) + + +def _asset_path(scope: AssetScope, key: str) -> ObjectStoragePath: + suffix = _compression_suffix() + asset_ref = _safe_segment(scope.name or scope.uri or str(scope.asset_id)) + return _get_base_path() / "assets" / asset_ref / f"{_safe_segment(key)}{suffix}" + + +def _write(path: ObjectStoragePath, value: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + compression = _get_compression() + with path.open(mode="wb", compression=compression) as f: + f.write(value.encode("utf-8")) + + +def _read(path: ObjectStoragePath) -> str | None: + try: + with path.open(mode="rb", compression="infer") as f: + return f.read().decode("utf-8") + except FileNotFoundError: + return None + + +def _is_storage_ref(value: str) -> bool: + try: + if not urlsplit(value).scheme: + return False + return ObjectStoragePath(value).is_relative_to(_get_base_path()) + except Exception: + return False + + +def _scope_path(scope: StoreScope, key: str) -> ObjectStoragePath: + match scope: + case TaskScope(): + return _task_path(scope, key) + case AssetScope(): + return _asset_path(scope, key) + case _: + raise TypeError(f"Unknown scope type: {type(scope)}") + + +class StoreObjectStorageBackend(BaseStoreBackend): + """ + Object-storage backend for task and asset store. + + Config keys (all under ``[common.io]``): + + - ``store_objectstorage_path``: base path, e.g. ``s3://conn_id@bucket/task-state/`` + - ``store_objectstorage_compression``: optional compression, e.g. ``gzip`` + """ + + def get(self, scope: StoreScope, key: str, *, session: Session | None = None) -> str | None: + return _read(_scope_path(scope, key)) + + def set( + self, + scope: StoreScope, + key: str, + value: str, + *, + expires_at: datetime | None = None, + session: Session | None = None, + ) -> None: + _write(_scope_path(scope, key), value) + + def delete(self, scope: StoreScope, key: str, *, session: Session | None = None) -> None: + path = _scope_path(scope, key) + with contextlib.suppress(FileNotFoundError): + path.unlink(missing_ok=True) + + def clear( + self, scope: StoreScope, *, all_map_indices: bool = False, session: Session | None = None + ) -> None: + match scope: + case TaskScope(): + if all_map_indices: + prefix = ( + _get_base_path() + / _safe_segment(scope.dag_id) + / _safe_segment(scope.run_id) + / _safe_segment(scope.task_id) + ) + for p in prefix.glob("*/*"): + p.unlink(missing_ok=True) + else: + prefix = ( + _get_base_path() + / _safe_segment(scope.dag_id) + / _safe_segment(scope.run_id) + / _safe_segment(scope.task_id) + / str(scope.map_index) + ) + for p in prefix.glob("*"): + p.unlink(missing_ok=True) + case AssetScope(): + asset_ref = _safe_segment(scope.name or scope.uri or str(scope.asset_id)) + prefix = _get_base_path() / "assets" / asset_ref + for p in prefix.glob("*"): + p.unlink(missing_ok=True) + + async def aget(self, scope: StoreScope, key: str, *, session: AsyncSession | None = None) -> str | None: + raise NotImplementedError + + async def aset( + self, + scope: StoreScope, + key: str, + value: str, + *, + expires_at: datetime | None = None, + session: AsyncSession | None = None, + ) -> None: + raise NotImplementedError + + async def adelete(self, scope: StoreScope, key: str, *, session: AsyncSession | None = None) -> None: + raise NotImplementedError + + async def aclear( + self, scope: StoreScope, *, all_map_indices: bool = False, session: AsyncSession | None = None + ) -> None: + raise NotImplementedError + + def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, scope: TaskScope) -> str: + serialized = json.dumps(value) + if len(serialized.encode()) < _get_threshold(): + return serialized + path = _task_path(scope, key) + _write(path, serialized) + return str(path) + + def deserialize_task_store_from_ref(self, stored: str) -> JsonValue: + if not stored: + return None + if _is_storage_ref(stored): + data = _read(ObjectStoragePath(stored)) + if data is not None: + return json.loads(data) + return None + return json.loads(stored) + + def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str, scope: AssetScope) -> str: + serialized = json.dumps(value) + if len(serialized.encode()) < _get_threshold(): + return serialized + path = _asset_path(scope, key) + _write(path, serialized) + return str(path) + + def deserialize_asset_store_from_ref(self, stored: str) -> JsonValue: + if not stored: + return None + if _is_storage_ref(stored): + data = _read(ObjectStoragePath(stored)) + if data is not None: + return json.loads(data) + return None + return json.loads(stored) diff --git a/providers/common/io/src/airflow/providers/common/io/version_compat.py b/providers/common/io/src/airflow/providers/common/io/version_compat.py index 48d122b669696..30468d86f0fa2 100644 --- a/providers/common/io/src/airflow/providers/common/io/version_compat.py +++ b/providers/common/io/src/airflow/providers/common/io/version_compat.py @@ -33,3 +33,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) +AIRFLOW_V_3_3_PLUS = get_base_airflow_version_tuple() >= (3, 3, 0) diff --git a/providers/common/io/tests/unit/common/io/store/__init__.py b/providers/common/io/tests/unit/common/io/store/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/common/io/tests/unit/common/io/store/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/common/io/tests/unit/common/io/store/test_backend.py b/providers/common/io/tests/unit/common/io/store/test_backend.py new file mode 100644 index 0000000000000..92e67f063b73e --- /dev/null +++ b/providers/common/io/tests/unit/common/io/store/test_backend.py @@ -0,0 +1,252 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from tests_common.test_utils.version_compat import AIRFLOW_V_3_3_PLUS + +if not AIRFLOW_V_3_3_PLUS: + pytest.skip("Store backend requires Airflow >= 3.3", allow_module_level=True) + +from airflow._shared.state import AssetScope, TaskScope +from airflow.providers.common.io.store import backend +from airflow.providers.common.io.store.backend import ( + StoreObjectStorageBackend, + _asset_path, + _read, + _task_path, + _write, +) +from airflow.sdk import ObjectStoragePath + +from tests_common.test_utils.config import conf_vars + + +@pytest.fixture(autouse=True) +def clear_caches(): + backend._get_base_path.cache_clear() + backend._get_compression.cache_clear() + backend._get_threshold.cache_clear() + yield + backend._get_base_path.cache_clear() + backend._get_compression.cache_clear() + backend._get_threshold.cache_clear() + + +@pytest.fixture +def base_path(tmp_path): + store_path = tmp_path / "store" + store_path.mkdir() + return f"file://{store_path}" + + +@pytest.fixture +def conf_overrides(base_path): + from tests_common.test_utils.config import conf_vars + + with conf_vars( + { + ("common.io", "store_objectstorage_path"): base_path, + ("common.io", "store_objectstorage_compression"): "", + } + ): + yield base_path + + +class TestPathBuilders: + def test_task_path_segments(self, conf_overrides): + scope = TaskScope(dag_id="my_dag", run_id="run_1", task_id="my_task", map_index=-1) + path = _task_path(scope, "job_id") + assert str(path).endswith("my_dag/run_1/my_task/-1/job_id") + + def test_task_path_sanitises_slashes(self, conf_overrides): + scope = TaskScope(dag_id="a/b", run_id="r/1", task_id="t/x", map_index=0) + path = _task_path(scope, "key/name") + # a/b is sanitised to a_b + assert "a/b" not in str(path) + assert "a_b" in str(path) + assert "key_name" in str(path) + assert "key/name" not in str(path) + + def test_asset_path_segments(self, conf_overrides): + scope = AssetScope(name="my_asset") + path = _asset_path(scope, "status") + assert "assets/my_asset/status" in str(path) + + def test_asset_path_uses_uri_when_no_name(self, conf_overrides): + scope = AssetScope(uri="s3://bucket/path") + path = _asset_path(scope, "key") + assert "assets/" in str(path) + # slashes in the uri are replaced with underscores to produce a single path segment + segment = str(path).split("assets/")[1].split("/")[0] + assert "/" not in segment + + def test_compression_suffix_appended(self, tmp_path): + store_path = tmp_path / "store" + store_path.mkdir() + + with conf_vars( + { + ("common.io", "store_objectstorage_path"): f"file://{store_path}", + ("common.io", "store_objectstorage_compression"): "gzip", + } + ): + backend._get_base_path.cache_clear() + backend._get_compression.cache_clear() + scope = TaskScope(dag_id="d", run_id="r", task_id="t", map_index=-1) + path = _task_path(scope, "k") + assert str(path).endswith(".gz") + + +class TestIOPrimitives: + def test_write_and_read_roundtrip(self, conf_overrides): + path = ObjectStoragePath(f"{conf_overrides}/test_key") + _write(path, '{"value": 42}') + result = _read(path) + assert result == '{"value": 42}' + + def test_read_missing_returns_none(self, conf_overrides): + path = ObjectStoragePath(f"{conf_overrides}/nonexistent") + assert _read(path) is None + + def test_write_creates_parent_dirs(self, conf_overrides): + path = ObjectStoragePath(f"{conf_overrides}/a/b/c/key") + _write(path, "hello") + assert path.exists() + + +class TestStoreObjectStorageBackend: + @pytest.fixture + def store(self, conf_overrides): + return StoreObjectStorageBackend() + + @pytest.fixture + def task_scope(self): + return TaskScope(dag_id="my_dag", run_id="run_1", task_id="my_task", map_index=-1) + + @pytest.fixture + def asset_scope(self): + return AssetScope(name="my_asset") + + def test_set_and_get_task(self, store, task_scope): + store.set(task_scope, "k", "hello") + assert store.get(task_scope, "k") == "hello" + + def test_get_missing_returns_none(self, store, task_scope): + assert store.get(task_scope, "missing") is None + + def test_delete_task(self, store, task_scope): + store.set(task_scope, "k", "v") + store.delete(task_scope, "k") + assert store.get(task_scope, "k") is None + + def test_delete_missing_is_noop(self, store, task_scope): + store.delete(task_scope, "does_not_exist") + + def test_clear_task_single_map_index(self, store, task_scope): + store.set(task_scope, "k1", "v1") + store.set(task_scope, "k2", "v2") + store.clear(task_scope) + assert store.get(task_scope, "k1") is None + assert store.get(task_scope, "k2") is None + + def test_clear_task_all_map_indices(self, store): + scope0 = TaskScope(dag_id="d", run_id="r", task_id="t", map_index=0) + scope1 = TaskScope(dag_id="d", run_id="r", task_id="t", map_index=1) + store.set(scope0, "k", "v0") + store.set(scope1, "k", "v1") + store.clear(scope0, all_map_indices=True) + assert store.get(scope0, "k") is None + assert store.get(scope1, "k") is None + + def test_set_and_get_asset(self, store, asset_scope): + store.set(asset_scope, "status", "ok") + assert store.get(asset_scope, "status") == "ok" + + def test_clear_asset(self, store, asset_scope): + store.set(asset_scope, "k1", "v1") + store.set(asset_scope, "k2", "v2") + store.clear(asset_scope) + assert store.get(asset_scope, "k1") is None + assert store.get(asset_scope, "k2") is None + + def test_serialize_and_deserialize_task(self, store, task_scope): + ref = store.serialize_task_store_to_ref(value={"x": 1}, key="job_id", scope=task_scope) + assert ref.startswith("file://") + result = store.deserialize_task_store_from_ref(ref) + assert result == {"x": 1} + + def test_serialize_and_deserialize_asset(self, store, asset_scope): + ref = store.serialize_asset_store_to_ref(value=[1, 2, 3], key="result", scope=asset_scope) + assert ref.startswith("file://") + result = store.deserialize_asset_store_from_ref(ref) + assert result == [1, 2, 3] + + def test_deserialize_missing_ref_returns_none(self, store, conf_overrides): + result = store.deserialize_task_store_from_ref(f"{conf_overrides}/no/such/path") + assert result is None + + def test_task_serialize_offloads_to_storage(self, task_scope, base_path): + with conf_vars( + { + ("common.io", "store_objectstorage_path"): base_path, + ("common.io", "store_objectstorage_threshold"): "0", + } + ): + backend._get_threshold.cache_clear() + store = StoreObjectStorageBackend() + ref = store.serialize_task_store_to_ref(value={"x": 1}, key="k", scope=task_scope) + assert ref.startswith("file://") + + def test_asset_serialize_offloads_to_storage(self, asset_scope, base_path): + with conf_vars( + { + ("common.io", "store_objectstorage_path"): base_path, + ("common.io", "store_objectstorage_threshold"): "0", + } + ): + backend._get_threshold.cache_clear() + store = StoreObjectStorageBackend() + ref = store.serialize_asset_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) + assert ref.startswith("file://") + + def test_task_serialize_to_db_when_below_threshold(self, task_scope, base_path): + with conf_vars( + { + ("common.io", "store_objectstorage_path"): base_path, + ("common.io", "store_objectstorage_threshold"): "10000", + } + ): + backend._get_threshold.cache_clear() + store = StoreObjectStorageBackend() + ref = store.serialize_task_store_to_ref(value={"x": 1}, key="k", scope=task_scope) + assert not ref.startswith("file://") + assert store.deserialize_task_store_from_ref(ref) == {"x": 1} + + def test_asset_serialize_to_db_when_below_threshold(self, asset_scope, base_path): + with conf_vars( + { + ("common.io", "store_objectstorage_path"): base_path, + ("common.io", "store_objectstorage_threshold"): "10000", + } + ): + backend._get_threshold.cache_clear() + store = StoreObjectStorageBackend() + ref = store.serialize_asset_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) + assert not ref.startswith("file://") + assert store.deserialize_asset_store_from_ref(ref) == {"x": 1} From 5d21dfe8913e7054e10e41ab4af6d54946da1cfc Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 9 Jun 2026 18:19:31 +0530 Subject: [PATCH 05/11] comments from wei --- .../administration-and-deployment/task-and-asset-store.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst b/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst index 6621f39eecd8e..4e71fef9eeffa 100644 --- a/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst +++ b/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst @@ -184,8 +184,8 @@ Example skeleton: def _asset_ref(self, scope: AssetScope, key: str) -> str: import hashlib - asset_id = scope.name or scope.uri or "" - safe = hashlib.sha256(asset_id.encode()).hexdigest()[:16] + asset_ref = scope.name or scope.uri or "" + safe = hashlib.sha256(asset_ref.encode()).hexdigest()[:16] return f"airflow/asset-store/{safe}/{key}" def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, scope: TaskScope) -> str: From f2f748ac9d2032186be5420529e195494013ca4b Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 9 Jun 2026 19:31:36 +0530 Subject: [PATCH 06/11] comments from wei --- .../administration-and-deployment/task-and-asset-store.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst b/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst index 4e71fef9eeffa..6e261a266dc2b 100644 --- a/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst +++ b/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst @@ -184,8 +184,8 @@ Example skeleton: def _asset_ref(self, scope: AssetScope, key: str) -> str: import hashlib - asset_ref = scope.name or scope.uri or "" - safe = hashlib.sha256(asset_ref.encode()).hexdigest()[:16] + asset_identifier = scope.name or scope.uri or "" + safe = hashlib.sha256(asset_identifier.encode()).hexdigest()[:16] return f"airflow/asset-store/{safe}/{key}" def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, scope: TaskScope) -> str: From 460569692e009f4998a3ebba7fcf82ad1b0ff373 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 25 Jun 2026 16:04:34 +0530 Subject: [PATCH 07/11] fixing tests --- generated/provider_dependencies.json.sha256sum | 2 +- .../common/io/tests/unit/common/io/store/test_backend.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/generated/provider_dependencies.json.sha256sum b/generated/provider_dependencies.json.sha256sum index 77196492535b5..ff3b05b18829d 100644 --- a/generated/provider_dependencies.json.sha256sum +++ b/generated/provider_dependencies.json.sha256sum @@ -1 +1 @@ -e8ce5121920fd66b1d1d70566662ee9147acf573f84008e1e7f65dadb77657f1 +1d6cbcc2ee5c0f6be393a56f3202e37b6429731deb385df5d940eadce44e40d3 diff --git a/providers/common/io/tests/unit/common/io/store/test_backend.py b/providers/common/io/tests/unit/common/io/store/test_backend.py index 92e67f063b73e..23ea6a35d1738 100644 --- a/providers/common/io/tests/unit/common/io/store/test_backend.py +++ b/providers/common/io/tests/unit/common/io/store/test_backend.py @@ -23,7 +23,6 @@ if not AIRFLOW_V_3_3_PLUS: pytest.skip("Store backend requires Airflow >= 3.3", allow_module_level=True) -from airflow._shared.state import AssetScope, TaskScope from airflow.providers.common.io.store import backend from airflow.providers.common.io.store.backend import ( StoreObjectStorageBackend, @@ -36,6 +35,9 @@ from tests_common.test_utils.config import conf_vars +if AIRFLOW_V_3_3_PLUS: + from airflow.sdk.state import AssetScope, TaskScope + @pytest.fixture(autouse=True) def clear_caches(): @@ -68,6 +70,7 @@ def conf_overrides(base_path): yield base_path +@pytest.mark.skipif(not AIRFLOW_V_3_3_PLUS, reason="task state store requires Airflow >= 3.3") class TestPathBuilders: def test_task_path_segments(self, conf_overrides): scope = TaskScope(dag_id="my_dag", run_id="run_1", task_id="my_task", map_index=-1) From 70717edd5d3e1d003939af2695f6a574a5f6512a Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 25 Jun 2026 17:51:36 +0530 Subject: [PATCH 08/11] that mega rename --- .../provider_dependencies.json.sha256sum | 2 +- providers/common/io/docs/index.rst | 2 +- ...re_backend.rst => state_store_backend.rst} | 32 +++++++------- providers/common/io/provider.yaml | 16 +++---- .../providers/common/io/get_provider_info.py | 12 +++--- .../io/{store => state_store}/__init__.py | 0 .../io/{store => state_store}/backend.py | 17 +++----- .../io/{store => state_store}/__init__.py | 0 .../io/{store => state_store}/test_backend.py | 42 +++++++++---------- 9 files changed, 59 insertions(+), 64 deletions(-) rename providers/common/io/docs/{store_backend.rst => state_store_backend.rst} (63%) rename providers/common/io/src/airflow/providers/common/io/{store => state_store}/__init__.py (100%) rename providers/common/io/src/airflow/providers/common/io/{store => state_store}/backend.py (92%) rename providers/common/io/tests/unit/common/io/{store => state_store}/__init__.py (100%) rename providers/common/io/tests/unit/common/io/{store => state_store}/test_backend.py (86%) diff --git a/generated/provider_dependencies.json.sha256sum b/generated/provider_dependencies.json.sha256sum index ff3b05b18829d..220e82373bdb5 100644 --- a/generated/provider_dependencies.json.sha256sum +++ b/generated/provider_dependencies.json.sha256sum @@ -1 +1 @@ -1d6cbcc2ee5c0f6be393a56f3202e37b6429731deb385df5d940eadce44e40d3 +1d2ec6ba7a96de0cc04b7f68834e7e1420749c96be6654e45dc1f72755823d59 diff --git a/providers/common/io/docs/index.rst b/providers/common/io/docs/index.rst index c7bfd92d533e8..6b3aae5bf744f 100644 --- a/providers/common/io/docs/index.rst +++ b/providers/common/io/docs/index.rst @@ -38,7 +38,7 @@ Transferring a file Operators Object Storage XCom Backend - Object Storage State Store Backend + Object Storage State Store Backend .. toctree:: :hidden: diff --git a/providers/common/io/docs/store_backend.rst b/providers/common/io/docs/state_store_backend.rst similarity index 63% rename from providers/common/io/docs/store_backend.rst rename to providers/common/io/docs/state_store_backend.rst index d52c11243f89c..3810d03b1766c 100644 --- a/providers/common/io/docs/store_backend.rst +++ b/providers/common/io/docs/state_store_backend.rst @@ -22,46 +22,46 @@ The default state store backend is :class:`~airflow.state.metastore.MetastoreSta task and asset state in the Airflow metadata database. For larger values, you may want to store state on object storage instead. -To enable object storage for task and asset store, set ``backend`` in the ``[state_store]`` section to -``airflow.providers.common.io.store.backend.StoreObjectStorageBackend``, and set -``store_objectstorage_path`` to the desired base location. The connection id is obtained from the user -part of the URL, e.g. ``store_objectstorage_path = s3://conn_id@mybucket/task-state/``. +To enable object storage for task and asset state store, set ``backend`` in the ``[state_store]`` section to +``airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend``, and set +``state_store_objectstorage_path`` to the desired base location. The connection id is obtained from the user +part of the URL, e.g. ``state_store_objectstorage_path = s3://conn_id@mybucket/task-state/``. Task state is stored under ``////`` and asset state under -``assets//`` beneath the configured base path. +``assets//`` beneath the configured base path. -By default (``store_objectstorage_threshold = 0``) all serialized values are offloaded to object storage. -Set ``store_objectstorage_threshold`` to a positive number of bytes to only offload values whose +By default (``state_store_objectstorage_threshold = 0``) all serialized values are offloaded to object storage. +Set ``state_store_objectstorage_threshold`` to a positive number of bytes to only offload values whose serialized size meets or exceeds the threshold, anything smaller are stored in the Airflow metadata database. -Optionally set ``store_objectstorage_compression`` to an fsspec-supported compression algorithm such as +Optionally set ``state_store_objectstorage_compression`` to an fsspec-supported compression algorithm such as ``gzip`` or ``snappy`` to compress values before writing. The following example stores all task and asset state in S3, compressed with gzip:: [state_store] - backend = airflow.providers.common.io.store.backend.StoreObjectStorageBackend + backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend [common.io] - store_objectstorage_path = s3://conn_id@mybucket/task-state/ - store_objectstorage_compression = gzip + state_store_objectstorage_path = s3://conn_id@mybucket/task-state/ + state_store_objectstorage_compression = gzip To only offload values larger than 1 MB:: [state_store] - backend = airflow.providers.common.io.store.backend.StoreObjectStorageBackend + backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend [common.io] - store_objectstorage_path = s3://conn_id@mybucket/task-state/ - store_objectstorage_threshold = 1048576 + state_store_objectstorage_path = s3://conn_id@mybucket/task-state/ + state_store_objectstorage_threshold = 1048576 Using the local filesystem (useful for development):: [state_store] - backend = airflow.providers.common.io.store.backend.StoreObjectStorageBackend + backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend [common.io] - store_objectstorage_path = file:///var/airflow/task-state/ + state_store_objectstorage_path = file:///var/airflow/task-state/ .. note:: diff --git a/providers/common/io/provider.yaml b/providers/common/io/provider.yaml index 3cabca3723786..1101b53a0258e 100644 --- a/providers/common/io/provider.yaml +++ b/providers/common/io/provider.yaml @@ -115,28 +115,28 @@ config: type: string example: "gz" default: "" - store_objectstorage_path: + state_store_objectstorage_path: description: | - Base path on object storage for the task/asset store backend, in URL format. - When set, StoreObjectStorageBackend will persist task and asset state under this + Base path on object storage for the task/asset state store backend, in URL format. + When set, StateStoreObjectStorageBackend will persist task and asset state under this prefix, organised as //// for tasks and - assets// for assets. + assets// for assets. version_added: 1.8.0 type: string example: "s3://conn_id@bucket/task-state/" default: "" - store_objectstorage_threshold: + state_store_objectstorage_threshold: description: | - Threshold in bytes for offloading serialized store values to object storage. 0 means + Threshold in bytes for offloading serialized state store values to object storage. 0 means always offload to object storage. Any positive number means values will be offloaded only when their serialized size is at least that many bytes. Must be non-negative. version_added: 1.8.0 type: integer example: "1000000" default: "0" - store_objectstorage_compression: + state_store_objectstorage_compression: description: | - Compression algorithm to use when writing task/asset store values to object storage. + Compression algorithm to use when writing task/asset state store values to object storage. Supported algorithms are a.o.: snappy, zip, gzip, bz2, and lzma. If not specified, no compression will be used. The same algorithm must be available on all workers. version_added: 1.8.0 diff --git a/providers/common/io/src/airflow/providers/common/io/get_provider_info.py b/providers/common/io/src/airflow/providers/common/io/get_provider_info.py index 3ce51f5209d2c..b9b60d69b827d 100644 --- a/providers/common/io/src/airflow/providers/common/io/get_provider_info.py +++ b/providers/common/io/src/airflow/providers/common/io/get_provider_info.py @@ -84,22 +84,22 @@ def get_provider_info(): "example": "gz", "default": "", }, - "store_objectstorage_path": { - "description": "Base path on object storage for the task/asset store backend, in URL format.\nWhen set, StoreObjectStorageBackend will persist task and asset state under this\nprefix, organised as //// for tasks and\nassets// for assets.\n", + "state_store_objectstorage_path": { + "description": "Base path on object storage for the task/asset state store backend, in URL format.\nWhen set, StateStoreObjectStorageBackend will persist task and asset state under this\nprefix, organised as //// for tasks and\nassets// for assets.\n", "version_added": "1.8.0", "type": "string", "example": "s3://conn_id@bucket/task-state/", "default": "", }, - "store_objectstorage_threshold": { - "description": "Threshold in bytes for offloading serialized store values to object storage. 0 means\nalways offload to object storage. Any positive number means values will be offloaded\nonly when their serialized size is at least that many bytes. Must be non-negative.\n", + "state_store_objectstorage_threshold": { + "description": "Threshold in bytes for offloading serialized state store values to object storage. 0 means\nalways offload to object storage. Any positive number means values will be offloaded\nonly when their serialized size is at least that many bytes. Must be non-negative.\n", "version_added": "1.8.0", "type": "integer", "example": "1000000", "default": "0", }, - "store_objectstorage_compression": { - "description": "Compression algorithm to use when writing task/asset store values to object storage.\nSupported algorithms are a.o.: snappy, zip, gzip, bz2, and lzma. If not specified,\nno compression will be used. The same algorithm must be available on all workers.\n", + "state_store_objectstorage_compression": { + "description": "Compression algorithm to use when writing task/asset state store values to object storage.\nSupported algorithms are a.o.: snappy, zip, gzip, bz2, and lzma. If not specified,\nno compression will be used. The same algorithm must be available on all workers.\n", "version_added": "1.8.0", "type": "string", "example": "gz", diff --git a/providers/common/io/src/airflow/providers/common/io/store/__init__.py b/providers/common/io/src/airflow/providers/common/io/state_store/__init__.py similarity index 100% rename from providers/common/io/src/airflow/providers/common/io/store/__init__.py rename to providers/common/io/src/airflow/providers/common/io/state_store/__init__.py diff --git a/providers/common/io/src/airflow/providers/common/io/store/backend.py b/providers/common/io/src/airflow/providers/common/io/state_store/backend.py similarity index 92% rename from providers/common/io/src/airflow/providers/common/io/store/backend.py rename to providers/common/io/src/airflow/providers/common/io/state_store/backend.py index 921a906a8402b..86dcb78823146 100644 --- a/providers/common/io/src/airflow/providers/common/io/store/backend.py +++ b/providers/common/io/src/airflow/providers/common/io/state_store/backend.py @@ -33,32 +33,27 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session -try: - from airflow.sdk._shared.state import AssetScope, BaseStoreBackend, StoreScope, TaskScope -except ImportError: - raise ImportError( - "StoreObjectStorageBackend requires Airflow >= 3.3. Please upgrade your Airflow installation." - ) from None from airflow.sdk import ObjectStoragePath +from airflow.sdk._shared.state import AssetScope, BaseStoreBackend, StoreScope, TaskScope SECTION = "common.io" @cache def _get_base_path() -> ObjectStoragePath: - return ObjectStoragePath(conf.get_mandatory_value(SECTION, "store_objectstorage_path")) + return ObjectStoragePath(conf.get_mandatory_value(SECTION, "state_store_objectstorage_path")) @cache def _get_compression() -> str | None: - value = conf.get(SECTION, "store_objectstorage_compression", fallback=None) + value = conf.get(SECTION, "state_store_objectstorage_compression", fallback=None) return value or None @cache def _get_threshold() -> int: - return conf.getint(SECTION, "store_objectstorage_threshold", fallback=0) + return conf.getint(SECTION, "state_store_objectstorage_threshold", fallback=0) def _compression_suffix() -> str: @@ -138,8 +133,8 @@ class StoreObjectStorageBackend(BaseStoreBackend): Config keys (all under ``[common.io]``): - - ``store_objectstorage_path``: base path, e.g. ``s3://conn_id@bucket/task-state/`` - - ``store_objectstorage_compression``: optional compression, e.g. ``gzip`` + - ``state_store_objectstorage_path``: base path, e.g. ``s3://conn_id@bucket/task-state/`` + - ``state_store_objectstorage_compression``: optional compression, e.g. ``gzip`` """ def get(self, scope: StoreScope, key: str, *, session: Session | None = None) -> str | None: diff --git a/providers/common/io/tests/unit/common/io/store/__init__.py b/providers/common/io/tests/unit/common/io/state_store/__init__.py similarity index 100% rename from providers/common/io/tests/unit/common/io/store/__init__.py rename to providers/common/io/tests/unit/common/io/state_store/__init__.py diff --git a/providers/common/io/tests/unit/common/io/store/test_backend.py b/providers/common/io/tests/unit/common/io/state_store/test_backend.py similarity index 86% rename from providers/common/io/tests/unit/common/io/store/test_backend.py rename to providers/common/io/tests/unit/common/io/state_store/test_backend.py index 23ea6a35d1738..6295897839f02 100644 --- a/providers/common/io/tests/unit/common/io/store/test_backend.py +++ b/providers/common/io/tests/unit/common/io/state_store/test_backend.py @@ -23,9 +23,9 @@ if not AIRFLOW_V_3_3_PLUS: pytest.skip("Store backend requires Airflow >= 3.3", allow_module_level=True) -from airflow.providers.common.io.store import backend -from airflow.providers.common.io.store.backend import ( - StoreObjectStorageBackend, +from airflow.providers.common.io.state_store import backend +from airflow.providers.common.io.state_store.backend import ( + StateStoreObjectStorageBackend, _asset_path, _read, _task_path, @@ -63,8 +63,8 @@ def conf_overrides(base_path): with conf_vars( { - ("common.io", "store_objectstorage_path"): base_path, - ("common.io", "store_objectstorage_compression"): "", + ("common.io", "state_store_objectstorage_path"): base_path, + ("common.io", "state_store_objectstorage_compression"): "", } ): yield base_path @@ -105,8 +105,8 @@ def test_compression_suffix_appended(self, tmp_path): with conf_vars( { - ("common.io", "store_objectstorage_path"): f"file://{store_path}", - ("common.io", "store_objectstorage_compression"): "gzip", + ("common.io", "state_store_objectstorage_path"): f"file://{store_path}", + ("common.io", "state_store_objectstorage_compression"): "gzip", } ): backend._get_base_path.cache_clear() @@ -133,10 +133,10 @@ def test_write_creates_parent_dirs(self, conf_overrides): assert path.exists() -class TestStoreObjectStorageBackend: +class TestStateStoreObjectStorageBackend: @pytest.fixture def store(self, conf_overrides): - return StoreObjectStorageBackend() + return StateStoreObjectStorageBackend() @pytest.fixture def task_scope(self): @@ -207,36 +207,36 @@ def test_deserialize_missing_ref_returns_none(self, store, conf_overrides): def test_task_serialize_offloads_to_storage(self, task_scope, base_path): with conf_vars( { - ("common.io", "store_objectstorage_path"): base_path, - ("common.io", "store_objectstorage_threshold"): "0", + ("common.io", "state_store_objectstorage_path"): base_path, + ("common.io", "state_store_objectstorage_threshold"): "0", } ): backend._get_threshold.cache_clear() - store = StoreObjectStorageBackend() + store = StateStoreObjectStorageBackend() ref = store.serialize_task_store_to_ref(value={"x": 1}, key="k", scope=task_scope) assert ref.startswith("file://") def test_asset_serialize_offloads_to_storage(self, asset_scope, base_path): with conf_vars( { - ("common.io", "store_objectstorage_path"): base_path, - ("common.io", "store_objectstorage_threshold"): "0", + ("common.io", "state_store_objectstorage_path"): base_path, + ("common.io", "state_store_objectstorage_threshold"): "0", } ): backend._get_threshold.cache_clear() - store = StoreObjectStorageBackend() + store = StateStoreObjectStorageBackend() ref = store.serialize_asset_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) assert ref.startswith("file://") def test_task_serialize_to_db_when_below_threshold(self, task_scope, base_path): with conf_vars( { - ("common.io", "store_objectstorage_path"): base_path, - ("common.io", "store_objectstorage_threshold"): "10000", + ("common.io", "state_store_objectstorage_path"): base_path, + ("common.io", "state_store_objectstorage_threshold"): "10000", } ): backend._get_threshold.cache_clear() - store = StoreObjectStorageBackend() + store = StateStoreObjectStorageBackend() ref = store.serialize_task_store_to_ref(value={"x": 1}, key="k", scope=task_scope) assert not ref.startswith("file://") assert store.deserialize_task_store_from_ref(ref) == {"x": 1} @@ -244,12 +244,12 @@ def test_task_serialize_to_db_when_below_threshold(self, task_scope, base_path): def test_asset_serialize_to_db_when_below_threshold(self, asset_scope, base_path): with conf_vars( { - ("common.io", "store_objectstorage_path"): base_path, - ("common.io", "store_objectstorage_threshold"): "10000", + ("common.io", "state_store_objectstorage_path"): base_path, + ("common.io", "state_store_objectstorage_threshold"): "10000", } ): backend._get_threshold.cache_clear() - store = StoreObjectStorageBackend() + store = StateStoreObjectStorageBackend() ref = store.serialize_asset_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) assert not ref.startswith("file://") assert store.deserialize_asset_store_from_ref(ref) == {"x": 1} From 9107ac66f47302ced73848773e3d8f33be8cd9d5 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 26 Jun 2026 15:02:59 +0530 Subject: [PATCH 09/11] mega rename complete --- generated/provider_dependencies.json.sha256sum | 2 +- providers/common/io/docs/state_store_backend.rst | 8 ++++---- providers/common/io/provider.yaml | 2 +- .../providers/common/io/get_provider_info.py | 2 +- .../providers/common/io/state_store/backend.py | 2 +- .../unit/common/io/state_store/test_backend.py | 14 +++++++------- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/generated/provider_dependencies.json.sha256sum b/generated/provider_dependencies.json.sha256sum index 220e82373bdb5..d01dd3dcd1507 100644 --- a/generated/provider_dependencies.json.sha256sum +++ b/generated/provider_dependencies.json.sha256sum @@ -1 +1 @@ -1d2ec6ba7a96de0cc04b7f68834e7e1420749c96be6654e45dc1f72755823d59 +1b826d55fe4bb9e8fa331ae2138ec76747aeca877f9e25eb02ea7e0aec00cdfa diff --git a/providers/common/io/docs/state_store_backend.rst b/providers/common/io/docs/state_store_backend.rst index 3810d03b1766c..a95bbd9728f2b 100644 --- a/providers/common/io/docs/state_store_backend.rst +++ b/providers/common/io/docs/state_store_backend.rst @@ -23,7 +23,7 @@ task and asset state in the Airflow metadata database. For larger values, you ma object storage instead. To enable object storage for task and asset state store, set ``backend`` in the ``[state_store]`` section to -``airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend``, and set +``airflow.providers.common.io.state_store.backend.StateStateStoreObjectStorageBackend``, and set ``state_store_objectstorage_path`` to the desired base location. The connection id is obtained from the user part of the URL, e.g. ``state_store_objectstorage_path = s3://conn_id@mybucket/task-state/``. @@ -40,7 +40,7 @@ Optionally set ``state_store_objectstorage_compression`` to an fsspec-supported The following example stores all task and asset state in S3, compressed with gzip:: [state_store] - backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend + backend = airflow.providers.common.io.state_store.backend.StateStateStoreObjectStorageBackend [common.io] state_store_objectstorage_path = s3://conn_id@mybucket/task-state/ @@ -49,7 +49,7 @@ The following example stores all task and asset state in S3, compressed with gzi To only offload values larger than 1 MB:: [state_store] - backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend + backend = airflow.providers.common.io.state_store.backend.StateStateStoreObjectStorageBackend [common.io] state_store_objectstorage_path = s3://conn_id@mybucket/task-state/ @@ -58,7 +58,7 @@ To only offload values larger than 1 MB:: Using the local filesystem (useful for development):: [state_store] - backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend + backend = airflow.providers.common.io.state_store.backend.StateStateStoreObjectStorageBackend [common.io] state_store_objectstorage_path = file:///var/airflow/task-state/ diff --git a/providers/common/io/provider.yaml b/providers/common/io/provider.yaml index 1101b53a0258e..664b163bd770d 100644 --- a/providers/common/io/provider.yaml +++ b/providers/common/io/provider.yaml @@ -118,7 +118,7 @@ config: state_store_objectstorage_path: description: | Base path on object storage for the task/asset state store backend, in URL format. - When set, StateStoreObjectStorageBackend will persist task and asset state under this + When set, StateStateStoreObjectStorageBackend will persist task and asset state under this prefix, organised as //// for tasks and assets// for assets. version_added: 1.8.0 diff --git a/providers/common/io/src/airflow/providers/common/io/get_provider_info.py b/providers/common/io/src/airflow/providers/common/io/get_provider_info.py index b9b60d69b827d..e2ca11884691a 100644 --- a/providers/common/io/src/airflow/providers/common/io/get_provider_info.py +++ b/providers/common/io/src/airflow/providers/common/io/get_provider_info.py @@ -85,7 +85,7 @@ def get_provider_info(): "default": "", }, "state_store_objectstorage_path": { - "description": "Base path on object storage for the task/asset state store backend, in URL format.\nWhen set, StateStoreObjectStorageBackend will persist task and asset state under this\nprefix, organised as //// for tasks and\nassets// for assets.\n", + "description": "Base path on object storage for the task/asset state store backend, in URL format.\nWhen set, StateStateStoreObjectStorageBackend will persist task and asset state under this\nprefix, organised as //// for tasks and\nassets// for assets.\n", "version_added": "1.8.0", "type": "string", "example": "s3://conn_id@bucket/task-state/", diff --git a/providers/common/io/src/airflow/providers/common/io/state_store/backend.py b/providers/common/io/src/airflow/providers/common/io/state_store/backend.py index 86dcb78823146..8975483226a60 100644 --- a/providers/common/io/src/airflow/providers/common/io/state_store/backend.py +++ b/providers/common/io/src/airflow/providers/common/io/state_store/backend.py @@ -127,7 +127,7 @@ def _scope_path(scope: StoreScope, key: str) -> ObjectStoragePath: raise TypeError(f"Unknown scope type: {type(scope)}") -class StoreObjectStorageBackend(BaseStoreBackend): +class StateStoreObjectStorageBackend(BaseStoreBackend): """ Object-storage backend for task and asset store. diff --git a/providers/common/io/tests/unit/common/io/state_store/test_backend.py b/providers/common/io/tests/unit/common/io/state_store/test_backend.py index 6295897839f02..1c2cc30c4dd1c 100644 --- a/providers/common/io/tests/unit/common/io/state_store/test_backend.py +++ b/providers/common/io/tests/unit/common/io/state_store/test_backend.py @@ -25,7 +25,7 @@ from airflow.providers.common.io.state_store import backend from airflow.providers.common.io.state_store.backend import ( - StateStoreObjectStorageBackend, + StateStateStoreObjectStorageBackend, _asset_path, _read, _task_path, @@ -133,10 +133,10 @@ def test_write_creates_parent_dirs(self, conf_overrides): assert path.exists() -class TestStateStoreObjectStorageBackend: +class TestStateStateStoreObjectStorageBackend: @pytest.fixture def store(self, conf_overrides): - return StateStoreObjectStorageBackend() + return StateStateStoreObjectStorageBackend() @pytest.fixture def task_scope(self): @@ -212,7 +212,7 @@ def test_task_serialize_offloads_to_storage(self, task_scope, base_path): } ): backend._get_threshold.cache_clear() - store = StateStoreObjectStorageBackend() + store = StateStateStoreObjectStorageBackend() ref = store.serialize_task_store_to_ref(value={"x": 1}, key="k", scope=task_scope) assert ref.startswith("file://") @@ -224,7 +224,7 @@ def test_asset_serialize_offloads_to_storage(self, asset_scope, base_path): } ): backend._get_threshold.cache_clear() - store = StateStoreObjectStorageBackend() + store = StateStateStoreObjectStorageBackend() ref = store.serialize_asset_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) assert ref.startswith("file://") @@ -236,7 +236,7 @@ def test_task_serialize_to_db_when_below_threshold(self, task_scope, base_path): } ): backend._get_threshold.cache_clear() - store = StateStoreObjectStorageBackend() + store = StateStateStoreObjectStorageBackend() ref = store.serialize_task_store_to_ref(value={"x": 1}, key="k", scope=task_scope) assert not ref.startswith("file://") assert store.deserialize_task_store_from_ref(ref) == {"x": 1} @@ -249,7 +249,7 @@ def test_asset_serialize_to_db_when_below_threshold(self, asset_scope, base_path } ): backend._get_threshold.cache_clear() - store = StateStoreObjectStorageBackend() + store = StateStateStoreObjectStorageBackend() ref = store.serialize_asset_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) assert not ref.startswith("file://") assert store.deserialize_asset_store_from_ref(ref) == {"x": 1} From 5a01c9d4e00cb5aba4e7084d33169e9f65776b64 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 26 Jun 2026 15:49:39 +0530 Subject: [PATCH 10/11] fixing CI --- generated/provider_dependencies.json.sha256sum | 2 +- providers/common/io/provider.yaml | 2 +- .../providers/common/io/get_provider_info.py | 2 +- .../unit/common/io/state_store/test_backend.py | 14 +++++++------- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/generated/provider_dependencies.json.sha256sum b/generated/provider_dependencies.json.sha256sum index d01dd3dcd1507..220e82373bdb5 100644 --- a/generated/provider_dependencies.json.sha256sum +++ b/generated/provider_dependencies.json.sha256sum @@ -1 +1 @@ -1b826d55fe4bb9e8fa331ae2138ec76747aeca877f9e25eb02ea7e0aec00cdfa +1d2ec6ba7a96de0cc04b7f68834e7e1420749c96be6654e45dc1f72755823d59 diff --git a/providers/common/io/provider.yaml b/providers/common/io/provider.yaml index 664b163bd770d..1101b53a0258e 100644 --- a/providers/common/io/provider.yaml +++ b/providers/common/io/provider.yaml @@ -118,7 +118,7 @@ config: state_store_objectstorage_path: description: | Base path on object storage for the task/asset state store backend, in URL format. - When set, StateStateStoreObjectStorageBackend will persist task and asset state under this + When set, StateStoreObjectStorageBackend will persist task and asset state under this prefix, organised as //// for tasks and assets// for assets. version_added: 1.8.0 diff --git a/providers/common/io/src/airflow/providers/common/io/get_provider_info.py b/providers/common/io/src/airflow/providers/common/io/get_provider_info.py index e2ca11884691a..b9b60d69b827d 100644 --- a/providers/common/io/src/airflow/providers/common/io/get_provider_info.py +++ b/providers/common/io/src/airflow/providers/common/io/get_provider_info.py @@ -85,7 +85,7 @@ def get_provider_info(): "default": "", }, "state_store_objectstorage_path": { - "description": "Base path on object storage for the task/asset state store backend, in URL format.\nWhen set, StateStateStoreObjectStorageBackend will persist task and asset state under this\nprefix, organised as //// for tasks and\nassets// for assets.\n", + "description": "Base path on object storage for the task/asset state store backend, in URL format.\nWhen set, StateStoreObjectStorageBackend will persist task and asset state under this\nprefix, organised as //// for tasks and\nassets// for assets.\n", "version_added": "1.8.0", "type": "string", "example": "s3://conn_id@bucket/task-state/", diff --git a/providers/common/io/tests/unit/common/io/state_store/test_backend.py b/providers/common/io/tests/unit/common/io/state_store/test_backend.py index 1c2cc30c4dd1c..6295897839f02 100644 --- a/providers/common/io/tests/unit/common/io/state_store/test_backend.py +++ b/providers/common/io/tests/unit/common/io/state_store/test_backend.py @@ -25,7 +25,7 @@ from airflow.providers.common.io.state_store import backend from airflow.providers.common.io.state_store.backend import ( - StateStateStoreObjectStorageBackend, + StateStoreObjectStorageBackend, _asset_path, _read, _task_path, @@ -133,10 +133,10 @@ def test_write_creates_parent_dirs(self, conf_overrides): assert path.exists() -class TestStateStateStoreObjectStorageBackend: +class TestStateStoreObjectStorageBackend: @pytest.fixture def store(self, conf_overrides): - return StateStateStoreObjectStorageBackend() + return StateStoreObjectStorageBackend() @pytest.fixture def task_scope(self): @@ -212,7 +212,7 @@ def test_task_serialize_offloads_to_storage(self, task_scope, base_path): } ): backend._get_threshold.cache_clear() - store = StateStateStoreObjectStorageBackend() + store = StateStoreObjectStorageBackend() ref = store.serialize_task_store_to_ref(value={"x": 1}, key="k", scope=task_scope) assert ref.startswith("file://") @@ -224,7 +224,7 @@ def test_asset_serialize_offloads_to_storage(self, asset_scope, base_path): } ): backend._get_threshold.cache_clear() - store = StateStateStoreObjectStorageBackend() + store = StateStoreObjectStorageBackend() ref = store.serialize_asset_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) assert ref.startswith("file://") @@ -236,7 +236,7 @@ def test_task_serialize_to_db_when_below_threshold(self, task_scope, base_path): } ): backend._get_threshold.cache_clear() - store = StateStateStoreObjectStorageBackend() + store = StateStoreObjectStorageBackend() ref = store.serialize_task_store_to_ref(value={"x": 1}, key="k", scope=task_scope) assert not ref.startswith("file://") assert store.deserialize_task_store_from_ref(ref) == {"x": 1} @@ -249,7 +249,7 @@ def test_asset_serialize_to_db_when_below_threshold(self, asset_scope, base_path } ): backend._get_threshold.cache_clear() - store = StateStateStoreObjectStorageBackend() + store = StateStoreObjectStorageBackend() ref = store.serialize_asset_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) assert not ref.startswith("file://") assert store.deserialize_asset_store_from_ref(ref) == {"x": 1} From 1159c9890c78f49f2ce1015810932a442553b30d Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 29 Jun 2026 17:03:53 +0530 Subject: [PATCH 11/11] comments from ian and kaxil --- .../provider_dependencies.json.sha256sum | 2 +- .../common/io/docs/state_store_backend.rst | 11 +++++----- providers/common/io/provider.yaml | 4 ++-- .../providers/common/io/get_provider_info.py | 4 ++-- .../common/io/state_store/backend.py | 8 +++---- .../common/io/state_store/test_backend.py | 22 +++++++++---------- 6 files changed, 25 insertions(+), 26 deletions(-) diff --git a/generated/provider_dependencies.json.sha256sum b/generated/provider_dependencies.json.sha256sum index 220e82373bdb5..527c7ea73621a 100644 --- a/generated/provider_dependencies.json.sha256sum +++ b/generated/provider_dependencies.json.sha256sum @@ -1 +1 @@ -1d2ec6ba7a96de0cc04b7f68834e7e1420749c96be6654e45dc1f72755823d59 +636713a4ca3c9ec99e28346cb42db798399d8e9e9dbbf2d981d600d9196739f5 diff --git a/providers/common/io/docs/state_store_backend.rst b/providers/common/io/docs/state_store_backend.rst index a95bbd9728f2b..2c8a59a7a027f 100644 --- a/providers/common/io/docs/state_store_backend.rst +++ b/providers/common/io/docs/state_store_backend.rst @@ -19,11 +19,10 @@ Object Storage State Store Backend =================================== The default state store backend is :class:`~airflow.state.metastore.MetastoreStateBackend`, which persists -task and asset state in the Airflow metadata database. For larger values, you may want to store state on -object storage instead. +task and asset state in the Airflow metadata database via the API Server's Execution API. For larger values, you may want to store state on object storage directly from the task instead. To enable object storage for task and asset state store, set ``backend`` in the ``[state_store]`` section to -``airflow.providers.common.io.state_store.backend.StateStateStoreObjectStorageBackend``, and set +``airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend``, and set ``state_store_objectstorage_path`` to the desired base location. The connection id is obtained from the user part of the URL, e.g. ``state_store_objectstorage_path = s3://conn_id@mybucket/task-state/``. @@ -40,7 +39,7 @@ Optionally set ``state_store_objectstorage_compression`` to an fsspec-supported The following example stores all task and asset state in S3, compressed with gzip:: [state_store] - backend = airflow.providers.common.io.state_store.backend.StateStateStoreObjectStorageBackend + backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend [common.io] state_store_objectstorage_path = s3://conn_id@mybucket/task-state/ @@ -49,7 +48,7 @@ The following example stores all task and asset state in S3, compressed with gzi To only offload values larger than 1 MB:: [state_store] - backend = airflow.providers.common.io.state_store.backend.StateStateStoreObjectStorageBackend + backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend [common.io] state_store_objectstorage_path = s3://conn_id@mybucket/task-state/ @@ -58,7 +57,7 @@ To only offload values larger than 1 MB:: Using the local filesystem (useful for development):: [state_store] - backend = airflow.providers.common.io.state_store.backend.StateStateStoreObjectStorageBackend + backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend [common.io] state_store_objectstorage_path = file:///var/airflow/task-state/ diff --git a/providers/common/io/provider.yaml b/providers/common/io/provider.yaml index 1101b53a0258e..8831783d665e0 100644 --- a/providers/common/io/provider.yaml +++ b/providers/common/io/provider.yaml @@ -137,9 +137,9 @@ config: state_store_objectstorage_compression: description: | Compression algorithm to use when writing task/asset state store values to object storage. - Supported algorithms are a.o.: snappy, zip, gzip, bz2, and lzma. If not specified, + Supported algorithms are a.o.: gzip, bz2, lzma, and xz. If not specified, no compression will be used. The same algorithm must be available on all workers. version_added: 1.8.0 type: string - example: "gz" + example: "gzip" default: "" diff --git a/providers/common/io/src/airflow/providers/common/io/get_provider_info.py b/providers/common/io/src/airflow/providers/common/io/get_provider_info.py index b9b60d69b827d..f63cab0b8c1c2 100644 --- a/providers/common/io/src/airflow/providers/common/io/get_provider_info.py +++ b/providers/common/io/src/airflow/providers/common/io/get_provider_info.py @@ -99,10 +99,10 @@ def get_provider_info(): "default": "0", }, "state_store_objectstorage_compression": { - "description": "Compression algorithm to use when writing task/asset state store values to object storage.\nSupported algorithms are a.o.: snappy, zip, gzip, bz2, and lzma. If not specified,\nno compression will be used. The same algorithm must be available on all workers.\n", + "description": "Compression algorithm to use when writing task/asset state store values to object storage.\nSupported algorithms are a.o.: gzip, bz2, lzma, and xz. If not specified,\nno compression will be used. The same algorithm must be available on all workers.\n", "version_added": "1.8.0", "type": "string", - "example": "gz", + "example": "gzip", "default": "", }, }, diff --git a/providers/common/io/src/airflow/providers/common/io/state_store/backend.py b/providers/common/io/src/airflow/providers/common/io/state_store/backend.py index 8975483226a60..63e1804cf00f1 100644 --- a/providers/common/io/src/airflow/providers/common/io/state_store/backend.py +++ b/providers/common/io/src/airflow/providers/common/io/state_store/backend.py @@ -208,7 +208,7 @@ async def aclear( ) -> None: raise NotImplementedError - def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, scope: TaskScope) -> str: + def serialize_task_state_store_to_ref(self, *, value: JsonValue, key: str, scope: TaskScope) -> str: serialized = json.dumps(value) if len(serialized.encode()) < _get_threshold(): return serialized @@ -216,7 +216,7 @@ def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, scope: Task _write(path, serialized) return str(path) - def deserialize_task_store_from_ref(self, stored: str) -> JsonValue: + def deserialize_task_state_store_from_ref(self, stored: str) -> JsonValue: if not stored: return None if _is_storage_ref(stored): @@ -226,7 +226,7 @@ def deserialize_task_store_from_ref(self, stored: str) -> JsonValue: return None return json.loads(stored) - def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str, scope: AssetScope) -> str: + def serialize_asset_state_store_to_ref(self, *, value: JsonValue, key: str, scope: AssetScope) -> str: serialized = json.dumps(value) if len(serialized.encode()) < _get_threshold(): return serialized @@ -234,7 +234,7 @@ def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str, scope: Ass _write(path, serialized) return str(path) - def deserialize_asset_store_from_ref(self, stored: str) -> JsonValue: + def deserialize_asset_state_store_from_ref(self, stored: str) -> JsonValue: if not stored: return None if _is_storage_ref(stored): diff --git a/providers/common/io/tests/unit/common/io/state_store/test_backend.py b/providers/common/io/tests/unit/common/io/state_store/test_backend.py index 6295897839f02..79c21dd6b8753 100644 --- a/providers/common/io/tests/unit/common/io/state_store/test_backend.py +++ b/providers/common/io/tests/unit/common/io/state_store/test_backend.py @@ -189,19 +189,19 @@ def test_clear_asset(self, store, asset_scope): assert store.get(asset_scope, "k2") is None def test_serialize_and_deserialize_task(self, store, task_scope): - ref = store.serialize_task_store_to_ref(value={"x": 1}, key="job_id", scope=task_scope) + ref = store.serialize_task_state_store_to_ref(value={"x": 1}, key="job_id", scope=task_scope) assert ref.startswith("file://") - result = store.deserialize_task_store_from_ref(ref) + result = store.deserialize_task_state_store_from_ref(ref) assert result == {"x": 1} def test_serialize_and_deserialize_asset(self, store, asset_scope): - ref = store.serialize_asset_store_to_ref(value=[1, 2, 3], key="result", scope=asset_scope) + ref = store.serialize_asset_state_store_to_ref(value=[1, 2, 3], key="result", scope=asset_scope) assert ref.startswith("file://") - result = store.deserialize_asset_store_from_ref(ref) + result = store.deserialize_asset_state_store_from_ref(ref) assert result == [1, 2, 3] def test_deserialize_missing_ref_returns_none(self, store, conf_overrides): - result = store.deserialize_task_store_from_ref(f"{conf_overrides}/no/such/path") + result = store.deserialize_task_state_store_from_ref(f"{conf_overrides}/no/such/path") assert result is None def test_task_serialize_offloads_to_storage(self, task_scope, base_path): @@ -213,7 +213,7 @@ def test_task_serialize_offloads_to_storage(self, task_scope, base_path): ): backend._get_threshold.cache_clear() store = StateStoreObjectStorageBackend() - ref = store.serialize_task_store_to_ref(value={"x": 1}, key="k", scope=task_scope) + ref = store.serialize_task_state_store_to_ref(value={"x": 1}, key="k", scope=task_scope) assert ref.startswith("file://") def test_asset_serialize_offloads_to_storage(self, asset_scope, base_path): @@ -225,7 +225,7 @@ def test_asset_serialize_offloads_to_storage(self, asset_scope, base_path): ): backend._get_threshold.cache_clear() store = StateStoreObjectStorageBackend() - ref = store.serialize_asset_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) + ref = store.serialize_asset_state_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) assert ref.startswith("file://") def test_task_serialize_to_db_when_below_threshold(self, task_scope, base_path): @@ -237,9 +237,9 @@ def test_task_serialize_to_db_when_below_threshold(self, task_scope, base_path): ): backend._get_threshold.cache_clear() store = StateStoreObjectStorageBackend() - ref = store.serialize_task_store_to_ref(value={"x": 1}, key="k", scope=task_scope) + ref = store.serialize_task_state_store_to_ref(value={"x": 1}, key="k", scope=task_scope) assert not ref.startswith("file://") - assert store.deserialize_task_store_from_ref(ref) == {"x": 1} + assert store.deserialize_task_state_store_from_ref(ref) == {"x": 1} def test_asset_serialize_to_db_when_below_threshold(self, asset_scope, base_path): with conf_vars( @@ -250,6 +250,6 @@ def test_asset_serialize_to_db_when_below_threshold(self, asset_scope, base_path ): backend._get_threshold.cache_clear() store = StateStoreObjectStorageBackend() - ref = store.serialize_asset_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) + ref = store.serialize_asset_state_store_to_ref(value={"x": 1}, key="k", scope=asset_scope) assert not ref.startswith("file://") - assert store.deserialize_asset_store_from_ref(ref) == {"x": 1} + assert store.deserialize_asset_state_store_from_ref(ref) == {"x": 1}