diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index 6fecd4d825f09..23b60306ea431 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -250,13 +250,12 @@ def clean_unused(cls, *, session: Session = NEW_SESSION) -> None: ) # Get all triggers that have no task instances, assets, or callbacks depending on them and delete them - ids = ( - select(cls.id) - .where(~cls.assets.any(), ~cls.callback.has()) - .join(TaskInstance, cls.id == TaskInstance.trigger_id, isouter=True) - .group_by(cls.id) - .having(func.count(TaskInstance.trigger_id) == 0) + ids = select(cls.id).where( + ~cls.assets.any(), + ~cls.callback.has(), + ~cls.task_instance.has(), ) + ids = with_row_locks(ids, session, of=cls, skip_locked=True, key_share=False) if get_dialect_name(session) == "mysql": # MySQL doesn't support DELETE with JOIN, so we need to do it in two steps ids_list = list(session.scalars(ids).all())