| title | SQS Batch Processing |
|---|---|
| description | Utility |
import Note from "../../src/components/Note"
The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS.
Key Features
- Prevent successfully processed messages from being returned to SQS
- A simple interface for individually processing messages from a batch
Background
When using SQS as a Lambda event source mapping, Lambda functions can be triggered with a batch of messages from SQS.
If your function fails to process any message from the batch, the entire batch returns to your SQS queue, and your Lambda function will be triggered with the same batch again.
With this utility, messages within a batch will be handled individually - only messages that were not successfully processed are returned to the queue.
While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible.More details on how Lambda works with SQS can be found in the AWS documentation
To install this utility, add the following dependency to your project.
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-sqs</artifactId>
<version>1.1.0</version>
</dependency>And configure the aspectj-maven-plugin to compile-time weave (CTW) the
aws-lambda-powertools-java aspects into your project. You may already have this
plugin in your pom. In that case add the dependency to the aspectLibraries
section.
<build>
<plugins>
...
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
<version>1.11</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<complianceLevel>1.8</complianceLevel>
<aspectLibraries>
<!-- highlight-start -->
<aspectLibrary>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-sqs</artifactId>
</aspectLibrary>
<!-- highlight-end -->
</aspectLibraries>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
...
</plugins>
</build>IAM Permissions
This utility requires additional permissions to work as expected. Lambda functions using this utility require the sqs:GetQueueUrl and sqs:DeleteMessageBatch permission.
You can use either SqsBatch annotation, or SqsUtils Utility API as a fluent API.
Both have nearly the same behaviour when it comes to processing messages from the batch:
- Entire batch has been successfully processed, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
- Entire Batch has been partially processed successfully, where exceptions were raised within your
SqsMessageHandlerinterface implementation, we will:- 1) Delete successfully processed messages from the queue by directly calling
sqs:DeleteMessageBatch - 2) Raise
SQSBatchProcessingExceptionto ensure failed messages return to your SQS queue
- 1) Delete successfully processed messages from the queue by directly calling
The only difference is that SqsUtils Utility API will give you access to return from the processed messages if you need. Exception SQSBatchProcessingException thrown from the
utility will have access to both successful and failed messaged along with failure exceptions.
Both annotation and SqsUtils Utility API requires an implementation of functional interface SqsMessageHandler.
This implementation is responsible for processing each individual message from the batch, and to raise an exception if unable to process any of the messages sent.
Any non-exception/successful return from your record handler function will instruct utility to queue up each individual message for deletion.
When using this annotation, you need provide a class implementation of SqsMessageHandler that will process individual messages from the batch - It should raise an exception if it is unable to process the record.
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
- Any successfully processed messages, we will delete them from the queue via
sqs:DeleteMessageBatch - Any unprocessed messages detected, we will raise
SQSBatchProcessingExceptionto ensure failed messages return to your SQS queue
SqsMessageHandler#process() function.
public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
@Override
@SqsBatch(SampleMessageHandler.class) // highlight-line
public String handleRequest(SQSEvent input, Context context) {
return "{\"statusCode\": 200}";
}
public class SampleMessageHandler implements SqsMessageHandler<Object> {
@Override
public String process(SQSMessage message) {
// This will be called for each individual message from a batch
// It should raise an exception if the message was not processed successfully
String returnVal = doSomething(message.getBody());
return returnVal;
}
}
}If you require access to the result of processed messages, you can use this utility.
The result from calling SqsUtils#batchProcessor() on the context manager will be a list of all the return values from your SqsMessageHandler#process() function.
public class AppSqsEvent implements RequestHandler<SQSEvent, List<String>> {
@Override
public List<String> handleRequest(SQSEvent input, Context context) {
List<String> returnValues = SqsUtils.batchProcessor(input, SampleMessageHandler.class); // highlight-line
return returnValues;
}
public class SampleMessageHandler implements SqsMessageHandler<String> {
@Override
public String process(SQSMessage message) {
// This will be called for each individual message from a batch
// It should raise an exception if the message was not processed successfully
String returnVal = doSomething(message.getBody());
return returnVal;
}
}
}You can also use the utility in a more functional way` by providing inline implementation of functional interface SqsMessageHandler#process()
public class AppSqsEvent implements RequestHandler<SQSEvent, List<String>> {
@Override
public List<String> handleRequest(SQSEvent input, Context context) {
// highlight-start
List<String> returnValues = SqsUtils.batchProcessor(input, (message) -> {
// This will be called for each individual message from a batch
// It should raise an exception if the message was not processed successfully
String returnVal = doSomething(message.getBody());
return returnVal;
});
// highlight-end
return returnValues;
}
}If you need to pass custom SqsClient such as region to the SDK, you can pass your own SqsClient to be used by utility either for
SqsBatch annotation, or SqsUtils Utility API.
public class AppSqsEvent implements RequestHandler<SQSEvent, List<String>> {
// highlight-start
static {
SqsUtils.overrideSqsClient(SqsClient.builder()
.build());
}
// highlight-end
@Override
public List<String> handleRequest(SQSEvent input, Context context) {
List<String> returnValues = SqsUtils.batchProcessor(input, SampleMessageHandler.class);
return returnValues;
}
public class SampleMessageHandler implements SqsMessageHandler<String> {
@Override
public String process(SQSMessage message) {
// This will be called for each individual message from a batch
// It should raise an exception if the message was not processed successfully
String returnVal = doSomething(message.getBody());
return returnVal;
}
}
}If you want to disable the default behavior where SQSBatchProcessingException is raised if there are any exception, you can pass the suppressException boolean argument.
Within SqsBatch annotation
...
@Override
@SqsBatch(value = SampleMessageHandler.class, suppressException = true) // highlight-line
public String handleRequest(SQSEvent input, Context context) {
return "{\"statusCode\": 200}";
}Within SqsUtils Utility API
@Override
public List<String> handleRequest(SQSEvent input, Context context) {
List<String> returnValues = SqsUtils.batchProcessor(input, true, SampleMessageHandler.class); // highlight-line
return returnValues;
}