Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2889,6 +2889,17 @@ triggerer:
type: integer
example: ~
default: "50"
unreferenced_triggers_cleanup_batch_size:
description: |
Maximum number of unreferenced ``trigger`` rows the triggerer removes per transaction
when running trigger cleanup. Batching avoids holding row locks on the ``trigger`` table
for the duration of a single large delete, which would otherwise stall the triggerer loop
while many rows are removed. Set to ``0`` to disable batching and delete all matching
rows in a single transaction.
version_added: 3.3.0
type: integer
example: ~
default: "500"
on_kill_timeout:
description: |
Maximum number of seconds the triggerer will wait for ``BaseTrigger.on_kill()`` to complete
Expand Down
24 changes: 15 additions & 9 deletions airflow-core/src/airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from airflow.triggers.base import BaseTaskEndEvent
from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime, get_dialect_name, with_row_locks
from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
Expand Down Expand Up @@ -250,23 +250,29 @@ def clean_unused(cls, *, session: Session = NEW_SESSION) -> None:
)

# Get all triggers that have no task instances, assets, or callbacks depending on them and delete them
ids = (
ids_query = (
select(cls.id)
.where(~cls.assets.any(), ~cls.callback.has())
.join(TaskInstance, cls.id == TaskInstance.trigger_id, isouter=True)
.group_by(cls.id)
.having(func.count(TaskInstance.trigger_id) == 0)
)
if get_dialect_name(session) == "mysql":
# MySQL doesn't support DELETE with JOIN, so we need to do it in two steps
ids_list = list(session.scalars(ids).all())
session.execute(
delete(Trigger).where(Trigger.id.in_(ids_list)).execution_options(synchronize_session=False)
)
else:
batch_size = conf.getint("triggerer", "unreferenced_triggers_cleanup_batch_size", fallback=500)

while True:
limited_ids_query = ids_query.limit(batch_size) if batch_size > 0 else ids_query
ids = list(session.scalars(limited_ids_query).all())

if not ids:
break

session.execute(
delete(Trigger).where(Trigger.id.in_(ids)).execution_options(synchronize_session=False)
)
session.commit()

if batch_size <= 0 or len(ids) < batch_size:
break

@classmethod
@provide_session
Expand Down
22 changes: 22 additions & 0 deletions airflow-core/tests/unit/models/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,28 @@ def test_clean_unused(session, dag_maker):
assert {result.id for result in results} == {trigger1.id, trigger4.id, trigger5.id, trigger6.id}


@conf_vars({("triggerer", "unreferenced_triggers_cleanup_batch_size"): "2"})
def test_clean_unused_batches_deletes(session, monkeypatch):
for index in range(5):
session.add(Trigger(classpath=f"airflow.triggers.testing.SuccessTrigger{index}", kwargs={}))
session.flush()

commit_calls = 0
original_commit = session.commit

def _counting_commit():
nonlocal commit_calls
commit_calls += 1
original_commit()

monkeypatch.setattr(session, "commit", _counting_commit)

Trigger.clean_unused(session=session)

assert session.scalar(select(func.count()).select_from(Trigger)) == 0
assert commit_calls == 3


@patch.object(TriggererCallback, "handle_event")
def test_submit_event(mock_callback_handle_event, session, create_task_instance):
"""
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1789,6 +1789,7 @@ unpause
unpaused
unpausing
unpredicted
unreferenced
unsanitized
unscoped
untestable
Expand Down