Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
# any possible import cycles with settings downstream.
from airflow import configuration
from airflow.cli import cli_parser
from airflow.cli.utils import redirect_stdout_log_handlers_to_stderr


def main():
Expand All @@ -45,6 +46,11 @@ def main():
parser = cli_parser.get_parser()
argcomplete.autocomplete(parser)
args = parser.parse_args()
# Commands that accept ``-o`` produce structured output on stdout; route any
# console log handler currently writing to stdout to stderr so log lines do
# not corrupt that output (e.g. ``airflow ... -o json | jq``).
if hasattr(args, "output"):
redirect_stdout_log_handlers_to_stderr()
if args.subcommand not in ["lazy_loaded", "version"]:
# Here we ensure that the default configuration is written if needed before running any command
# that might need it. This used to be done during configuration initialization but having it
Expand Down
15 changes: 15 additions & 0 deletions airflow-core/src/airflow/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

import logging
import sys
from collections.abc import Callable
from typing import TYPE_CHECKING, TypeVar
Expand Down Expand Up @@ -78,6 +79,20 @@ def is_stdout(fileio: IOBase) -> bool:
return fileio is sys.stdout


def redirect_stdout_log_handlers_to_stderr() -> None:
"""
Redirect any root-logger ``StreamHandler`` writing to stdout so it writes to stderr.

Called from the CLI entrypoint for commands that emit structured output on
stdout (``-o json|yaml|plain|table``), so log lines do not corrupt that
output. ``FileHandler`` is a ``StreamHandler`` subclass; the identity check
against ``sys.stdout`` correctly skips it.
"""
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.StreamHandler) and handler.stream is sys.stdout:
handler.setStream(sys.stderr)


def print_export_output(command_type: str, exported_items: Collection, file: TextIOWrapper):
if is_stdout(file):
print(f"\n{len(exported_items)} {command_type} successfully exported.", file=sys.stderr)
Expand Down
51 changes: 50 additions & 1 deletion airflow-core/tests/unit/cli/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
# under the License.
from __future__ import annotations

import logging
import sys
import warnings

from airflow.cli.utils import deprecated_for_airflowctl
import pytest

from airflow.cli.utils import deprecated_for_airflowctl, redirect_stdout_log_handlers_to_stderr


class TestDeprecatedForAirflowctl:
Expand Down Expand Up @@ -48,3 +52,48 @@ def command(a, b, *, c):
assert command.__name__ == "command"
assert command.__doc__ == "Original docstring."
assert command._migrated_to_airflowctl == "airflowctl pools create"


class TestRedirectStdoutLogHandlersToStderr:
"""Tests for the CLI helper that keeps logs off stdout for ``-o``-style commands."""

@pytest.fixture
def isolated_root_logger(self):
"""Snapshot and restore root logger handlers so tests don't leak state."""
root = logging.getLogger()
original_handlers = root.handlers[:]
root.handlers = []
try:
yield root
finally:
root.handlers = original_handlers

@pytest.mark.parametrize(
("make_handler", "expected_stream"),
[
pytest.param(
lambda _tmp_path: logging.StreamHandler(stream=sys.stdout),
lambda _original: sys.stderr,
id="stdout-stream-handler-redirected",
),
pytest.param(
lambda _tmp_path: logging.StreamHandler(stream=sys.stderr),
lambda _original: sys.stderr,
id="stderr-stream-handler-untouched",
),
pytest.param(
lambda tmp_path: logging.FileHandler(tmp_path / "airflow.log"),
lambda original: original,
id="file-handler-untouched",
),
],
)
def test_redirect(self, isolated_root_logger, tmp_path, make_handler, expected_stream):
handler = make_handler(tmp_path)
isolated_root_logger.addHandler(handler)
original_stream = handler.stream
try:
redirect_stdout_log_handlers_to_stderr()
assert handler.stream is expected_stream(original_stream)
finally:
handler.close()
Loading