Spring cloud stream is a framework for building highly
scalable event-driven microservices connected with shared messaging systems.
It provides
binders to connect with messaging systems like rabbitmq, kafka etc. It also
provides capability to connect with AWS SNS/SQS.
Spring Cloud
Messaging Binders documentation is really good for setting up message channels
& subscribers. In my case I wanted to use SNS/SQS as messaging system,
where I faced few issues and documentation doesn’t help much in this regard. In
this post, I will share sample code of producer/consumer application & its
configurations.
Producer Application:
Gradle Dependencies:
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.cloud:spring-cloud-stream')
compile('org.springframework.cloud:spring-cloud-aws:1.2.2.RELEASE')
compile('org.springframework.cloud:spring-cloud-aws-core:1.2.2.RELEASE')
compile('org.springframework.cloud:spring-cloud-aws-messaging:1.2.2.RELEASE')
ProducerApplication.java
This code
will accept an input TodoEvent POJO and publish the same into SNS.
@RestController
@SpringBootApplication
public class
ProducerApplication {
private AmazonSNSAsync amazonSNSAsync;
public ProducerApplication(){
amazonSNSAsync =
AmazonSNSAsyncClientBuilder.standard()
.withCredentials(new
AWSStaticCredentialsProvider(new
BasicAWSCredentials("<<accessKey>>","<<secretKey>>")))
.withRegion("<<region>>")
.build();
}
public static void main(String[]
args) {
SpringApplication.run(ProducerApplication.class,
args);
}
@PostMapping("/todo")
public void createTODOEvent(@RequestBody
TodoEvent event) throws Exception{
amazonSNSAsync.publish("<<SNS
TOPIC ARN>>", new ObjectMapper().writeValueAsString(event) ,
"test message");
}
}
Consumer Application:
Gradle Dependencies:
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.cloud:spring-cloud-stream')
compile('org.springframework.cloud:spring-cloud-aws:1.2.2.RELEASE')
compile('org.springframework.cloud:spring-cloud-aws-core:1.2.2.RELEASE')
compile('org.springframework.cloud:spring-cloud-aws-messaging:1.2.2.RELEASE')
ConsumerApplication.java
@SpringBootApplication
@EnableSqs
public class
ConsumerApplication {
private QueueMessagingTemplate
queueMessagingTemplate;
public static void main(String[]
args) {
SpringApplication.run(ConsumerApplication.class,
args);
}
@Bean
public QueueMessagingTemplate
queueMessagingTemplate(
AmazonSQSAsync
amazonSQSAsync) {
QueueMessagingTemplate
queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
return
queueMessagingTemplate;
}
@Bean
public
QueueMessageHandlerFactory queueMessageHandlerFactory(AmazonSQSAsync amazonSQS,
BeanFactory beanFactory, ObjectMapper objectMapper) {
QueueMessageHandlerFactory
factory = new QueueMessageHandlerFactory();
factory.setAmazonSqs(amazonSQS);
factory.setBeanFactory(beanFactory);
MappingJackson2MessageConverter
mappingJackson2MessageConverter = new MappingJackson2MessageConverter();
mappingJackson2MessageConverter.setSerializedPayloadClass(String.class);
mappingJackson2MessageConverter.setObjectMapper(objectMapper);
mappingJackson2MessageConverter.setStrictContentTypeMatch(false);
//
NotificationMsgArgResolver is used to deserialize the “Message” data from SNS
Notification
factory.setArgumentResolvers(Arrays.asList(new
NotificationMessageArgumentResolver(mappingJackson2MessageConverter)));
return factory;
}
@Bean(name =
"amazonSQS", destroyMethod = "shutdown")
public AmazonSQSAsync
amazonSQSAsyncClient() {
AmazonSQSAsync
amazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(new
AWSStaticCredentialsProvider(new
BasicAWSCredentials("<<accessKey>>","<<secretKey>>")))
.withRegion("<<region>>")
.build();
return
amazonSQSAsync;
}
@RuntimeUse
@SqsListener(value =
"<SQS-QUEUE-NAME>", deletionPolicy =
SqsMessageDeletionPolicy.NEVER)
// Set deletion policy to NEVER
so that you can acknowledge the incoming message post processing. If mentioned
as ON_SUCCESS, message is not deleted when method throws an exception else
message will be deleted. ALWAYS means message will be deleted irrespective of
the message processing state
//With Acknowledgment you will
have control at which point the message is ok to be deleted from the queue
public void
sqsListener(@NotificationMessage TodoEvent message, Acknowledgment
acknowledgment){
System.out.println("message
from SQS "+message);
// If the below
statement is not provided, then the message will not be deleted from the queue
acknowledgment.acknowledge();
}
}
Raw SNS
Messages can be pulled from the queue by defining the argument as “String
message”. The message will be in the below defined format.
SNS Message Format:
{
"Type" :
"Notification",
"MessageId"
: "<Message UUID>",
"TopicArn"
: "<SNS ARN>",
"Subject" :
"test subject",
"Message" :
"<Actual Message>",
"Timestamp"
: "2018-05-01T00:00:00.287Z",
"SignatureVersion" : "1",
"Signature"
: "<PEM Signature>",
"SigningCertURL" : "",
"UnsubscribeURL" : ""
}
@NotificationMessage
denotes that the application is trying to read the content of Message field from SNS message. NotificationMessageArgumentResolver
is used to deserialize the message into corresponding object.