From 725b1435f08963c61fbc654f2f09b45db35e04d0 Mon Sep 17 00:00:00 2001 From: Antonio Bergonzi Date: Mon, 8 Jun 2026 18:44:53 +0200 Subject: [PATCH 1/3] Use NOT EXISTS anti-join in Trigger.clean_unused instead of LEFT JOIN + aggregate --- airflow-core/src/airflow/models/trigger.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index 6fecd4d825f09..99cb5fc559e1b 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -250,12 +250,10 @@ def clean_unused(cls, *, session: Session = NEW_SESSION) -> None: ) # Get all triggers that have no task instances, assets, or callbacks depending on them and delete them - ids = ( - select(cls.id) - .where(~cls.assets.any(), ~cls.callback.has()) - .join(TaskInstance, cls.id == TaskInstance.trigger_id, isouter=True) - .group_by(cls.id) - .having(func.count(TaskInstance.trigger_id) == 0) + ids = select(cls.id).where( + ~cls.assets.any(), + ~cls.callback.has(), + ~cls.task_instance.has(), ) if get_dialect_name(session) == "mysql": # MySQL doesn't support DELETE with JOIN, so we need to do it in two steps From 05c885c8808a7f9ef922cafa109bdf85c243825f Mon Sep 17 00:00:00 2001 From: Antonio Bergonzi Date: Fri, 12 Jun 2026 16:00:56 +0200 Subject: [PATCH 2/3] add FOR UPDATE SKIP LOCKED to Trigger.clean_unused to avoid deadlocks between concurrent triggerer pods --- airflow-core/src/airflow/models/trigger.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index 99cb5fc559e1b..ef18181ad3625 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -250,10 +250,14 @@ def clean_unused(cls, *, session: Session = NEW_SESSION) -> None: ) # Get all triggers that have no task instances, assets, or callbacks depending on them and delete them - ids = select(cls.id).where( - ~cls.assets.any(), - ~cls.callback.has(), - ~cls.task_instance.has(), + ids = ( + select(cls.id) + .where( + ~cls.assets.any(), + ~cls.callback.has(), + ~cls.task_instance.has(), + ) + .with_for_update(skip_locked=True) ) if get_dialect_name(session) == "mysql": # MySQL doesn't support DELETE with JOIN, so we need to do it in two steps From ab4776829abdb8c783ac919fba8f0206bc448259 Mon Sep 17 00:00:00 2001 From: Antonio Bergonzi Date: Fri, 19 Jun 2026 10:53:49 +0200 Subject: [PATCH 3/3] use with_row_locks helper and scope lock to trigger table --- airflow-core/src/airflow/models/trigger.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index ef18181ad3625..23b60306ea431 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -250,15 +250,12 @@ def clean_unused(cls, *, session: Session = NEW_SESSION) -> None: ) # Get all triggers that have no task instances, assets, or callbacks depending on them and delete them - ids = ( - select(cls.id) - .where( - ~cls.assets.any(), - ~cls.callback.has(), - ~cls.task_instance.has(), - ) - .with_for_update(skip_locked=True) + ids = select(cls.id).where( + ~cls.assets.any(), + ~cls.callback.has(), + ~cls.task_instance.has(), ) + ids = with_row_locks(ids, session, of=cls, skip_locked=True, key_share=False) if get_dialect_name(session) == "mysql": # MySQL doesn't support DELETE with JOIN, so we need to do it in two steps ids_list = list(session.scalars(ids).all())