Wednesday, 1 August 2018

Integration with AWS SNS/SQS using Spring Cloud Stream


Spring cloud stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
It provides binders to connect with messaging systems like rabbitmq, kafka etc. It also provides capability to connect with AWS SNS/SQS.

Spring Cloud Messaging Binders documentation is really good for setting up message channels & subscribers. In my case I wanted to use SNS/SQS as messaging system, where I faced few issues and documentation doesn’t help much in this regard. In this post, I will share sample code of producer/consumer application & its configurations.


Producer Application:

Gradle Dependencies:
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.cloud:spring-cloud-stream')
compile('org.springframework.cloud:spring-cloud-aws:1.2.2.RELEASE')
compile('org.springframework.cloud:spring-cloud-aws-core:1.2.2.RELEASE')
compile('org.springframework.cloud:spring-cloud-aws-messaging:1.2.2.RELEASE')

ProducerApplication.java

This code will accept an input TodoEvent POJO and publish the same into SNS.

@RestController
@SpringBootApplication
public class ProducerApplication {
                private AmazonSNSAsync amazonSNSAsync;
    public ProducerApplication(){
        amazonSNSAsync = AmazonSNSAsyncClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("<<accessKey>>","<<secretKey>>")))
                .withRegion("<<region>>")
                .build();
    }

                public static void main(String[] args) {
                                SpringApplication.run(ProducerApplication.class, args);
                }

    @PostMapping("/todo")
    public void createTODOEvent(@RequestBody TodoEvent event) throws Exception{
                    amazonSNSAsync.publish("<<SNS TOPIC ARN>>", new ObjectMapper().writeValueAsString(event) , "test message");
    }
}

Consumer Application:

Gradle Dependencies:
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.cloud:spring-cloud-stream')
compile('org.springframework.cloud:spring-cloud-aws:1.2.2.RELEASE')
compile('org.springframework.cloud:spring-cloud-aws-core:1.2.2.RELEASE')
compile('org.springframework.cloud:spring-cloud-aws-messaging:1.2.2.RELEASE')

ConsumerApplication.java

@SpringBootApplication
@EnableSqs
public class ConsumerApplication {

                private QueueMessagingTemplate queueMessagingTemplate;

                public static void main(String[] args) {
                                SpringApplication.run(ConsumerApplication.class, args);
                }

                @Bean
                public QueueMessagingTemplate queueMessagingTemplate(
                                                AmazonSQSAsync amazonSQSAsync) {
                                QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
                                return queueMessagingTemplate;
                }

                @Bean
                public QueueMessageHandlerFactory queueMessageHandlerFactory(AmazonSQSAsync amazonSQS, BeanFactory beanFactory, ObjectMapper objectMapper) {
                               
                                QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
                                factory.setAmazonSqs(amazonSQS);
                                factory.setBeanFactory(beanFactory);

                                MappingJackson2MessageConverter mappingJackson2MessageConverter = new MappingJackson2MessageConverter();
                                mappingJackson2MessageConverter.setSerializedPayloadClass(String.class);
                                mappingJackson2MessageConverter.setObjectMapper(objectMapper);
                                mappingJackson2MessageConverter.setStrictContentTypeMatch(false);
                                // NotificationMsgArgResolver is used to deserialize the “Message” data from SNS Notification
                                factory.setArgumentResolvers(Arrays.asList(new NotificationMessageArgumentResolver(mappingJackson2MessageConverter)));

                                return factory;

                }

                @Bean(name = "amazonSQS", destroyMethod = "shutdown")
                public AmazonSQSAsync amazonSQSAsyncClient() {
                                AmazonSQSAsync amazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
                                                                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("<<accessKey>>","<<secretKey>>")))
                                                                .withRegion("<<region>>")
                                                                .build();
                                return amazonSQSAsync;
                }

                @RuntimeUse
                @SqsListener(value = "<SQS-QUEUE-NAME>", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
                // Set deletion policy to NEVER so that you can acknowledge the incoming message post processing. If mentioned as ON_SUCCESS, message is not deleted when method throws an exception else message will be deleted. ALWAYS means message will be deleted irrespective of the message processing state
                //With Acknowledgment you will have control at which point the message is ok to be deleted from the queue
                public void sqsListener(@NotificationMessage TodoEvent message, Acknowledgment acknowledgment){
                                System.out.println("message from SQS "+message);
                                // If the below statement is not provided, then the message will not be deleted from the queue
                                acknowledgment.acknowledge();
                }
}

Raw SNS Messages can be pulled from the queue by defining the argument as “String message”. The message will be in the below defined format.

SNS Message Format:
{
  "Type" : "Notification",
  "MessageId" : "<Message UUID>",
  "TopicArn" : "<SNS ARN>",
  "Subject" : "test subject",
  "Message" : "<Actual Message>",
  "Timestamp" : "2018-05-01T00:00:00.287Z",
  "SignatureVersion" : "1",
  "Signature" : "<PEM Signature>",
  "SigningCertURL" : "",
  "UnsubscribeURL" : ""
}

@NotificationMessage denotes that the application is trying to read the content of Message field from SNS message. NotificationMessageArgumentResolver is used to deserialize the message into corresponding object.
@NotificationSubject retrieves the content from Subject field of SNS message 

Recent Posts

Micro VMs & Unikernels

This post is a follow up of this post . In Previous Post, we discussed about Virtual Machines & Containers architecture. In this post, w...

Older Posts