From afd24cff40a962b0a1c55f2d46e62b98c8d3b719 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sun, 8 Feb 2026 07:36:35 -0300 Subject: [PATCH 1/8] Fix ImportError when importing SUPERVISOR_COMMS outside task context MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SUPERVISOR_COMMS was declared as a bare type annotation without assignment, which does not create an actual module attribute in Python. This caused ImportError when Variable.get() was called at the top level of a DAG file (outside task execution context), such as during dag.test(). Initialize SUPERVISOR_COMMS to None so the import always succeeds, add None guards before .send() calls in _set_variable, _delete_variable, and ExecutionAPISecretsBackend methods, and provide helpful error messages suggesting alternatives like environment variables or Jinja templates. Closes: #51816 Signed-off-by: André Ahlert --- .../src/airflow/sdk/execution_time/context.py | 51 +++++++++++++++++-- .../execution_time/secrets/execution_api.py | 12 +++++ .../airflow/sdk/execution_time/task_runner.py | 2 +- .../task_sdk/definitions/test_variables.py | 46 +++++++++++++++++ 4 files changed, 107 insertions(+), 4 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index 6f72667dd90b1..a7235befae117 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -289,8 +289,25 @@ def _get_variable(key: str, deserialize_json: bool) -> Any: ) # If no backend found the variable, raise a not found error (mirrors _get_connection) - from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType from airflow.sdk.execution_time.comms import ErrorResponse + from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + + if SUPERVISOR_COMMS is None: + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.VARIABLE_NOT_FOUND, + detail={ + "message": ( + f"Variable '{key}' not found. Note: SUPERVISOR_COMMS is not available, " + "which means this code is running outside a task execution context " + "(e.g., at the top level of a DAG file). " + "Consider using environment variables (AIRFLOW_VAR_), " + "Jinja templates ({{ var.value. }}), " + "or move the Variable.get() call inside a task function." + ) + }, + ) + ) raise AirflowRuntimeError( ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message": f"Variable {key} not found"}) @@ -335,7 +352,7 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ import json from airflow.sdk.execution_time.cache import SecretCache - from airflow.sdk.execution_time.comms import PutVariable + from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS @@ -369,6 +386,20 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ except Exception as e: log.exception(e) + if SUPERVISOR_COMMS is None: + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.set() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.set() at the top level of a DAG file " + "or outside of a running task. Variable.set() can only be used inside a task." + ) + }, + ) + ) + SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) # Invalidate cache after setting the variable @@ -382,9 +413,23 @@ def _delete_variable(key: str) -> None: # will make that module depend on Task SDK, which is not ideal because we intend to # keep Task SDK as a separate package than execution time mods. from airflow.sdk.execution_time.cache import SecretCache - from airflow.sdk.execution_time.comms import DeleteVariable + from airflow.sdk.execution_time.comms import DeleteVariable, ErrorResponse from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.delete() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.delete() at the top level of a DAG file " + "or outside of a running task. Variable.delete() can only be used inside a task." + ) + }, + ) + ) + msg = SUPERVISOR_COMMS.send(DeleteVariable(key=key)) if TYPE_CHECKING: assert isinstance(msg, OKResponse) diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py index 57bffd12a1620..9ee0cc0535e88 100644 --- a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py +++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py @@ -81,6 +81,9 @@ def get_connection(self, conn_id: str, team_name: str | None = None) -> Connecti from airflow.sdk.execution_time.context import _process_connection_result_conn from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: + return None + try: msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id)) @@ -115,6 +118,9 @@ def get_variable(self, key: str, team_name: str | None = None) -> str | None: from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable, VariableResult from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: + return None + try: msg = SUPERVISOR_COMMS.send(GetVariable(key=key)) @@ -147,6 +153,9 @@ async def aget_connection(self, conn_id: str) -> Connection | None: # type: ign from airflow.sdk.execution_time.context import _process_connection_result_conn from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: + return None + try: msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id)) @@ -176,6 +185,9 @@ async def aget_variable(self, key: str) -> str | None: from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable, VariableResult from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: + return None + try: msg = await SUPERVISOR_COMMS.asend(GetVariable(key=key)) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 61e476e60b930..456c5eb330b35 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -874,7 +874,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: # deeply nested execution stack. # - By defining `SUPERVISOR_COMMS` as a global, it ensures that this communication mechanism is readily # accessible wherever needed during task execution without modifying every layer of the call stack. -SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] +SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] | None = None # State machine! diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py b/task-sdk/tests/task_sdk/definitions/test_variables.py index 6e94ccf503f8c..b5e5a2feb93a7 100644 --- a/task-sdk/tests/task_sdk/definitions/test_variables.py +++ b/task-sdk/tests/task_sdk/definitions/test_variables.py @@ -25,6 +25,7 @@ from airflow.sdk import Variable from airflow.sdk.configuration import initialize_secrets_backends +from airflow.sdk.exceptions import AirflowRuntimeError from airflow.sdk.execution_time.comms import GetVariableKeys, PutVariable, VariableKeysResult, VariableResult from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS @@ -289,3 +290,48 @@ def test_backend_fallback_to_env_var(self, mock_get_variable, mock_env_get, mock # mock_env is only called when LocalFilesystemBackend doesn't have it mock_env_get.assert_called() assert var == "fake_value" + + +class TestVariableOutsideTaskContext: + """Tests for Variable operations when SUPERVISOR_COMMS is None (outside task execution context).""" + + @mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable") + def test_get_with_env_var_works_without_supervisor_comms(self, mock_env_get, monkeypatch): + """Variable.get() should still work via EnvironmentVariablesBackend when SUPERVISOR_COMMS is None.""" + from airflow.sdk.execution_time import task_runner + + monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + mock_env_get.return_value = "env_value" + + result = Variable.get(key="my_env_var") + assert result == "env_value" + mock_env_get.assert_called_once_with(key="my_env_var") + + def test_get_not_found_without_supervisor_comms(self, monkeypatch): + """Variable.get() should raise with a helpful message when variable not found and SUPERVISOR_COMMS is None.""" + from airflow.sdk.execution_time import task_runner + + monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + + with pytest.raises(AirflowRuntimeError, match="outside a task execution context"): + Variable.get(key="nonexistent_var") + + def test_set_without_supervisor_comms(self, monkeypatch): + """Variable.set() should raise AirflowRuntimeError when SUPERVISOR_COMMS is None.""" + from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.context import _set_variable + + monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + + with pytest.raises(AirflowRuntimeError, match="Variable.set\\(\\) requires a task execution context"): + _set_variable(key="my_key", value="my_value") + + def test_delete_without_supervisor_comms(self, monkeypatch): + """Variable.delete() should raise AirflowRuntimeError when SUPERVISOR_COMMS is None.""" + from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.context import _delete_variable + + monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + + with pytest.raises(AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context"): + _delete_variable(key="my_key") From 3ee113826854cf771e3c251a1778861c9d56fb7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Tue, 3 Mar 2026 16:22:30 -0300 Subject: [PATCH 2/8] Fix CI failures: MyPy, hasattr routing, reinit_supervisor_comms, and ruff format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use type: ignore[assignment] instead of | None for SUPERVISOR_COMMS to avoid 39 mypy union-attr errors. Replace hasattr with getattr is not None in variable.py and connection.py. Fix reinit_supervisor_comms to check is None. Simplify set_supervisor_comms context manager. Update test assertions. Signed-off-by: André Ahlert --- airflow-core/src/airflow/models/connection.py | 10 +++++++-- airflow-core/src/airflow/models/variable.py | 20 ++++++++++++++---- .../airflow/sdk/execution_time/supervisor.py | 21 ++++++------------- .../airflow/sdk/execution_time/task_runner.py | 2 +- .../task_sdk/definitions/test_variables.py | 4 +++- .../execution_time/test_supervisor.py | 18 +++++++--------- 6 files changed, 41 insertions(+), 34 deletions(-) diff --git a/airflow-core/src/airflow/models/connection.py b/airflow-core/src/airflow/models/connection.py index 55a61de022ec1..b0fbdb93773ee 100644 --- a/airflow-core/src/airflow/models/connection.py +++ b/airflow-core/src/airflow/models/connection.py @@ -509,7 +509,10 @@ def get_connection_from_secrets(cls, conn_id: str, team_name: str | None = None) # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): from airflow.sdk import Connection as TaskSDKConnection from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType @@ -598,7 +601,10 @@ def to_dict(self, *, prune_empty: bool = False, validate: bool = True) -> dict[s @classmethod def from_json(cls, value, conn_id=None) -> Connection: - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): from airflow.sdk import Connection as TaskSDKConnection warnings.warn( diff --git a/airflow-core/src/airflow/models/variable.py b/airflow-core/src/airflow/models/variable.py index 0d543f334be42..e4bcc0ca378eb 100644 --- a/airflow-core/src/airflow/models/variable.py +++ b/airflow-core/src/airflow/models/variable.py @@ -182,7 +182,10 @@ def get( # If this is set it means we are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): warnings.warn( "Using Variable.get from `airflow.models` is deprecated." "Please use `get` on Variable from sdk(`airflow.sdk.Variable`) instead", @@ -242,7 +245,10 @@ def set( # If this is set it means we are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): warnings.warn( "Using Variable.set from `airflow.models` is deprecated." "Please use `set` on Variable from sdk(`airflow.sdk.Variable`) instead", @@ -330,7 +336,10 @@ def update( # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): warnings.warn( "Using Variable.update from `airflow.models` is deprecated." "Please use `set` on Variable from sdk(`airflow.sdk.Variable`) instead as it is an upsert.", @@ -396,7 +405,10 @@ def delete(key: str, team_name: str | None = None, session: Session | None = Non # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): warnings.warn( "Using Variable.delete from `airflow.models` is deprecated." "Please use `delete` on Variable from sdk(`airflow.sdk.Variable`) instead", diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 7c9ecbeab9e6f..661811e521f1d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1998,28 +1998,19 @@ def set_supervisor_comms(temp_comms): by injecting a test Comms implementation (e.g. `InProcessSupervisorComms`) in place of the real inter-process communication layer. - Some parts of the code (e.g. models.Variable.get) check for the presence - of `task_runner.SUPERVISOR_COMMS` to determine if the code is running in a Task SDK execution context. - This override ensures those code paths behave correctly during in-process tests. + Some parts of the code (e.g. models.Variable.get) check that + `task_runner.SUPERVISOR_COMMS` is not None to determine if the code is running in a Task SDK + execution context. This override ensures those code paths behave correctly during in-process tests. """ from airflow.sdk.execution_time import task_runner - sentinel = object() - old = getattr(task_runner, "SUPERVISOR_COMMS", sentinel) - - if temp_comms is not None: - task_runner.SUPERVISOR_COMMS = temp_comms - elif old is not sentinel: - delattr(task_runner, "SUPERVISOR_COMMS") + old = task_runner.SUPERVISOR_COMMS + task_runner.SUPERVISOR_COMMS = temp_comms try: yield finally: - if old is sentinel: - if hasattr(task_runner, "SUPERVISOR_COMMS"): - delattr(task_runner, "SUPERVISOR_COMMS") - else: - task_runner.SUPERVISOR_COMMS = old + task_runner.SUPERVISOR_COMMS = old def run_task_in_process(ti: TaskInstance, task) -> TaskRunResult: diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 456c5eb330b35..9f44fb26186c5 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -874,7 +874,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: # deeply nested execution stack. # - By defining `SUPERVISOR_COMMS` as a global, it ensures that this communication mechanism is readily # accessible wherever needed during task execution without modifying every layer of the call stack. -SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] | None = None +SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] = None # type: ignore[assignment] # State machine! diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py b/task-sdk/tests/task_sdk/definitions/test_variables.py index b5e5a2feb93a7..79d2be0b3ab83 100644 --- a/task-sdk/tests/task_sdk/definitions/test_variables.py +++ b/task-sdk/tests/task_sdk/definitions/test_variables.py @@ -333,5 +333,7 @@ def test_delete_without_supervisor_comms(self, monkeypatch): monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) - with pytest.raises(AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context"): + with pytest.raises( + AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context" + ): _delete_variable(key="my_key") diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 0b3cd64a21eb7..1c21f675d3dc3 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -3086,12 +3086,9 @@ class DummyComms: @pytest.fixture(autouse=True) def cleanup_supervisor_comms(self): - # Ensure clean state before/after test - if hasattr(task_runner, "SUPERVISOR_COMMS"): - delattr(task_runner, "SUPERVISOR_COMMS") + task_runner.SUPERVISOR_COMMS = None # type: ignore[assignment] yield - if hasattr(task_runner, "SUPERVISOR_COMMS"): - delattr(task_runner, "SUPERVISOR_COMMS") + task_runner.SUPERVISOR_COMMS = None # type: ignore[assignment] def test_set_supervisor_comms_overrides_and_restores(self): task_runner.SUPERVISOR_COMMS = self.DummyComms() @@ -3103,21 +3100,20 @@ def test_set_supervisor_comms_overrides_and_restores(self): assert task_runner.SUPERVISOR_COMMS is original def test_set_supervisor_comms_sets_temporarily_when_not_set(self): - assert not hasattr(task_runner, "SUPERVISOR_COMMS") + assert task_runner.SUPERVISOR_COMMS is None replacement = self.DummyComms() with set_supervisor_comms(replacement): assert task_runner.SUPERVISOR_COMMS is replacement - assert not hasattr(task_runner, "SUPERVISOR_COMMS") + assert task_runner.SUPERVISOR_COMMS is None def test_set_supervisor_comms_unsets_temporarily_when_not_set(self): - assert not hasattr(task_runner, "SUPERVISOR_COMMS") + assert task_runner.SUPERVISOR_COMMS is None - # This will delete an attribute that isn't set, and restore it likewise with set_supervisor_comms(None): - assert not hasattr(task_runner, "SUPERVISOR_COMMS") + assert task_runner.SUPERVISOR_COMMS is None - assert not hasattr(task_runner, "SUPERVISOR_COMMS") + assert task_runner.SUPERVISOR_COMMS is None class TestInProcessTestSupervisor: From f2f1f3c5c94beb8aae271308cf1a0b1dcc387be2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Thu, 5 Mar 2026 17:29:11 -0300 Subject: [PATCH 3/8] Fix test_logging_propogated_by_default flakiness from OTLP trace logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Assert only on operator logger messages so CI logs (e.g. OTLP connection errors to localhost:4318) do not break the test. Signed-off-by: André Ahlert --- .../tests/unit/serialization/test_serialized_objects.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index e672e93cd2313..13a53afad3c4b 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -961,8 +961,13 @@ def test_logging_propogated_by_default(self, caplog): BaseOperator(task_id="test").log.warning("test") # This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing # the other case (that when we have set_context it goes to the file is harder to achieve without - # leaking a lot of state) - assert caplog.messages == ["test"] + # leaking a lot of state). Only assert on the operator's logger so other loggers (e.g. OTLP trace + # export errors in CI) do not affect the test. + operator_logger_prefix = "airflow.task.operators" + operator_messages = [ + r.message for r in caplog.records if r.name.startswith(operator_logger_prefix) + ] + assert operator_messages == ["test"] def test_resume_execution(self): from airflow.models.trigger import TriggerFailureReason From c5b7eb221f22551c3352f4191b96b170304a6095 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Fri, 6 Mar 2026 08:34:18 -0300 Subject: [PATCH 4/8] Apply ruff format to test_serialized_objects.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: André Ahlert --- .../tests/unit/serialization/test_serialized_objects.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index 13a53afad3c4b..5b54a60f20831 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -964,9 +964,7 @@ def test_logging_propogated_by_default(self, caplog): # leaking a lot of state). Only assert on the operator's logger so other loggers (e.g. OTLP trace # export errors in CI) do not affect the test. operator_logger_prefix = "airflow.task.operators" - operator_messages = [ - r.message for r in caplog.records if r.name.startswith(operator_logger_prefix) - ] + operator_messages = [r.message for r in caplog.records if r.name.startswith(operator_logger_prefix)] assert operator_messages == ["test"] def test_resume_execution(self): From b5ac3368adf208b01203fb82f00fb17c3907a4c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Thu, 12 Mar 2026 09:43:55 -0300 Subject: [PATCH 5/8] Revert SUPERVISOR_COMMS=None, use hasattr checks instead MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review feedback: keep SUPERVISOR_COMMS as a bare type annotation (not initialized to None) and use hasattr checks in the few places that need to detect task execution context. Signed-off-by: André Ahlert --- airflow-core/src/airflow/models/connection.py | 10 ++------ airflow-core/src/airflow/models/variable.py | 20 ++++------------ .../src/airflow/sdk/execution_time/context.py | 16 ++++++------- .../execution_time/secrets/execution_api.py | 12 ---------- .../airflow/sdk/execution_time/supervisor.py | 23 +++++++++++++------ .../airflow/sdk/execution_time/task_runner.py | 2 +- .../task_sdk/definitions/test_variables.py | 18 +++++++-------- .../execution_time/test_supervisor.py | 17 ++++++++------ 8 files changed, 50 insertions(+), 68 deletions(-) diff --git a/airflow-core/src/airflow/models/connection.py b/airflow-core/src/airflow/models/connection.py index b0fbdb93773ee..55a61de022ec1 100644 --- a/airflow-core/src/airflow/models/connection.py +++ b/airflow-core/src/airflow/models/connection.py @@ -509,10 +509,7 @@ def get_connection_from_secrets(cls, conn_id: str, team_name: str | None = None) # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): from airflow.sdk import Connection as TaskSDKConnection from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType @@ -601,10 +598,7 @@ def to_dict(self, *, prune_empty: bool = False, validate: bool = True) -> dict[s @classmethod def from_json(cls, value, conn_id=None) -> Connection: - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): from airflow.sdk import Connection as TaskSDKConnection warnings.warn( diff --git a/airflow-core/src/airflow/models/variable.py b/airflow-core/src/airflow/models/variable.py index e4bcc0ca378eb..0d543f334be42 100644 --- a/airflow-core/src/airflow/models/variable.py +++ b/airflow-core/src/airflow/models/variable.py @@ -182,10 +182,7 @@ def get( # If this is set it means we are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): warnings.warn( "Using Variable.get from `airflow.models` is deprecated." "Please use `get` on Variable from sdk(`airflow.sdk.Variable`) instead", @@ -245,10 +242,7 @@ def set( # If this is set it means we are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): warnings.warn( "Using Variable.set from `airflow.models` is deprecated." "Please use `set` on Variable from sdk(`airflow.sdk.Variable`) instead", @@ -336,10 +330,7 @@ def update( # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): warnings.warn( "Using Variable.update from `airflow.models` is deprecated." "Please use `set` on Variable from sdk(`airflow.sdk.Variable`) instead as it is an upsert.", @@ -405,10 +396,7 @@ def delete(key: str, team_name: str | None = None, session: Session | None = Non # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): warnings.warn( "Using Variable.delete from `airflow.models` is deprecated." "Please use `delete` on Variable from sdk(`airflow.sdk.Variable`) instead", diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index a7235befae117..42ccb44b3e874 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -289,10 +289,10 @@ def _get_variable(key: str, deserialize_json: bool) -> Any: ) # If no backend found the variable, raise a not found error (mirrors _get_connection) + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import ErrorResponse - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: + if not hasattr(task_runner, "SUPERVISOR_COMMS"): raise AirflowRuntimeError( ErrorResponse( error=ErrorType.VARIABLE_NOT_FOUND, @@ -352,10 +352,10 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ import json from airflow.sdk.execution_time.cache import SecretCache + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS # check for write conflicts on the worker for secrets_backend in ensure_secrets_backend_loaded(): @@ -386,7 +386,7 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ except Exception as e: log.exception(e) - if SUPERVISOR_COMMS is None: + if not hasattr(task_runner, "SUPERVISOR_COMMS"): raise AirflowRuntimeError( ErrorResponse( error=ErrorType.GENERIC_ERROR, @@ -400,7 +400,7 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ ) ) - SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) + task_runner.SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) # Invalidate cache after setting the variable SecretCache.invalidate_variable(key) @@ -413,10 +413,10 @@ def _delete_variable(key: str) -> None: # will make that module depend on Task SDK, which is not ideal because we intend to # keep Task SDK as a separate package than execution time mods. from airflow.sdk.execution_time.cache import SecretCache + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import DeleteVariable, ErrorResponse - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: + if not hasattr(task_runner, "SUPERVISOR_COMMS"): raise AirflowRuntimeError( ErrorResponse( error=ErrorType.GENERIC_ERROR, @@ -430,7 +430,7 @@ def _delete_variable(key: str) -> None: ) ) - msg = SUPERVISOR_COMMS.send(DeleteVariable(key=key)) + msg = task_runner.SUPERVISOR_COMMS.send(DeleteVariable(key=key)) if TYPE_CHECKING: assert isinstance(msg, OKResponse) diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py index 9ee0cc0535e88..57bffd12a1620 100644 --- a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py +++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py @@ -81,9 +81,6 @@ def get_connection(self, conn_id: str, team_name: str | None = None) -> Connecti from airflow.sdk.execution_time.context import _process_connection_result_conn from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: - return None - try: msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id)) @@ -118,9 +115,6 @@ def get_variable(self, key: str, team_name: str | None = None) -> str | None: from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable, VariableResult from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: - return None - try: msg = SUPERVISOR_COMMS.send(GetVariable(key=key)) @@ -153,9 +147,6 @@ async def aget_connection(self, conn_id: str) -> Connection | None: # type: ign from airflow.sdk.execution_time.context import _process_connection_result_conn from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: - return None - try: msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id)) @@ -185,9 +176,6 @@ async def aget_variable(self, key: str) -> str | None: from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable, VariableResult from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: - return None - try: msg = await SUPERVISOR_COMMS.asend(GetVariable(key=key)) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 661811e521f1d..28429cd916f75 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1998,19 +1998,28 @@ def set_supervisor_comms(temp_comms): by injecting a test Comms implementation (e.g. `InProcessSupervisorComms`) in place of the real inter-process communication layer. - Some parts of the code (e.g. models.Variable.get) check that - `task_runner.SUPERVISOR_COMMS` is not None to determine if the code is running in a Task SDK - execution context. This override ensures those code paths behave correctly during in-process tests. + Some parts of the code (e.g. models.Variable.get) check for the presence + of `task_runner.SUPERVISOR_COMMS` to determine if the code is running in a Task SDK execution context. + This override ensures those code paths behave correctly during in-process tests. """ from airflow.sdk.execution_time import task_runner - old = task_runner.SUPERVISOR_COMMS - task_runner.SUPERVISOR_COMMS = temp_comms + sentinel = object() + old = getattr(task_runner, "SUPERVISOR_COMMS", sentinel) + + if temp_comms is not None: + task_runner.SUPERVISOR_COMMS = temp_comms + elif old is not sentinel: + delattr(task_runner, "SUPERVISOR_COMMS") try: yield finally: - task_runner.SUPERVISOR_COMMS = old + if old is sentinel: + if hasattr(task_runner, "SUPERVISOR_COMMS"): + delattr(task_runner, "SUPERVISOR_COMMS") + else: + task_runner.SUPERVISOR_COMMS = old def run_task_in_process(ti: TaskInstance, task) -> TaskRunResult: @@ -2190,7 +2199,7 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]: try: from airflow.sdk.execution_time import task_runner - if hasattr(task_runner, "SUPERVISOR_COMMS") and task_runner.SUPERVISOR_COMMS is not None: + if hasattr(task_runner, "SUPERVISOR_COMMS"): # Client context: task runner with SUPERVISOR_COMMS return ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS) except (ImportError, AttributeError): diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 9f44fb26186c5..61e476e60b930 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -874,7 +874,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: # deeply nested execution stack. # - By defining `SUPERVISOR_COMMS` as a global, it ensures that this communication mechanism is readily # accessible wherever needed during task execution without modifying every layer of the call stack. -SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] = None # type: ignore[assignment] +SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] # State machine! diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py b/task-sdk/tests/task_sdk/definitions/test_variables.py index 79d2be0b3ab83..455fd9b8f1a10 100644 --- a/task-sdk/tests/task_sdk/definitions/test_variables.py +++ b/task-sdk/tests/task_sdk/definitions/test_variables.py @@ -293,14 +293,14 @@ def test_backend_fallback_to_env_var(self, mock_get_variable, mock_env_get, mock class TestVariableOutsideTaskContext: - """Tests for Variable operations when SUPERVISOR_COMMS is None (outside task execution context).""" + """Tests for Variable operations when SUPERVISOR_COMMS is not set (outside task execution context).""" @mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable") def test_get_with_env_var_works_without_supervisor_comms(self, mock_env_get, monkeypatch): - """Variable.get() should still work via EnvironmentVariablesBackend when SUPERVISOR_COMMS is None.""" + """Variable.get() should still work via EnvironmentVariablesBackend when SUPERVISOR_COMMS is not set.""" from airflow.sdk.execution_time import task_runner - monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) mock_env_get.return_value = "env_value" result = Variable.get(key="my_env_var") @@ -308,30 +308,30 @@ def test_get_with_env_var_works_without_supervisor_comms(self, mock_env_get, mon mock_env_get.assert_called_once_with(key="my_env_var") def test_get_not_found_without_supervisor_comms(self, monkeypatch): - """Variable.get() should raise with a helpful message when variable not found and SUPERVISOR_COMMS is None.""" + """Variable.get() should raise with a helpful message when variable not found and SUPERVISOR_COMMS is not set.""" from airflow.sdk.execution_time import task_runner - monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) with pytest.raises(AirflowRuntimeError, match="outside a task execution context"): Variable.get(key="nonexistent_var") def test_set_without_supervisor_comms(self, monkeypatch): - """Variable.set() should raise AirflowRuntimeError when SUPERVISOR_COMMS is None.""" + """Variable.set() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set.""" from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.context import _set_variable - monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) with pytest.raises(AirflowRuntimeError, match="Variable.set\\(\\) requires a task execution context"): _set_variable(key="my_key", value="my_value") def test_delete_without_supervisor_comms(self, monkeypatch): - """Variable.delete() should raise AirflowRuntimeError when SUPERVISOR_COMMS is None.""" + """Variable.delete() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set.""" from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.context import _delete_variable - monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) with pytest.raises( AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context" diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 1c21f675d3dc3..5e274ab1cb60f 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -3086,9 +3086,12 @@ class DummyComms: @pytest.fixture(autouse=True) def cleanup_supervisor_comms(self): - task_runner.SUPERVISOR_COMMS = None # type: ignore[assignment] + # Ensure clean state before/after test + if hasattr(task_runner, "SUPERVISOR_COMMS"): + delattr(task_runner, "SUPERVISOR_COMMS") yield - task_runner.SUPERVISOR_COMMS = None # type: ignore[assignment] + if hasattr(task_runner, "SUPERVISOR_COMMS"): + delattr(task_runner, "SUPERVISOR_COMMS") def test_set_supervisor_comms_overrides_and_restores(self): task_runner.SUPERVISOR_COMMS = self.DummyComms() @@ -3100,20 +3103,20 @@ def test_set_supervisor_comms_overrides_and_restores(self): assert task_runner.SUPERVISOR_COMMS is original def test_set_supervisor_comms_sets_temporarily_when_not_set(self): - assert task_runner.SUPERVISOR_COMMS is None + assert not hasattr(task_runner, "SUPERVISOR_COMMS") replacement = self.DummyComms() with set_supervisor_comms(replacement): assert task_runner.SUPERVISOR_COMMS is replacement - assert task_runner.SUPERVISOR_COMMS is None + assert not hasattr(task_runner, "SUPERVISOR_COMMS") def test_set_supervisor_comms_unsets_temporarily_when_not_set(self): - assert task_runner.SUPERVISOR_COMMS is None + assert not hasattr(task_runner, "SUPERVISOR_COMMS") with set_supervisor_comms(None): - assert task_runner.SUPERVISOR_COMMS is None + assert not hasattr(task_runner, "SUPERVISOR_COMMS") - assert task_runner.SUPERVISOR_COMMS is None + assert not hasattr(task_runner, "SUPERVISOR_COMMS") class TestInProcessTestSupervisor: From 5631eed2abc888066e887a5d3cc4910b65fc282b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Mon, 16 Mar 2026 05:40:24 -0300 Subject: [PATCH 6/8] Fix import ordering in context.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: André Ahlert --- task-sdk/src/airflow/sdk/execution_time/context.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index 42ccb44b3e874..206e46105c91b 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -351,8 +351,8 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ # keep Task SDK as a separate package than execution time mods. import json - from airflow.sdk.execution_time.cache import SecretCache from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.cache import SecretCache from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded @@ -412,8 +412,8 @@ def _delete_variable(key: str) -> None: # A reason to not move it to `airflow.sdk.execution_time.comms` is that it # will make that module depend on Task SDK, which is not ideal because we intend to # keep Task SDK as a separate package than execution time mods. - from airflow.sdk.execution_time.cache import SecretCache from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.cache import SecretCache from airflow.sdk.execution_time.comms import DeleteVariable, ErrorResponse if not hasattr(task_runner, "SUPERVISOR_COMMS"): From f5aedfda018b62e31e5e9ab277484ddfabfebb13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Tue, 19 May 2026 17:52:12 -0300 Subject: [PATCH 7/8] Regenerate edge worker API spec and uv.lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: André Ahlert --- .../edge3/worker_api/v2-edge-generated.yaml | 4 +- uv.lock | 64 ++++++++++++++++++- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml index 01c8149d1dad8..c777d1f1bec97 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml @@ -1257,8 +1257,8 @@ components: - queue - priority_weight title: TaskInstanceDTO - description: Schema for TaskInstance with minimal required fields needed for - Executors and Task SDK. + description: TaskInstanceDTO with executor-specific ``external_executor_id`` + field and ``key`` property. TaskInstanceState: type: string enum: diff --git a/uv.lock b/uv.lock index c1d9d1293c19c..c45cb52ffe3e0 100644 --- a/uv.lock +++ b/uv.lock @@ -1414,6 +1414,35 @@ zendesk = [ ] [package.dev-dependencies] +ci-image = [ + { name = "apache-airflow", extra = ["all"] }, + { name = "apache-airflow-breeze" }, + { name = "apache-airflow-ctl" }, + { name = "apache-airflow-ctl-tests" }, + { name = "apache-airflow-dev" }, + { name = "apache-airflow-devel-common", extra = ["docs", "docs-gen", "no-doc"] }, + { name = "apache-airflow-docker-tests" }, + { name = "apache-airflow-helm-chart" }, + { name = "apache-airflow-kubernetes-tests" }, + { name = "apache-airflow-scripts" }, + { name = "apache-airflow-shared-configuration" }, + { name = "apache-airflow-shared-dagnode" }, + { name = "apache-airflow-shared-listeners" }, + { name = "apache-airflow-shared-logging" }, + { name = "apache-airflow-shared-module-loading" }, + { name = "apache-airflow-shared-observability" }, + { name = "apache-airflow-shared-plugins-manager" }, + { name = "apache-airflow-shared-providers-discovery" }, + { name = "apache-airflow-shared-secrets-backend" }, + { name = "apache-airflow-shared-secrets-masker" }, + { name = "apache-airflow-shared-serialization" }, + { name = "apache-airflow-shared-state" }, + { name = "apache-airflow-shared-template-rendering" }, + { name = "apache-airflow-shared-timezones" }, + { name = "apache-airflow-task-sdk", extra = ["all"] }, + { name = "apache-airflow-task-sdk-integration-tests" }, + { name = "plyvel" }, +] dev = [ { name = "apache-airflow", extra = ["all"] }, { name = "apache-airflow-breeze" }, @@ -1687,6 +1716,37 @@ requires-dist = [ provides-extras = ["all-core", "async", "graphviz", "gunicorn", "kerberos", "memray", "otel", "statsd", "all-task-sdk", "airbyte", "akeyless", "alibaba", "amazon", "apache-cassandra", "apache-drill", "apache-druid", "apache-flink", "apache-hdfs", "apache-hive", "apache-iceberg", "apache-impala", "apache-kafka", "apache-kylin", "apache-livy", "apache-pig", "apache-pinot", "apache-spark", "apache-tinkerpop", "apprise", "arangodb", "asana", "atlassian-jira", "celery", "cloudant", "cncf-kubernetes", "cohere", "common-ai", "common-compat", "common-io", "common-messaging", "common-sql", "databricks", "datadog", "dbt-cloud", "dingding", "discord", "docker", "edge3", "elasticsearch", "exasol", "fab", "facebook", "ftp", "git", "github", "google", "grpc", "hashicorp", "http", "imap", "influxdb", "informatica", "jdbc", "jenkins", "keycloak", "microsoft-azure", "microsoft-mssql", "microsoft-psrp", "microsoft-winrm", "mongo", "mysql", "neo4j", "odbc", "openai", "openfaas", "openlineage", "opensearch", "opsgenie", "oracle", "pagerduty", "papermill", "pgvector", "pinecone", "postgres", "presto", "qdrant", "redis", "salesforce", "samba", "segment", "sendgrid", "sftp", "singularity", "slack", "smtp", "snowflake", "sqlite", "ssh", "standard", "tableau", "telegram", "teradata", "trino", "vertica", "vespa", "weaviate", "yandex", "ydb", "zendesk", "all", "aiobotocore", "apache-atlas", "apache-webhdfs", "amazon-aws-auth", "cloudpickle", "github-enterprise", "google-auth", "ldap", "pandas", "polars", "rabbitmq", "sentry", "s3fs", "uv"] [package.metadata.requires-dev] +ci-image = [ + { name = "apache-airflow", extras = ["all"], editable = "." }, + { name = "apache-airflow-breeze", editable = "dev/breeze" }, + { name = "apache-airflow-ctl", editable = "airflow-ctl" }, + { name = "apache-airflow-ctl-tests", editable = "airflow-ctl-tests" }, + { name = "apache-airflow-dev", editable = "dev" }, + { name = "apache-airflow-devel-common", extras = ["docs"], editable = "devel-common" }, + { name = "apache-airflow-devel-common", extras = ["docs-gen"], editable = "devel-common" }, + { name = "apache-airflow-devel-common", extras = ["no-doc"], editable = "devel-common" }, + { name = "apache-airflow-docker-tests", editable = "docker-tests" }, + { name = "apache-airflow-helm-chart", editable = "chart" }, + { name = "apache-airflow-kubernetes-tests", editable = "kubernetes-tests" }, + { name = "apache-airflow-scripts", editable = "scripts" }, + { name = "apache-airflow-shared-configuration", editable = "shared/configuration" }, + { name = "apache-airflow-shared-dagnode", editable = "shared/dagnode" }, + { name = "apache-airflow-shared-listeners", editable = "shared/listeners" }, + { name = "apache-airflow-shared-logging", editable = "shared/logging" }, + { name = "apache-airflow-shared-module-loading", editable = "shared/module_loading" }, + { name = "apache-airflow-shared-observability", editable = "shared/observability" }, + { name = "apache-airflow-shared-plugins-manager", editable = "shared/plugins_manager" }, + { name = "apache-airflow-shared-providers-discovery", editable = "shared/providers_discovery" }, + { name = "apache-airflow-shared-secrets-backend", editable = "shared/secrets_backend" }, + { name = "apache-airflow-shared-secrets-masker", editable = "shared/secrets_masker" }, + { name = "apache-airflow-shared-serialization", editable = "shared/serialization" }, + { name = "apache-airflow-shared-state", editable = "shared/state" }, + { name = "apache-airflow-shared-template-rendering", editable = "shared/template_rendering" }, + { name = "apache-airflow-shared-timezones", editable = "shared/timezones" }, + { name = "apache-airflow-task-sdk", extras = ["all"], editable = "task-sdk" }, + { name = "apache-airflow-task-sdk-integration-tests", editable = "task-sdk-integration-tests" }, + { name = "plyvel", specifier = ">=1.5.1" }, +] dev = [ { name = "apache-airflow", extras = ["all"], editable = "." }, { name = "apache-airflow-breeze", editable = "dev/breeze" }, @@ -20333,8 +20393,8 @@ name = "secretstorage" version = "3.5.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cryptography", marker = "python_full_version >= '3.14' or platform_machine != 'arm64' or sys_platform != 'darwin'" }, - { name = "jeepney", marker = "python_full_version >= '3.14' or platform_machine != 'arm64' or sys_platform != 'darwin'" }, + { name = "cryptography", marker = "(python_full_version >= '3.14' and sys_platform == 'darwin') or (python_full_version < '3.15' and sys_platform == 'emscripten') or (python_full_version < '3.15' and sys_platform == 'win32') or (platform_machine != 'arm64' and sys_platform == 'darwin') or (sys_platform != 'darwin' and sys_platform != 'emscripten' and sys_platform != 'win32')" }, + { name = "jeepney", marker = "(python_full_version >= '3.14' and sys_platform == 'darwin') or (python_full_version < '3.15' and sys_platform == 'emscripten') or (python_full_version < '3.15' and sys_platform == 'win32') or (platform_machine != 'arm64' and sys_platform == 'darwin') or (sys_platform != 'darwin' and sys_platform != 'emscripten' and sys_platform != 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/1c/03/e834bcd866f2f8a49a85eaff47340affa3bfa391ee9912a952a1faa68c7b/secretstorage-3.5.0.tar.gz", hash = "sha256:f04b8e4689cbce351744d5537bf6b1329c6fc68f91fa666f60a380edddcd11be", size = 19884, upload-time = "2025-11-23T19:02:53.191Z" } wheels = [ From e8ba49a1ed939d93021f6df58ca8ae626298f442 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sun, 24 May 2026 19:58:05 -0300 Subject: [PATCH 8/8] Address review: fix Variable.keys() ImportError and hoist Variable.set() guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _get_variable_keys: replace `from ... import SUPERVISOR_COMMS` (raises ImportError, bare annotation) with `task_runner` namespace + hasattr guard; previously Variable.keys() outside task context hit the same opaque failure this PR is trying to wrap. - _set_variable: hoist hasattr guard above the secrets-backend conflict-check loop so the misleading "API Server will be updated" warning is not emitted when the write cannot happen. Matches the pattern already applied to _get_variable and _delete_variable. Signed-off-by: André Ahlert --- .../src/airflow/sdk/execution_time/context.py | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index ab2887859a305..d82c2002f43d5 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -321,17 +321,31 @@ def _get_variable(key: str, deserialize_json: bool) -> Any: def _get_variable_keys(prefix: str | None = None) -> list[str]: from airflow.sdk.exceptions import AirflowRuntimeError + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import ( ErrorResponse, GetVariableKeys, VariableKeysResult, ) - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + + if not hasattr(task_runner, "SUPERVISOR_COMMS"): + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.keys() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.keys() at the top level of a DAG file " + "or outside of a running task. Variable.keys() can only be used inside a task." + ) + }, + ) + ) all_keys: list[str] = [] offset = 0 while True: - msg = SUPERVISOR_COMMS.send( + msg = task_runner.SUPERVISOR_COMMS.send( GetVariableKeys(prefix=prefix, limit=_VARIABLE_KEYS_PAGE_SIZE, offset=offset) ) if isinstance(msg, ErrorResponse): @@ -359,6 +373,20 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded + if not hasattr(task_runner, "SUPERVISOR_COMMS"): + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.set() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.set() at the top level of a DAG file " + "or outside of a running task. Variable.set() can only be used inside a task." + ) + }, + ) + ) + # check for write conflicts on the worker for secrets_backend in ensure_secrets_backend_loaded(): if isinstance(secrets_backend, ExecutionAPISecretsBackend): @@ -388,20 +416,6 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ except Exception as e: log.exception(e) - if not hasattr(task_runner, "SUPERVISOR_COMMS"): - raise AirflowRuntimeError( - ErrorResponse( - error=ErrorType.GENERIC_ERROR, - detail={ - "message": ( - "Variable.set() requires a task execution context (SUPERVISOR_COMMS is not available). " - "This typically happens when calling Variable.set() at the top level of a DAG file " - "or outside of a running task. Variable.set() can only be used inside a task." - ) - }, - ) - ) - task_runner.SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) # Invalidate cache after setting the variable