| title | SQS Large Message Handling |
|---|---|
| description | Utility |
The large message handling utility handles SQS messages which have had their payloads offloaded to S3 due to them being larger than the SQS maximum.
The utility automatically retrieves messages which have been offloaded to S3 using the amazon-sqs-java-extended-client-lib client library. Once the message payloads have been processed successful the utility can delete the message payloads from S3.
This utility is compatible with versions 1.1.0+ of amazon-sqs-java-extended-client-lib.
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-extended-client-lib</artifactId>
<version>1.1.0</version>
</dependency>To install this utility, add the following dependency to your project.
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-sqs</artifactId>
<version>1.1.0</version>
</dependency>And configure the aspectj-maven-plugin to compile-time weave (CTW) the
aws-lambda-powertools-java aspects into your project. You may already have this
plugin in your pom. In that case add the dependency to the aspectLibraries
section.
<build>
<plugins>
...
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
<version>1.11</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<complianceLevel>1.8</complianceLevel>
<aspectLibraries>
<!-- highlight-start -->
<aspectLibrary>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-sqs</artifactId>
</aspectLibrary>
<!-- highlight-end -->
</aspectLibraries>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
...
</plugins>
</build>The annotation @SqsLargeMessage should be used with the handleRequest method of a class
which implements com.amazonaws.services.lambda.runtime.RequestHandler with
com.amazonaws.services.lambda.runtime.events.SQSEvent as the first parameter.
public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {
@Override
@SqsLargeMessage
public String handleRequest(SQSEvent sqsEvent, Context context) {
// process messages
return "ok";
}
}@SqsLargeMessage creates a default S3 Client AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient().
When the Lambda function is invoked with an event from SQS, each record received
in the SQSEvent will be checked to see if it's body contains a payload which has
been offloaded to S3. If it does then getObject(bucket, key) will be called,
and the payload retrieved. If there is an error during this process then the
function will fail with a FailedProcessingLargePayloadException exception.
If the request handler method returns without error then each payload will be
deleted from S3 using deleteObject(bucket, key)
To disable deletion of payloads setting the following annotation parameter:
@SqsLargeMessage(deletePayloads=false)
public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {
}If you want to avoid using annotation and have control over error that can happen during payload enrichment.
SqsUtils.enrichedMessageFromS3() provides you access with list of SQSMessage object enriched from S3 payload.
Original SQSEvent object is never mutated. You can also control if the S3 payload should be deleted after successful
processing. You can enrich messages from S3 with below code:
public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {
@Override
public String handleRequest(SQSEvent sqsEvent, Context context) {
Map<String, String> sqsMessage = SqsUtils.enrichedMessageFromS3(sqsEvent, sqsMessages -> {
// Some business logic
Map<String, String> someBusinessLogic = new HashMap<>();
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
return someBusinessLogic;
});
// Do not delete payload after processing.
Map<String, String> sqsMessage = SqsUtils.enrichedMessageFromS3(sqsEvent, false, sqsMessages -> {
// Some business logic
Map<String, String> someBusinessLogic = new HashMap<>();
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
return someBusinessLogic;
});
// Better control over exception during enrichment
try {
// Do not delete payload after processing.
SqsUtils.enrichedMessageFromS3(sqsEvent, false, sqsMessages -> {
// Some business logic
});
} catch (FailedProcessingLargePayloadException e) {
// handle any exception.
}
return "ok";
}
}