From 89f3c6282cf50ddf4477fd30833d2f043adf21bc Mon Sep 17 00:00:00 2001 From: Dheeraj Turaga Date: Mon, 15 Jun 2026 17:24:48 -0500 Subject: [PATCH] Send Airflow CLI logs to stderr for -o commands so structured output stays parseable CLI commands that accept -o (table|json|yaml|plain) write structured data to stdout. Airflow's default logging config routes the root console handler to sys.stdout, so any log line emitted while such a command runs is mixed into that data and breaks downstream parsers (most visibly: the Celery default visibility_timeout warning corrupts `airflow celery list-workers -o json` output). After parse_args() in the CLI entrypoint, redirect any root-logger StreamHandler whose stream is sys.stdout to sys.stderr, gated on hasattr(args, 'output'). Daemon commands (scheduler, api-server, celery worker, ...) do not register -o and so keep their existing stdout-logging behavior. --- airflow-core/src/airflow/__main__.py | 6 +++ airflow-core/src/airflow/cli/utils.py | 15 +++++++ airflow-core/tests/unit/cli/test_utils.py | 51 ++++++++++++++++++++++- 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/__main__.py b/airflow-core/src/airflow/__main__.py index c11d3b5afe923..c49146b629875 100644 --- a/airflow-core/src/airflow/__main__.py +++ b/airflow-core/src/airflow/__main__.py @@ -35,6 +35,7 @@ # any possible import cycles with settings downstream. from airflow import configuration from airflow.cli import cli_parser +from airflow.cli.utils import redirect_stdout_log_handlers_to_stderr def main(): @@ -45,6 +46,11 @@ def main(): parser = cli_parser.get_parser() argcomplete.autocomplete(parser) args = parser.parse_args() + # Commands that accept ``-o`` produce structured output on stdout; route any + # console log handler currently writing to stdout to stderr so log lines do + # not corrupt that output (e.g. ``airflow ... -o json | jq``). + if hasattr(args, "output"): + redirect_stdout_log_handlers_to_stderr() if args.subcommand not in ["lazy_loaded", "version"]: # Here we ensure that the default configuration is written if needed before running any command # that might need it. This used to be done during configuration initialization but having it diff --git a/airflow-core/src/airflow/cli/utils.py b/airflow-core/src/airflow/cli/utils.py index 35a62c9df9135..aef66a8804622 100644 --- a/airflow-core/src/airflow/cli/utils.py +++ b/airflow-core/src/airflow/cli/utils.py @@ -17,6 +17,7 @@ from __future__ import annotations +import logging import sys from collections.abc import Callable from typing import TYPE_CHECKING, TypeVar @@ -78,6 +79,20 @@ def is_stdout(fileio: IOBase) -> bool: return fileio is sys.stdout +def redirect_stdout_log_handlers_to_stderr() -> None: + """ + Redirect any root-logger ``StreamHandler`` writing to stdout so it writes to stderr. + + Called from the CLI entrypoint for commands that emit structured output on + stdout (``-o json|yaml|plain|table``), so log lines do not corrupt that + output. ``FileHandler`` is a ``StreamHandler`` subclass; the identity check + against ``sys.stdout`` correctly skips it. + """ + for handler in logging.getLogger().handlers: + if isinstance(handler, logging.StreamHandler) and handler.stream is sys.stdout: + handler.setStream(sys.stderr) + + def print_export_output(command_type: str, exported_items: Collection, file: TextIOWrapper): if is_stdout(file): print(f"\n{len(exported_items)} {command_type} successfully exported.", file=sys.stderr) diff --git a/airflow-core/tests/unit/cli/test_utils.py b/airflow-core/tests/unit/cli/test_utils.py index f98a9ef6b193d..bd1556dd31038 100644 --- a/airflow-core/tests/unit/cli/test_utils.py +++ b/airflow-core/tests/unit/cli/test_utils.py @@ -17,9 +17,13 @@ # under the License. from __future__ import annotations +import logging +import sys import warnings -from airflow.cli.utils import deprecated_for_airflowctl +import pytest + +from airflow.cli.utils import deprecated_for_airflowctl, redirect_stdout_log_handlers_to_stderr class TestDeprecatedForAirflowctl: @@ -48,3 +52,48 @@ def command(a, b, *, c): assert command.__name__ == "command" assert command.__doc__ == "Original docstring." assert command._migrated_to_airflowctl == "airflowctl pools create" + + +class TestRedirectStdoutLogHandlersToStderr: + """Tests for the CLI helper that keeps logs off stdout for ``-o``-style commands.""" + + @pytest.fixture + def isolated_root_logger(self): + """Snapshot and restore root logger handlers so tests don't leak state.""" + root = logging.getLogger() + original_handlers = root.handlers[:] + root.handlers = [] + try: + yield root + finally: + root.handlers = original_handlers + + @pytest.mark.parametrize( + ("make_handler", "expected_stream"), + [ + pytest.param( + lambda _tmp_path: logging.StreamHandler(stream=sys.stdout), + lambda _original: sys.stderr, + id="stdout-stream-handler-redirected", + ), + pytest.param( + lambda _tmp_path: logging.StreamHandler(stream=sys.stderr), + lambda _original: sys.stderr, + id="stderr-stream-handler-untouched", + ), + pytest.param( + lambda tmp_path: logging.FileHandler(tmp_path / "airflow.log"), + lambda original: original, + id="file-handler-untouched", + ), + ], + ) + def test_redirect(self, isolated_root_logger, tmp_path, make_handler, expected_stream): + handler = make_handler(tmp_path) + isolated_root_logger.addHandler(handler) + original_stream = handler.stream + try: + redirect_stdout_log_handlers_to_stderr() + assert handler.stream is expected_stream(original_stream) + finally: + handler.close()