Skip to content

Use NOT EXISTS anti-join in Trigger.clean_unused and added skip locked#68244

Open
AntonioBergonzi wants to merge 4 commits into
apache:mainfrom
AntonioBergonzi:ab/improve_slow_delete
Open

Use NOT EXISTS anti-join in Trigger.clean_unused and added skip locked#68244
AntonioBergonzi wants to merge 4 commits into
apache:mainfrom
AntonioBergonzi:ab/improve_slow_delete

Conversation

@AntonioBergonzi

@AntonioBergonzi AntonioBergonzi commented Jun 8, 2026

Copy link
Copy Markdown

Description

Changed the query that checks for triggers that have no more callbacks, assets or task instances associated to them from count + aggregate to anti join on task_instance.
The result is the same (we are checking for non existence) but the query is more efficient. Also added skip locked since we run multiple triggerers and the different queries interfere with each other.

I did not add tests since I'm not introducing a new functionality, so the current one should cover the code change. If you think additional tests are needed, please let me know.

Testing

Plans of the queries OLD QUERY:
EXPLAIN
DELETE
FROM trigger
WHERE trigger.id IN (SELECT trigger.id
                     FROM trigger
                              LEFT OUTER JOIN task_instance ON trigger.id = task_instance.trigger_id
                     WHERE NOT (EXISTS (SELECT 1
                                        FROM asset_watcher
                                        WHERE trigger.id = asset_watcher.trigger_id
                                          AND (EXISTS (SELECT 1
                                                       FROM asset
                                                       WHERE asset.id = asset_watcher.asset_id)))
                         )
                       AND NOT (EXISTS (SELECT 1
                                        FROM callback
                                        WHERE trigger.id = callback.trigger_id))
                     GROUP BY trigger.id
                     HAVING count(task_instance.trigger_id) = 0);

yields

