Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1222,8 +1222,11 @@ def test_logging_propogated_by_default(self, caplog):
BaseOperator(task_id="test").log.warning("test")
# This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing
# the other case (that when we have set_context it goes to the file is harder to achieve without
# leaking a lot of state)
assert caplog.messages == ["test"]
# leaking a lot of state). Only assert on the operator's logger so other loggers (e.g. OTLP trace
# export errors in CI) do not affect the test.
operator_logger_prefix = "airflow.task.operators"
operator_messages = [r.message for r in caplog.records if r.name.startswith(operator_logger_prefix)]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This OTLP-noise filter is unrelated to the SUPERVISOR_COMMS error-handling scope of the PR. It's a legitimate CI-flake fix (other loggers polluting caplog.messages), but bundling it here makes the change harder to bisect later if it ever causes regressions.

Either split it into its own PR or call it out explicitly in the description so reviewers know it's intentional scope creep.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, the scope creep is real. Kept it in this PR since it surfaced while debugging the CI flake that was blocking the merge, but called it out explicitly under "Out-of-scope change included" in the description, referencing commit f2f1f3c so it stays discoverable if it ever needs to be bisected.

assert operator_messages == ["test"]

def test_resume_execution(self):
from airflow.models.trigger import TriggerFailureReason
Expand Down
77 changes: 68 additions & 9 deletions task-sdk/src/airflow/sdk/execution_time/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,26 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
)

# If no backend found the variable, raise a not found error (mirrors _get_connection)
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.comms import ErrorResponse

if not hasattr(task_runner, "SUPERVISOR_COMMS"):
raise AirflowRuntimeError(
ErrorResponse(
error=ErrorType.VARIABLE_NOT_FOUND,
detail={
"message": (
f"Variable '{key}' not found. Note: SUPERVISOR_COMMS is not available, "
"which means this code is running outside a task execution context "
"(e.g., at the top level of a DAG file). "
"Consider using environment variables (AIRFLOW_VAR_<key>), "
"Jinja templates ({{ var.value.<key> }}), "
"or move the Variable.get() call inside a task function."
)
},
)
)

raise AirflowRuntimeError(
ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message": f"Variable {key} not found"})
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related: _get_variable_keys below (line 329) still has the bare from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS at the top of the function. Since SUPERVISOR_COMMS is now a bare annotation in task_runner.py (SUPERVISOR_COMMS: CommsDecoder[...] at line 930), this import raises ImportError at function entry whenever the function is called outside a task context -- exactly the opaque failure mode this PR is trying to wrap with a helpful message.

Variable.keys() from a DAG top-level (or any non-task caller) will hit the original ImportError, not the friendly message added to _get_variable here. Worth applying the same hasattr(task_runner, "SUPERVISOR_COMMS") guard there for symmetry, or switching to from airflow.sdk.execution_time import task_runner + task_runner.SUPERVISOR_COMMS like _set_variable / _delete_variable now do.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, that one slipped through. Fixed.

Expand All @@ -345,17 +362,31 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:

def _get_variable_keys(prefix: str | None = None) -> list[str]:
from airflow.sdk.exceptions import AirflowRuntimeError
from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.comms import (
ErrorResponse,
GetVariableKeys,
VariableKeysResult,
)
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS

if not hasattr(task_runner, "SUPERVISOR_COMMS"):
raise AirflowRuntimeError(
ErrorResponse(
error=ErrorType.GENERIC_ERROR,
detail={
"message": (
"Variable.keys() requires a task execution context (SUPERVISOR_COMMS is not available). "
"This typically happens when calling Variable.keys() at the top level of a DAG file "
"or outside of a running task. Variable.keys() can only be used inside a task."
)
},
)
)

all_keys: list[str] = []
offset = 0
while True:
msg = SUPERVISOR_COMMS.send(
msg = task_runner.SUPERVISOR_COMMS.send(
GetVariableKeys(prefix=prefix, limit=_VARIABLE_KEYS_PAGE_SIZE, offset=offset)
)
if isinstance(msg, ErrorResponse):
Expand All @@ -377,11 +408,25 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ
# keep Task SDK as a separate package than execution time mods.
import json

from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.cache import SecretCache
from airflow.sdk.execution_time.comms import PutVariable
from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable
from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend
from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS

if not hasattr(task_runner, "SUPERVISOR_COMMS"):
raise AirflowRuntimeError(
ErrorResponse(
error=ErrorType.GENERIC_ERROR,
detail={
"message": (
"Variable.set() requires a task execution context (SUPERVISOR_COMMS is not available). "
"This typically happens when calling Variable.set() at the top level of a DAG file "
"or outside of a running task. Variable.set() can only be used inside a task."
)
},
)
)

# check for write conflicts on the worker
for secrets_backend in ensure_secrets_backend_loaded():
Expand Down Expand Up @@ -412,7 +457,7 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ
except Exception as e:
log.exception(e)

SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description))
task_runner.SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description))

