diff --git a/airflow-core/newsfragments/63185.significant.rst b/airflow-core/newsfragments/63185.significant.rst new file mode 100644 index 0000000000000..ca1d335fad2e8 --- /dev/null +++ b/airflow-core/newsfragments/63185.significant.rst @@ -0,0 +1,32 @@ +Fix 2.x to 3.0+ upgrade failure when a custom Dag bundle is configured + +The ``0082_3_1_0_make_bundle_name_not_nullable`` migration assigned every legacy row +``bundle_name='dags-folder'`` and left ``relative_fileloc`` NULL, so triggering a DagRun raised +``Requested bundle 'dags-folder' is not configured.`` on any deployment that uses a bundle other +than the default ``dags-folder``. + +A one-shot repair now runs at ``DagFileProcessorManager`` startup, after ``sync_bundles_to_db``: + +- Each legacy-candidate row (NULL ``relative_fileloc`` AND no ``DagVersion``) is routed to the + most-specific configured bundle whose absolute path contains the Dag's ``fileloc``; both + ``bundle_name`` and ``relative_fileloc`` are written atomically. Rows whose ``fileloc`` is not + under any configured bundle are left untouched (writing ``bundle_name`` without a verified + ``relative_fileloc`` would produce a row task workers cannot execute) and self-heal via the + existing staleness lifecycle once the operator adds a matching bundle: ``sync_bundles_to_db`` + deactivates the orphan bundle, ``deactivate_stale_dags`` flips the row stale, and the next + successful parse resets everything through ``update_dag_parsing_results_in_db``. No manual + ``airflow dags reserialize`` is required, though it can be used to force the parse path to + rewrite ``bundle_name`` and ``relative_fileloc`` immediately. +- A global ``DagVersion`` fast-skip short-circuits the repair on any deployment that has already + completed a 3.x parse cycle. ``DagVersion`` rows are written only by the parse path, which + overwrites both ``bundle_name`` and ``relative_fileloc`` on every parse, so once any row has a + ``DagVersion`` the parse loop is doing the same work the repair would do (and the staleness + lifecycle handles rows whose files no longer match any configured bundle). The probe is a PK + hit on ``dag_version`` vs. a sequential scan of ``dag``. +- High-availability deployments running multiple Dag processors are safe: each chunked UPDATE + re-asserts the legacy-candidate predicate as a compare-and-swap guard, so whichever DFP commits + first becomes authoritative and any later UPDATE (from a second DFP or from a parser that + landed a fresh write between SELECT and UPDATE) matches zero rows and is a no-op. Each chunk + runs in its own internally-owned transaction to bound the row-lock window. +- Multi-team deployments are safe: a bundle path belongs to at most one team, so routing by the + most-specific containing path cannot cross a team boundary. diff --git a/airflow-core/src/airflow/dag_processing/bundles/manager.py b/airflow-core/src/airflow/dag_processing/bundles/manager.py index 78c54266eda9f..0e939a6c4c204 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/manager.py +++ b/airflow-core/src/airflow/dag_processing/bundles/manager.py @@ -20,11 +20,13 @@ import logging import os import warnings -from typing import TYPE_CHECKING +from collections import defaultdict +from pathlib import Path +from typing import TYPE_CHECKING, cast from itsdangerous import URLSafeSerializer from pydantic import BaseModel, ValidationError -from sqlalchemy import delete, select +from sqlalchemy import delete, exists, select, update from airflow._shared.module_loading import import_string from airflow.configuration import conf @@ -34,17 +36,48 @@ from airflow.models.team import Team from airflow.providers_manager import ProvidersManager from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.session import NEW_SESSION, provide_session +from airflow.utils.session import NEW_SESSION, create_session, provide_session if TYPE_CHECKING: from collections.abc import Iterable + from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session log = logging.getLogger(__name__) _example_dag_bundle_name = "example_dags" +# Chunk size for the one-time startup repair of unconfigured bundles. +_REASSIGN_BATCH_SIZE = 1000 + + +def _best_bundle_for_fileloc( + fileloc: str, descending_bundle_paths: dict[str, Path] +) -> tuple[str, str] | None: + """ + Return ``(bundle_name, relative_fileloc)`` for the first bundle whose path contains ``fileloc``. + + ``descending_bundle_paths`` must be sorted by path length descending so + the deepest bundle wins when paths overlap. + + Returns ``None`` when ``fileloc`` is not under any bundle's path. + + Uses the same plain ``Path.relative_to`` check as + ``BaseDagImporter.get_relative_path``, so the ``relative_fileloc`` written + here matches what the next parse computes for the same file. Filelocs are + produced by the Dag processor parsing admin-controlled bundle files, so they + are trusted and need no path-traversal normalization. + """ + file_path = Path(fileloc) + for name, path in descending_bundle_paths.items(): + try: + relative = file_path.relative_to(path) + except ValueError: + continue + return name, str(relative) + return None + class _ExternalBundleConfig(BaseModel): """Schema defining the user-specified configuration for a DAG bundle.""" @@ -366,6 +399,194 @@ def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]: session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == name)) self.log.info("Deleted import errors for bundle %s which is no longer configured", name) + # Airflow sets autoflush=False, so subsequent reads in the same session + # need an explicit flush to see ORM-side bundle state changes. + session.flush() + + def reassign_dags_with_unconfigured_bundles(self) -> int: + """ + Reassign Dags pointing at unconfigured bundles to a configured one. + + Side effect of the ``0082_3_1_0_make_bundle_name_not_nullable`` + migration (#63323): legacy rows get ``bundle_name='dags-folder'`` + and NULL ``relative_fileloc``, which raises ``Requested bundle + '{name}' is not configured.`` at trigger time when the deployment + uses a custom bundle. + + Each legacy-candidate row (NULL ``relative_fileloc`` and no + ``DagVersion``) is routed to the most-specific configured bundle + whose path contains its ``fileloc``, writing ``relative_fileloc`` + atomically. Rows whose ``fileloc`` is under no configured bundle + are left untouched -- writing ``bundle_name`` without a verified + ``relative_fileloc`` would produce a row task workers cannot + execute -- and instead self-heal via the normal staleness + lifecycle: ``sync_bundles_to_db`` deactivates the old bundle, the + stale-scan marks the row stale, and the next successful parse + from any configured bundle resets everything via + ``update_dag_parsing_results_in_db``. No manual ``airflow dags + reserialize`` is required. + + Multi-team-safe because a bundle path belongs to at most one team. + Each chunk runs in its own internally-owned transaction so the + row-lock window stays bounded and no caller-provided session is + committed. + + :return: Number of Dags reassigned. + """ + # Import here to avoid circular import + # (manager -> dag -> dagrun -> taskinstance -> dag_version -> manager) + from airflow.models.dag import DagModel + from airflow.models.dag_version import DagVersion + + # Fast-skip once any 3.x parse cycle has run. DagVersion is written + # only by the parse path, and that path overwrites both bundle_name + # and relative_fileloc on every parse (see + # ``DagModelOperation.update_dags``), so any legacy row whose file + # is under a configured bundle self-heals at the next parse and any + # row whose file is under no configured bundle self-heals via the + # staleness lifecycle -- reassign has no work the parse path will + # not do itself. The probe is an index hit on dag_version's PK + # vs. a sequential scan of dag (no index on relative_fileloc). + with create_session() as session: + if session.scalar(select(DagVersion.id).limit(1)) is not None: + return 0 + + with create_session() as session: + if not (active_bundle_paths := self._resolve_active_bundle_paths(session=session)): + self.log.info( + "No active Dag bundles with resolvable paths; skipping reassignment of Dags " + "with unconfigured bundles." + ) + return 0 + + # Chunked UPDATEs ordered by dag_id, one transaction per chunk; repaired + # rows drop out of the predicate because writing relative_fileloc + # makes the IS NULL clause false. + # + # Legacy-candidate predicate (rows never parsed in 3.x): NULL + # relative_fileloc (the 0082 migration leaves it NULL) AND NOT EXISTS + # DagVersion (the parse path writes DagModel.bundle_name before the + # DagVersion). Equivalent under that invariant; both stated as + # defense in depth and repeated on the UPDATE itself as a CAS guard + # so a concurrent parser write wins the race. + movements: dict[tuple[str | None, str], int] = defaultdict(int) + total_reassigned = 0 + total_backfilled = 0 + total_skipped = 0 + last_dag_id: str | None = None + + while True: + with create_session() as session: + query = ( + select(DagModel.dag_id, DagModel.bundle_name, DagModel.fileloc) + .where( + DagModel.relative_fileloc.is_(None), + ~exists().where(DagVersion.dag_id == DagModel.dag_id), + ) + .order_by(DagModel.dag_id) + .limit(_REASSIGN_BATCH_SIZE) + ) + if last_dag_id is not None: + query = query.where(DagModel.dag_id > last_dag_id) + + if not (chunk := session.execute(query).all()): + break + last_dag_id = chunk[-1].dag_id + + # Route every legacy row by fileloc, not just those on + # unconfigured bundles, so a migration-assigned dags-folder + # row whose file lives under a different configured bundle + # gets relocated instead of stranded. Classify as skip + # (no match), backfill (match == current bundle), or + # reassign (match != current bundle). + chunk_updates: list[tuple[str, str | None, str, str]] = [] + for row in chunk: + match = _best_bundle_for_fileloc(row.fileloc, active_bundle_paths) if row.fileloc else None + if match is None: + total_skipped += 1 + continue + target, relative = match + chunk_updates.append((row.dag_id, row.bundle_name, target, relative)) + + if not chunk_updates: + continue + + with create_session() as session: + # create_session commits on context exit, bounding the + # row-lock window to one chunk. + for dag_id, prev_bundle, target, relative in chunk_updates: + result = cast( + "CursorResult", + session.execute( + update(DagModel) + .where( + DagModel.dag_id == dag_id, + DagModel.relative_fileloc.is_(None), + ~exists().where(DagVersion.dag_id == DagModel.dag_id), + ) + .values(relative_fileloc=relative, bundle_name=target) + .execution_options(synchronize_session=False) + ), + ) + + if result.rowcount: + # Rowcount is the source of truth for whether the CAS actually fired + if target == prev_bundle: + total_backfilled += 1 + else: + movements[(prev_bundle, target)] += 1 + total_reassigned += 1 + else: + self.log.debug("Skipping repair for Dag '%s': lost race to parser.", dag_id) + + for (prev, target), n in sorted(movements.items(), key=lambda item: (str(item[0][0]), item[0][1])): + self.log.info( + "Reassigning %d Dag(s) from unconfigured bundle '%s' to '%s'", + n, + prev, + target, + ) + + if total_backfilled: + self.log.info("Backfilled relative_fileloc for %d legacy Dag(s).", total_backfilled) + + if total_skipped: + self.log.warning( + "Skipped %d legacy Dag(s) whose fileloc is not under any configured bundle; " + "triggering them will keep raising \"Requested bundle '{name}' is not configured.\" " + "until a bundle whose path contains the fileloc is added to " + "dag_bundle_config_list. The next parse will then restore them automatically, " + "or run `airflow dags reserialize` to force the parse path to rewrite " + "bundle_name and relative_fileloc immediately.", + total_skipped, + ) + + return total_reassigned + + def _resolve_active_bundle_paths(self, *, session: Session) -> dict[str, Path]: + """ + Return paths for configured-and-active bundles. + + A bundle is "configured-and-active" when it is both in the manager's + config and persisted as ``active=True`` in ``dag_bundle`` -- bundles + missing from ``dag_bundle`` are excluded so they can't trigger an FK + violation if used as a reassignment target. + + The returned dict is ``{name: path}`` ordered by path length descending + so the most specific bundle wins in ``_best_bundle_for_fileloc``. + """ + active_db_names = set( + session.scalars(select(DagBundleModel.name).where(DagBundleModel.active.is_(True))) + ) + + active_bundle_paths: dict[str, Path] = {} + for bundle in self.get_all_dag_bundles(): + if bundle.name not in active_db_names: + continue + active_bundle_paths[bundle.name] = bundle.path + + return dict(sorted(active_bundle_paths.items(), key=lambda item: len(str(item[1])), reverse=True)) + @staticmethod def _extract_template_params(bundle_instance: BaseDagBundle) -> dict: """ diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index e43920f2618bc..1627fc95e92fb 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -318,7 +318,9 @@ def _exit_gracefully(self, signum, frame): def sync_bundles(self) -> None: """Sync configured DAG bundles to the metadata database.""" - DagBundlesManager().sync_bundles_to_db() + dag_bundle_manager = DagBundlesManager() + dag_bundle_manager.sync_bundles_to_db() + dag_bundle_manager.reassign_dags_with_unconfigured_bundles() def get_all_bundles(self) -> list[BaseDagBundle]: """Return configured DAG bundles filtered by ``bundle_names_to_parse`` if provided.""" @@ -449,6 +451,7 @@ def deactivate_stale_dags( ).where(~DagModel.is_stale) dags_parsed = session.execute(query) + stuck_legacy_rows = 0 for dag in dags_parsed: # Dags whose bundle has been removed from config (bundle no longer active) are stale — # the processor has stopped parsing their files, so the time-based check below would never fire. @@ -460,6 +463,16 @@ def deactivate_stale_dags( ) to_deactivate.add(dag.dag_id) continue + # A Dag upgraded from Airflow 2.x can still have a NULL relative_fileloc: + # the 0082 migration adds the column as nullable, and the startup repair + # in DagBundlesManager only backfills it when the Dag's fileloc resolves to + # a configured bundle. Rows whose fileloc matches no bundle stay NULL, so + # the time-based stale check below would build Path(None) and crash. Skip + # them here and count them so the total is surfaced after the loop. + # See https://github.com/apache/airflow/issues/63323. + if dag.relative_fileloc is None: + stuck_legacy_rows += 1 + continue # When the Dag's last_parsed_time is more than the stale_dag_threshold older than the # Dag file's last_finish_time, the Dag is considered stale as has apparently been removed from the file, # This is especially relevant for Dag files that generate Dags in a dynamic manner. @@ -496,6 +509,15 @@ def deactivate_stale_dags( else: raise + if stuck_legacy_rows: + # Surface how many legacy rows the startup repair could not route; + # each one keeps raising "Requested bundle is not configured." until + # a matching bundle is added to dag_bundle_config_list. + self.log.info( + "Skipped stale check for %d legacy Dag(s) with NULL relative_fileloc.", + stuck_legacy_rows, + ) + def _run_parsing_loop(self): # initialize cache to mutualize calls to Variable.get in DAGs # needs to be done before this process is forked to create the DAG parsing processes. diff --git a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py index 0d17069c831dd..c9ddd07912ce3 100644 --- a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py +++ b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py @@ -17,22 +17,27 @@ from __future__ import annotations +import contextlib import json import os from contextlib import nullcontext +from pathlib import Path +from unittest import mock from unittest.mock import patch import pytest -from sqlalchemy import func, select +from sqlalchemy import func, select, update from airflow.dag_processing.bundles.base import BaseDagBundle -from airflow.dag_processing.bundles.manager import DagBundlesManager +from airflow.dag_processing.bundles.manager import DagBundlesManager, _best_bundle_for_fileloc from airflow.exceptions import AirflowConfigException +from airflow.models.dag import DagModel from airflow.models.dagbundle import DagBundleModel from airflow.models.errors import ParseImportError +from airflow.utils.session import create_session from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.db import clear_db_dag_bundles +from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags @pytest.mark.parametrize( @@ -107,8 +112,9 @@ def refresh(self): def get_current_version(self): pass - def path(self): - pass + @property + def path(self) -> Path: + return Path("/__basic_bundle_unmatched__") BASIC_BUNDLE_CONFIG = [ @@ -118,6 +124,13 @@ def path(self): "kwargs": {"refresh_interval": 1}, } ] +SECOND_BUNDLE_CONFIG = [ + { + "name": "second-bundle", + "classpath": "unit.dag_processing.bundles.test_dag_bundle_manager.BasicBundle", + "kwargs": {"refresh_interval": 1}, + } +] def test_get_bundle(): @@ -476,3 +489,1244 @@ def test_get_all_bundle_names(): # the naming suffix instead of pinning an exact list. extra = [n for n in bundle_names if n not in {"dags-folder", "example_dags"}] assert all(n.endswith("-example-dags") for n in extra) + + +@pytest.fixture +def clear_dags_and_bundles(): + clear_db_dags() + clear_db_dag_bundles() + yield + clear_db_dags() + clear_db_dag_bundles() + + +def _add_dag(session, dag_id: str, bundle_name: str) -> DagModel: + dag = DagModel(dag_id=dag_id, bundle_name=bundle_name, fileloc=f"/tmp/{dag_id}.py") + session.add(dag) + session.flush() + return dag + + +class TestBestBundleForFileloc: + """Tests for ``_best_bundle_for_fileloc`` path matching.""" + + def test_returns_relative_path_for_match(self) -> None: + assert _best_bundle_for_fileloc("/dags/team_x/dag.py", {"team-x": Path("/dags/team_x")}) == ( + "team-x", + "dag.py", + ) + + def test_returns_none_for_no_match(self) -> None: + assert _best_bundle_for_fileloc("/elsewhere/dag.py", {"team-x": Path("/dags/team_x")}) is None + + def test_returns_none_for_empty_paths(self) -> None: + assert _best_bundle_for_fileloc("/dags/dag.py", {}) is None + + def test_normalises_redundant_separators_and_dots(self) -> None: + # ``Path`` itself collapses ``//`` and ``.`` segments on construction, + # so the helper matches without any explicit normalization. + assert _best_bundle_for_fileloc("/dags//team_x/./dag.py", {"team-x": Path("/dags/team_x")}) == ( + "team-x", + "dag.py", + ) + + +@pytest.mark.db_test +class TestReassignDagsWithUnconfiguredBundles: + """Tests for DagBundlesManager.reassign_dags_with_unconfigured_bundles.""" + + def _manager_with_bundle_names(self, names: list[str]) -> DagBundlesManager: + """Return a manager whose ``_resolve_active_bundle_paths`` reports *names* with non-matching paths. + + The fake paths are absolute roots that cannot contain any + ``/tmp/{dag_id}.py`` test fileloc, so rows are routed as unmatched + unless a test explicitly arranges otherwise. + """ + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(BASIC_BUNDLE_CONFIG)}, + ): + manager = DagBundlesManager() + paths = {name: Path(f"/__unmatched_for_test__/{name}") for name in names} + manager._resolve_active_bundle_paths = lambda *, session: paths # type: ignore[method-assign] + return manager + + def test_no_configured_bundles_is_noop(self, clear_dags_and_bundles, session): + """Return 0 without raising when no bundles are configured.""" + manager = self._manager_with_bundle_names([]) + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + + def test_already_configured_is_noop(self, clear_dags_and_bundles, session) -> None: + """No reassignment when every Dag already points at a configured bundle.""" + bundle = DagBundleModel(name="bundle-a") + bundle.active = True + session.add(bundle) + session.flush() + _add_dag(session, "dag-1", "bundle-a") + session.commit() + + manager = self._manager_with_bundle_names(["bundle-a"]) + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + assert session.get(DagModel, "dag-1").bundle_name == "bundle-a" + + def test_unmatched_fileloc_leaves_row_untouched(self, clear_dags_and_bundles, session, caplog) -> None: + """Rows whose fileloc has no configured bundle path keep their original bundle. + + The fallback that wrote ``bundle_name`` without a verified + ``relative_fileloc`` produced active-but-un-runnable rows, so the + helper now leaves such rows on their unconfigured bundle and emits a + single warning naming the count. + """ + active = DagBundleModel(name="active") + active.active = True + removed = DagBundleModel(name="removed-bundle") + removed.active = False + session.add(active) + session.add(removed) + session.flush() + _add_dag(session, "dag-1", "removed-bundle") + _add_dag(session, "dag-2", "removed-bundle") + session.commit() + + manager = self._manager_with_bundle_names(["active"]) + with caplog.at_level("WARNING", logger="airflow.dag_processing.bundles.manager.DagBundlesManager"): + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + + session.expire_all() + for dag_id in ("dag-1", "dag-2"): + row = session.get(DagModel, dag_id) + assert row.bundle_name == "removed-bundle" + assert row.relative_fileloc is None + assert any("Skipped 2 legacy Dag(s)" in record.message for record in caplog.records) + + def test_row_with_populated_relative_fileloc_is_left_alone(self, clear_dags_and_bundles, session) -> None: + """A 3.x row whose bundle is no longer configured must keep its bundle assignment. + + Only rows the 0082 migration touched (``relative_fileloc IS NULL``) are + candidates for reassignment. Rows that already carry a relative path + were written by 3.x serialization and must be left on their bundle so + the regular stale-Dag deactivation path can handle a removed bundle. + """ + active_bundle = DagBundleModel(name="active") + active_bundle.active = True + removed_bundle = DagBundleModel(name="removed-bundle") + removed_bundle.active = False + session.add(active_bundle) + session.add(removed_bundle) + session.flush() + + dag = DagModel( + dag_id="dag-1", + bundle_name="removed-bundle", + fileloc="/tmp/dag-1.py", + ) + dag.relative_fileloc = "dag-1.py" + dag.bundle_version = "abc123" + session.add(dag) + session.flush() + session.commit() + + manager = self._manager_with_bundle_names(["active"]) + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + + session.expire_all() + refreshed = session.get(DagModel, "dag-1") + assert refreshed.bundle_name == "removed-bundle" + assert refreshed.relative_fileloc == "dag-1.py" + assert refreshed.bundle_version == "abc123" + + def test_row_with_existing_dag_version_is_left_alone(self, clear_dags_and_bundles, session) -> None: + """A Dag with any DagVersion row must not be reassigned. + + Defended at two layers: (1) the global DagVersion-existence fast-skip + short-circuits before any work, and (2) the per-row predicate + ``NOT EXISTS DagVersion`` excludes the row even if the fast-skip is + bypassed. The parse path is the source of truth for any Dag with a + DagVersion -- touching only the DagModel would leave the DagVersion + stale, and scheduler/executor paths prefer DagVersion.bundle_name + when building task workloads. + """ + from airflow.models.dag_version import DagVersion + + active = DagBundleModel(name="active") + active.active = True + removed = DagBundleModel(name="removed-bundle") + removed.active = False + session.add(active) + session.add(removed) + session.flush() + + # NULL relative_fileloc would normally make this a repair candidate; + # the DagVersion row should exclude it from the predicate. + dag = DagModel( + dag_id="versioned", + bundle_name="removed-bundle", + fileloc="/tmp/versioned.py", + ) + dag.relative_fileloc = None + session.add(dag) + session.flush() + + version = DagVersion( + dag_id="versioned", + version_number=1, + bundle_name="removed-bundle", + bundle_version="v1", + ) + session.add(version) + session.flush() + session.commit() + + manager = self._manager_with_bundle_names(["active"]) + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + + session.expire_all() + refreshed = session.get(DagModel, "versioned") + assert refreshed.bundle_name == "removed-bundle" + assert refreshed.relative_fileloc is None + refreshed_version = session.get(DagVersion, version.id) + assert refreshed_version.bundle_name == "removed-bundle" + assert refreshed_version.bundle_version == "v1" + + @conf_vars({("core", "multi_team"): "True"}) + def test_runs_under_multi_team_mode(self, clear_dags_and_bundles, session, tmp_path) -> None: + """Multi-team mode still repairs legacy rows. + + Each team's bundle owns a distinct on-disk path, so routing a legacy + row to the most-specific bundle whose path contains its ``fileloc`` + cannot cross a team boundary. The repair therefore runs unchanged + under ``core.multi_team`` and must not blanket-skip rows that have a + safe target. + """ + bundle_dir = tmp_path / "team_x" + bundle_dir.mkdir() + legacy_file = bundle_dir / "legacy.py" + legacy_file.write_text("# legacy dag") + + config = [ + { + "name": "team-x-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + active_bundle = DagBundleModel(name="team-x-bundle") + active_bundle.active = True + removed_bundle = DagBundleModel(name="removed-bundle") + removed_bundle.active = False + session.add(active_bundle) + session.add(removed_bundle) + session.flush() + + dag = DagModel(dag_id="legacy", bundle_name="removed-bundle", fileloc=str(legacy_file)) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + + assert count == 1 + session.expire_all() + refreshed = session.get(DagModel, "legacy") + assert refreshed.bundle_name == "team-x-bundle" + assert refreshed.relative_fileloc == "legacy.py" + + +@pytest.mark.db_test +class TestBackfillRelativeFileloc: + """Tests for the legacy ``relative_fileloc`` backfill triggered by reassignment.""" + + @pytest.mark.parametrize( + ("fileloc_under_bundle", "expected_bundle_name", "expected_relative_fileloc"), + [ + pytest.param(True, "my-bundle", "legacy.py", id="fileloc_under_bundle_path_is_backfilled"), + pytest.param(False, "orphan-bundle", None, id="fileloc_outside_bundle_path_is_left_alone"), + ], + ) + def test_backfill_behavior( + self, + clear_dags_and_bundles, + session, + tmp_path, + fileloc_under_bundle: bool, + expected_bundle_name: str, + expected_relative_fileloc: str | None, + ) -> None: + """Reassignment only happens when ``fileloc`` lies under a configured bundle path. + + When the fileloc matches, both ``bundle_name`` and ``relative_fileloc`` + are written atomically. When it does not match, the row is left on its + unconfigured bundle so it cannot become an active-but-un-runnable row. + + :param fileloc_under_bundle: Whether the legacy Dag's absolute ``fileloc`` lies under + the configured bundle's path. + :param expected_bundle_name: Expected ``bundle_name`` after reassignment. + :param expected_relative_fileloc: Expected ``relative_fileloc`` after reassignment. + """ + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + legacy_file_under_bundle = bundle_dir / "legacy.py" + legacy_file_under_bundle.write_text("# legacy dag") + legacy_fileloc = str(legacy_file_under_bundle) if fileloc_under_bundle else "/elsewhere/foo.py" + + config = [ + { + "name": "my-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + active_bundle = DagBundleModel(name="my-bundle") + active_bundle.active = True + orphan_bundle = DagBundleModel(name="orphan-bundle") + orphan_bundle.active = False + session.add(active_bundle) + session.add(orphan_bundle) + session.flush() + + dag = DagModel(dag_id="legacy", bundle_name="orphan-bundle", fileloc=legacy_fileloc) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + manager.reassign_dags_with_unconfigured_bundles() + + session.expire_all() + refreshed = session.get(DagModel, "legacy") + assert refreshed.bundle_name == expected_bundle_name + assert refreshed.relative_fileloc == expected_relative_fileloc + + def test_post_2x_to_3x_migration_with_renamed_bundle( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """End-to-end: 2.x→3.x upgrade where the operator's bundle is not named ``dags-folder``. + + Reproduces the original incident: the migration sets ``bundle_name='dags-folder'`` on + legacy Dag rows and leaves ``relative_fileloc`` NULL, but the operator has configured a + single LocalDagBundle named ``custom-bundle`` pointing at the same on-disk dags folder. + After ``reassign_dags_with_unconfigured_bundles`` runs at DFP startup, the legacy Dags + should be: + + 1. Reassigned to ``custom-bundle`` so triggering DagRuns no longer fails with + "Requested bundle 'dags-folder' is not configured." + 2. Have ``relative_fileloc`` backfilled from ``fileloc`` so the standard fileloc-based + stale-detection path can later detect real deletions. + """ + dags_folder = tmp_path / "dags" + dags_folder.mkdir() + legacy_file = dags_folder / "my_dag.py" + legacy_file.write_text("# 2.x dag") + + config = [ + { + "name": "custom-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(dags_folder), "refresh_interval": 1}, + } + ] + # State right after the 0082 migration: ``dags-folder`` row exists in dag_bundle (added + # by migration backfill) but is not in the user's config; the user's ``custom-bundle`` + # has not been registered yet (sync_bundles_to_db does that on startup). + legacy_default_bundle = DagBundleModel(name="dags-folder") + legacy_default_bundle.active = True + session.add(legacy_default_bundle) + session.flush() + + legacy_dag = DagModel( + dag_id="legacy_dag", + bundle_name="dags-folder", + fileloc=str(legacy_file), + ) + legacy_dag.relative_fileloc = None + session.add(legacy_dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + # DFP startup order: register configured bundles, then reassign. + manager.sync_bundles_to_db(session=session) + session.commit() + count = manager.reassign_dags_with_unconfigured_bundles() + + assert count == 1 + session.expire_all() + refreshed = session.get(DagModel, "legacy_dag") + assert refreshed.bundle_name == "custom-bundle" + assert refreshed.relative_fileloc == "my_dag.py" + + def test_backfill_commits_between_chunks( + self, clear_dags_and_bundles, session, tmp_path, monkeypatch + ) -> None: + """The legacy backfill chunks and commits like the reassignment loop. + + Without chunked commits, a deployment where every legacy Dag is + already on a configured bundle would still UPDATE the whole set in + one transaction, holding row locks on the dag table for the full + DFP startup. + """ + from airflow.dag_processing.bundles import manager as bundles_manager_mod + + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + for i in range(5): + (bundle_dir / f"legacy_{i}.py").write_text(f"# legacy {i}") + + config = [ + { + "name": "my-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + active_bundle = DagBundleModel(name="my-bundle") + active_bundle.active = True + session.add(active_bundle) + session.flush() + + for i in range(5): + dag = DagModel( + dag_id=f"legacy_{i}", + bundle_name="my-bundle", + fileloc=str(bundle_dir / f"legacy_{i}.py"), + ) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + monkeypatch.setattr(bundles_manager_mod, "_REASSIGN_BATCH_SIZE", 2) + # Each chunk opens its own ``create_session`` (which commits on exit), + # so counting context-manager entries equals counting batch commits. + session_open_count = [0] + real_create_session = bundles_manager_mod.create_session + + @contextlib.contextmanager + def _counting_create_session(*args, **kwargs): + session_open_count[0] += 1 + with real_create_session(*args, **kwargs) as s: + yield s + + monkeypatch.setattr(bundles_manager_mod, "create_session", _counting_create_session) + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + # No unconfigured rows exist, so the reassignment loop returns + # immediately and the helper drives the chunking we are testing. + manager.reassign_dags_with_unconfigured_bundles() + + # 1 session for the active-bundle read + 5 backfill rows / batch size 2 + # = chunks of 2, 2, 1, then an empty chunk that terminates the loop. + assert session_open_count[0] >= 4 + + session.expire_all() + for i in range(5): + refreshed = session.get(DagModel, f"legacy_{i}") + assert refreshed.relative_fileloc == f"legacy_{i}.py" + + def test_backfill_does_not_overwrite_concurrent_parser_write( + self, clear_dags_and_bundles, session, tmp_path, monkeypatch + ) -> None: + """A concurrent parser write between SELECT and UPDATE keeps its value. + + Mirrors the race-safety regression on the reassignment UPDATE; the + backfill UPDATE re-asserts ``relative_fileloc IS NULL`` so the + parser's write is authoritative. + """ + from airflow.dag_processing.bundles import manager as bundles_manager_mod + + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + legacy_file = bundle_dir / "legacy.py" + legacy_file.write_text("# legacy") + + config = [ + { + "name": "my-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + active_bundle = DagBundleModel(name="my-bundle") + active_bundle.active = True + session.add(active_bundle) + session.flush() + + dag = DagModel( + dag_id="legacy", + bundle_name="my-bundle", + fileloc=str(legacy_file), + ) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + # Drive the race: the helper invokes ``Path(fileloc)`` and then + # ``.relative_to(bundle_path)``. Wrap the Path constructor so the + # returned object commits a parser write before delegating to the + # real ``relative_to``. + original_path_cls = bundles_manager_mod.Path + + def _racey_path_factory(arg): + real_path = original_path_cls(arg) + + def _racey_relative_to(*args, **kwargs): + with create_session() as racer: + racer.execute( + update(DagModel) + .where(DagModel.dag_id == "legacy") + .values(relative_fileloc="parser_wrote_this.py") + ) + return type(real_path).relative_to(real_path, *args, **kwargs) + + patcher = mock.MagicMock(wraps=real_path) + patcher.relative_to = _racey_relative_to + return patcher + + monkeypatch.setattr(bundles_manager_mod, "Path", _racey_path_factory) + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + manager.reassign_dags_with_unconfigured_bundles() + + session.expire_all() + refreshed = session.get(DagModel, "legacy") + assert refreshed.relative_fileloc == "parser_wrote_this.py" + + +@pytest.mark.db_test +class TestSmarterRouting: + """Tests for per-row best-bundle routing in ``reassign_dags_with_unconfigured_bundles``.""" + + def test_routes_to_bundle_whose_path_contains_fileloc( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """When multiple bundles are configured, route each Dag to the bundle whose path matches.""" + bundle_a_dir = tmp_path / "bundle_a" + bundle_b_dir = tmp_path / "bundle_b" + bundle_a_dir.mkdir() + bundle_b_dir.mkdir() + dag_a_file = bundle_a_dir / "dag_a.py" + dag_b_file = bundle_b_dir / "dag_b.py" + dag_a_file.write_text("# a") + dag_b_file.write_text("# b") + + config = [ + { + "name": "bundle-a", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_a_dir), "refresh_interval": 1}, + }, + { + "name": "bundle-b", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_b_dir), "refresh_interval": 1}, + }, + ] + for name, active in [("bundle-a", True), ("bundle-b", True), ("orphan", False)]: + b = DagBundleModel(name=name) + b.active = active + session.add(b) + session.flush() + + for dag_id, fileloc in [("dag-a", str(dag_a_file)), ("dag-b", str(dag_b_file))]: + dag = DagModel(dag_id=dag_id, bundle_name="orphan", fileloc=fileloc) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + count = manager.reassign_dags_with_unconfigured_bundles() + + assert count == 2 + session.expire_all() + # Each Dag goes to the bundle whose path contains its fileloc, not just the first one. + a = session.get(DagModel, "dag-a") + assert a.bundle_name == "bundle-a" + assert a.relative_fileloc == "dag_a.py" + b = session.get(DagModel, "dag-b") + assert b.bundle_name == "bundle-b" + assert b.relative_fileloc == "dag_b.py" + + def test_longest_matching_path_wins_for_overlapping_bundles( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """When bundle paths nest, route to the most-specific bundle.""" + outer_dir = tmp_path / "outer" + inner_dir = outer_dir / "team_x" + inner_dir.mkdir(parents=True) + dag_file = inner_dir / "deep_dag.py" + dag_file.write_text("# deep") + + config = [ + { + "name": "outer-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(outer_dir), "refresh_interval": 1}, + }, + { + "name": "team-x-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(inner_dir), "refresh_interval": 1}, + }, + ] + for name in ("outer-bundle", "team-x-bundle"): + b = DagBundleModel(name=name) + b.active = True + session.add(b) + orphan = DagBundleModel(name="orphan") + orphan.active = False + session.add(orphan) + session.flush() + + dag = DagModel(dag_id="deep", bundle_name="orphan", fileloc=str(dag_file)) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + manager.reassign_dags_with_unconfigured_bundles() + + session.expire_all() + refreshed = session.get(DagModel, "deep") + # Most-specific (deeper) bundle wins. + assert refreshed.bundle_name == "team-x-bundle" + assert refreshed.relative_fileloc == "deep_dag.py" + + def test_no_path_match_leaves_row_unchanged( + self, clear_dags_and_bundles, session, tmp_path, caplog + ) -> None: + """A Dag whose fileloc is outside every configured bundle is left untouched. + + Writing ``bundle_name`` without a verified ``relative_fileloc`` would + produce an active row that task workloads cannot execute (no + ``dag_rel_path``). The row stays on its unconfigured bundle so the + existing "Requested bundle '{name}' is not configured." error at + trigger time gives the operator an actionable signal. + """ + bundle_dir = tmp_path / "configured" + bundle_dir.mkdir() + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + configured = DagBundleModel(name="configured-bundle") + configured.active = True + session.add(configured) + orphan = DagBundleModel(name="orphan") + orphan.active = False + session.add(orphan) + session.flush() + + dag = DagModel(dag_id="elsewhere", bundle_name="orphan", fileloc="/somewhere/else/dag.py") + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with ( + patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ), + caplog.at_level("WARNING", logger="airflow.dag_processing.bundles.manager.DagBundlesManager"), + ): + manager = DagBundlesManager() + assert manager.reassign_dags_with_unconfigured_bundles() == 0 + + session.expire_all() + refreshed = session.get(DagModel, "elsewhere") + assert refreshed.bundle_name == "orphan" + assert refreshed.relative_fileloc is None + assert any("Skipped 1 legacy Dag(s)" in record.message for record in caplog.records) + + def test_legacy_row_on_active_default_routed_to_better_match( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """Migration-assigned ``dags-folder`` rows are re-routed when another bundle owns the file. + + Migration 0082 assigns every legacy 2.x row to ``dags-folder`` by + default. An operator that keeps a configured ``dags-folder`` bundle + alongside another bundle whose path contains the Dag's ``fileloc`` + must see the Dag reassigned to the better-matching bundle -- not + stranded on ``dags-folder`` just because that name is still active. + """ + dags_folder_dir = tmp_path / "dags-folder" + team_x_dir = tmp_path / "team_x" + dags_folder_dir.mkdir() + team_x_dir.mkdir() + team_x_file = team_x_dir / "team_x_dag.py" + team_x_file.write_text("# team-x dag") + + config = [ + { + "name": "dags-folder", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(dags_folder_dir), "refresh_interval": 1}, + }, + { + "name": "team-x-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(team_x_dir), "refresh_interval": 1}, + }, + ] + for name in ("dags-folder", "team-x-bundle"): + bundle = DagBundleModel(name=name) + bundle.active = True + session.add(bundle) + session.flush() + + # Migration state: bundle_name set to ``dags-folder`` (still in the + # active config!) but fileloc actually lives under ``team-x-bundle``. + dag = DagModel(dag_id="team_x_dag", bundle_name="dags-folder", fileloc=str(team_x_file)) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + + assert count == 1 + session.expire_all() + refreshed = session.get(DagModel, "team_x_dag") + assert refreshed.bundle_name == "team-x-bundle" + assert refreshed.relative_fileloc == "team_x_dag.py" + + def test_legacy_row_on_correct_bundle_only_backfills_relative_fileloc( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """A legacy row whose ``fileloc`` matches its current bundle keeps the bundle. + + The repair must not rewrite ``bundle_name`` when the best match is + the same bundle the row already points at; it only needs to fill in + the missing ``relative_fileloc``. + """ + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + legacy_file = bundle_dir / "legacy.py" + legacy_file.write_text("# legacy") + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + configured = DagBundleModel(name="configured-bundle") + configured.active = True + session.add(configured) + session.flush() + + dag = DagModel(dag_id="legacy", bundle_name="configured-bundle", fileloc=str(legacy_file)) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + # The reassignment counter only includes rows whose bundle_name + # changed; a same-bundle backfill is not counted as a reassignment. + count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + + assert count == 0 + session.expire_all() + refreshed = session.get(DagModel, "legacy") + assert refreshed.bundle_name == "configured-bundle" + assert refreshed.relative_fileloc == "legacy.py" + + +@pytest.mark.db_test +class TestBatching: + """Tests for the chunked-commit pattern in ``reassign_dags_with_unconfigured_bundles``.""" + + def test_repair_commits_between_chunks( + self, clear_dags_and_bundles, session, tmp_path, monkeypatch + ) -> None: + """All matched rows are repaired and the loop commits between chunks. + + Each ``session.commit()`` bounds the row-lock window to one chunk, + which is the load-bearing property on a large 2.x-upgraded + deployment. + """ + from airflow.dag_processing.bundles import manager as bundles_manager_mod + + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + dag_files = [] + for i in range(5): + dag_file = bundle_dir / f"dag_{i}.py" + dag_file.write_text(f"# dag {i}") + dag_files.append(dag_file) + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + configured = DagBundleModel(name="configured-bundle") + configured.active = True + orphan = DagBundleModel(name="orphan") + orphan.active = False + session.add(configured) + session.add(orphan) + session.flush() + + for i, dag_file in enumerate(dag_files): + dag = DagModel( + dag_id=f"dag_{i}", + bundle_name="orphan", + fileloc=str(dag_file), + ) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() # baseline state visible to the repair's own commits + + monkeypatch.setattr(bundles_manager_mod, "_REASSIGN_BATCH_SIZE", 2) + # Each chunk opens its own ``create_session`` (which commits on exit), + # so counting context-manager entries equals counting batch commits. + session_open_count = [0] + real_create_session = bundles_manager_mod.create_session + + @contextlib.contextmanager + def _counting_create_session(*args, **kwargs): + session_open_count[0] += 1 + with real_create_session(*args, **kwargs) as s: + yield s + + monkeypatch.setattr(bundles_manager_mod, "create_session", _counting_create_session) + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + count = manager.reassign_dags_with_unconfigured_bundles() + + assert count == 5 + # 1 session for the active-bundle read + 5 rows / batch size 2 + # = chunks of 2, 2, 1, then an empty chunk that terminates the loop. + assert session_open_count[0] >= 4 + + session.expire_all() + for i in range(5): + refreshed = session.get(DagModel, f"dag_{i}") + assert refreshed.bundle_name == "configured-bundle" + assert refreshed.relative_fileloc == f"dag_{i}.py" + + +@pytest.mark.db_test +class TestRaceSafety: + """Tests that the repair UPDATE does not overwrite concurrent parser writes.""" + + def test_concurrent_parse_between_select_and_update_wins( + self, clear_dags_and_bundles, session, tmp_path, monkeypatch + ) -> None: + """A parser that lands a write between our SELECT and UPDATE keeps its values. + + The repair's UPDATE re-asserts ``relative_fileloc IS NULL`` (and the + DagVersion absence) so SQL evaluates against committed state when the + UPDATE runs. A concurrent parse that wrote the real values first + makes our UPDATE match zero rows; we must not overwrite it. + """ + from airflow.dag_processing.bundles import manager as bundles_manager_mod + + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + dag_file = bundle_dir / "raced.py" + dag_file.write_text("# raced") + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + configured = DagBundleModel(name="configured-bundle") + configured.active = True + orphan = DagBundleModel(name="orphan") + orphan.active = False + session.add(configured) + session.add(orphan) + session.flush() + + dag = DagModel(dag_id="raced", bundle_name="orphan", fileloc=str(dag_file)) + dag.relative_fileloc = None + session.add(dag) + session.flush() + session.commit() + + # Drive the race: before _best_bundle_for_fileloc returns (which + # sits between the chunk SELECT and the per-row UPDATE), simulate a + # concurrent parser that has already committed the real values. + original = bundles_manager_mod._best_bundle_for_fileloc + + def _racey_match(fileloc, active_bundle_paths): + with create_session() as racer: + racer.execute( + update(DagModel) + .where(DagModel.dag_id == "raced") + .values( + bundle_name="configured-bundle", + relative_fileloc="parser_wrote_this.py", + ) + ) + return original(fileloc, active_bundle_paths) + + monkeypatch.setattr(bundles_manager_mod, "_best_bundle_for_fileloc", _racey_match) + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + count = manager.reassign_dags_with_unconfigured_bundles() + + # CAS guard prevents the repair from clobbering the parser write. + assert count == 0 + session.expire_all() + refreshed = session.get(DagModel, "raced") + assert refreshed.bundle_name == "configured-bundle" + assert refreshed.relative_fileloc == "parser_wrote_this.py" + + +@pytest.mark.db_test +class TestHighAvailabilityStartup: + """Two DFPs entering the repair path concurrently must converge safely.""" + + def _build_dataset(self, session, bundle_dir): + """Insert one reassignment-eligible row and one backfill-eligible row.""" + reassign_file = bundle_dir / "reassign.py" + backfill_file = bundle_dir / "backfill.py" + reassign_file.write_text("# reassign") + backfill_file.write_text("# backfill") + + configured = DagBundleModel(name="configured-bundle") + configured.active = True + orphan = DagBundleModel(name="orphan") + orphan.active = False + session.add(configured) + session.add(orphan) + session.flush() + + # Hits the reassignment branch: bundle is unconfigured, fileloc lies + # under a configured bundle's path. + reassign_dag = DagModel(dag_id="reassign", bundle_name="orphan", fileloc=str(reassign_file)) + reassign_dag.relative_fileloc = None + session.add(reassign_dag) + + # Hits the backfill branch: bundle is already configured, but + # relative_fileloc is NULL (legacy row). + backfill_dag = DagModel( + dag_id="backfill", bundle_name="configured-bundle", fileloc=str(backfill_file) + ) + backfill_dag.relative_fileloc = None + session.add(backfill_dag) + session.flush() + session.commit() + + def test_sequential_repeat_is_idempotent(self, clear_dags_and_bundles, session, tmp_path) -> None: + """Running the full repair twice over the same dataset must be a no-op the second pass. + + Simulates two DFPs starting up one after the other (or the same DFP + restarting): the second pass's SELECT must find an empty set because + the first pass either repaired the row (now non-NULL + relative_fileloc) or skipped it (cursor advances past on the first + pass alone, but the second pass's predicate would also exclude + repaired rows directly). + """ + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + self._build_dataset(session, bundle_dir) + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + first_count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + session.expire_all() + after_first = { + row.dag_id: (row.bundle_name, row.relative_fileloc) + for row in session.execute(select(DagModel)).scalars() + } + + second_count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + session.expire_all() + after_second = { + row.dag_id: (row.bundle_name, row.relative_fileloc) + for row in session.execute(select(DagModel)).scalars() + } + + assert first_count == 1 # one reassignment fires on pass 1 + assert second_count == 0 # nothing to do on pass 2 + assert after_first == after_second + assert after_first["reassign"] == ("configured-bundle", "reassign.py") + assert after_first["backfill"] == ("configured-bundle", "backfill.py") + + def test_interleaved_dfps_do_not_overwrite_each_other( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + """Two DFPs both SELECT the same row; the second to UPDATE must no-op. + + Mirrors a multi-DFP startup where both processes hit the repair at + the same time. The CAS guards on the UPDATE statements ensure that + whichever transaction commits second sees zero rows match its + WHERE clause, leaving the first commit authoritative. + """ + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + self._build_dataset(session, bundle_dir) + + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + + # Hand-driven interleaving: simulate DFP A and DFP B both observing + # the unrepaired state, then DFP A commits first, then DFP B's + # UPDATEs run against committed state with the CAS guards in place. + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + # DFP B reads the legacy state first (still NULL relative_fileloc). + with create_session() as session_b: + pre_b = session_b.execute( + select(DagModel.dag_id, DagModel.relative_fileloc).order_by(DagModel.dag_id) + ).all() + assert [r[1] for r in pre_b] == [None, None] + + # DFP A runs the full repair (commits internally per chunk). + DagBundlesManager().reassign_dags_with_unconfigured_bundles() + + # DFP B now runs its repair against the post-A committed state. + # The CAS guards (relative_fileloc IS NULL) make every UPDATE a + # no-op because A has already filled in the values. + b_count = DagBundlesManager().reassign_dags_with_unconfigured_bundles() + assert b_count == 0 + + session.expire_all() + reassign_row = session.get(DagModel, "reassign") + backfill_row = session.get(DagModel, "backfill") + assert reassign_row.bundle_name == "configured-bundle" + assert reassign_row.relative_fileloc == "reassign.py" + assert backfill_row.bundle_name == "configured-bundle" + assert backfill_row.relative_fileloc == "backfill.py" + + +@pytest.mark.db_test +class TestSyncAndReassign: + """Tests for sync_bundles_to_db followed by reassign_dags_with_unconfigured_bundles.""" + + def _sync_and_reassign(self, config: list[dict], session) -> None: + """Sync bundles to DB and reassign DAGs with unconfigured bundles. + + :param config: Bundle config list to use for this sync cycle. + :param session: SQLAlchemy session. + """ + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + manager.sync_bundles_to_db(session=session) + session.commit() + manager.reassign_dags_with_unconfigured_bundles() + + @conf_vars({("core", "LOAD_EXAMPLES"): "False"}) + @pytest.mark.parametrize( + "second_config", + [ + pytest.param(SECOND_BUNDLE_CONFIG, id="bundle_removed_no_path_match"), + pytest.param(BASIC_BUNDLE_CONFIG, id="bundle_active_dag_unchanged"), + ], + ) + def test_sync_preserves_dag_bundle_without_path_match( + self, + clear_dags_and_bundles, + session, + second_config: list[dict], + ) -> None: + """Sync + reassign keeps the original bundle when no configured path matches the fileloc. + + Both parameter cases insert a Dag whose ``fileloc`` is ``/tmp/dag-1.py``, + which is not under any test bundle's path (``BasicBundle.path`` is a + no-op). When the original bundle is removed from config the helper used + to silently rewrite ``bundle_name`` to the first configured bundle even + with no path match; it now leaves the row alone so the row never + becomes active-but-un-runnable. When the bundle is still configured the + row is untouched as before. + + :param second_config: Bundle config for the second sync cycle. + """ + self._sync_and_reassign(BASIC_BUNDLE_CONFIG, session) + + _add_dag(session, "dag-1", "my-test-bundle") + session.commit() + + self._sync_and_reassign(second_config, session) + + dag = session.get(DagModel, "dag-1") + assert dag.bundle_name == "my-test-bundle" + + +@pytest.mark.db_test +class TestSkippedRowLifecycle: + """End-to-end coverage for the sync -> repair skip -> stale-scan -> re-parse path. + + A legacy 2.x row with an unconfigured bundle and a fileloc outside every + configured bundle is intentionally left untouched by the repair. The + inactive-bundle branch of ``deactivate_stale_dags`` then marks it stale, + and a later successful parse from a now-configured bundle must restore it + end-to-end. Each unit-level test exercises one stage of this lifecycle; + this test pins the whole sequence so a future refactor can't silently + break the recovery contract. + """ + + def test_skipped_row_is_recoverable_after_operator_fix( + self, clear_dags_and_bundles, session, tmp_path + ) -> None: + from airflow.dag_processing.collection import update_dag_parsing_results_in_db + from airflow.dag_processing.manager import DagFileProcessorManager + from airflow.sdk import DAG + from airflow.serialization.serialized_objects import LazyDeserializedDAG + + bundle_dir = tmp_path / "dags" + bundle_dir.mkdir() + legacy_file = bundle_dir / "legacy.py" + legacy_file.write_text("# legacy dag") + + # Pre-sync state: legacy row points at the orphan bundle, fileloc is + # outside any configured bundle path, relative_fileloc is NULL. + orphan = DagBundleModel(name="orphan") + orphan.active = True + session.add(orphan) + session.flush() + legacy = DagModel( + dag_id="legacy_dag", + bundle_name="orphan", + fileloc="/elsewhere/legacy.py", + last_parsed_time=None, + ) + legacy.relative_fileloc = None + legacy.is_stale = False + session.add(legacy) + session.commit() + + # Operator removes the orphan bundle and configures a different one + # whose path doesn't contain the legacy fileloc. + config = [ + { + "name": "configured-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": str(bundle_dir), "refresh_interval": 1}, + } + ] + with patch.dict( + os.environ, + {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(config)}, + ): + manager = DagBundlesManager() + manager.sync_bundles_to_db(session=session) + session.commit() + # Orphan bundle is now inactive; legacy row's bundle_name still points at it. + assert session.get(DagBundleModel, "orphan").active is False + + # Repair skips the row -- no fileloc match means no atomic + # bundle_name + relative_fileloc write is possible. + reassigned = manager.reassign_dags_with_unconfigured_bundles() + assert reassigned == 0 + session.expire_all() + refreshed = session.get(DagModel, "legacy_dag") + assert refreshed.bundle_name == "orphan" + assert refreshed.relative_fileloc is None + assert refreshed.is_stale is False + + # Stale-Dag scan deactivates the row via the inactive-bundle + # branch, which runs before the NULL relative_fileloc guard. + dfp_manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 60) + dfp_manager.deactivate_stale_dags(last_parsed={}) + session.expire_all() + stale_row = session.get(DagModel, "legacy_dag") + assert stale_row.is_stale is True + + # Operator fix: the legacy file is now inside the configured + # bundle's path. The parser re-parses it and the standard + # collection write resets is_stale, bundle_name, and + # relative_fileloc in a single transaction. + recovered_dag = DAG(dag_id="legacy_dag") + recovered_dag.fileloc = str(legacy_file) + recovered_dag.relative_fileloc = "legacy.py" + update_dag_parsing_results_in_db( + "configured-bundle", + None, + [LazyDeserializedDAG.from_dag(recovered_dag)], + {}, + None, + set(), + session, + ) + session.commit() + + session.expire_all() + restored = session.get(DagModel, "legacy_dag") + assert restored.is_stale is False + assert restored.bundle_name == "configured-bundle" + assert restored.relative_fileloc == "legacy.py" diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 6d10e5c1a0665..dad33ac81e2a2 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -1055,6 +1055,68 @@ def test_deactivate_stale_dags_marks_dags_in_inactive_bundles(self, session): ) assert is_stale_by_dag == {"dag_in_inactive_bundle": True, "dag_in_active_bundle": False} + @pytest.mark.usefixtures("testing_dag_bundle") + def test_deactivate_stale_dags_tolerates_null_relative_fileloc(self, session): + """An active Dag with ``relative_fileloc=None`` must not crash the stale scan. + + Upgrade leftover: the 0082 migration writes ``bundle_name`` without + ``relative_fileloc``, and the startup repair leaves a row untouched when its + ``fileloc`` is not under any configured bundle's path. The scanner must skip the + file-based check for such rows rather than calling ``Path(None)``. + """ + session.add( + DagModel( + dag_id="dag_null_relfileloc", + bundle_name="testing", + fileloc="/not/under/any/bundle.py", + relative_fileloc=None, + last_parsed_time=timezone.utcnow(), + is_stale=False, + ) + ) + session.flush() + + manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 60) + manager.deactivate_stale_dags(last_parsed={}) + + is_stale = session.scalar(select(DagModel.is_stale).where(DagModel.dag_id == "dag_null_relfileloc")) + assert is_stale is False + + @pytest.mark.usefixtures("testing_dag_bundle") + def test_deactivate_stale_dags_logs_stuck_legacy_row_count(self, session): + """Skipped NULL-``relative_fileloc`` rows are counted and surfaced via a single INFO log. + + Operators need visibility into how many legacy migration rows the + startup repair could not route, because every such row keeps raising + ``Requested bundle is not configured.`` at trigger time until the + operator restores a matching bundle. The log line is the only + operator-facing signal, so this test asserts it is emitted with the + expected count. (Allowed exception to the "don't assert on log + output" convention: the log line *is* the behaviour under test.) + """ + for dag_id in ("legacy_a", "legacy_b", "legacy_c"): + session.add( + DagModel( + dag_id=dag_id, + bundle_name="testing", + fileloc=f"/not/under/any/{dag_id}.py", + relative_fileloc=None, + last_parsed_time=timezone.utcnow(), + is_stale=False, + ) + ) + session.flush() + + manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 60) + with mock.patch.object(manager.log, "info") as mock_info: + manager.deactivate_stale_dags(last_parsed={}) + + legacy_log_calls = [ + call for call in mock_info.call_args_list if "legacy Dag" in (call.args[0] if call.args else "") + ] + assert len(legacy_log_calls) == 1 + assert legacy_log_calls[0].args[1] == 3 + @mock.patch("airflow.dag_processing.manager.is_lock_not_available_error") @pytest.mark.usefixtures("testing_dag_bundle") def test_deactivate_stale_dags_handles_lock_timeout(self, mock_is_lock_not_available, session, caplog): @@ -1610,6 +1672,29 @@ def test_deactivate_deleted_dags_return_value( assert session.get(DagModel, "test_dag1").is_stale is expected_dag1_stale assert session.get(DagModel, "test_dag2").is_stale is expected_dag2_stale + def test_deactivate_deleted_dags_marks_null_relative_fileloc_stale(self, dag_maker, session): + """DAGs with NULL ``relative_fileloc`` are also stale-marked when not in the observed file set. + + Legacy 2.x rows that the bundle backfill couldn't recover (``fileloc`` not under any active + bundle path) get treated as deleted on the first parse cycle. Alive rows self-heal when the + parser next succeeds and resets ``is_stale`` via ``update_dags``. + """ + with dag_maker("parsed_dag") as dag1: + dag1.relative_fileloc = "parsed_dag.py" + with dag_maker("legacy_dag") as dag2: + dag2.relative_fileloc = None + dag_maker.sync_dagbag_to_db() + + any_deactivated = DagModel.deactivate_deleted_dags( + bundle_name="dag_maker", + rel_filelocs=set(), + session=session, + ) + + assert any_deactivated is True + assert session.get(DagModel, "parsed_dag").is_stale is True + assert session.get(DagModel, "legacy_dag").is_stale is True + @pytest.mark.parametrize( ("active_files", "should_call_cleanup"), [ @@ -2120,6 +2205,94 @@ def test_fetch_callbacks_delegates_to_private_method(self): assert manager.fetch_callbacks() is expected private.assert_called_once_with() + @mock.patch("airflow.dag_processing.manager.DagBundlesManager") + def test_reassign_called_once_at_startup_not_on_refresh(self, mock_bundle_manager): + """ + reassign_dags_with_unconfigured_bundles is called exactly once by + sync_bundles, not by _refresh_dag_bundles. + """ + manager = DagFileProcessorManager(max_runs=1) + manager._dag_bundles = [] + + manager.sync_bundles() + mock_bundle_manager.return_value.reassign_dags_with_unconfigured_bundles.assert_called_once() + + manager._refresh_dag_bundles(known_files={}) + mock_bundle_manager.return_value.reassign_dags_with_unconfigured_bundles.assert_called_once() + + @pytest.mark.parametrize( + "apply_patch", + [False, True], + ids=["without_patch", "with_patch"], + ) + def test_sync_bundles_repairs_legacy_bundle_before_parsing_loop( + self, apply_patch, session, tmp_path, configure_dag_bundles + ): + """DFP initial setup alone re-homes a 2.x->3.x legacy Dag to its configured bundle. + + Window right after ``airflow db migrate`` but before the parsing loop: the ``0082`` + migration left the row with ``bundle_name='dags-folder'`` and NULL ``relative_fileloc``, + and ``0047`` emptied ``serialized_dag``/``dag_version`` (so the Dag is unserialized and + ``get_bundle`` -- what the worker calls at run time -- is the probe, not the REST trigger). + ``sync_bundles()`` runs ``sync_bundles_to_db`` and, on the patched build, + ``reassign_dags_with_unconfigured_bundles``; without it (reassign mocked off) the row stays + on the unconfigured ``dags-folder`` and ``get_bundle`` raises. See + https://github.com/apache/airflow/issues/63323. + """ + dags_folder = tmp_path / "dags" + dags_folder.mkdir() + legacy_file = dags_folder / "af2_upgrade_af3_dag.py" + legacy_file.write_text("# 2.x dag") + + # State right after the 0082 migration: the ``dags-folder`` row exists in dag_bundle but the + # operator's bundle is not registered yet (sync_bundles_to_db does that below), and + # serialized_dag/dag_version are empty (0047 wiped them; setup_method clears them too). + legacy_default_bundle = DagBundleModel(name="dags-folder") + legacy_default_bundle.active = True + session.add(legacy_default_bundle) + session.flush() + legacy_dag = DagModel( + dag_id="legacy_dag", + bundle_name="dags-folder", + fileloc=str(legacy_file), + ) + legacy_dag.relative_fileloc = None + session.add(legacy_dag) + session.commit() + + bundle_name = "upgrade_test_dag_bundle" + with configure_dag_bundles({bundle_name: dags_folder}): + manager = DagFileProcessorManager(max_runs=1) + if apply_patch: + manager.sync_bundles() + else: + # Pre-patch sync_bundles ran sync_bundles_to_db only; mocking reassign to a no-op + # reproduces that build without forking the source under test. + with mock.patch.object( + DagBundlesManager, + "reassign_dags_with_unconfigured_bundles", + return_value=0, + ): + manager.sync_bundles() + + session.expire_all() + refreshed = session.get(DagModel, "legacy_dag") + + if apply_patch: + assert refreshed.bundle_name == bundle_name + assert refreshed.relative_fileloc == "af2_upgrade_af3_dag.py" + # The worker can now resolve the bundle: the symptom is gone. + assert DagBundlesManager().get_bundle(bundle_name).name == bundle_name + else: + assert refreshed.bundle_name == "dags-folder" + assert refreshed.relative_fileloc is None + # The worker would hit the original incident at run time. + with pytest.raises( + ValueError, + match=re.escape("Requested bundle 'dags-folder' is not configured."), + ): + DagBundlesManager().get_bundle(refreshed.bundle_name) + def test_dag_with_assets(self, session, configure_testing_dag_bundle): """'Integration' test to ensure that the assets get parsed and stored correctly for parsed dags.""" test_dag_path = str(TEST_DAG_FOLDER / "test_assets.py")