Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import time
import warnings
from base64 import urlsafe_b64decode
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin

Expand All @@ -34,6 +35,7 @@

from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX
from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
from airflow.api_fastapi.auth.managers.models.resource_details import DagDetails
from airflow.exceptions import AirflowProviderDeprecationWarning

try:
Expand Down Expand Up @@ -68,7 +70,6 @@
ConfigurationDetails,
ConnectionDetails,
DagAccessEntity,
DagDetails,
PoolDetails,
TeamDetails,
VariableDetails,
Expand Down Expand Up @@ -457,10 +458,27 @@ def filter_authorized_dag_ids(
cache_key = (user.get_id(), method, team_name, frozenset(dag_ids))

def query_keycloak() -> set[str]:
kwargs: dict = dict(dag_ids=dag_ids, user=user, method=method)
if team_name is not None:
kwargs["team_name"] = team_name
return super(KeycloakAuthManager, self).filter_authorized_dag_ids(**kwargs)
if not dag_ids:
return set()

max_workers = min(
len(dag_ids), conf.getint(CONF_SECTION_NAME, CONF_REQUESTS_POOL_SIZE_KEY, fallback=10)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the number of DAGs as the upper bound for the number of threads makes sense. But could you clarify the rationale behind using CONF_REQUESTS_POOL_SIZE_KEY? Correct me if I am wrong but I believe you are using it because it is the limit for the size of the HTTP connection pool for Keycloak requests. It would not hurt to add a comment here to explain why you are using these 2 variables to determine max_workers.

)

def check(dag_id: str) -> tuple[str, bool]:
details_kwargs: dict[str, Any] = {"id": dag_id}
if team_name is not None:
details_kwargs["team_name"] = team_name
return dag_id, self.is_authorized_dag(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you looked at whether is_authorized_dag() or the underlying HTTP client becomes the limiting factor here? It would be good to understand whether increasing the thread count beyond the HTTP connection pool size actually improves throughput.

method=method,
user=user,
details=DagDetails(**details_kwargs),
)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(check, dag_ids)

return {dag_id for dag_id, authorized in results if authorized}

return single_flight(cache_key, query_keycloak)

Expand Down
Loading