-[ RECORD 1 ]---------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN | Delete on trigger  (cost=49.31..93458.14 rows=0 width=0)
-[ RECORD 2 ]---------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |   ->  Nested Loop  (cost=49.31..93458.14 rows=1 width=34)
-[ RECORD 3 ]---------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |         ->  Subquery Scan on "ANY_subquery"  (cost=49.04..93455.59 rows=1 width=32)
-[ RECORD 4 ]---------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |               ->  GroupAggregate  (cost=49.04..93455.58 rows=1 width=4)
-[ RECORD 5 ]---------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                     Group Key: trigger_1.id
-[ RECORD 6 ]---------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                     Filter: (count(task_instance.trigger_id) = 0)
-[ RECORD 7 ]---------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                     ->  Nested Loop Left Join  (cost=49.04..192.44 rows=18652448 width=8)
-[ RECORD 8 ]---------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                           ->  Nested Loop Anti Join  (cost=48.61..72.06 rows=72 width=4)
-[ RECORD 9 ]---------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                 ->  Merge Anti Join  (cost=48.46..48.83 rows=72 width=4)
-[ RECORD 10 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                       Merge Cond: (trigger_1.id = asset_watcher.trigger_id)
-[ RECORD 11 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                       ->  Sort  (cost=45.94..46.12 rows=72 width=4)
-[ RECORD 12 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                             Sort Key: trigger_1.id
-[ RECORD 13 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                             ->  Seq Scan on trigger trigger_1  (cost=0.00..43.72 rows=72 width=4)
-[ RECORD 14 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                       ->  Sort  (cost=2.52..2.52 rows=1 width=4)
-[ RECORD 15 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                             Sort Key: asset_watcher.trigger_id
-[ RECORD 16 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                             ->  Nested Loop  (cost=0.29..2.51 rows=1 width=4)
-[ RECORD 17 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                                   ->  Seq Scan on asset_watcher  (cost=0.00..0.00 rows=1 width=8)
-[ RECORD 18 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                                   ->  Index Only Scan using asset_pkey on asset  (cost=0.29..2.51 rows=1 width=4)
-[ RECORD 19 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                                         Index Cond: (id = asset_watcher.asset_id)
-[ RECORD 20 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                 ->  Index Only Scan using idx_callback_trigger_id on callback  (cost=0.15..0.32 rows=1 width=4)
-[ RECORD 21 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                       Index Cond: (trigger_id = trigger_1.id)
-[ RECORD 22 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                           ->  Index Only Scan using ti_trigger_id on task_instance  (cost=0.44..1.66 rows=1 width=4)
-[ RECORD 23 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                 Index Cond: (trigger_id = trigger_1.id)
-[ RECORD 24 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |         ->  Index Scan using trigger_pkey on trigger  (cost=0.27..2.49 rows=1 width=10)
-[ RECORD 25 ]--------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |               Index Cond: (id = "ANY_subquery".id)

NEW QUERY:

EXPLAIN
DELETE
FROM trigger
WHERE trigger.id IN (SELECT trigger.id
                     FROM trigger
                     WHERE NOT (EXISTS (SELECT 1
                                        FROM asset_watcher

                                        WHERE trigger.id = asset_watcher.trigger_id
                                          AND (EXISTS (SELECT 1
                                                       FROM asset
                                                       WHERE asset.id = asset_watcher.asset_id)))
                         )
                       AND NOT (EXISTS (SELECT 1
                                        FROM callback

                                        WHERE trigger.id = callback.trigger_id))
                       AND NOT (EXISTS (SELECT 1
                                        FROM task_instance

                                        WHERE trigger.id = task_instance.trigger_id)) FOR
UPDATE SKIP LOCKED
    );

yields

-[ RECORD 1 ]-----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN | Delete on trigger  (cost=328.23..372.97 rows=0 width=0)
-[ RECORD 2 ]-----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |   ->  Hash Semi Join  (cost=328.23..372.97 rows=73 width=34)
-[ RECORD 3 ]-----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |         Hash Cond: (trigger.id = "ANY_subquery".id)
-[ RECORD 4 ]-----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |         ->  Seq Scan on trigger  (cost=0.00..43.73 rows=73 width=10)
-[ RECORD 5 ]-----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |         ->  Hash  (cost=327.32..327.32 rows=73 width=32)
-[ RECORD 6 ]-----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |               ->  Subquery Scan on "ANY_subquery"  (cost=0.88..327.32 rows=73 width=32)
-[ RECORD 7 ]-----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                     ->  LockRows  (cost=0.88..326.59 rows=73 width=34)
-[ RECORD 8 ]-----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                           ->  Nested Loop Anti Join  (cost=0.88..325.86 rows=73 width=34)
-[ RECORD 9 ]-----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                 ->  Nested Loop Anti Join  (cost=0.44..129.83 rows=73 width=28)
-[ RECORD 10 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                       ->  Nested Loop Anti Join  (cost=0.29..47.33 rows=73 width=22)
-[ RECORD 11 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                             Join Filter: (trigger_1.id = asset_watcher.trigger_id)
-[ RECORD 12 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                             ->  Seq Scan on trigger trigger_1  (cost=0.00..43.73 rows=73 width=10)
-[ RECORD 13 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                             ->  Materialize  (cost=0.29..2.51 rows=1 width=16)
-[ RECORD 14 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                                   ->  Nested Loop  (cost=0.29..2.51 rows=1 width=16)
-[ RECORD 15 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                                         ->  Seq Scan on asset_watcher  (cost=0.00..0.00 rows=1 width=14)
-[ RECORD 16 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                                         ->  Index Scan using asset_pkey on asset  (cost=0.29..2.51 rows=1 width=10)
-[ RECORD 17 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                                               Index Cond: (id = asset_watcher.asset_id)
-[ RECORD 18 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                       ->  Index Scan using idx_callback_trigger_id on callback  (cost=0.15..1.12 rows=1 width=10)
-[ RECORD 19 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                             Index Cond: (trigger_id = trigger_1.id)
-[ RECORD 20 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                 ->  Index Scan using ti_trigger_id on task_instance  (cost=0.44..2.66 rows=1 width=10)
-[ RECORD 21 ]----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN |                                       Index Cond: (trigger_id = trigger_1.id)

Also we stopped having queries that crashed the triggerer image
Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
    Claude Sonnet 1M

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@boring-cyborg

boring-cyborg Bot commented Jun 8, 2026

Copy link
Copy Markdown

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example Dag that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@AntonioBergonzi AntonioBergonzi changed the title Use NOT EXISTS anti-join in Trigger.clean_unused instead of aggregate Use NOT EXISTS anti-join in Trigger.clean_unused and add index in callback table Jun 12, 2026
@AntonioBergonzi AntonioBergonzi force-pushed the ab/improve_slow_delete branch from c0f8d0c to 725b143 Compare June 12, 2026 13:56
@AntonioBergonzi AntonioBergonzi changed the title Use NOT EXISTS anti-join in Trigger.clean_unused and add index in callback table Use NOT EXISTS anti-join in Trigger.clean_unused instead of aggregating Jun 12, 2026
@AntonioBergonzi AntonioBergonzi changed the title Use NOT EXISTS anti-join in Trigger.clean_unused instead of aggregating Use NOT EXISTS anti-join in Trigger.clean_unused and added skip locked Jun 12, 2026
@AntonioBergonzi AntonioBergonzi marked this pull request as ready for review June 12, 2026 14:06

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes Trigger.clean_unused() by switching from an OUTER JOIN + GROUP BY/HAVING count-based approach to a NOT EXISTS-style anti-join when identifying unused triggers, and adds row-level locking with SKIP LOCKED to reduce interference when multiple triggerers are running concurrently.

Changes:

  • Replace JOIN/aggregate “no TaskInstance rows” check with ~cls.task_instance.has() (anti-join / NOT EXISTS semantics).
  • Add SKIP LOCKED row locking to avoid concurrent triggerers contending for the same trigger rows during cleanup.

Comment on lines 255 to 261
.where(
~cls.assets.any(),
~cls.callback.has(),
~cls.task_instance.has(),
)
.with_for_update(skip_locked=True)
)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can cherry pick this AntonioBergonzi@95b2489 in this branch. However I noticed we use with_for_update with skip_locked in other parts of the code (in the scheduler, for example)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hussein-awala I resolved this. let me know if I should unresolve

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure with_for_update here is right -- it is locking the TI table, but you are never updating or touchingthem -- you are instead deleting things in the Trigger table.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SELECT ... FOR UPDATE is right, then please use the existing with_row_locks existing helper.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!
Addressed in ab47768, I used the helper and scoped the locking to the trigger table.

@hussein-awala hussein-awala left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I went through this carefully since it rewrites the delete predicate, and I'm satisfied it's a safe, well-motivated optimization.

For SKIP LOCKED: Good call for multi-triggerer, a single clean_unused() pass may now skip rows locked by a concurrent triggerer and leave them for the next run instead of always sweeping everything, which is the right trade-off.

@hussein-awala hussein-awala added the type:improvement Changelog: Improvements label Jun 17, 2026
@hussein-awala hussein-awala added this to the Airflow 3.3.0 milestone Jun 17, 2026
@hussein-awala hussein-awala added the ready for maintainer review Set after triaging when all criteria pass. label Jun 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Triggerer ready for maintainer review Set after triaging when all criteria pass. type:improvement Changelog: Improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants