1+ /*
2+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
3+ * Licensed under the Apache License, Version 2.0 (the
4+ * "License"); you may not use this file except in compliance
5+ * with the License. You may obtain a copy of the License at
6+ * http://www.apache.org/licenses/LICENSE-2.0
7+ * Unless required by applicable law or agreed to in writing, software
8+ * distributed under the License is distributed on an "AS IS" BASIS,
9+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+ * See the License for the specific language governing permissions and
11+ * limitations under the License.
12+ *
13+ */
14+
15+ package software .amazon .lambda .powertools .e2e ;
16+
17+ import com .amazonaws .services .lambda .runtime .Context ;
18+ import com .amazonaws .services .lambda .runtime .RequestHandler ;
19+ import com .amazonaws .services .lambda .runtime .events .SQSBatchResponse ;
20+ import com .amazonaws .services .lambda .runtime .events .SQSEvent ;
21+ import java .nio .charset .StandardCharsets ;
22+ import java .time .Duration ;
23+ import java .time .Instant ;
24+ import java .time .format .DateTimeFormatter ;
25+ import java .time .temporal .ChronoUnit ;
26+ import java .util .HashMap ;
27+ import java .util .Map ;
28+ import java .util .TimeZone ;
29+ import software .amazon .awssdk .http .urlconnection .UrlConnectionHttpClient ;
30+ import software .amazon .awssdk .regions .Region ;
31+ import software .amazon .awssdk .services .dynamodb .DynamoDbClient ;
32+ import software .amazon .awssdk .services .dynamodb .model .AttributeValue ;
33+ import software .amazon .awssdk .services .dynamodb .model .PutItemRequest ;
34+ import software .amazon .awssdk .utils .BinaryUtils ;
35+ import software .amazon .awssdk .utils .Md5Utils ;
36+ import software .amazon .lambda .powertools .idempotency .Idempotency ;
37+ import software .amazon .lambda .powertools .idempotency .IdempotencyConfig ;
38+ import software .amazon .lambda .powertools .idempotency .IdempotencyKey ;
39+ import software .amazon .lambda .powertools .idempotency .Idempotent ;
40+ import software .amazon .lambda .powertools .idempotency .persistence .DynamoDBPersistenceStore ;
41+ import software .amazon .lambda .powertools .largemessages .LargeMessage ;
42+ import software .amazon .lambda .powertools .logging .Logging ;
43+
44+ public class Function implements RequestHandler <SQSEvent , SQSBatchResponse > {
45+
46+ private static final String TABLE_FOR_ASYNC_TESTS = System .getenv ("TABLE_FOR_ASYNC_TESTS" );
47+ private final DynamoDbClient client ;
48+
49+ public Function () {
50+ this (DynamoDbClient
51+ .builder ()
52+ .httpClient (UrlConnectionHttpClient .builder ().build ())
53+ .region (Region .of (System .getenv ("AWS_REGION" )))
54+ .build ());
55+ }
56+
57+ public Function (DynamoDbClient client ) {
58+ this .client = client ;
59+ Idempotency .config ().withConfig (
60+ IdempotencyConfig .builder ()
61+ .withExpiration (Duration .of (22 , ChronoUnit .SECONDS ))
62+ .withEventKeyJMESPath ("body" ) // get the body of the message
63+ .build ())
64+ .withPersistenceStore (
65+ DynamoDBPersistenceStore .builder ()
66+ .withDynamoDbClient (client )
67+ .withTableName (System .getenv ("IDEMPOTENCY_TABLE" ))
68+ .build ()
69+ ).configure ();
70+ }
71+
72+ @ Logging (logEvent = true )
73+ public SQSBatchResponse handleRequest (SQSEvent event , Context context ) {
74+ for (SQSEvent .SQSMessage message : event .getRecords ()) {
75+ processRawMessage (message , context );
76+ }
77+ return SQSBatchResponse .builder ().build ();
78+ }
79+
80+ @ Idempotent
81+ @ LargeMessage (deleteS3Object = false )
82+ private String processRawMessage (@ IdempotencyKey SQSEvent .SQSMessage sqsMessage , Context context ) {
83+ String bodyMD5 = md5 (sqsMessage .getBody ());
84+ if (!sqsMessage .getMd5OfBody ().equals (bodyMD5 )) {
85+ throw new SecurityException (String .format ("message digest does not match, expected %s, got %s" , sqsMessage .getMd5OfBody (), bodyMD5 ));
86+ }
87+
88+ Instant now = Instant .now ();
89+ Map <String , AttributeValue > item = new HashMap <>();
90+ item .put ("functionName" , AttributeValue .builder ().s (context .getFunctionName ()).build ());
91+ item .put ("id" , AttributeValue .builder ().s (sqsMessage .getMessageId ()).build ());
92+ item .put ("bodyMD5" , AttributeValue .builder ().s (bodyMD5 ).build ());
93+ item .put ("now" , AttributeValue .builder ().n (String .valueOf (now .getEpochSecond ())).build ());
94+ item .put ("bodySize" , AttributeValue .builder ().n (String .valueOf (sqsMessage .getBody ().getBytes (StandardCharsets .UTF_8 ).length )).build ());
95+
96+ client .putItem (PutItemRequest .builder ().tableName (TABLE_FOR_ASYNC_TESTS ).item (item ).build ());
97+
98+ DateTimeFormatter dtf = DateTimeFormatter .ISO_DATE_TIME .withZone (TimeZone .getTimeZone ("UTC" ).toZoneId ());
99+ return dtf .format (now );
100+ }
101+
102+ private String md5 (String message ) {
103+ return BinaryUtils .toHex (Md5Utils .computeMD5Hash (message .getBytes (StandardCharsets .UTF_8 )));
104+ }
105+ }
0 commit comments