Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,11 @@ def submit_event(cls, trigger_id, event: TriggerEvent, *, session: Session = NEW
handle_event_submit(event, task_instance=task_instance, session=session)

# Send an event to assets
trigger = session.scalars(select(cls).where(cls.id == trigger_id)).one_or_none()
trigger = session.scalars(
select(cls)
.where(cls.id == trigger_id)
.options(selectinload(cls.asset_watchers).selectinload(AssetWatcherModel.asset))
).one_or_none()
if trigger is None:
# Already deleted for some reason
return
Expand Down
33 changes: 33 additions & 0 deletions airflow-core/tests/unit/models/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from airflow.utils.session import create_session
from airflow.utils.state import State

from tests_common.test_utils.asserts import count_queries
from tests_common.test_utils.config import conf_vars

if TYPE_CHECKING:
Expand Down Expand Up @@ -234,6 +235,38 @@ def test_submit_event(mock_callback_handle_event, session, create_task_instance)
mock_callback_handle_event.assert_called_once_with(event, session)


@patch("airflow.models.trigger.AssetManager.register_asset_change")
def test_submit_event_no_n_plus_one_for_assets(_, session):
"""Ensure asset notifications do not trigger per-asset lazy-load queries."""

def _create_trigger_with_assets(asset_count: int) -> int:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we could collapse the these utility function as the test function itself.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, making it a nested function gives us limited gain here.

trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
session.add(trigger)
session.flush()

for i in range(asset_count):
asset = AssetModel(name=f"asset_{asset_count}_{i}")
asset.add_trigger(trigger, f"watcher_{i}")
session.add(asset)

session.commit()
session.expire_all()
return trigger.id

def _submit_event_query_count(trigger_id: int) -> int:
with count_queries(session=session) as query_result:
Trigger.submit_event(trigger_id, TriggerEvent("payload"), session=session)
return sum(query_result.values())

small_query_count = _submit_event_query_count(_create_trigger_with_assets(1))
larger_query_count = _submit_event_query_count(_create_trigger_with_assets(5))

assert larger_query_count - small_query_count < 3, (
f"Added 4 assets but query count increased by {larger_query_count - small_query_count} "
f"({small_query_count} -> {larger_query_count}), suggesting n+1 queries for trigger assets"
)
Comment on lines +256 to +267

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we should parameterize the asset_count at test function level and get the exact same select count.



def test_submit_failure(session, create_task_instance):
"""
Tests that failures submitted to a trigger fail their dependent
Expand Down
Loading