# Invalidate cache after setting the variable
SecretCache.invalidate_variable(key)
Expand All @@ -424,11 +469,25 @@ def _delete_variable(key: str) -> None:
# A reason to not move it to `airflow.sdk.execution_time.comms` is that it
# will make that module depend on Task SDK, which is not ideal because we intend to
# keep Task SDK as a separate package than execution time mods.
from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.cache import SecretCache
from airflow.sdk.execution_time.comms import DeleteVariable
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
from airflow.sdk.execution_time.comms import DeleteVariable, ErrorResponse

if not hasattr(task_runner, "SUPERVISOR_COMMS"):
raise AirflowRuntimeError(
ErrorResponse(
error=ErrorType.GENERIC_ERROR,
detail={
"message": (
"Variable.delete() requires a task execution context (SUPERVISOR_COMMS is not available). "
"This typically happens when calling Variable.delete() at the top level of a DAG file "
"or outside of a running task. Variable.delete() can only be used inside a task."
)
},
)
)

msg = SUPERVISOR_COMMS.send(DeleteVariable(key=key))
msg = task_runner.SUPERVISOR_COMMS.send(DeleteVariable(key=key))
if TYPE_CHECKING:
assert isinstance(msg, OKResponse)

Expand Down
2 changes: 1 addition & 1 deletion task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2375,7 +2375,7 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]:
try:
from airflow.sdk.execution_time import task_runner

if hasattr(task_runner, "SUPERVISOR_COMMS") and task_runner.SUPERVISOR_COMMS is not None:
if hasattr(task_runner, "SUPERVISOR_COMMS"):
# Client context: task runner with SUPERVISOR_COMMS
return ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
except (ImportError, AttributeError):
Expand Down
47 changes: 47 additions & 0 deletions task-sdk/tests/task_sdk/definitions/test_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,50 @@ def test_backend_fallback_to_env_var(self, mock_get_variable, mock_env_get, mock
# mock_env is only called when LocalFilesystemBackend doesn't have it
mock_env_get.assert_called()
assert var == "fake_value"


class TestVariableOutsideTaskContext:
"""Tests for Variable operations when SUPERVISOR_COMMS is not set (outside task execution context)."""

@mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable")
def test_get_with_env_var_works_without_supervisor_comms(self, mock_env_get, monkeypatch):
"""Variable.get() should still work via EnvironmentVariablesBackend when SUPERVISOR_COMMS is not set."""
from airflow.sdk.execution_time import task_runner

monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False)
mock_env_get.return_value = "env_value"

result = Variable.get(key="my_env_var")
assert result == "env_value"
mock_env_get.assert_called_once_with(key="my_env_var")

def test_get_not_found_without_supervisor_comms(self, monkeypatch):
"""Variable.get() should raise with a helpful message when variable not found and SUPERVISOR_COMMS is not set."""
from airflow.sdk.execution_time import task_runner

monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False)

with pytest.raises(AirflowRuntimeError, match="outside a task execution context"):
Variable.get(key="nonexistent_var")

def test_set_without_supervisor_comms(self, monkeypatch):
"""Variable.set() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set."""
from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.context import _set_variable

monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False)

with pytest.raises(AirflowRuntimeError, match="Variable.set\\(\\) requires a task execution context"):
_set_variable(key="my_key", value="my_value")

def test_delete_without_supervisor_comms(self, monkeypatch):
"""Variable.delete() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set."""
from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.context import _delete_variable

monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False)

with pytest.raises(
AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context"
):
_delete_variable(key="my_key")
Original file line number Diff line number Diff line change
Expand Up @@ -3344,7 +3344,6 @@ def test_set_supervisor_comms_sets_temporarily_when_not_set(self):
def test_set_supervisor_comms_unsets_temporarily_when_not_set(self):
assert not hasattr(task_runner, "SUPERVISOR_COMMS")

# This will delete an attribute that isn't set, and restore it likewise
with set_supervisor_comms(None):
assert not hasattr(task_runner, "SUPERVISOR_COMMS")

Expand Down
Loading