diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index 7de4c87148ae0..123fa20b589d7 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -1222,8 +1222,11 @@ def test_logging_propogated_by_default(self, caplog): BaseOperator(task_id="test").log.warning("test") # This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing # the other case (that when we have set_context it goes to the file is harder to achieve without - # leaking a lot of state) - assert caplog.messages == ["test"] + # leaking a lot of state). Only assert on the operator's logger so other loggers (e.g. OTLP trace + # export errors in CI) do not affect the test. + operator_logger_prefix = "airflow.task.operators" + operator_messages = [r.message for r in caplog.records if r.name.startswith(operator_logger_prefix)] + assert operator_messages == ["test"] def test_resume_execution(self): from airflow.models.trigger import TriggerFailureReason diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index d41306009eac7..5876f185cf760 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -332,9 +332,26 @@ def _get_variable(key: str, deserialize_json: bool) -> Any: ) # If no backend found the variable, raise a not found error (mirrors _get_connection) - from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import ErrorResponse + if not hasattr(task_runner, "SUPERVISOR_COMMS"): + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.VARIABLE_NOT_FOUND, + detail={ + "message": ( + f"Variable '{key}' not found. Note: SUPERVISOR_COMMS is not available, " + "which means this code is running outside a task execution context " + "(e.g., at the top level of a DAG file). " + "Consider using environment variables (AIRFLOW_VAR_), " + "Jinja templates ({{ var.value. }}), " + "or move the Variable.get() call inside a task function." + ) + }, + ) + ) + raise AirflowRuntimeError( ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message": f"Variable {key} not found"}) ) @@ -345,17 +362,31 @@ def _get_variable(key: str, deserialize_json: bool) -> Any: def _get_variable_keys(prefix: str | None = None) -> list[str]: from airflow.sdk.exceptions import AirflowRuntimeError + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import ( ErrorResponse, GetVariableKeys, VariableKeysResult, ) - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + + if not hasattr(task_runner, "SUPERVISOR_COMMS"): + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.keys() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.keys() at the top level of a DAG file " + "or outside of a running task. Variable.keys() can only be used inside a task." + ) + }, + ) + ) all_keys: list[str] = [] offset = 0 while True: - msg = SUPERVISOR_COMMS.send( + msg = task_runner.SUPERVISOR_COMMS.send( GetVariableKeys(prefix=prefix, limit=_VARIABLE_KEYS_PAGE_SIZE, offset=offset) ) if isinstance(msg, ErrorResponse): @@ -377,11 +408,25 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ # keep Task SDK as a separate package than execution time mods. import json + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.cache import SecretCache - from airflow.sdk.execution_time.comms import PutVariable + from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + + if not hasattr(task_runner, "SUPERVISOR_COMMS"): + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.set() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.set() at the top level of a DAG file " + "or outside of a running task. Variable.set() can only be used inside a task." + ) + }, + ) + ) # check for write conflicts on the worker for secrets_backend in ensure_secrets_backend_loaded(): @@ -412,7 +457,7 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ except Exception as e: log.exception(e) - SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) + task_runner.SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) # Invalidate cache after setting the variable SecretCache.invalidate_variable(key) @@ -424,11 +469,25 @@ def _delete_variable(key: str) -> None: # A reason to not move it to `airflow.sdk.execution_time.comms` is that it # will make that module depend on Task SDK, which is not ideal because we intend to # keep Task SDK as a separate package than execution time mods. + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.cache import SecretCache - from airflow.sdk.execution_time.comms import DeleteVariable - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + from airflow.sdk.execution_time.comms import DeleteVariable, ErrorResponse + + if not hasattr(task_runner, "SUPERVISOR_COMMS"): + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.delete() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.delete() at the top level of a DAG file " + "or outside of a running task. Variable.delete() can only be used inside a task." + ) + }, + ) + ) - msg = SUPERVISOR_COMMS.send(DeleteVariable(key=key)) + msg = task_runner.SUPERVISOR_COMMS.send(DeleteVariable(key=key)) if TYPE_CHECKING: assert isinstance(msg, OKResponse) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 76dfd2009ac18..98c9283578373 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -2375,7 +2375,7 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]: try: from airflow.sdk.execution_time import task_runner - if hasattr(task_runner, "SUPERVISOR_COMMS") and task_runner.SUPERVISOR_COMMS is not None: + if hasattr(task_runner, "SUPERVISOR_COMMS"): # Client context: task runner with SUPERVISOR_COMMS return ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS) except (ImportError, AttributeError): diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py b/task-sdk/tests/task_sdk/definitions/test_variables.py index 29b6ac0cb97ca..52b57cb4e8a80 100644 --- a/task-sdk/tests/task_sdk/definitions/test_variables.py +++ b/task-sdk/tests/task_sdk/definitions/test_variables.py @@ -300,3 +300,50 @@ def test_backend_fallback_to_env_var(self, mock_get_variable, mock_env_get, mock # mock_env is only called when LocalFilesystemBackend doesn't have it mock_env_get.assert_called() assert var == "fake_value" + + +class TestVariableOutsideTaskContext: + """Tests for Variable operations when SUPERVISOR_COMMS is not set (outside task execution context).""" + + @mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable") + def test_get_with_env_var_works_without_supervisor_comms(self, mock_env_get, monkeypatch): + """Variable.get() should still work via EnvironmentVariablesBackend when SUPERVISOR_COMMS is not set.""" + from airflow.sdk.execution_time import task_runner + + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) + mock_env_get.return_value = "env_value" + + result = Variable.get(key="my_env_var") + assert result == "env_value" + mock_env_get.assert_called_once_with(key="my_env_var") + + def test_get_not_found_without_supervisor_comms(self, monkeypatch): + """Variable.get() should raise with a helpful message when variable not found and SUPERVISOR_COMMS is not set.""" + from airflow.sdk.execution_time import task_runner + + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) + + with pytest.raises(AirflowRuntimeError, match="outside a task execution context"): + Variable.get(key="nonexistent_var") + + def test_set_without_supervisor_comms(self, monkeypatch): + """Variable.set() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set.""" + from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.context import _set_variable + + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) + + with pytest.raises(AirflowRuntimeError, match="Variable.set\\(\\) requires a task execution context"): + _set_variable(key="my_key", value="my_value") + + def test_delete_without_supervisor_comms(self, monkeypatch): + """Variable.delete() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set.""" + from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.context import _delete_variable + + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) + + with pytest.raises( + AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context" + ): + _delete_variable(key="my_key") diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 8dea3d0793f49..6cee9e96c536c 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -3344,7 +3344,6 @@ def test_set_supervisor_comms_sets_temporarily_when_not_set(self): def test_set_supervisor_comms_unsets_temporarily_when_not_set(self): assert not hasattr(task_runner, "SUPERVISOR_COMMS") - # This will delete an attribute that isn't set, and restore it likewise with set_supervisor_comms(None): assert not hasattr(task_runner, "SUPERVISOR_COMMS")