Skip to content

Parallelize per-dag auth checks in KeycloakAuthManager#69107

Open
Andrushika wants to merge 2 commits into
apache:mainfrom
Andrushika:keycloak-parallelize-dag-auth
Open

Parallelize per-dag auth checks in KeycloakAuthManager#69107
Andrushika wants to merge 2 commits into
apache:mainfrom
Andrushika:keycloak-parallelize-dag-auth

Conversation

@Andrushika

@Andrushika Andrushika commented Jun 28, 2026

Copy link
Copy Markdown
Contributor

As the issue reported, users who use Keycloak as an auth backend are facing over 10 seconds of UI loading speed.
I took a look and discovered that KeycloakAuthManager.filter_authorized_dag_ids() wraps the result with cache/single-flight, but on cache miss it falls back to the base auth manager implementation. The base implementation checks each Dag ID individually, so Keycloak sends one authorization HTTP request per Dag ID during that filter call.

To address this problem, I use ThreadPoolExecutor to run the per-dag checks concurrently.
I tried to simulate and wrote a script for a local benchmark. Here's the result:
(50ms latency per request, measured with perf_kit.repeat_and_time.timing)

Dags count Before After
25 1.36s 0.16s
100 5.42s 0.55s
250 13.51s 1.37s

The test script is as following:

Details
from __future__ import annotations

import time
from unittest.mock import Mock

from airflow.providers.keycloak.auth_manager.constants import (
    CONF_CLIENT_ID_KEY,
    CONF_CLIENT_SECRET_KEY,
    CONF_REALM_KEY,
    CONF_SECTION_NAME,
    CONF_SERVER_URL_KEY,
)
from airflow.providers.keycloak.auth_manager.keycloak_auth_manager import KeycloakAuthManager

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.perf.perf_kit.repeat_and_time import timing

LATENCY_SECONDS = 0.05
DAG_COUNTS = (25, 100, 250)


def build_slow_session(latency_seconds: float = LATENCY_SECONDS):

    session = Mock()

    def slow_post(*_, **__):
        time.sleep(latency_seconds)
        response = Mock()
        response.status_code = 200
        return response

    session.post = slow_post
    return session


def main() -> None:
    with conf_vars(
        {
            (CONF_SECTION_NAME, CONF_CLIENT_ID_KEY): "client_id",
            (CONF_SECTION_NAME, CONF_CLIENT_SECRET_KEY): "client_secret",
            (CONF_SECTION_NAME, CONF_REALM_KEY): "realm",
            (CONF_SECTION_NAME, CONF_SERVER_URL_KEY): "http://server.invalid",
        }
    ):
        for dag_count in DAG_COUNTS:
            dag_ids = {f"dag_{index}" for index in range(dag_count)}

            manager = KeycloakAuthManager()
            manager._http_session = build_slow_session()

            user = Mock()
            user.get_id.return_value = f"user_for_{dag_count}"
            user.access_token = "token"

            with timing():
                manager.filter_authorized_dag_ids(dag_ids=dag_ids, user=user, method="GET")
            print(f"  ({dag_count} dags @ {int(LATENCY_SECONDS * 1000)}ms latency)\n")


if __name__ == "__main__":
    main()

However, I found it hard to reproduce the production environment as described in #69041... I would really appreciate it if someone could help check if this really solved the problem.


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Code Opus 4.8 following the guidelines, reviewed by @Andrushika.


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@Andrushika Andrushika marked this pull request as ready for review June 28, 2026 15:40

@SameerMesiah97 SameerMesiah97 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me. I have left 2 comments. Also, I believe you should expand test coverage (either by adding new unit tests or modifying exist ones) to include the new concurrency logic. In particular, it would be good to verify that ThreadPoolExecutor is created with the expected max_workers value (covering both len(dag_ids) and the configured pool size), while ensuring the existing authorization behavior is preserved in both the single- and multi-worker cases.

details_kwargs: dict[str, Any] = {"id": dag_id}
if team_name is not None:
details_kwargs["team_name"] = team_name
return dag_id, self.is_authorized_dag(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you looked at whether is_authorized_dag() or the underlying HTTP client becomes the limiting factor here? It would be good to understand whether increasing the thread count beyond the HTTP connection pool size actually improves throughput.

return set()

max_workers = min(
len(dag_ids), conf.getint(CONF_SECTION_NAME, CONF_REQUESTS_POOL_SIZE_KEY, fallback=10)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the number of DAGs as the upper bound for the number of threads makes sense. But could you clarify the rationale behind using CONF_REQUESTS_POOL_SIZE_KEY? Correct me if I am wrong but I believe you are using it because it is the limit for the size of the HTTP connection pool for Keycloak requests. It would not hurt to add a comment here to explain why you are using these 2 variables to determine max_workers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Multi Team: /dags screen is very slow to load with multiple teams

2 participants