From 26e5dd14ec3498b2e360ce7924b92c5e2259860d Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Wed, 27 May 2026 16:36:07 +0800 Subject: [PATCH 1/5] Reassign Dags from unconfigured bundles at DFP startup The 0082_3_1_0_make_bundle_name_not_nullable migration writes a single hard-coded ``bundle_name='dags-folder'`` on every legacy DagModel row, so deployments whose runtime config uses any other bundle name (or multiple custom bundles) cannot resolve the row at trigger time and fail with ``Requested bundle 'dags-folder' is not configured.`` (see #63323). Fixing this inside the Alembic migration is wrong: migrations must not import application code, and the migration runs before bundles are constructible, so the user's real bundle config is not available there. Instead, at DagFileProcessorManager startup -- after sync_bundles_to_db flushes the latest bundle state -- scan DagModel for legacy-candidate rows (NULL relative_fileloc and no DagVersion for that dag_id) and route each row to the most-specific configured bundle whose absolute path contains the Dag's fileloc, writing relative_fileloc at the same time so fileloc-based stale-detection works later. Rows whose fileloc is not under any configured bundle's path are left untouched: writing bundle_name without a verified relative_fileloc would produce an active row task workers cannot execute. Skipped rows then self-heal via the staleness lifecycle -- no manual ``airflow dags reserialize`` required. Concurrency and edge-case hardening on the repair path: * Fast-skip via ``EXISTS(DagVersion)`` -- DagVersion is written only by the parse path, which overwrites both bundle_name and relative_fileloc on every parse (DagModelOperation.update_dags), so once any 3.x parse has run the parse path is the source of truth and reassign has no work it would not do itself. PK-index probe vs. a sequential scan of ``dag`` (no index on relative_fileloc). * Chunked UPDATEs (_REASSIGN_BATCH_SIZE=1000) ordered by dag_id, one internally-owned transaction per chunk via create_session(), so the row-lock window stays bounded and the repair never commits a caller-provided session. Per-row compare-and-swap WHERE clause re-asserts the legacy-candidate predicate on the UPDATE so a concurrent parser write wins the race. * SELECT and UPDATE chunks run in separate sessions; per-row fileloc matching runs without a DB connection held. * Parent-traversal guard in _best_bundle_for_fileloc lexically normalises both sides with os.path.normpath and rejects any relative result that is still absolute or contains ``..``, so a stored fileloc like ``/dags/foo/../../outside.py`` cannot escape a bundle root. Lexical only -- no symlink resolution. * multi_team-safe because a bundle path belongs to at most one team. * Stale-Dag scan skips rows with NULL relative_fileloc and emits one INFO line per cycle with the skip count, so operator-visible legacy rows that the repair could not route stay observable. Tests cover custom bundle names, multiple bundles, overlapping paths (deepest wins), unmatched fileloc (row skipped), missing fileloc, FK-safety when a configured bundle is missing from dag_bundle, the legacy relative_fileloc backfill path, concurrent-DFP startup, chunk-boundary batching, and the full sync -> repair -> stale-scan -> re-parse lifecycle. closes: #63323 --- .../airflow/dag_processing/bundles/manager.py | 221 ++- .../src/airflow/dag_processing/manager.py | 20 +- .../bundles/test_dag_bundle_manager.py | 1282 ++++++++++++++++- .../tests/unit/dag_processing/test_manager.py | 100 ++ 4 files changed, 1614 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/bundles/manager.py b/airflow-core/src/airflow/dag_processing/bundles/manager.py index 78c54266eda9f..7820f80ca6419 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/manager.py +++ b/airflow-core/src/airflow/dag_processing/bundles/manager.py @@ -20,11 +20,13 @@ import logging import os import warnings -from typing import TYPE_CHECKING +from collections import defaultdict +from pathlib import Path +from typing import TYPE_CHECKING, cast from itsdangerous import URLSafeSerializer from pydantic import BaseModel, ValidationError -from sqlalchemy import delete, select +from sqlalchemy import delete, exists, select, update from airflow._shared.module_loading import import_string from airflow.configuration import conf @@ -34,17 +36,44 @@ from airflow.models.team import Team from airflow.providers_manager import ProvidersManager from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.session import NEW_SESSION, provide_session +from airflow.utils.session import NEW_SESSION, create_session, provide_session if TYPE_CHECKING: from collections.abc import Iterable + from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session log = logging.getLogger(__name__) _example_dag_bundle_name = "example_dags" +# Chunk size for the one-time startup repair of unconfigured bundles. +_REASSIGN_BATCH_SIZE = 1000 + + +def _best_bundle_for_fileloc( + fileloc: str, descending_bundle_paths: dict[str, Path] +) -> tuple[str, str] | None: + """ + Return ``(bundle_name, relative_fileloc)`` for the first bundle whose path contains ``fileloc``. + + ``descending_bundle_paths`` must be sorted by path length descending so + the deepest bundle wins when paths overlap. + + Returns ``None`` when ``fileloc`` is not under any bundle's path. + """ + file_path = Path(os.path.normpath(fileloc)) + for name, path in descending_bundle_paths.items(): + try: + relative = file_path.relative_to(os.path.normpath(path)) + except ValueError: + continue + if relative.is_absolute() or os.pardir in relative.parts: + continue + return name, str(relative) + return None + class _ExternalBundleConfig(BaseModel): """Schema defining the user-specified configuration for a DAG bundle.""" @@ -366,6 +395,192 @@ def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]: session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == name)) self.log.info("Deleted import errors for bundle %s which is no longer configured", name) + # Airflow sets autoflush=False, so subsequent reads in the same session + # need an explicit flush to see ORM-side bundle state changes. + session.flush() + + def reassign_dags_with_unconfigured_bundles(self) -> int: + """ + Reassign Dags pointing at unconfigured bundles to a configured one. + + Side effect of the ``0082_3_1_0_make_bundle_name_not_nullable`` + migration (#63323): legacy rows get ``bundle_name='dags-folder'`` + and NULL ``relative_fileloc``, which raises ``Requested bundle + '{name}' is not configured.`` at trigger time when the deployment + uses a custom bundle. + + Each legacy-candidate row (NULL ``relative_fileloc`` and no + ``DagVersion``) is routed to the most-specific configured bundle + whose path contains its ``fileloc``, writing ``relative_fileloc`` + atomically. Rows whose ``fileloc`` is under no configured bundle + are left untouched -- writing ``bundle_name`` without a verified + ``relative_fileloc`` would produce a row task workers cannot + execute -- and instead self-heal via the normal staleness + lifecycle: ``sync_bundles_to_db`` deactivates the old bundle, the + stale-scan marks the row stale, and the next successful parse + from any configured bundle resets everything via + ``update_dag_parsing_results_in_db``. No manual ``airflow dags + reserialize`` is required. + + Multi-team-safe because a bundle path belongs to at most one team. + Each chunk runs in its own internally-owned transaction so the + row-lock window stays bounded and no caller-provided session is + committed. + + :return: Number of Dags reassigned. + """ + # Import here to avoid circular import + # (manager -> dag -> dagrun -> taskinstance -> dag_version -> manager) + from airflow.models.dag import DagModel + from airflow.models.dag_version import DagVersion + + # Fast-skip once any 3.x parse cycle has run. DagVersion is written + # only by the parse path, and that path overwrites both bundle_name + # and relative_fileloc on every parse (see + # ``DagModelOperation.update_dags``), so any legacy row whose file + # is under a configured bundle self-heals at the next parse and any + # row whose file is under no configured bundle self-heals via the + # staleness lifecycle -- reassign has no work the parse path will + # not do itself. The probe is an index hit on dag_version's PK + # vs. a sequential scan of dag (no index on relative_fileloc). + with create_session() as session: + if session.scalar(select(DagVersion.id).limit(1)) is not None: + return 0 + + with create_session() as session: + if not (active_bundle_paths := self._resolve_active_bundle_paths(session=session)): + self.log.info( + "No active Dag bundles with resolvable paths; skipping reassignment of Dags " + "with unconfigured bundles." + ) + return 0 + + # Chunked UPDATEs ordered by dag_id, one transaction per chunk; repaired + # rows drop out of the predicate because writing relative_fileloc + # makes the IS NULL clause false. + # + # Legacy-candidate predicate (rows never parsed in 3.x): NULL + # relative_fileloc (the 0082 migration leaves it NULL) AND NOT EXISTS + # DagVersion (the parse path writes DagModel.bundle_name before the + # DagVersion). Equivalent under that invariant; both stated as + # defense in depth and repeated on the UPDATE itself as a CAS guard + # so a concurrent parser write wins the race. + movements: dict[tuple[str | None, str], int] = defaultdict(int) + total_reassigned = 0 + total_backfilled = 0 + total_skipped = 0 + last_dag_id: str | None = None + + while True: + with create_session() as session: + query = ( + select(DagModel.dag_id, DagModel.bundle_name, DagModel.fileloc) + .where( + DagModel.relative_fileloc.is_(None), + ~exists().where(DagVersion.dag_id == DagModel.dag_id), + ) + .order_by(DagModel.dag_id) + .limit(_REASSIGN_BATCH_SIZE) + ) + if last_dag_id is not None: + query = query.where(DagModel.dag_id > last_dag_id) + + if not (chunk := session.execute(query).all()): + break + last_dag_id = chunk[-1].dag_id + + # Route every legacy row by fileloc, not just those on + # unconfigured bundles, so a migration-assigned dags-folder + # row whose file lives under a different configured bundle + # gets relocated instead of stranded. Classify as skip + # (no match), backfill (match == current bundle), or + # reassign (match != current bundle). + chunk_updates: list[tuple[str, str | None, str, str]] = [] + for row in chunk: + match = _best_bundle_for_fileloc(row.fileloc, active_bundle_paths) if row.fileloc else None + if match is None: + total_skipped += 1 + continue + target, relative = match + chunk_updates.append((row.dag_id, row.bundle_name, target, relative)) + + if not chunk_updates: + continue + + with create_session() as session: + # create_session commits on context exit, bounding the + # row-lock window to one chunk. + for dag_id, prev_bundle, target, relative in chunk_updates: + result = cast( + "CursorResult", + session.execute( + update(DagModel) + .where( + DagModel.dag_id == dag_id, + DagModel.relative_fileloc.is_(None), + ~exists().where(DagVersion.dag_id == DagModel.dag_id), + ) + .values(relative_fileloc=relative, bundle_name=target) + .execution_options(synchronize_session=False) + ), + ) + + if result.rowcount: + # Rowcount is the source of truth for whether the CAS actually fired + if target == prev_bundle: + total_backfilled += 1 + else: + movements[(prev_bundle, target)] += 1 + total_reassigned += 1 + else: + self.log.debug("Skipping repair for Dag '%s': lost race to parser.", dag_id) + + for (prev, target), n in sorted(movements.items(), key=lambda item: (str(item[0][0]), item[0][1])): + self.log.info( + "Reassigning %d Dag(s) from unconfigured bundle '%s' to '%s'", + n, + prev, + target, + ) + + if total_backfilled: + self.log.info("Backfilled relative_fileloc for %d legacy Dag(s).", total_backfilled) + + if total_skipped: + self.log.warning( + "Skipped %d legacy Dag(s) whose fileloc is not under any configured bundle; " + "they will be marked stale until a matching bundle is added to " + "dag_bundle_config_list and the next parse restores them.", + total_skipped, + ) + + return total_reassigned + + def _resolve_active_bundle_paths(self, *, session: Session) -> dict[str, Path]: + """ + Return resolved absolute paths for configured-and-active bundles. + + A bundle is "configured-and-active" when it is both in the manager's + config and persisted as ``active=True`` in ``dag_bundle`` -- bundles + missing from ``dag_bundle`` are excluded so they can't trigger an FK + violation if used as a reassignment target. + + The returned dict is ``{name: absolute_path}`` ordered by path length + descending so the most specific bundle wins in + ``_best_bundle_for_fileloc``. + """ + active_db_names = set( + session.scalars(select(DagBundleModel.name).where(DagBundleModel.active.is_(True))) + ) + + active_bundle_paths: dict[str, Path] = {} + for bundle in self.get_all_dag_bundles(): + if bundle.name not in active_db_names: + continue + active_bundle_paths[bundle.name] = bundle.path + + return dict(sorted(active_bundle_paths.items(), key=lambda item: len(str(item[1])), reverse=True)) + @staticmethod def _extract_template_params(bundle_instance: BaseDagBundle) -> dict: """ diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index e43920f2618bc..24fbee652f73e 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -318,7 +318,9 @@ def _exit_gracefully(self, signum, frame): def sync_bundles(self) -> None: """Sync configured DAG bundles to the metadata database.""" - DagBundlesManager().sync_bundles_to_db() + dag_bundle_manager = DagBundlesManager() + dag_bundle_manager.sync_bundles_to_db() + dag_bundle_manager.reassign_dags_with_unconfigured_bundles() def get_all_bundles(self) -> list[BaseDagBundle]: """Return configured DAG bundles filtered by ``bundle_names_to_parse`` if provided.""" @@ -449,6 +451,7 @@ def deactivate_stale_dags( ).where(~DagModel.is_stale) dags_parsed = session.execute(query) + stuck_legacy_rows = 0 for dag in dags_parsed: # Dags whose bundle has been removed from config (bundle no longer active) are stale — # the processor has stopped parsing their files, so the time-based check below would never fire. @@ -460,6 +463,12 @@ def deactivate_stale_dags( ) to_deactivate.add(dag.dag_id) continue + # Legacy 0082-migration row the startup repair could not route. + # Path(None) would crash the file-based check below; skip and count + # so it can be surfaced after the loop. + if dag.relative_fileloc is None: + stuck_legacy_rows += 1 + continue # When the Dag's last_parsed_time is more than the stale_dag_threshold older than the # Dag file's last_finish_time, the Dag is considered stale as has apparently been removed from the file, # This is especially relevant for Dag files that generate Dags in a dynamic manner. @@ -496,6 +505,15 @@ def deactivate_stale_dags( else: raise + if stuck_legacy_rows: + # Surface how many legacy rows the startup repair could not route; + # each one keeps raising "Requested bundle is not configured." until + # a matching bundle is added to dag_bundle_config_list. + self.log.info( + "Skipped stale check for %d legacy Dag(s) with NULL relative_fileloc.", + stuck_legacy_rows, + ) + def _run_parsing_loop(self): # initialize cache to mutualize calls to Variable.get in DAGs # needs to be done before this process is forked to create the DAG parsing processes. diff --git a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py index 0d17069c831dd..aff82be7242c8 100644 --- a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py +++ b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py @@ -17,22 +17,27 @@ from __future__ import annotations +import contextlib import json import os from contextlib import nullcontext +from pathlib import Path +from unittest import mock from unittest.mock import patch import pytest -from sqlalchemy import func, select +from sqlalchemy import func, select, update from airflow.dag_processing.bundles.base import BaseDagBundle -from airflow.dag_processing.bundles.manager import DagBundlesManager +from airflow.dag_processing.bundles.manager import DagBundlesManager, _best_bundle_for_fileloc from airflow.exceptions import AirflowConfigException +from airflow.models.dag import DagModel from airflow.models.dagbundle import DagBundleModel from airflow.models.errors import ParseImportError +from airflow.utils.session import create_session from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.db import clear_db_dag_bundles +from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags @pytest.mark.parametrize( @@ -107,8 +112,9 @@ def refresh(self): def get_current_version(self): pass - def path(self): - pass + @property + def path(self) -> Path: + return Path("/__basic_bundle_unmatched__") BASIC_BUNDLE_CONFIG = [ @@ -118,6 +124,13 @@ def path(self): "kwargs": {"refresh_interval": 1}, } ] +SECOND_BUNDLE_CONFIG = [ + { + "name": "second-bundle", + "classpath": "unit.dag_processing.bundles.test_dag_bundle_manager.BasicBundle", + "kwargs": {"refresh_interval": 1}, + } +] def test_get_bundle(): @@ -476,3 +489,1262 @@ def test_get_all_bundle_names(): # the naming suffix instead of pinning an exact list. extra = [n for n in bundle_names if n not in {"dags-folder", "example_dags"}] assert all(n.endswith("-example-dags") for n in extra) + + +@pytest.fixture +def clear_dags_and_bundles(): + clear_db_dags() + clear_db_dag_bundles() + yield + clear_db_dags() + clear_db_dag_bundles() + + +def _add_dag(session, dag_id: str, bundle_name: str) -> DagModel: + dag = DagModel(dag_id=dag_id, bundle_name=bundle_name, fileloc=f"/tmp/{dag_id}.py") + session.add(dag) + session.flush() + return dag + + +class TestBestBundleForFileloc: + """Tests for ``_best_bundle_for_fileloc`` path normalisation and safety.""" + + def test_returns_relative_path_for_match(self) -> None: + assert _best_bundle_for_fileloc("/dags/team_x/dag.py", {"team-x": Path("/dags/team_x")}) == ( + "team-x", + "dag.py", + ) + + def test_returns_none_for_no_match(self) -> None: + assert _best_bundle_for_fileloc("/elsewhere/dag.py", {"team-x": Path("/dags/team_x")}) is None + + def test_returns_none_for_empty_paths(self) -> None: + assert _best_bundle_for_fileloc("/dags/dag.py", {}) is None + + @pytest.mark.parametrize( + "fileloc", + [ + pytest.param("/dags/foo/../../outside.py", id="parent_traversal_escapes_root"), + pytest.param("/dags/../outside.py", id="parent_traversal_at_root"), + pytest.param("/dags/../../etc/passwd", id="multiple_parent_traversal"), + ], + ) + def test_rejects_parent_traversal_filelocs(self, fileloc: str) -> None: + """A fileloc with ``..`` segments that escape the bundle root must not match. + + Without lexical normalisation, ``Path.relative_to`` on an unnormalised + path with ``..`` segments returns a relative path like + ``foo/../../outside.py`` that, when later joined with the bundle root, + addresses files outside it. Normalising both sides collapses the + traversal so the fileloc is no longer under the bundle and the helper + returns ``None``. + """ + assert _best_bundle_for_fileloc(fileloc, {"dags": Path("/dags")}) is None + + def test_normalises_redundant_separators_and_dots(self) -> None: + assert _best_bundle_for_fileloc("/dags//team_x/./dag.py", {"team-x": Path("/dags/team_x")}) == ( + "team-x", + "dag.py", + ) + + +@pytest.mark.db_test +class TestReassignDagsWithUnconfiguredBundles: + """Tests for DagBundlesManager.reassign_dags_with_unconfigured_bundles.""" + + def _manager_with_bundle_names(self, names: list[str]) -> DagBundlesManager: + """Return a manager whose ``_resolve_active_bundle_paths`` reports *names* with non-matching paths. + + The fake paths are absolute roots that cannot contain any + ``/tmp/{dag_id}.py`` test fileloc, so rows are routed as unmatched + unless a test explicitly arranges otherwise. + """ + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(BASIC_BUNDLE_CONFIG)}, + ): + manager = DagBundlesManager() + paths = {name: Path(f"/__unmatched_for_test__/{name}") for name in names} + manager._resolve_active_bundle_paths = lambda *, session: paths # type: ignore[method-assign] + return manager + + def test_no_configured_bundles_is_noop(self, clear_dags_and_bundles, session): + """Return 0 without raising when no bundles are configured.""" + manager = self._manager_with_bundle_names([]) + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + + def test_already_configured_is_noop(self, clear_dags_and_bundles, session) -> None: + """No reassignment when every Dag already points at a configured bundle.""" + bundle = DagBundleModel(name="bundle-a") + bundle.active = True + session.add(bundle) + session.flush() + _add_dag(session, "dag-1", "bundle-a") + session.commit() + + manager = self._manager_with_bundle_names(["bundle-a"]) + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + assert session.get(DagModel, "dag-1").bundle_name == "bundle-a" + + def test_unmatched_fileloc_leaves_row_untouched(self, clear_dags_and_bundles, session, caplog) -> None: + """Rows whose fileloc has no configured bundle path keep their original bundle. + + The fallback that wrote ``bundle_name`` without a verified + ``relative_fileloc`` produced active-but-un-runnable rows, so the + helper now leaves such rows on their unconfigured bundle and emits a + single warning naming the count. + """ + active = DagBundleModel(name="active") + active.active = True + removed = DagBundleModel(name="removed-bundle") + removed.active = False + session.add(active) + session.add(removed) + session.flush() + _add_dag(session, "dag-1", "removed-bundle") + _add_dag(session, "dag-2", "removed-bundle") + session.commit() + + manager = self._manager_with_bundle_names(["active"]) + with caplog.at_level("WARNING", logger="airflow.dag_processing.bundles.manager.DagBundlesManager"): + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + + session.expire_all() + for dag_id in ("dag-1", "dag-2"): + row = session.get(DagModel, dag_id) + assert row.bundle_name == "removed-bundle" + assert row.relative_fileloc is None + assert any("Skipped 2 legacy Dag(s)" in record.message for record in caplog.records) + + def test_row_with_populated_relative_fileloc_is_left_alone(self, clear_dags_and_bundles, session) -> None: + """A 3.x row whose bundle is no longer configured must keep its bundle assignment. + + Only rows the 0082 migration touched (``relative_fileloc IS NULL``) are + candidates for reassignment. Rows that already carry a relative path + were written by 3.x serialization and must be left on their bundle so + the regular stale-Dag deactivation path can handle a removed bundle. + """ + active_bundle = DagBundleModel(name="active") + active_bundle.active = True + removed_bundle = DagBundleModel(name="removed-bundle") + removed_bundle.active = False + session.add(active_bundle) + session.add(removed_bundle) + session.flush() + + dag = DagModel( + dag_id="dag-1", + bundle_name="removed-bundle", + fileloc="/tmp/dag-1.py", + ) + dag.relative_fileloc = "dag-1.py" + dag.bundle_version = "abc123" + session.add(dag) + session.flush() + session.commit() + + manager = self._manager_with_bundle_names(["active"]) + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + + session.expire_all() + refreshed = session.get(DagModel, "dag-1") + assert refreshed.bundle_name == "removed-bundle" + assert refreshed.relative_fileloc == "dag-1.py" + assert refreshed.bundle_version == "abc123" + + def test_row_with_existing_dag_version_is_left_alone(self, clear_dags_and_bundles, session) -> None: + """A Dag with any DagVersion row must not be reassigned. + + Defended at two layers: (1) the global DagVersion-existence fast-skip + short-circuits before any work, and (2) the per-row predicate + ``NOT EXISTS DagVersion`` excludes the row even if the fast-skip is + bypassed. The parse path is the source of truth for any Dag with a + DagVersion -- touching only the DagModel would leave the DagVersion + stale, and scheduler/executor paths prefer DagVersion.bundle_name + when building task workloads. + """ + from airflow.models.dag_version import DagVersion + + active = DagBundleModel(name="active") + active.active = True + removed = DagBundleModel(name="removed-bundle") + removed.active = False + session.add(active) + session.add(removed) + session.flush() + + # NULL relative_fileloc would normally make this a repair candidate; + # the DagVersion row should exclude it from the predicate. + dag = DagModel( + dag_id="versioned", + bundle_name="removed-bundle", + fileloc="/tmp/versioned.py", + ) + dag.relative_fileloc = None + session.add(dag) + session.flush() + + version = DagVersion( + dag_id="versioned", + version_number=1, + bundle_name="removed-bundle", + bundle_version="v1", + ) + session.add(version) + session.flush() + session.commit() + + manager = self._manager_with_bundle_names(["active"]) + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + + session.expire_all() + refreshed = session.get(DagModel, "versioned") + assert refreshed.bundle_name == "removed-bundle" + assert refreshed.relative_fileloc is None + refreshed_version = session.get(DagVersion, version.id) + assert refreshed_version.bundle_name == "removed-bundle" + assert refreshed_version.bundle_version == "v1" + + @conf_vars({("core", "multi_team"): "True"}) + def test_runs_under_multi_team_mode(self, clear_dags_and_bundles, session, tmp_path) -> None: + """Multi-team mode still repairs legacy rows. + + Each team's bundle owns a distinct on-disk path, so routing a legacy + row to the most-specific bundle whose path contains its ``fileloc`` + cannot cross a team boundary. The repair therefore runs unchanged + under ``core.multi_team`` and must not blanket-skip rows that have a + safe target. + """ + bundle_dir = tmp_path / "team_x" + bundle_dir.mkdir() + legacy_file = bundle_dir / "legacy.py" + legacy_file.write_text("# legacy dag") + + config = [ + { + "name": "team-x-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + active_bundle = DagBundleModel(name="team-x-bundle") + active_bundle.active = True + removed_bundle = DagBundleModel(name="removed-bundle") + removed_bundle.active = False + session.add(active_bundle) + session.add(removed_bundle) + session.flush() + + dag = DagModel(dag_id="legacy", bundle_name="removed-bundle", fileloc=str(legacy_file)) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + + assert count == 1 + session.expire_all() + refreshed = session.get(DagModel, "legacy") + assert refreshed.bundle_name == "team-x-bundle" + assert refreshed.relative_fileloc == "legacy.py" + + +@pytest.mark.db_test +class TestBackfillRelativeFileloc: + """Tests for the legacy ``relative_fileloc`` backfill triggered by reassignment.""" + + @pytest.mark.parametrize( + ("fileloc_under_bundle", "expected_bundle_name", "expected_relative_fileloc"), + [ + pytest.param(True, "my-bundle", "legacy.py", id="fileloc_under_bundle_path_is_backfilled"), + pytest.param(False, "orphan-bundle", None, id="fileloc_outside_bundle_path_is_left_alone"), + ], + ) + def test_backfill_behavior( + self, + clear_dags_and_bundles, + session, + tmp_path, + fileloc_under_bundle: bool, + expected_bundle_name: str, + expected_relative_fileloc: str | None, + ) -> None: + """Reassignment only happens when ``fileloc`` lies under a configured bundle path. + + When the fileloc matches, both ``bundle_name`` and ``relative_fileloc`` + are written atomically. When it does not match, the row is left on its + unconfigured bundle so it cannot become an active-but-un-runnable row. + + :param fileloc_under_bundle: Whether the legacy Dag's absolute ``fileloc`` lies under + the configured bundle's path. + :param expected_bundle_name: Expected ``bundle_name`` after reassignment. + :param expected_relative_fileloc: Expected ``relative_fileloc`` after reassignment. + """ + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + legacy_file_under_bundle = bundle_dir / "legacy.py" + legacy_file_under_bundle.write_text("# legacy dag") + legacy_fileloc = str(legacy_file_under_bundle) if fileloc_under_bundle else "/elsewhere/foo.py" + + config = [ + { + "name": "my-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + active_bundle = DagBundleModel(name="my-bundle") + active_bundle.active = True + orphan_bundle = DagBundleModel(name="orphan-bundle") + orphan_bundle.active = False + session.add(active_bundle) + session.add(orphan_bundle) + session.flush() + + dag = DagModel(dag_id="legacy", bundle_name="orphan-bundle", fileloc=legacy_fileloc) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + manager.reassign_dags_with_unconfigured_bundles() + + session.expire_all() + refreshed = session.get(DagModel, "legacy") + assert refreshed.bundle_name == expected_bundle_name + assert refreshed.relative_fileloc == expected_relative_fileloc + + def test_post_2x_to_3x_migration_with_renamed_bundle( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """End-to-end: 2.x→3.x upgrade where the operator's bundle is not named ``dags-folder``. + + Reproduces the original incident: the migration sets ``bundle_name='dags-folder'`` on + legacy Dag rows and leaves ``relative_fileloc`` NULL, but the operator has configured a + single LocalDagBundle named ``custom-bundle`` pointing at the same on-disk dags folder. + After ``reassign_dags_with_unconfigured_bundles`` runs at DFP startup, the legacy Dags + should be: + + 1. Reassigned to ``custom-bundle`` so triggering DagRuns no longer fails with + "Requested bundle 'dags-folder' is not configured." + 2. Have ``relative_fileloc`` backfilled from ``fileloc`` so the standard fileloc-based + stale-detection path can later detect real deletions. + """ + dags_folder = tmp_path / "dags" + dags_folder.mkdir() + legacy_file = dags_folder / "my_dag.py" + legacy_file.write_text("# 2.x dag") + + config = [ + { + "name": "custom-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(dags_folder), "refresh_interval": 1}, + } + ] + # State right after the 0082 migration: ``dags-folder`` row exists in dag_bundle (added + # by migration backfill) but is not in the user's config; the user's ``custom-bundle`` + # has not been registered yet (sync_bundles_to_db does that on startup). + legacy_default_bundle = DagBundleModel(name="dags-folder") + legacy_default_bundle.active = True + session.add(legacy_default_bundle) + session.flush() + + legacy_dag = DagModel( + dag_id="legacy_dag", + bundle_name="dags-folder", + fileloc=str(legacy_file), + ) + legacy_dag.relative_fileloc = None + session.add(legacy_dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + # DFP startup order: register configured bundles, then reassign. + manager.sync_bundles_to_db(session=session) + session.commit() + count = manager.reassign_dags_with_unconfigured_bundles() + + assert count == 1 + session.expire_all() + refreshed = session.get(DagModel, "legacy_dag") + assert refreshed.bundle_name == "custom-bundle" + assert refreshed.relative_fileloc == "my_dag.py" + + def test_backfill_commits_between_chunks( + self, clear_dags_and_bundles, session, tmp_path, monkeypatch + ) -> None: + """The legacy backfill chunks and commits like the reassignment loop. + + Without chunked commits, a deployment where every legacy Dag is + already on a configured bundle would still UPDATE the whole set in + one transaction, holding row locks on the dag table for the full + DFP startup. + """ + from airflow.dag_processing.bundles import manager as bundles_manager_mod + + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + for i in range(5): + (bundle_dir / f"legacy_{i}.py").write_text(f"# legacy {i}") + + config = [ + { + "name": "my-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + active_bundle = DagBundleModel(name="my-bundle") + active_bundle.active = True + session.add(active_bundle) + session.flush() + + for i in range(5): + dag = DagModel( + dag_id=f"legacy_{i}", + bundle_name="my-bundle", + fileloc=str(bundle_dir / f"legacy_{i}.py"), + ) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + monkeypatch.setattr(bundles_manager_mod, "_REASSIGN_BATCH_SIZE", 2) + # Each chunk opens its own ``create_session`` (which commits on exit), + # so counting context-manager entries equals counting batch commits. + session_open_count = [0] + real_create_session = bundles_manager_mod.create_session + + @contextlib.contextmanager + def _counting_create_session(*args, **kwargs): + session_open_count[0] += 1 + with real_create_session(*args, **kwargs) as s: + yield s + + monkeypatch.setattr(bundles_manager_mod, "create_session", _counting_create_session) + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + # No unconfigured rows exist, so the reassignment loop returns + # immediately and the helper drives the chunking we are testing. + manager.reassign_dags_with_unconfigured_bundles() + + # 1 session for the active-bundle read + 5 backfill rows / batch size 2 + # = chunks of 2, 2, 1, then an empty chunk that terminates the loop. + assert session_open_count[0] >= 4 + + session.expire_all() + for i in range(5): + refreshed = session.get(DagModel, f"legacy_{i}") + assert refreshed.relative_fileloc == f"legacy_{i}.py" + + def test_backfill_does_not_overwrite_concurrent_parser_write( + self, clear_dags_and_bundles, session, tmp_path, monkeypatch + ) -> None: + """A concurrent parser write between SELECT and UPDATE keeps its value. + + Mirrors the race-safety regression on the reassignment UPDATE; the + backfill UPDATE re-asserts ``relative_fileloc IS NULL`` so the + parser's write is authoritative. + """ + from airflow.dag_processing.bundles import manager as bundles_manager_mod + + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + legacy_file = bundle_dir / "legacy.py" + legacy_file.write_text("# legacy") + + config = [ + { + "name": "my-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + active_bundle = DagBundleModel(name="my-bundle") + active_bundle.active = True + session.add(active_bundle) + session.flush() + + dag = DagModel( + dag_id="legacy", + bundle_name="my-bundle", + fileloc=str(legacy_file), + ) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + # Drive the race: the helper invokes ``Path(fileloc)`` and then + # ``.relative_to(bundle_path)``. Wrap the Path constructor so the + # returned object commits a parser write before delegating to the + # real ``relative_to``. + original_path_cls = bundles_manager_mod.Path + + def _racey_path_factory(arg): + real_path = original_path_cls(arg) + + def _racey_relative_to(*args, **kwargs): + with create_session() as racer: + racer.execute( + update(DagModel) + .where(DagModel.dag_id == "legacy") + .values(relative_fileloc="parser_wrote_this.py") + ) + return type(real_path).relative_to(real_path, *args, **kwargs) + + patcher = mock.MagicMock(wraps=real_path) + patcher.relative_to = _racey_relative_to + return patcher + + monkeypatch.setattr(bundles_manager_mod, "Path", _racey_path_factory) + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + manager.reassign_dags_with_unconfigured_bundles() + + session.expire_all() + refreshed = session.get(DagModel, "legacy") + assert refreshed.relative_fileloc == "parser_wrote_this.py" + + +@pytest.mark.db_test +class TestSmarterRouting: + """Tests for per-row best-bundle routing in ``reassign_dags_with_unconfigured_bundles``.""" + + def test_routes_to_bundle_whose_path_contains_fileloc( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """When multiple bundles are configured, route each Dag to the bundle whose path matches.""" + bundle_a_dir = tmp_path / "bundle_a" + bundle_b_dir = tmp_path / "bundle_b" + bundle_a_dir.mkdir() + bundle_b_dir.mkdir() + dag_a_file = bundle_a_dir / "dag_a.py" + dag_b_file = bundle_b_dir / "dag_b.py" + dag_a_file.write_text("# a") + dag_b_file.write_text("# b") + + config = [ + { + "name": "bundle-a", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_a_dir), "refresh_interval": 1}, + }, + { + "name": "bundle-b", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_b_dir), "refresh_interval": 1}, + }, + ] + for name, active in [("bundle-a", True), ("bundle-b", True), ("orphan", False)]: + b = DagBundleModel(name=name) + b.active = active + session.add(b) + session.flush() + + for dag_id, fileloc in [("dag-a", str(dag_a_file)), ("dag-b", str(dag_b_file))]: + dag = DagModel(dag_id=dag_id, bundle_name="orphan", fileloc=fileloc) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + count = manager.reassign_dags_with_unconfigured_bundles() + + assert count == 2 + session.expire_all() + # Each Dag goes to the bundle whose path contains its fileloc, not just the first one. + a = session.get(DagModel, "dag-a") + assert a.bundle_name == "bundle-a" + assert a.relative_fileloc == "dag_a.py" + b = session.get(DagModel, "dag-b") + assert b.bundle_name == "bundle-b" + assert b.relative_fileloc == "dag_b.py" + + def test_longest_matching_path_wins_for_overlapping_bundles( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """When bundle paths nest, route to the most-specific bundle.""" + outer_dir = tmp_path / "outer" + inner_dir = outer_dir / "team_x" + inner_dir.mkdir(parents=True) + dag_file = inner_dir / "deep_dag.py" + dag_file.write_text("# deep") + + config = [ + { + "name": "outer-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(outer_dir), "refresh_interval": 1}, + }, + { + "name": "team-x-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(inner_dir), "refresh_interval": 1}, + }, + ] + for name in ("outer-bundle", "team-x-bundle"): + b = DagBundleModel(name=name) + b.active = True + session.add(b) + orphan = DagBundleModel(name="orphan") + orphan.active = False + session.add(orphan) + session.flush() + + dag = DagModel(dag_id="deep", bundle_name="orphan", fileloc=str(dag_file)) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + manager.reassign_dags_with_unconfigured_bundles() + + session.expire_all() + refreshed = session.get(DagModel, "deep") + # Most-specific (deeper) bundle wins. + assert refreshed.bundle_name == "team-x-bundle" + assert refreshed.relative_fileloc == "deep_dag.py" + + def test_no_path_match_leaves_row_unchanged( + self, clear_dags_and_bundles, session, tmp_path, caplog + ) -> None: + """A Dag whose fileloc is outside every configured bundle is left untouched. + + Writing ``bundle_name`` without a verified ``relative_fileloc`` would + produce an active row that task workloads cannot execute (no + ``dag_rel_path``). The row stays on its unconfigured bundle so the + existing "Requested bundle '{name}' is not configured." error at + trigger time gives the operator an actionable signal. + """ + bundle_dir = tmp_path / "configured" + bundle_dir.mkdir() + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + configured = DagBundleModel(name="configured-bundle") + configured.active = True + session.add(configured) + orphan = DagBundleModel(name="orphan") + orphan.active = False + session.add(orphan) + session.flush() + + dag = DagModel(dag_id="elsewhere", bundle_name="orphan", fileloc="/somewhere/else/dag.py") + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with ( + patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ), + caplog.at_level("WARNING", logger="airflow.dag_processing.bundles.manager.DagBundlesManager"), + ): + manager = DagBundlesManager() + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + + session.expire_all() + refreshed = session.get(DagModel, "elsewhere") + assert refreshed.bundle_name == "orphan" + assert refreshed.relative_fileloc is None + assert any("Skipped 1 legacy Dag(s)" in record.message for record in caplog.records) + + def test_legacy_row_on_active_default_routed_to_better_match( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """Migration-assigned ``dags-folder`` rows are re-routed when another bundle owns the file. + + Migration 0082 assigns every legacy 2.x row to ``dags-folder`` by + default. An operator that keeps a configured ``dags-folder`` bundle + alongside another bundle whose path contains the Dag's ``fileloc`` + must see the Dag reassigned to the better-matching bundle -- not + stranded on ``dags-folder`` just because that name is still active. + """ + dags_folder_dir = tmp_path / "dags-folder" + team_x_dir = tmp_path / "team_x" + dags_folder_dir.mkdir() + team_x_dir.mkdir() + team_x_file = team_x_dir / "team_x_dag.py" + team_x_file.write_text("# team-x dag") + + config = [ + { + "name": "dags-folder", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(dags_folder_dir), "refresh_interval": 1}, + }, + { + "name": "team-x-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(team_x_dir), "refresh_interval": 1}, + }, + ] + for name in ("dags-folder", "team-x-bundle"): + bundle = DagBundleModel(name=name) + bundle.active = True + session.add(bundle) + session.flush() + + # Migration state: bundle_name set to ``dags-folder`` (still in the + # active config!) but fileloc actually lives under ``team-x-bundle``. + dag = DagModel(dag_id="team_x_dag", bundle_name="dags-folder", fileloc=str(team_x_file)) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + + assert count == 1 + session.expire_all() + refreshed = session.get(DagModel, "team_x_dag") + assert refreshed.bundle_name == "team-x-bundle" + assert refreshed.relative_fileloc == "team_x_dag.py" + + def test_legacy_row_on_correct_bundle_only_backfills_relative_fileloc( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """A legacy row whose ``fileloc`` matches its current bundle keeps the bundle. + + The repair must not rewrite ``bundle_name`` when the best match is + the same bundle the row already points at; it only needs to fill in + the missing ``relative_fileloc``. + """ + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + legacy_file = bundle_dir / "legacy.py" + legacy_file.write_text("# legacy") + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + configured = DagBundleModel(name="configured-bundle") + configured.active = True + session.add(configured) + session.flush() + + dag = DagModel(dag_id="legacy", bundle_name="configured-bundle", fileloc=str(legacy_file)) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + # The reassignment counter only includes rows whose bundle_name + # changed; a same-bundle backfill is not counted as a reassignment. + count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + + assert count == 0 + session.expire_all() + refreshed = session.get(DagModel, "legacy") + assert refreshed.bundle_name == "configured-bundle" + assert refreshed.relative_fileloc == "legacy.py" + + +@pytest.mark.db_test +class TestBatching: + """Tests for the chunked-commit pattern in ``reassign_dags_with_unconfigured_bundles``.""" + + def test_repair_commits_between_chunks( + self, clear_dags_and_bundles, session, tmp_path, monkeypatch + ) -> None: + """All matched rows are repaired and the loop commits between chunks. + + Each ``session.commit()`` bounds the row-lock window to one chunk, + which is the load-bearing property on a large 2.x-upgraded + deployment. + """ + from airflow.dag_processing.bundles import manager as bundles_manager_mod + + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + dag_files = [] + for i in range(5): + dag_file = bundle_dir / f"dag_{i}.py" + dag_file.write_text(f"# dag {i}") + dag_files.append(dag_file) + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + configured = DagBundleModel(name="configured-bundle") + configured.active = True + orphan = DagBundleModel(name="orphan") + orphan.active = False + session.add(configured) + session.add(orphan) + session.flush() + + for i, dag_file in enumerate(dag_files): + dag = DagModel( + dag_id=f"dag_{i}", + bundle_name="orphan", + fileloc=str(dag_file), + ) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() # baseline state visible to the repair's own commits + + monkeypatch.setattr(bundles_manager_mod, "_REASSIGN_BATCH_SIZE", 2) + # Each chunk opens its own ``create_session`` (which commits on exit), + # so counting context-manager entries equals counting batch commits. + session_open_count = [0] + real_create_session = bundles_manager_mod.create_session + + @contextlib.contextmanager + def _counting_create_session(*args, **kwargs): + session_open_count[0] += 1 + with real_create_session(*args, **kwargs) as s: + yield s + + monkeypatch.setattr(bundles_manager_mod, "create_session", _counting_create_session) + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + count = manager.reassign_dags_with_unconfigured_bundles() + + assert count == 5 + # 1 session for the active-bundle read + 5 rows / batch size 2 + # = chunks of 2, 2, 1, then an empty chunk that terminates the loop. + assert session_open_count[0] >= 4 + + session.expire_all() + for i in range(5): + refreshed = session.get(DagModel, f"dag_{i}") + assert refreshed.bundle_name == "configured-bundle" + assert refreshed.relative_fileloc == f"dag_{i}.py" + + +@pytest.mark.db_test +class TestRaceSafety: + """Tests that the repair UPDATE does not overwrite concurrent parser writes.""" + + def test_concurrent_parse_between_select_and_update_wins( + self, clear_dags_and_bundles, session, tmp_path, monkeypatch + ) -> None: + """A parser that lands a write between our SELECT and UPDATE keeps its values. + + The repair's UPDATE re-asserts ``relative_fileloc IS NULL`` (and the + DagVersion absence) so SQL evaluates against committed state when the + UPDATE runs. A concurrent parse that wrote the real values first + makes our UPDATE match zero rows; we must not overwrite it. + """ + from airflow.dag_processing.bundles import manager as bundles_manager_mod + + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + dag_file = bundle_dir / "raced.py" + dag_file.write_text("# raced") + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + configured = DagBundleModel(name="configured-bundle") + configured.active = True + orphan = DagBundleModel(name="orphan") + orphan.active = False + session.add(configured) + session.add(orphan) + session.flush() + + dag = DagModel(dag_id="raced", bundle_name="orphan", fileloc=str(dag_file)) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + # Drive the race: before _best_bundle_for_fileloc returns (which + # sits between the chunk SELECT and the per-row UPDATE), simulate a + # concurrent parser that has already committed the real values. + original = bundles_manager_mod._best_bundle_for_fileloc + + def _racey_match(fileloc, active_bundle_paths): + with create_session() as racer: + racer.execute( + update(DagModel) + .where(DagModel.dag_id == "raced") + .values( + bundle_name="configured-bundle", + relative_fileloc="parser_wrote_this.py", + ) + ) + return original(fileloc, active_bundle_paths) + + monkeypatch.setattr(bundles_manager_mod, "_best_bundle_for_fileloc", _racey_match) + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + count = manager.reassign_dags_with_unconfigured_bundles() + + # CAS guard prevents the repair from clobbering the parser write. + assert count == 0 + session.expire_all() + refreshed = session.get(DagModel, "raced") + assert refreshed.bundle_name == "configured-bundle" + assert refreshed.relative_fileloc == "parser_wrote_this.py" + + +@pytest.mark.db_test +class TestHighAvailabilityStartup: + """Two DFPs entering the repair path concurrently must converge safely.""" + + def _build_dataset(self, session, bundle_dir): + """Insert one reassignment-eligible row and one backfill-eligible row.""" + reassign_file = bundle_dir / "reassign.py" + backfill_file = bundle_dir / "backfill.py" + reassign_file.write_text("# reassign") + backfill_file.write_text("# backfill") + + configured = DagBundleModel(name="configured-bundle") + configured.active = True + orphan = DagBundleModel(name="orphan") + orphan.active = False + session.add(configured) + session.add(orphan) + session.flush() + + # Hits the reassignment branch: bundle is unconfigured, fileloc lies + # under a configured bundle's path. + reassign_dag = DagModel(dag_id="reassign", bundle_name="orphan", fileloc=str(reassign_file)) + reassign_dag.relative_fileloc = None + session.add(reassign_dag) + + # Hits the backfill branch: bundle is already configured, but + # relative_fileloc is NULL (legacy row). + backfill_dag = DagModel( + dag_id="backfill", bundle_name="configured-bundle", fileloc=str(backfill_file) + ) + backfill_dag.relative_fileloc = None + session.add(backfill_dag) + session.flush() + session.commit() + + def test_sequential_repeat_is_idempotent(self, clear_dags_and_bundles, session, tmp_path) -> None: + """Running the full repair twice over the same dataset must be a no-op the second pass. + + Simulates two DFPs starting up one after the other (or the same DFP + restarting): the second pass's SELECT must find an empty set because + the first pass either repaired the row (now non-NULL + relative_fileloc) or skipped it (cursor advances past on the first + pass alone, but the second pass's predicate would also exclude + repaired rows directly). + """ + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + self._build_dataset(session, bundle_dir) + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + first_count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + session.expire_all() + after_first = { + row.dag_id: (row.bundle_name, row.relative_fileloc) + for row in session.execute(select(DagModel)).scalars() + } + + second_count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + session.expire_all() + after_second = { + row.dag_id: (row.bundle_name, row.relative_fileloc) + for row in session.execute(select(DagModel)).scalars() + } + + assert first_count == 1 # one reassignment fires on pass 1 + assert second_count == 0 # nothing to do on pass 2 + assert after_first == after_second + assert after_first["reassign"] == ("configured-bundle", "reassign.py") + assert after_first["backfill"] == ("configured-bundle", "backfill.py") + + def test_interleaved_dfps_do_not_overwrite_each_other( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """Two DFPs both SELECT the same row; the second to UPDATE must no-op. + + Mirrors a multi-DFP startup where both processes hit the repair at + the same time. The CAS guards on the UPDATE statements ensure that + whichever transaction commits second sees zero rows match its + WHERE clause, leaving the first commit authoritative. + """ + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + self._build_dataset(session, bundle_dir) + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + + # Hand-driven interleaving: simulate DFP A and DFP B both observing + # the unrepaired state, then DFP A commits first, then DFP B's + # UPDATEs run against committed state with the CAS guards in place. + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + # DFP B reads the legacy state first (still NULL relative_fileloc). + with create_session() as session_b: + pre_b = session_b.execute( + select(DagModel.dag_id, DagModel.relative_fileloc).order_by(DagModel.dag_id) + ).all() + assert [r[1] for r in pre_b] == [None, None] + + # DFP A runs the full repair (commits internally per chunk). + DagBundlesManager().reassign_dags_with_unconfigured_bundles() + + # DFP B now runs its repair against the post-A committed state. + # The CAS guards (relative_fileloc IS NULL) make every UPDATE a + # no-op because A has already filled in the values. + b_count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + assert b_count == 0 + + session.expire_all() + reassign_row = session.get(DagModel, "reassign") + backfill_row = session.get(DagModel, "backfill") + assert reassign_row.bundle_name == "configured-bundle" + assert reassign_row.relative_fileloc == "reassign.py" + assert backfill_row.bundle_name == "configured-bundle" + assert backfill_row.relative_fileloc == "backfill.py" + + +@pytest.mark.db_test +class TestSyncAndReassign: + """Tests for sync_bundles_to_db followed by reassign_dags_with_unconfigured_bundles.""" + + def _sync_and_reassign(self, config: list[dict], session) -> None: + """Sync bundles to DB and reassign DAGs with unconfigured bundles. + + :param config: Bundle config list to use for this sync cycle. + :param session: SQLAlchemy session. + """ + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + manager.sync_bundles_to_db(session=session) + session.commit() + manager.reassign_dags_with_unconfigured_bundles() + + @conf_vars({("core", "LOAD_EXAMPLES"): "False"}) + @pytest.mark.parametrize( + "second_config", + [ + pytest.param(SECOND_BUNDLE_CONFIG, id="bundle_removed_no_path_match"), + pytest.param(BASIC_BUNDLE_CONFIG, id="bundle_active_dag_unchanged"), + ], + ) + def test_sync_preserves_dag_bundle_without_path_match( + self, + clear_dags_and_bundles, + session, + second_config: list[dict], + ) -> None: + """Sync + reassign keeps the original bundle when no configured path matches the fileloc. + + Both parameter cases insert a Dag whose ``fileloc`` is ``/tmp/dag-1.py``, + which is not under any test bundle's path (``BasicBundle.path`` is a + no-op). When the original bundle is removed from config the helper used + to silently rewrite ``bundle_name`` to the first configured bundle even + with no path match; it now leaves the row alone so the row never + becomes active-but-un-runnable. When the bundle is still configured the + row is untouched as before. + + :param second_config: Bundle config for the second sync cycle. + """ + self._sync_and_reassign(BASIC_BUNDLE_CONFIG, session) + + _add_dag(session, "dag-1", "my-test-bundle") + session.commit() + + self._sync_and_reassign(second_config, session) + + dag = session.get(DagModel, "dag-1") + assert dag.bundle_name == "my-test-bundle" + + +@pytest.mark.db_test +class TestSkippedRowLifecycle: + """End-to-end coverage for the sync -> repair skip -> stale-scan -> re-parse path. + + A legacy 2.x row with an unconfigured bundle and a fileloc outside every + configured bundle is intentionally left untouched by the repair. The + inactive-bundle branch of ``deactivate_stale_dags`` then marks it stale, + and a later successful parse from a now-configured bundle must restore it + end-to-end. Each unit-level test exercises one stage of this lifecycle; + this test pins the whole sequence so a future refactor can't silently + break the recovery contract. + """ + + def test_skipped_row_is_recoverable_after_operator_fix( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + from airflow.dag_processing.collection import update_dag_parsing_results_in_db + from airflow.dag_processing.manager import DagFileProcessorManager + from airflow.sdk import DAG + from airflow.serialization.serialized_objects import LazyDeserializedDAG + + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + legacy_file = bundle_dir / "legacy.py" + legacy_file.write_text("# legacy dag") + + # Pre-sync state: legacy row points at the orphan bundle, fileloc is + # outside any configured bundle path, relative_fileloc is NULL. + orphan = DagBundleModel(name="orphan") + orphan.active = True + session.add(orphan) + session.flush() + legacy = DagModel( + dag_id="legacy_dag", + bundle_name="orphan", + fileloc="/elsewhere/legacy.py", + last_parsed_time=None, + ) + legacy.relative_fileloc = None + legacy.is_stale = False + session.add(legacy) + session.commit() + + # Operator removes the orphan bundle and configures a different one + # whose path doesn't contain the legacy fileloc. + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + manager.sync_bundles_to_db(session=session) + session.commit() + # Orphan bundle is now inactive; legacy row's bundle_name still points at it. + assert session.get(DagBundleModel, "orphan").active is False + + # Repair skips the row -- no fileloc match means no atomic + # bundle_name + relative_fileloc write is possible. + reassigned = manager.reassign_dags_with_unconfigured_bundles() + assert reassigned == 0 + session.expire_all() + refreshed = session.get(DagModel, "legacy_dag") + assert refreshed.bundle_name == "orphan" + assert refreshed.relative_fileloc is None + assert refreshed.is_stale is False + + # Stale-Dag scan deactivates the row via the inactive-bundle + # branch, which runs before the NULL relative_fileloc guard. + dfp_manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 60) + dfp_manager.deactivate_stale_dags(last_parsed={}) + session.expire_all() + stale_row = session.get(DagModel, "legacy_dag") + assert stale_row.is_stale is True + + # Operator fix: the legacy file is now inside the configured + # bundle's path. The parser re-parses it and the standard + # collection write resets is_stale, bundle_name, and + # relative_fileloc in a single transaction. + recovered_dag = DAG(dag_id="legacy_dag") + recovered_dag.fileloc = str(legacy_file) + recovered_dag.relative_fileloc = "legacy.py" + update_dag_parsing_results_in_db( + "configured-bundle", + None, + [LazyDeserializedDAG.from_dag(recovered_dag)], + {}, + None, + set(), + session, + ) + session.commit() + + session.expire_all() + restored = session.get(DagModel, "legacy_dag") + assert restored.is_stale is False + assert restored.bundle_name == "configured-bundle" + assert restored.relative_fileloc == "legacy.py" diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 6d10e5c1a0665..53bfb8140c6cf 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -1055,6 +1055,68 @@ def test_deactivate_stale_dags_marks_dags_in_inactive_bundles(self, session): ) assert is_stale_by_dag == {"dag_in_inactive_bundle": True, "dag_in_active_bundle": False} + @pytest.mark.usefixtures("testing_dag_bundle") + def test_deactivate_stale_dags_tolerates_null_relative_fileloc(self, session): + """An active Dag with ``relative_fileloc=None`` must not crash the stale scan. + + Upgrade leftover: the 0082 migration writes ``bundle_name`` without + ``relative_fileloc``, and the startup repair leaves a row untouched when its + ``fileloc`` is not under any configured bundle's path. The scanner must skip the + file-based check for such rows rather than calling ``Path(None)``. + """ + session.add( + DagModel( + dag_id="dag_null_relfileloc", + bundle_name="testing", + fileloc="/not/under/any/bundle.py", + relative_fileloc=None, + last_parsed_time=timezone.utcnow(), + is_stale=False, + ) + ) + session.flush() + + manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 60) + manager.deactivate_stale_dags(last_parsed={}) + + is_stale = session.scalar(select(DagModel.is_stale).where(DagModel.dag_id == "dag_null_relfileloc")) + assert is_stale is False + + @pytest.mark.usefixtures("testing_dag_bundle") + def test_deactivate_stale_dags_logs_stuck_legacy_row_count(self, session): + """Skipped NULL-``relative_fileloc`` rows are counted and surfaced via a single INFO log. + + Operators need visibility into how many legacy migration rows the + startup repair could not route, because every such row keeps raising + ``Requested bundle is not configured.`` at trigger time until the + operator restores a matching bundle. The log line is the only + operator-facing signal, so this test asserts it is emitted with the + expected count. (Allowed exception to the "don't assert on log + output" convention: the log line *is* the behaviour under test.) + """ + for dag_id in ("legacy_a", "legacy_b", "legacy_c"): + session.add( + DagModel( + dag_id=dag_id, + bundle_name="testing", + fileloc=f"/not/under/any/{dag_id}.py", + relative_fileloc=None, + last_parsed_time=timezone.utcnow(), + is_stale=False, + ) + ) + session.flush() + + manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 60) + with mock.patch.object(manager.log, "info") as mock_info: + manager.deactivate_stale_dags(last_parsed={}) + + legacy_log_calls = [ + call for call in mock_info.call_args_list if "legacy Dag" in (call.args[0] if call.args else "") + ] + assert len(legacy_log_calls) == 1 + assert legacy_log_calls[0].args[1] == 3 + @mock.patch("airflow.dag_processing.manager.is_lock_not_available_error") @pytest.mark.usefixtures("testing_dag_bundle") def test_deactivate_stale_dags_handles_lock_timeout(self, mock_is_lock_not_available, session, caplog): @@ -1610,6 +1672,29 @@ def test_deactivate_deleted_dags_return_value( assert session.get(DagModel, "test_dag1").is_stale is expected_dag1_stale assert session.get(DagModel, "test_dag2").is_stale is expected_dag2_stale + def test_deactivate_deleted_dags_marks_null_relative_fileloc_stale(self, dag_maker, session): + """DAGs with NULL ``relative_fileloc`` are also stale-marked when not in the observed file set. + + Legacy 2.x rows that the bundle backfill couldn't recover (``fileloc`` not under any active + bundle path) get treated as deleted on the first parse cycle. Alive rows self-heal when the + parser next succeeds and resets ``is_stale`` via ``update_dags``. + """ + with dag_maker("parsed_dag") as dag1: + dag1.relative_fileloc = "parsed_dag.py" + with dag_maker("legacy_dag") as dag2: + dag2.relative_fileloc = None + dag_maker.sync_dagbag_to_db() + + any_deactivated = DagModel.deactivate_deleted_dags( + bundle_name="dag_maker", + rel_filelocs=set(), + session=session, + ) + + assert any_deactivated is True + assert session.get(DagModel, "parsed_dag").is_stale is True + assert session.get(DagModel, "legacy_dag").is_stale is True + @pytest.mark.parametrize( ("active_files", "should_call_cleanup"), [ @@ -2120,6 +2205,21 @@ def test_fetch_callbacks_delegates_to_private_method(self): assert manager.fetch_callbacks() is expected private.assert_called_once_with() + @mock.patch("airflow.dag_processing.manager.DagBundlesManager") + def test_reassign_called_once_at_startup_not_on_refresh(self, mock_bundle_manager): + """ + reassign_dags_with_unconfigured_bundles is called exactly once by + sync_bundles, not by _refresh_dag_bundles. + """ + manager = DagFileProcessorManager(max_runs=1) + manager._dag_bundles = [] + + manager.sync_bundles() + mock_bundle_manager.return_value.reassign_dags_with_unconfigured_bundles.assert_called_once() + + manager._refresh_dag_bundles(known_files={}) + mock_bundle_manager.return_value.reassign_dags_with_unconfigured_bundles.assert_called_once() + def test_dag_with_assets(self, session, configure_testing_dag_bundle): """'Integration' test to ensure that the assets get parsed and stored correctly for parsed dags.""" test_dag_path = str(TEST_DAG_FOLDER / "test_assets.py") From 8acd35eaaf54d20e3b86461ccc9f028db8c39b17 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Wed, 27 May 2026 19:27:37 +0800 Subject: [PATCH 2/5] Add newsfragment and mention the workaround --- .../newsfragments/63185.significant.rst | 32 +++++++++++++++++++ .../airflow/dag_processing/bundles/manager.py | 7 ++-- 2 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 airflow-core/newsfragments/63185.significant.rst diff --git a/airflow-core/newsfragments/63185.significant.rst b/airflow-core/newsfragments/63185.significant.rst new file mode 100644 index 0000000000000..ca1d335fad2e8 --- /dev/null +++ b/airflow-core/newsfragments/63185.significant.rst @@ -0,0 +1,32 @@ +Fix 2.x to 3.0+ upgrade failure when a custom Dag bundle is configured + +The ``0082_3_1_0_make_bundle_name_not_nullable`` migration assigned every legacy row +``bundle_name='dags-folder'`` and left ``relative_fileloc`` NULL, so triggering a DagRun raised +``Requested bundle 'dags-folder' is not configured.`` on any deployment that uses a bundle other +than the default ``dags-folder``. + +A one-shot repair now runs at ``DagFileProcessorManager`` startup, after ``sync_bundles_to_db``: + +- Each legacy-candidate row (NULL ``relative_fileloc`` AND no ``DagVersion``) is routed to the + most-specific configured bundle whose absolute path contains the Dag's ``fileloc``; both + ``bundle_name`` and ``relative_fileloc`` are written atomically. Rows whose ``fileloc`` is not + under any configured bundle are left untouched (writing ``bundle_name`` without a verified + ``relative_fileloc`` would produce a row task workers cannot execute) and self-heal via the + existing staleness lifecycle once the operator adds a matching bundle: ``sync_bundles_to_db`` + deactivates the orphan bundle, ``deactivate_stale_dags`` flips the row stale, and the next + successful parse resets everything through ``update_dag_parsing_results_in_db``. No manual + ``airflow dags reserialize`` is required, though it can be used to force the parse path to + rewrite ``bundle_name`` and ``relative_fileloc`` immediately. +- A global ``DagVersion`` fast-skip short-circuits the repair on any deployment that has already + completed a 3.x parse cycle. ``DagVersion`` rows are written only by the parse path, which + overwrites both ``bundle_name`` and ``relative_fileloc`` on every parse, so once any row has a + ``DagVersion`` the parse loop is doing the same work the repair would do (and the staleness + lifecycle handles rows whose files no longer match any configured bundle). The probe is a PK + hit on ``dag_version`` vs. a sequential scan of ``dag``. +- High-availability deployments running multiple Dag processors are safe: each chunked UPDATE + re-asserts the legacy-candidate predicate as a compare-and-swap guard, so whichever DFP commits + first becomes authoritative and any later UPDATE (from a second DFP or from a parser that + landed a fresh write between SELECT and UPDATE) matches zero rows and is a no-op. Each chunk + runs in its own internally-owned transaction to bound the row-lock window. +- Multi-team deployments are safe: a bundle path belongs to at most one team, so routing by the + most-specific containing path cannot cross a team boundary. diff --git a/airflow-core/src/airflow/dag_processing/bundles/manager.py b/airflow-core/src/airflow/dag_processing/bundles/manager.py index 7820f80ca6419..bd336828967b3 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/manager.py +++ b/airflow-core/src/airflow/dag_processing/bundles/manager.py @@ -549,8 +549,11 @@ def reassign_dags_with_unconfigured_bundles(self) -> int: if total_skipped: self.log.warning( "Skipped %d legacy Dag(s) whose fileloc is not under any configured bundle; " - "they will be marked stale until a matching bundle is added to " - "dag_bundle_config_list and the next parse restores them.", + "triggering them will keep raising \"Requested bundle '{name}' is not configured.\" " + "until a bundle whose path contains the fileloc is added to " + "dag_bundle_config_list. The next parse will then restore them automatically, " + "or run `airflow dags reserialize` to force the parse path to rewrite " + "bundle_name and relative_fileloc immediately.", total_skipped, ) From 56e6dfe05c58fa5a817ba53a6099f58d1d0f019b Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Tue, 9 Jun 2026 10:39:37 +0800 Subject: [PATCH 3/5] Clarify bundle path matching and NULL relative_fileloc handling Normalize bundle paths once when building the active-bundle map so the fileloc match uses plain Path.relative_to instead of mixing os.path with pathlib per iteration, and explain why the lexical normpath is required. Expand the cryptic stale-check comment to describe the legacy 2.x NULL relative_fileloc case and link the tracking issue. --- .../airflow/dag_processing/bundles/manager.py | 24 ++++++++++++------- .../src/airflow/dag_processing/manager.py | 10 +++++--- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/bundles/manager.py b/airflow-core/src/airflow/dag_processing/bundles/manager.py index bd336828967b3..d0f76c12a629d 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/manager.py +++ b/airflow-core/src/airflow/dag_processing/bundles/manager.py @@ -62,15 +62,20 @@ def _best_bundle_for_fileloc( the deepest bundle wins when paths overlap. Returns ``None`` when ``fileloc`` is not under any bundle's path. + + The bundle paths in ``descending_bundle_paths`` are expected to already be + normalized (see ``_resolve_active_bundle_paths``); only ``fileloc`` is + normalized here. ``os.path.normpath`` is used rather than ``Path.resolve`` + because it collapses ``.``/``..`` purely lexically: ``resolve`` would touch + the filesystem and follow symlinks, which is wrong for legacy rows whose + files may no longer exist on disk. """ file_path = Path(os.path.normpath(fileloc)) for name, path in descending_bundle_paths.items(): try: - relative = file_path.relative_to(os.path.normpath(path)) + relative = file_path.relative_to(path) except ValueError: continue - if relative.is_absolute() or os.pardir in relative.parts: - continue return name, str(relative) return None @@ -561,16 +566,19 @@ def reassign_dags_with_unconfigured_bundles(self) -> int: def _resolve_active_bundle_paths(self, *, session: Session) -> dict[str, Path]: """ - Return resolved absolute paths for configured-and-active bundles. + Return normalized paths for configured-and-active bundles. A bundle is "configured-and-active" when it is both in the manager's config and persisted as ``active=True`` in ``dag_bundle`` -- bundles missing from ``dag_bundle`` are excluded so they can't trigger an FK violation if used as a reassignment target. - The returned dict is ``{name: absolute_path}`` ordered by path length - descending so the most specific bundle wins in - ``_best_bundle_for_fileloc``. + Paths are normalized with ``os.path.normpath`` so the prefix check in + ``_best_bundle_for_fileloc`` can use plain ``Path.relative_to`` and so + the length-based ordering below reflects the canonical path. + + The returned dict is ``{name: path}`` ordered by path length descending + so the most specific bundle wins in ``_best_bundle_for_fileloc``. """ active_db_names = set( session.scalars(select(DagBundleModel.name).where(DagBundleModel.active.is_(True))) @@ -580,7 +588,7 @@ def _resolve_active_bundle_paths(self, *, session: Session) -> dict[str, Path]: for bundle in self.get_all_dag_bundles(): if bundle.name not in active_db_names: continue - active_bundle_paths[bundle.name] = bundle.path + active_bundle_paths[bundle.name] = Path(os.path.normpath(bundle.path)) return dict(sorted(active_bundle_paths.items(), key=lambda item: len(str(item[1])), reverse=True)) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 24fbee652f73e..1627fc95e92fb 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -463,9 +463,13 @@ def deactivate_stale_dags( ) to_deactivate.add(dag.dag_id) continue - # Legacy 0082-migration row the startup repair could not route. - # Path(None) would crash the file-based check below; skip and count - # so it can be surfaced after the loop. + # A Dag upgraded from Airflow 2.x can still have a NULL relative_fileloc: + # the 0082 migration adds the column as nullable, and the startup repair + # in DagBundlesManager only backfills it when the Dag's fileloc resolves to + # a configured bundle. Rows whose fileloc matches no bundle stay NULL, so + # the time-based stale check below would build Path(None) and crash. Skip + # them here and count them so the total is surfaced after the loop. + # See https://github.com/apache/airflow/issues/63323. if dag.relative_fileloc is None: stuck_legacy_rows += 1 continue From a49b9806ee7f2967c30df375e3c9ae1e87a543f9 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Tue, 9 Jun 2026 12:34:19 +0800 Subject: [PATCH 4/5] Use plain Path.relative_to for bundle fileloc routing Match BaseDagImporter.get_relative_path instead of normalizing with os.path.normpath. Filelocs come from the Dag processor parsing admin-controlled bundle files, so they are trusted and need no path-traversal defense, and using the same relative_to check means the startup repair writes the same relative_fileloc the next parse computes. --- .../airflow/dag_processing/bundles/manager.py | 21 +++++++--------- .../bundles/test_dag_bundle_manager.py | 24 +++---------------- 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/bundles/manager.py b/airflow-core/src/airflow/dag_processing/bundles/manager.py index d0f76c12a629d..0e939a6c4c204 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/manager.py +++ b/airflow-core/src/airflow/dag_processing/bundles/manager.py @@ -63,14 +63,13 @@ def _best_bundle_for_fileloc( Returns ``None`` when ``fileloc`` is not under any bundle's path. - The bundle paths in ``descending_bundle_paths`` are expected to already be - normalized (see ``_resolve_active_bundle_paths``); only ``fileloc`` is - normalized here. ``os.path.normpath`` is used rather than ``Path.resolve`` - because it collapses ``.``/``..`` purely lexically: ``resolve`` would touch - the filesystem and follow symlinks, which is wrong for legacy rows whose - files may no longer exist on disk. + Uses the same plain ``Path.relative_to`` check as + ``BaseDagImporter.get_relative_path``, so the ``relative_fileloc`` written + here matches what the next parse computes for the same file. Filelocs are + produced by the Dag processor parsing admin-controlled bundle files, so they + are trusted and need no path-traversal normalization. """ - file_path = Path(os.path.normpath(fileloc)) + file_path = Path(fileloc) for name, path in descending_bundle_paths.items(): try: relative = file_path.relative_to(path) @@ -566,17 +565,13 @@ def reassign_dags_with_unconfigured_bundles(self) -> int: def _resolve_active_bundle_paths(self, *, session: Session) -> dict[str, Path]: """ - Return normalized paths for configured-and-active bundles. + Return paths for configured-and-active bundles. A bundle is "configured-and-active" when it is both in the manager's config and persisted as ``active=True`` in ``dag_bundle`` -- bundles missing from ``dag_bundle`` are excluded so they can't trigger an FK violation if used as a reassignment target. - Paths are normalized with ``os.path.normpath`` so the prefix check in - ``_best_bundle_for_fileloc`` can use plain ``Path.relative_to`` and so - the length-based ordering below reflects the canonical path. - The returned dict is ``{name: path}`` ordered by path length descending so the most specific bundle wins in ``_best_bundle_for_fileloc``. """ @@ -588,7 +583,7 @@ def _resolve_active_bundle_paths(self, *, session: Session) -> dict[str, Path]: for bundle in self.get_all_dag_bundles(): if bundle.name not in active_db_names: continue - active_bundle_paths[bundle.name] = Path(os.path.normpath(bundle.path)) + active_bundle_paths[bundle.name] = bundle.path return dict(sorted(active_bundle_paths.items(), key=lambda item: len(str(item[1])), reverse=True)) diff --git a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py index aff82be7242c8..c9ddd07912ce3 100644 --- a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py +++ b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py @@ -508,7 +508,7 @@ def _add_dag(session, dag_id: str, bundle_name: str) -> DagModel: class TestBestBundleForFileloc: - """Tests for ``_best_bundle_for_fileloc`` path normalisation and safety.""" + """Tests for ``_best_bundle_for_fileloc`` path matching.""" def test_returns_relative_path_for_match(self) -> None: assert _best_bundle_for_fileloc("/dags/team_x/dag.py", {"team-x": Path("/dags/team_x")}) == ( @@ -522,27 +522,9 @@ def test_returns_none_for_no_match(self) -> None: def test_returns_none_for_empty_paths(self) -> None: assert _best_bundle_for_fileloc("/dags/dag.py", {}) is None - @pytest.mark.parametrize( - "fileloc", - [ - pytest.param("/dags/foo/../../outside.py", id="parent_traversal_escapes_root"), - pytest.param("/dags/../outside.py", id="parent_traversal_at_root"), - pytest.param("/dags/../../etc/passwd", id="multiple_parent_traversal"), - ], - ) - def test_rejects_parent_traversal_filelocs(self, fileloc: str) -> None: - """A fileloc with ``..`` segments that escape the bundle root must not match. - - Without lexical normalisation, ``Path.relative_to`` on an unnormalised - path with ``..`` segments returns a relative path like - ``foo/../../outside.py`` that, when later joined with the bundle root, - addresses files outside it. Normalising both sides collapses the - traversal so the fileloc is no longer under the bundle and the helper - returns ``None``. - """ - assert _best_bundle_for_fileloc(fileloc, {"dags": Path("/dags")}) is None - def test_normalises_redundant_separators_and_dots(self) -> None: + # ``Path`` itself collapses ``//`` and ``.`` segments on construction, + # so the helper matches without any explicit normalization. assert _best_bundle_for_fileloc("/dags//team_x/./dag.py", {"team-x": Path("/dags/team_x")}) == ( "team-x", "dag.py", From 0ac8289eee17bf52d75bbacd16278c49848c02e8 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 22 Jun 2026 15:35:34 +0900 Subject: [PATCH 5/5] Add critial regression guard test case --- .../tests/unit/dag_processing/test_manager.py | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 53bfb8140c6cf..dad33ac81e2a2 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2220,6 +2220,79 @@ def test_reassign_called_once_at_startup_not_on_refresh(self, mock_bundle_manage manager._refresh_dag_bundles(known_files={}) mock_bundle_manager.return_value.reassign_dags_with_unconfigured_bundles.assert_called_once() + @pytest.mark.parametrize( + "apply_patch", + [False, True], + ids=["without_patch", "with_patch"], + ) + def test_sync_bundles_repairs_legacy_bundle_before_parsing_loop( + self, apply_patch, session, tmp_path, configure_dag_bundles + ): + """DFP initial setup alone re-homes a 2.x->3.x legacy Dag to its configured bundle. + + Window right after ``airflow db migrate`` but before the parsing loop: the ``0082`` + migration left the row with ``bundle_name='dags-folder'`` and NULL ``relative_fileloc``, + and ``0047`` emptied ``serialized_dag``/``dag_version`` (so the Dag is unserialized and + ``get_bundle`` -- what the worker calls at run time -- is the probe, not the REST trigger). + ``sync_bundles()`` runs ``sync_bundles_to_db`` and, on the patched build, + ``reassign_dags_with_unconfigured_bundles``; without it (reassign mocked off) the row stays + on the unconfigured ``dags-folder`` and ``get_bundle`` raises. See + https://github.com/apache/airflow/issues/63323. + """ + dags_folder = tmp_path / "dags" + dags_folder.mkdir() + legacy_file = dags_folder / "af2_upgrade_af3_dag.py" + legacy_file.write_text("# 2.x dag") + + # State right after the 0082 migration: the ``dags-folder`` row exists in dag_bundle but the + # operator's bundle is not registered yet (sync_bundles_to_db does that below), and + # serialized_dag/dag_version are empty (0047 wiped them; setup_method clears them too). + legacy_default_bundle = DagBundleModel(name="dags-folder") + legacy_default_bundle.active = True + session.add(legacy_default_bundle) + session.flush() + legacy_dag = DagModel( + dag_id="legacy_dag", + bundle_name="dags-folder", + fileloc=str(legacy_file), + ) + legacy_dag.relative_fileloc = None + session.add(legacy_dag) + session.commit() + + bundle_name = "upgrade_test_dag_bundle" + with configure_dag_bundles({bundle_name: dags_folder}): + manager = DagFileProcessorManager(max_runs=1) + if apply_patch: + manager.sync_bundles() + else: + # Pre-patch sync_bundles ran sync_bundles_to_db only; mocking reassign to a no-op + # reproduces that build without forking the source under test. + with mock.patch.object( + DagBundlesManager, + "reassign_dags_with_unconfigured_bundles", + return_value=0, + ): + manager.sync_bundles() + + session.expire_all() + refreshed = session.get(DagModel, "legacy_dag") + + if apply_patch: + assert refreshed.bundle_name == bundle_name + assert refreshed.relative_fileloc == "af2_upgrade_af3_dag.py" + # The worker can now resolve the bundle: the symptom is gone. + assert DagBundlesManager().get_bundle(bundle_name).name == bundle_name + else: + assert refreshed.bundle_name == "dags-folder" + assert refreshed.relative_fileloc is None + # The worker would hit the original incident at run time. + with pytest.raises( + ValueError, + match=re.escape("Requested bundle 'dags-folder' is not configured."), + ): + DagBundlesManager().get_bundle(refreshed.bundle_name) + def test_dag_with_assets(self, session, configure_testing_dag_bundle): """'Integration' test to ensure that the assets get parsed and stored correctly for parsed dags.""" test_dag_path = str(TEST_DAG_FOLDER / "test_assets.py")