From 86bf01a40997ab7c4ac3458b4ebe2582f182c4d4 Mon Sep 17 00:00:00 2001 From: Andrew Chang Date: Sun, 28 Jun 2026 22:31:05 +0800 Subject: [PATCH 1/4] Parallelize per-dag auth checks in KeycloakAuthManager --- .../auth_manager/keycloak_auth_manager.py | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py index 545f78039942e..72133a2f10dba 100644 --- a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py +++ b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py @@ -22,6 +22,7 @@ import time import warnings from base64 import urlsafe_b64decode +from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Any from urllib.parse import urljoin @@ -34,6 +35,7 @@ from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager +from airflow.api_fastapi.auth.managers.models.resource_details import DagDetails from airflow.exceptions import AirflowProviderDeprecationWarning try: @@ -68,7 +70,6 @@ ConfigurationDetails, ConnectionDetails, DagAccessEntity, - DagDetails, PoolDetails, TeamDetails, VariableDetails, @@ -457,10 +458,24 @@ def filter_authorized_dag_ids( cache_key = (user.get_id(), method, team_name, frozenset(dag_ids)) def query_keycloak() -> set[str]: - kwargs: dict = dict(dag_ids=dag_ids, user=user, method=method) - if team_name is not None: - kwargs["team_name"] = team_name - return super(KeycloakAuthManager, self).filter_authorized_dag_ids(**kwargs) + if not dag_ids: + return set() + + max_workers = min( + len(dag_ids), conf.getint(CONF_SECTION_NAME, CONF_REQUESTS_POOL_SIZE_KEY, fallback=10) + ) + + def check(dag_id: str) -> tuple[str, bool]: + return dag_id, self.is_authorized_dag( + method=method, + user=user, + details=DagDetails(id=dag_id, team_name=team_name), + ) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + results = executor.map(check, dag_ids) + + return {dag_id for dag_id, authorized in results if authorized} return single_flight(cache_key, query_keycloak) From d29303beeb7b1d5d5b508d04d54ad6362d00342e Mon Sep 17 00:00:00 2001 From: Andrew Chang Date: Sun, 28 Jun 2026 23:39:21 +0800 Subject: [PATCH 2/4] Fix team_name args not found --- .../providers/keycloak/auth_manager/keycloak_auth_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py index 72133a2f10dba..4974718349eb4 100644 --- a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py +++ b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py @@ -466,10 +466,13 @@ def query_keycloak() -> set[str]: ) def check(dag_id: str) -> tuple[str, bool]: + details_kwargs: dict[str, Any] = {"id": dag_id} + if team_name is not None: + details_kwargs["team_name"] = team_name return dag_id, self.is_authorized_dag( method=method, user=user, - details=DagDetails(id=dag_id, team_name=team_name), + details=DagDetails(**details_kwargs), ) with ThreadPoolExecutor(max_workers=max_workers) as executor: From df27fbe6f098a05cc1d2c4658fd34cd1e10d411c Mon Sep 17 00:00:00 2001 From: Andrew Chang Date: Mon, 29 Jun 2026 16:27:50 +0800 Subject: [PATCH 3/4] Add comment of thread pool size decision --- .../providers/keycloak/auth_manager/keycloak_auth_manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py index 4974718349eb4..bd3e227da5040 100644 --- a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py +++ b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py @@ -460,7 +460,9 @@ def filter_authorized_dag_ids( def query_keycloak() -> set[str]: if not dag_ids: return set() - + # Cap workers at the HTTP connection pool size: each is_authorized_dag() call + # goes through the shared requests.Session, so extra threads would just block + # waiting for a free connection in urllib3's pool. max_workers = min( len(dag_ids), conf.getint(CONF_SECTION_NAME, CONF_REQUESTS_POOL_SIZE_KEY, fallback=10) ) From e80b1f76985c503c8a98722e207ea3997dd23d82 Mon Sep 17 00:00:00 2001 From: Andrew Chang Date: Mon, 29 Jun 2026 17:05:39 +0800 Subject: [PATCH 4/4] Add test for checking thread pool size --- .../test_keycloak_auth_manager.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py index 347e9944e5ef9..f7e58a3273809 100644 --- a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py +++ b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py @@ -18,6 +18,7 @@ import base64 import json +from concurrent.futures import ThreadPoolExecutor from contextlib import ExitStack from unittest.mock import Mock, patch @@ -56,6 +57,7 @@ CONF_CLIENT_ID_KEY, CONF_CLIENT_SECRET_KEY, CONF_REALM_KEY, + CONF_REQUESTS_POOL_SIZE_KEY, CONF_SECTION_NAME, CONF_SERVER_URL_KEY, ) @@ -1143,3 +1145,33 @@ def test_filter_authorized_dag_ids_cache_hit(self, mock_is_authorized, auth_mana assert result2 == dag_ids # is_authorized_dag should only be called for the first invocation (2 dag_ids × 1 call) assert mock_is_authorized.call_count == 2 + + @pytest.mark.parametrize( + ("dag_count", "pool_size", "expected_max_workers"), + [ + pytest.param(5, 10, 5, id="dag-count-smaller-than-pool"), + pytest.param(50, 10, 10, id="dag-count-larger-than-pool"), + pytest.param(10, 10, 10, id="dag-count-same-as-pool"), + ], + ) + @patch.object(KeycloakAuthManager, "is_authorized_dag", return_value=True) + @patch( + "airflow.providers.keycloak.auth_manager.keycloak_auth_manager.ThreadPoolExecutor", + wraps=ThreadPoolExecutor, + ) + def test_filter_authorized_dag_ids_caps_max_workers( + self, + mock_executor, + _mock_is_authorized, + auth_manager, + user, + dag_count, + pool_size, + expected_max_workers, + ): + dag_ids = {f"dag-{index}" for index in range(dag_count)} + + with conf_vars({(CONF_SECTION_NAME, CONF_REQUESTS_POOL_SIZE_KEY): str(pool_size)}): + auth_manager.filter_authorized_dag_ids(dag_ids=dag_ids, user=user) + + mock_executor.assert_called_once_with(max_workers=expected_max_workers)