Skip to content

Commit b76b92e

Browse files
authored
fix(aws) Add batching to avoid threads hanging on s3 and ecr GetDetails (#2423)
### Type of change - [x] Bug fix (non-breaking change that fixes an issue) ### Summary fix(aws) Add batching to avoid threads hanging when getting s3 and ecr details ### Related issues or links With the increase in the scale of our infrastructure, we are seeing threads hanging in `s3` and `ecr` syncs. The root cause for s3 is that there is no max limit to the connection pool to AWS API and all bucket details are being requested at once. For ecr if a repository has a very large volume of images (think > 500k) querying all details could also deplete and hangout the running threads. ### Breaking changes No breaking change, but might slowdown aws syncs. ### How was this tested? - All unit and integration tests pass. - Running a local sync for each module. Signed-off-by: Eryx Paredes <eryxp@lyft.com>
1 parent a15c308 commit b76b92e

3 files changed

Lines changed: 34 additions & 9 deletions

File tree

cartography/intel/aws/ecr.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
}
2828

2929

30+
REPO_BATCH_SIZE = 100
31+
32+
3033
@timeit
3134
@aws_handle_regions
3235
def get_ecr_repositories(
@@ -378,7 +381,9 @@ async def async_get_images(repo: Dict[str, Any]) -> None:
378381

379382
# Sort repositories by name to ensure consistent processing order
380383
sorted_repos = sorted(repositories, key=lambda x: x["repositoryName"])
381-
to_synchronous(*[async_get_images(repo) for repo in sorted_repos])
384+
for i in range(0, len(sorted_repos), REPO_BATCH_SIZE):
385+
batch = sorted_repos[i : i + REPO_BATCH_SIZE]
386+
to_synchronous(*[async_get_images(repo) for repo in batch])
382387

383388
return image_data
384389

cartography/intel/aws/s3.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import boto3
1414
import botocore
15+
import botocore.config
1516
import neo4j
1617
from botocore.exceptions import ClientError
1718
from botocore.exceptions import EndpointConnectionError
@@ -41,6 +42,8 @@
4142
logger = logging.getLogger(__name__)
4243
stat_handler = get_stats_client(__name__)
4344

45+
BUCKET_BATCH_SIZE = 50
46+
4447

4548
# Sentinel value to indicate a fetch operation failed (vs None for "no configuration")
4649
# When a fetch returns FETCH_FAILED, we skip loading that property group to preserve existing data.
@@ -66,7 +69,10 @@ def __repr__(self):
6669

6770
@timeit
6871
def get_s3_bucket_list(boto3_session: boto3.session.Session) -> List[Dict]:
69-
client = boto3_session.client("s3")
72+
client = boto3_session.client(
73+
"s3",
74+
config=botocore.config.Config(max_pool_connections=50),
75+
)
7076
# NOTE no paginator available for this operation
7177
buckets = client.list_buckets()
7278
for bucket in buckets["Buckets"]:
@@ -135,7 +141,11 @@ async def _get_bucket_detail(bucket: Dict[str, Any]) -> BucketDetail:
135141
# in us-east-1 region
136142
client = s3_regional_clients.get(bucket["Region"])
137143
if not client:
138-
client = boto3_session.client("s3", bucket["Region"])
144+
client = boto3_session.client(
145+
"s3",
146+
bucket["Region"],
147+
config=botocore.config.Config(max_pool_connections=50),
148+
)
139149
s3_regional_clients[bucket["Region"]] = client
140150
(
141151
acl,
@@ -165,10 +175,13 @@ async def _get_bucket_detail(bucket: Dict[str, Any]) -> BucketDetail:
165175
bucket_logging,
166176
)
167177

168-
bucket_details = to_synchronous(
169-
*[_get_bucket_detail(bucket) for bucket in bucket_data["Buckets"]],
170-
)
171-
yield from bucket_details
178+
buckets = bucket_data["Buckets"]
179+
for i in range(0, len(buckets), BUCKET_BATCH_SIZE):
180+
batch = buckets[i : i + BUCKET_BATCH_SIZE]
181+
bucket_details = to_synchronous(
182+
*[_get_bucket_detail(bucket) for bucket in batch],
183+
)
184+
yield from bucket_details
172185

173186

174187
@timeit
@@ -1249,7 +1262,10 @@ def _sync_s3_notifications(
12491262
Sync S3 bucket notification configurations to Neo4j.
12501263
"""
12511264
logger.info("Syncing S3 bucket notifications")
1252-
s3_client = boto3_session.client("s3")
1265+
s3_client = boto3_session.client(
1266+
"s3",
1267+
config=botocore.config.Config(max_pool_connections=BUCKET_BATCH_SIZE),
1268+
)
12531269
notifications = []
12541270

12551271
for bucket in bucket_data["Buckets"]:

cartography/util.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1038,7 +1038,11 @@ def wrapper(*args: Any, **kwargs: Any) -> R:
10381038
raise
10391039

10401040
# don't use @backoff as decorator, to preserve typing
1041-
wrapped = backoff.on_exception(backoff.expo, CartographyThrottlingException)(
1041+
wrapped = backoff.on_exception(
1042+
backoff.expo,
1043+
CartographyThrottlingException,
1044+
max_time=300,
1045+
)(
10421046
wrapper,
10431047
)
10441048
call = partial(wrapped, *args, **kwargs)

0 commit comments

Comments
 (0)