From 0fa4b88bbe8f14b7986c34ca19cd48241a1a3a82 Mon Sep 17 00:00:00 2001 From: Henry Chen Date: Thu, 16 Apr 2026 20:44:01 +0800 Subject: [PATCH] Fix N+1 queries in trigger asset event submission --- airflow-core/src/airflow/models/trigger.py | 6 ++++- .../tests/unit/models/test_trigger.py | 22 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index d17af8532e067..eefe1d2e0e8d4 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -274,7 +274,11 @@ def submit_event(cls, trigger_id, event: TriggerEvent, session: Session = NEW_SE handle_event_submit(event, task_instance=task_instance, session=session) # Send an event to assets - trigger = session.scalars(select(cls).where(cls.id == trigger_id)).one_or_none() + trigger = session.scalars( + select(cls) + .where(cls.id == trigger_id) + .options(selectinload(cls.asset_watchers).selectinload(AssetWatcherModel.asset)) + ).one_or_none() if trigger is None: # Already deleted for some reason return diff --git a/airflow-core/tests/unit/models/test_trigger.py b/airflow-core/tests/unit/models/test_trigger.py index dfd0f2e99cd0a..298a30568793e 100644 --- a/airflow-core/tests/unit/models/test_trigger.py +++ b/airflow-core/tests/unit/models/test_trigger.py @@ -48,6 +48,7 @@ from airflow.utils.session import create_session from airflow.utils.state import State +from tests_common.test_utils.asserts import assert_queries_count from tests_common.test_utils.config import conf_vars if TYPE_CHECKING: @@ -221,6 +222,27 @@ def test_submit_event(mock_callback_handle_event, session, create_task_instance) mock_callback_handle_event.assert_called_once_with(event, session) +@pytest.mark.parametrize(("asset_count", "expected_query_count"), [(1, 6), (5, 6)]) +@patch("airflow.models.trigger.AssetManager.register_asset_change") +def test_submit_event_no_n_plus_one_for_assets(_, session, asset_count, expected_query_count): + """Ensure asset notifications do not trigger per-asset lazy-load queries.""" + trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) + session.add(trigger) + session.flush() + trigger_id = trigger.id + + for i in range(asset_count): + asset = AssetModel(name=f"asset_{asset_count}_{i}") + asset.add_trigger(trigger, f"watcher_{i}") + session.add(asset) + + session.commit() + session.expire_all() + + with assert_queries_count(expected_query_count, session=session): + Trigger.submit_event(trigger_id, TriggerEvent("payload"), session=session) + + def test_submit_failure(session, create_task_instance): """ Tests that failures submitted to a trigger fail their dependent