Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions generated/provider_dependencies.json.sha256sum
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
636713a4ca3c9ec99e28346cb42db798399d8e9e9dbbf2d981d600d9196739f5
1 change: 1 addition & 0 deletions providers/common/io/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
Transferring a file <transfer>
Operators <operators>
Object Storage XCom Backend <xcom_backend>
Object Storage State Store Backend <state_store_backend>

.. toctree::
:hidden:
Expand Down
80 changes: 80 additions & 0 deletions providers/common/io/docs/state_store_backend.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Object Storage State Store Backend
===================================

The default state store backend is :class:`~airflow.state.metastore.MetastoreStateBackend`, which persists
task and asset state in the Airflow metadata database via the API Server's Execution API. For larger values, you may want to store state on object storage directly from the task instead.

To enable object storage for task and asset state store, set ``backend`` in the ``[state_store]`` section to
``airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend``, and set
``state_store_objectstorage_path`` to the desired base location. The connection id is obtained from the user
part of the URL, e.g. ``state_store_objectstorage_path = s3://conn_id@mybucket/task-state/``.

Task state is stored under ``<dag_id>/<run_id>/<task_id>/<map_index>/<key>`` and asset state under
``assets/<asset_identifier>/<key>`` beneath the configured base path.

By default (``state_store_objectstorage_threshold = 0``) all serialized values are offloaded to object storage.
Set ``state_store_objectstorage_threshold`` to a positive number of bytes to only offload values whose
serialized size meets or exceeds the threshold, anything smaller are stored in the Airflow metadata database.

Optionally set ``state_store_objectstorage_compression`` to an fsspec-supported compression algorithm such as
``gzip`` or ``snappy`` to compress values before writing.

The following example stores all task and asset state in S3, compressed with gzip::

[state_store]
backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend

[common.io]
state_store_objectstorage_path = s3://conn_id@mybucket/task-state/
state_store_objectstorage_compression = gzip

To only offload values larger than 1 MB::

[state_store]
backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend

[common.io]
state_store_objectstorage_path = s3://conn_id@mybucket/task-state/
state_store_objectstorage_threshold = 1048576

Using the local filesystem (useful for development)::

[state_store]
backend = airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend

[common.io]
state_store_objectstorage_path = file:///var/airflow/task-state/

.. note::

Compression requires the relevant library to be installed in your Python environment.
For example, ``snappy`` requires ``python-snappy``. Gzip and bz2 work out of the box.

.. note::

``expires_at`` is not enforced by this backend. Values written to object storage persist
indefinitely until explicitly deleted. Use your object storage provider's lifecycle policies
(e.g. S3 lifecycle rules, GCS object lifecycle) to automatically expire old state.

.. note::

Task state paths are keyed on ``(dag_id, run_id, task_id, map_index)`` and are stable across
task retries. This makes this backend suitable for operators that use
:class:`~airflow.sdk.ResumableJobMixin` to reconnect to external jobs after a retry.
28 changes: 28 additions & 0 deletions providers/common/io/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,31 @@ config:
type: string
example: "gz"
default: ""
state_store_objectstorage_path:
description: |
Base path on object storage for the task/asset state store backend, in URL format.
When set, StateStoreObjectStorageBackend will persist task and asset state under this
prefix, organised as <dag_id>/<run_id>/<task_id>/<map_index>/<key> for tasks and
assets/<asset_identifier>/<key> for assets.
version_added: 1.8.0
type: string
example: "s3://conn_id@bucket/task-state/"
default: ""
state_store_objectstorage_threshold:
description: |
Threshold in bytes for offloading serialized state store values to object storage. 0 means
always offload to object storage. Any positive number means values will be offloaded
only when their serialized size is at least that many bytes. Must be non-negative.
version_added: 1.8.0
type: integer
example: "1000000"
default: "0"
state_store_objectstorage_compression:
description: |
Compression algorithm to use when writing task/asset state store values to object storage.
Supported algorithms are a.o.: gzip, bz2, lzma, and xz. If not specified,
no compression will be used. The same algorithm must be available on all workers.
version_added: 1.8.0
type: string
example: "gzip"
default: ""
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,27 @@ def get_provider_info():
"example": "gz",
"default": "",
},
"state_store_objectstorage_path": {
"description": "Base path on object storage for the task/asset state store backend, in URL format.\nWhen set, StateStoreObjectStorageBackend will persist task and asset state under this\nprefix, organised as <dag_id>/<run_id>/<task_id>/<map_index>/<key> for tasks and\nassets/<asset_identifier>/<key> for assets.\n",
"version_added": "1.8.0",
"type": "string",
"example": "s3://conn_id@bucket/task-state/",
"default": "",
},
"state_store_objectstorage_threshold": {
"description": "Threshold in bytes for offloading serialized state store values to object storage. 0 means\nalways offload to object storage. Any positive number means values will be offloaded\nonly when their serialized size is at least that many bytes. Must be non-negative.\n",
"version_added": "1.8.0",
"type": "integer",
"example": "1000000",
"default": "0",
},
"state_store_objectstorage_compression": {
"description": "Compression algorithm to use when writing task/asset state store values to object storage.\nSupported algorithms are a.o.: gzip, bz2, lzma, and xz. If not specified,\nno compression will be used. The same algorithm must be available on all workers.\n",
"version_added": "1.8.0",
"type": "string",
"example": "gzip",
"default": "",
},
},
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading