Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 37 additions & 32 deletions src/infraDeploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Configuration } from './configuration.js';
import { AwsCredentials } from './awsCredentials.js';
import { getModuleDirname } from './getDirname.js';
import { Logger } from './logger.js';
import { runWithConcurrency } from './utils/runWithConcurrency.js';
import * as crypto from 'crypto';

let lambdaClient: LambdaClient | undefined;
Expand Down Expand Up @@ -93,6 +94,9 @@ function getLambdaClient(): LambdaClient {
profile: Configuration.config.profile,
role: Configuration.config.role,
}),
// client-side rate limiting and generous retries to avoid TooManyRequestsException
retryMode: 'adaptive',
maxAttempts: 10,
});
}
return lambdaClient;
Expand All @@ -111,6 +115,9 @@ function getIAMClient(): IAMClient {
profile: Configuration.config.profile,
role: Configuration.config.role,
}),
// client-side rate limiting and generous retries to avoid TooManyRequestsException
retryMode: 'adaptive',
maxAttempts: 10,
});
}
return iamClient;
Expand Down Expand Up @@ -453,11 +460,11 @@ async function applyAddingInfra(changes: InfraAddingChanges) {
layerVersionArn = changes.existingLayerVersionArn;
}

const promises: Promise<void>[] = [];
const tasks: (() => Promise<void>)[] = [];

