From 6343f0654c95018821feb9f55bb62a6781037329 Mon Sep 17 00:00:00 2001 From: Christ Beaubrun Date: Tue, 9 Jun 2026 00:29:46 -0500 Subject: [PATCH 1/2] Batch triggerer unused trigger cleanup --- .../src/airflow/config_templates/config.yml | 11 +++++++++ airflow-core/src/airflow/models/trigger.py | 24 ++++++++++++------- .../tests/unit/models/test_trigger.py | 22 +++++++++++++++++ 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index bba829b3d08d5..159374ecd5de4 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2889,6 +2889,17 @@ triggerer: type: integer example: ~ default: "50" + unreferenced_triggers_cleanup_batch_size: + description: | + Maximum number of unreferenced ``trigger`` rows the triggerer removes per transaction + when running trigger cleanup. Batching avoids holding row locks on the ``trigger`` table + for the duration of a single large delete, which would otherwise stall the triggerer loop + while many rows are removed. Set to ``0`` to disable batching and delete all matching + rows in a single transaction. + version_added: 3.3.0 + type: integer + example: ~ + default: "500" on_kill_timeout: description: | Maximum number of seconds the triggerer will wait for ``BaseTrigger.on_kill()`` to complete diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index 6fecd4d825f09..8995500ae712b 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -39,7 +39,7 @@ from airflow.triggers.base import BaseTaskEndEvent from airflow.utils.retries import run_with_db_retries from airflow.utils.session import NEW_SESSION, provide_session -from airflow.utils.sqlalchemy import UtcDateTime, get_dialect_name, with_row_locks +from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks from airflow.utils.state import TaskInstanceState if TYPE_CHECKING: @@ -250,23 +250,29 @@ def clean_unused(cls, *, session: Session = NEW_SESSION) -> None: ) # Get all triggers that have no task instances, assets, or callbacks depending on them and delete them - ids = ( + ids_query = ( select(cls.id) .where(~cls.assets.any(), ~cls.callback.has()) .join(TaskInstance, cls.id == TaskInstance.trigger_id, isouter=True) .group_by(cls.id) .having(func.count(TaskInstance.trigger_id) == 0) ) - if get_dialect_name(session) == "mysql": - # MySQL doesn't support DELETE with JOIN, so we need to do it in two steps - ids_list = list(session.scalars(ids).all()) - session.execute( - delete(Trigger).where(Trigger.id.in_(ids_list)).execution_options(synchronize_session=False) - ) - else: + batch_size = conf.getint("triggerer", "unreferenced_triggers_cleanup_batch_size", fallback=500) + + while True: + limited_ids_query = ids_query.limit(batch_size) if batch_size > 0 else ids_query + ids = list(session.scalars(limited_ids_query).all()) + + if not ids: + break + session.execute( delete(Trigger).where(Trigger.id.in_(ids)).execution_options(synchronize_session=False) ) + session.commit() + + if batch_size <= 0 or len(ids) < batch_size: + break @classmethod @provide_session diff --git a/airflow-core/tests/unit/models/test_trigger.py b/airflow-core/tests/unit/models/test_trigger.py index 1243a9112f97a..f791dc761703f 100644 --- a/airflow-core/tests/unit/models/test_trigger.py +++ b/airflow-core/tests/unit/models/test_trigger.py @@ -177,6 +177,28 @@ def test_clean_unused(session, dag_maker): assert {result.id for result in results} == {trigger1.id, trigger4.id, trigger5.id, trigger6.id} +@conf_vars({("triggerer", "unreferenced_triggers_cleanup_batch_size"): "2"}) +def test_clean_unused_batches_deletes(session, monkeypatch): + for index in range(5): + session.add(Trigger(classpath=f"airflow.triggers.testing.SuccessTrigger{index}", kwargs={})) + session.flush() + + commit_calls = 0 + original_commit = session.commit + + def _counting_commit(): + nonlocal commit_calls + commit_calls += 1 + original_commit() + + monkeypatch.setattr(session, "commit", _counting_commit) + + Trigger.clean_unused(session=session) + + assert session.scalar(select(func.count()).select_from(Trigger)) == 0 + assert commit_calls == 3 + + @patch.object(TriggererCallback, "handle_event") def test_submit_event(mock_callback_handle_event, session, create_task_instance): """ From 7e8da1fc4758e0b9e7c18131e0b6c211becc784e Mon Sep 17 00:00:00 2001 From: Christ Beaubrun Date: Fri, 19 Jun 2026 12:52:31 -0700 Subject: [PATCH 2/2] docs: add unreferenced to spelling wordlist --- docs/spelling_wordlist.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index dac6798952671..de18a19045bd7 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1789,6 +1789,7 @@ unpause unpaused unpausing unpredicted +unreferenced unsanitized unscoped untestable