Resilience in AWS Step Functions with Lambdas

My AWS Step Function is exhausting my Lambda pool and keeps falling over!

Woman frustrated at computer

Audience and Aim

At the time of writing the maximum open Step Function executions per account is 1,000,000. However, the default number of concurrent Lambda executions you may have per account is 1,000. This means if you have 1,000,000 Step Function executions each calling a single Lambda, then you will receive a ‘Too Many Concurrent Invocations’ exception when the 1,000 limit is exceeded.

The purpose of this article is to address this issue, using a tactic inspired by a previous AWS pattern.

Argument

The approaches included:

  • Our Step Function is triggered by a Lambda. One approach was to throttle this Lambda to a minimal number of invocations in order to reduce the number of concurrent Step Functions being triggered at a time. However, the consistent high traffic combined with the step function using several Lambdas over a number of seconds meant that executions backed up and quickly exceeded the limits.
  • The next approach was to have the Lambda wait until the previous execution it triggered was finished before starting another one. However, this was not cost effective as the Lambdas remained up (and billable) while waiting. Additionally we needed to poll the Step Function API for completions and found we exceeded their API rate limits.

The final approach was to introduce an asynchronous pattern using SQS queues.

Asynchronous pattern for calling lambdas

In the above pattern we do not directly call the Lambdas. Instead we introduce a queue before them, pushing onto the queue from the Step Function and then returning the result directly to the function from the Lambda.

To offer some code snippets, previously we called our Lambda using the below JSON in our Workflow definition:

"Step Name": {
"Type": "Task",
"InputPath": "$",
"Resource": "<Lambda ARN>",
"Next": "Next Step Name",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 60,
"BackoffRate": 2.0,
"MaxAttempts": 2
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.errorInfo",
"Next": "Error Step Name"
}
]
}

This was then replaced by the following:

"Step Name": {
"Type": "Task",
"TimeoutSeconds": 1800,
"Next": "Next Step Name",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "<Replace with your queue URL>",
"MessageBody": {
"CurrentBodyOfWorkflowFunction.$": "$",
"TaskToken.$": "$$.Task.Token"
}
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Error Step Name",
"ResultPath": "$.errorInfo"
}
],
"Retry": [
{
"BackoffRate": 2,
"ErrorEquals": [
"SQS.SdkClientException",
"SQS.AmazonSQSException"
],
"IntervalSeconds": 60,
"MaxAttempts": 2
}
]
}

The key components of the above are:

  • TimeoutSeconds: The step function has no way of knowing if the step has been successful without being manually told by the Lambda. It is important to set a sensible timeout, after which we consider the step failed. This means if something goes wrong with taking the item from the queue the step won’t wait forever. Note, by adding a DLQ we can retrieve any failed messages from there.
  • MessageBody: There are two components to the MessageBody. We see we pass the entire of the current input of the Step Function step to Lambda via the CurrentBodyOfWorkflowFunction variable. We then also pass a TaskToken — this is what allows us to map the queue message to the particular Workflow execution.
  • The Retry Block: This is important as under high traffic the SQS SDK is not 100% reliable. Occasionally we may see a failure to post a queue message, so a retry block is very useful.

Our Lambdas were then altered to return the results to the Step Function. Previously any results written to the Lambda output would be sent directly to the execution, however now as there is the additional abstraction of the queue we need a way of communicating them to the Workflow function.

We are using the Java V2 SDK and so our request handlers appear similar to the below:

public class LambdaHandler implements RequestStreamHandler {
// Imports
@Override
public void handleRequest(
InputStream inputStream,
OutputStream outputStream,
Context context
) throws IOException {
for (JsonNode sqsJsonNode : OBJECT_MAPPER.readTree(inputStream).get("Records")) {

QueuedMessageDto queuedMessageDto = OBJECT_MAPPER.readValue(
sqsJsonNode.get("body").asText().replace("\n", ""),
QueuedMessageDto.class
);
try { // Carry out Lambda functionality SendTaskSuccessRequest sendTaskSuccessRequest =
SendTaskSuccessRequest.builder()
.taskToken(queuedMessageDto.getTaskToken())
.output(outputString)
.build();

sfnClient.sendTaskSuccess(sendTaskSuccessRequest);
} catch (Exception e) {

SendTaskFailureRequest sendTaskFailureRequest =
SendTaskFailureRequest.builder()
.taskToken(queuedMessageDto.getTaskToken())
.cause(e.toString())
.build();
sfnClient.sendTaskFailure(sendTaskFailureRequest); }
}
}
}

We see we manually send responses back to the Step Function using the Task Token provided by the execution.

Conclusion

Senior Software Engineer at the BBC

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store