// Add LLD to functions
for (const lambdaData of changes.lambdasToAdd) {
promises.push(
tasks.push(() =>
addLayerToLambda({
...lambdaData,
layers: [
Expand All @@ -473,20 +480,20 @@ async function applyAddingInfra(changes: InfraAddingChanges) {

// Remove LLD from filtered functions
for (const lambdaData of changes.lambdasToRemove) {
promises.push(removeLayerFromLambda(lambdaData));
tasks.push(() => removeLayerFromLambda(lambdaData));
}

// Add policies to roles
for (const roleName of changes.rolesToAdd) {
promises.push(addPolicyToRole(roleName));
tasks.push(() => addPolicyToRole(roleName));
}

// Remove policies from roles
for (const roleName of changes.rolesToRemove) {
promises.push(removePolicyFromLambdaRole(roleName));
tasks.push(() => removePolicyFromLambdaRole(roleName));
}

await Promise.all(promises);
await runWithConcurrency(tasks);
}

/**
Expand All @@ -508,8 +515,8 @@ async function getInfraChangesForAdding(): Promise<InfraAddingChanges> {
(l) => l.filteredOut === true,
);

const lambdasToUpdatePromise = Promise.all(
configLambdasUpdate.map(async (func) => {
const lambdasToUpdatePromise = runWithConcurrency(
configLambdasUpdate.map((func) => async () => {
const lambdaUpdate = await analyzeLambdaAdd(
func.functionName,
existingLayer?.LayerVersionArn,
Expand All @@ -518,25 +525,25 @@ async function getInfraChangesForAdding(): Promise<InfraAddingChanges> {
}),
);

const lambdasToRemovePromise = Promise.all(
configLambdasRemove.map(async (func) => {
return analyzeLambdaRemove(func.functionName);
}),
const lambdasToRemovePromise = runWithConcurrency(
configLambdasRemove.map(
(func) => () => analyzeLambdaRemove(func.functionName),
),
);

// Get all role names for lambdas to update, ensure uniqueness, then analyze
const roleNamesToAddSet = new Set<string>();
const roleNamesToAddPromise = Promise.all(
configLambdasUpdate.map(async (func) => {
const roleNamesToAddPromise = runWithConcurrency(
configLambdasUpdate.map((func) => async () => {
const roleName = await getRoleNameFromFunction(func.functionName);
roleNamesToAddSet.add(roleName);
}),
);

// Get all role names for lambdas to remove, ensure uniqueness, then analyze
const roleNamesToRemoveSet = new Set<string>();
const roleNamesToRemovePromise = Promise.all(
configLambdasRemove.map(async (func) => {
const roleNamesToRemovePromise = runWithConcurrency(
configLambdasRemove.map((func) => async () => {
const roleName = await getRoleNameFromFunction(func.functionName);
roleNamesToRemoveSet.add(roleName);
}),
Expand All @@ -546,8 +553,8 @@ async function getInfraChangesForAdding(): Promise<InfraAddingChanges> {
await roleNamesToAddPromise;

const roleNamesToAdd = Array.from(roleNamesToAddSet);
const rolesToAddPromise = Promise.all(
roleNamesToAdd.map(async (roleName) => {
const rolesToAddPromise = runWithConcurrency(
roleNamesToAdd.map((roleName) => async () => {
const roleUpdate = await analyzeRoleAdd(roleName);
return roleUpdate.addPolicy ? roleUpdate.roleName : undefined;
}),
Expand All @@ -563,8 +570,8 @@ async function getInfraChangesForAdding(): Promise<InfraAddingChanges> {
(role) => !roleNamesToAdd.includes(role),
);

const rolesToRemovePromise = Promise.all(
roleNamesToRemove.map(async (roleName) => {
const rolesToRemovePromise = runWithConcurrency(
roleNamesToRemove.map((roleName) => async () => {
const roleRemoval = await analyzeRoleRemove(roleName);
return roleRemoval.needToRemovePolicy ? roleRemoval.roleName : undefined;
}),
Expand Down Expand Up @@ -700,25 +707,23 @@ async function getInfraChangesForRemoving(): Promise<InfraRemovalChanges> {

const allLambdas = Configuration.getLambdasAll();

const lambdasToRemovePromise = Promise.all(
allLambdas.map(async (func) => {
return analyzeLambdaRemove(func.functionName);
}),
const lambdasToRemovePromise = runWithConcurrency(
allLambdas.map((func) => () => analyzeLambdaRemove(func.functionName)),
);

// Get all role names for lambdas to remove, ensure uniqueness, then analyze
const roleNamesToRemoveSet = new Set<string>();
await Promise.all(
allLambdas.map(async (func) => {
await runWithConcurrency(
allLambdas.map((func) => async () => {
const roleName = await getRoleNameFromFunction(func.functionName);
roleNamesToRemoveSet.add(roleName);
}),
);

const roleNamesToRemove = Array.from(roleNamesToRemoveSet);

const rolesToRemovePromise = Promise.all(
roleNamesToRemove.map(async (roleName) => {
const rolesToRemovePromise = runWithConcurrency(
roleNamesToRemove.map((roleName) => async () => {
const roleRemoval = await analyzeRoleRemove(roleName);
return roleRemoval.needToRemovePolicy ? roleRemoval.roleName : undefined;
}),
Expand All @@ -744,17 +749,17 @@ async function getInfraChangesForRemoving(): Promise<InfraRemovalChanges> {
async function applyRemoveInfra(changes: InfraRemovalChanges) {
Logger.verbose('Starting infrastructure removal');

const promises: Promise<void>[] = [];
const tasks: (() => Promise<void>)[] = [];

for (const lambdaData of changes.lambdasToRemove) {
promises.push(removeLayerFromLambda(lambdaData));
tasks.push(() => removeLayerFromLambda(lambdaData));
}

for (const roleName of changes.rolesToRemove) {
promises.push(removePolicyFromLambdaRole(roleName));
tasks.push(() => removePolicyFromLambdaRole(roleName));
}

await Promise.all(promises);
await runWithConcurrency(tasks);
}

/**
Expand Down
27 changes: 27 additions & 0 deletions src/utils/runWithConcurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Run async tasks with a limited number of tasks running concurrently,
* for example to avoid hitting AWS API rate limits.
* @param tasks
* @param limit
* @returns results in the same order as the tasks
*/
export async function runWithConcurrency<T>(
tasks: (() => Promise<T>)[],
limit: number = 5,
): Promise<T[]> {
const results = new Array<T>(tasks.length);
let nextTaskIndex = 0;

const workers = Array.from(
{ length: Math.min(limit, tasks.length) },
async () => {
while (nextTaskIndex < tasks.length) {
const taskIndex = nextTaskIndex++;
results[taskIndex] = await tasks[taskIndex]();
}
},
);

await Promise.all(workers);
return results;
}
Loading