Java SQS Listener library built to be customisable and dynamic during runtime
MIT License
The Java Dynamic SQS Listener is a library that simplifies the listening of messages on an AWS SQS queue. It has been built from the ground up with the goal of making it framework agnostic, easily customisable and allow for dynamic changes to the configuration during runtime.
The following provides some examples using the library with different languages or frameworks.
Include the dependency:
<dependency>
<groupId>com.jashmore</groupId>
<artifactId>java-dynamic-sqs-listener-spring-starter</artifactId>
<version>${sqs.listener.version}</version>
</dependency>
or
dependencies {
implementation("com.jashmore:java-dynamic-sqs-listener-spring-starter:${sqs.listener.version}")
}
In one of your beans, attach a @QueueListener annotation to a method indicating that it should process messages from a queue.
@Service
public class MyMessageListener {
// The queue here can point to your SQS server, e.g. a
// local SQS server or one on AWS
@QueueListener("${insert.queue.url.here}")
public void processMessage(@Payload final String payload) {
// process the message payload here
}
}
This will use any configured SqsAsyncClient
in the application context for connecting to the queue, otherwise a default
will be provided that will look for AWS credentials/region from multiple areas, like the environment variables.
See Spring Starter Minimal Example for a minimal example of configuring in a Spring Boot application with a local ElasticMQ SQS Server.
Include the dependency:
<dependency>
<groupId>com.jashmore</groupId>
<artifactId>java-dynamic-sqs-listener-core</artifactId>
<version>${sqs.listener.version}</version>
</dependency>
or
dependencies {
implementation("com.jashmore:java-dynamic-sqs-listener-core:${sqs.listener.version}")
}
Create a MessageListenerContainer for a specific queue.
public class MyClass {
public static void main(String[] args) throws InterruptedException {
final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); // or your own custom client
final QueueProperties queueProperties = QueueProperties.builder().queueUrl("${insert.queue.url.here}").build();
final MessageListenerContainer container = new BatchingMessageListenerContainer(
"listener-identifier",
queueProperties,
sqsAsyncClient,
() ->
new LambdaMessageProcessor(
sqsAsyncClient,
queueProperties,
message -> {
// process message here
}
),
ImmutableBatchingMessageListenerContainerProperties
.builder()
.concurrencyLevel(10)
.batchSize(5)
.batchingPeriod(Duration.ofSeconds(20))
.build()
);
container.start();
Runtime.getRuntime().addShutdownHook(new Thread(container::stop));
Thread.currentThread().join();
}
}
See the Core Example for a more complicated example that uses a local ElasticMQ server and dynamically changes the concurrency of message processing while the app is running.
Include the dependency:
<dependencies>
<dependency>
<groupId>com.jashmore</groupId>
<artifactId>java-dynamic-sqs-listener-core</artifactId>
<version>${sqs.listener.version}</version>
</dependency>
<dependency>
<groupId>com.jashmore</groupId>
<artifactId>core-kotlin-dsl</artifactId>
<version>${sqs.listener.version}</version>
</dependency>
</dependencies>
or
dependencies {
implementation("com.jashmore:java-dynamic-sqs-listener-core:${sqs.listener.version}")
implementation("com.jashmore:core-kotlin-dsl:${sqs.listener.version}")
}
Create a MessageListenerContainer for a specific queue.
fun main() {
val sqsAsyncClient = SqsAsyncClient.create()
val container = batchingMessageListener("identifier", sqsAsyncClient, "url") {
concurrencyLevel = { 10 }
batchSize = { 5 }
batchingPeriod = { Duration.ofSeconds(20) }
processor = lambdaProcessor {
method { message ->
log.info("Message: {}", message.body())
}
}
}
container.start()
Runtime.getRuntime().addShutdownHook(
Thread {
container.stop()
}
)
Thread.currentThread().join()
}
See the Core Kotlin Example for a full example running a Kotlin App that listens to a local ElasticMQ SQS Server.
Include the dependency:
<dependency>
<groupId>com.jashmore</groupId>
<artifactId>java-dynamic-sqs-listener-ktor-core</artifactId>
<version>${sqs.listener.version}</version>
</dependency>
or
dependencies {
implementation("com.jashmore:java-dynamic-sqs-listener-ktor-core:${sqs.listener.version}")
}
Add a message listener to the server
fun main() {
val sqsAsyncClient = SqsAsyncClient.create() // replace with however you want to configure this client
val queueUrl = "replaceWithQueueUrl"
val server = embeddedServer(Netty, 8080) {
batchingMessageListener("listener-identifier", sqsAsyncClient, queueUrl) {
concurrencyLevel = { 10 }
batchSize = { 5 }
batchingPeriod = { Duration.ofSeconds(20) }
processor = lambdaProcessor {
method { message ->
log.info("Message: {}", message.body())
}
}
}
}
server.start()
Runtime.getRuntime().addShutdownHook(
Thread {
server.stop(1, 30_000)
}
)
Thread.currentThread().join()
}
See the Ktor Core Example for a full example running a Ktor framework that listens to a local ElasticMQ SQS Server.
This library has been divided into isolated components each with distinct responsibilities. The following is a diagram describing a simple flow of a single SQS message flowing through each of the components to eventually be executed by some code.
Details about each component is:
For more information about the core implementations provided by this library, see the Core Implementations Overview.
The framework relies on the following dependencies and therefore it is recommended to upgrade the applications dependencies to a point somewhere near these for compatibility.
See the gradle.properties for the specific versions of these dependencies.
More in-depth guides on how configure this library:
MessageProcessingDecorators
into your Spring Queue Listeners.FifoMessageListenerContainerDslBuildeWhen you are using the reflection based CoreMessageProcessor (which is the default for Spring Boot applications), the payload of the message is de-serialised by default using Jackson and therefore any Jackson compatible POJO class can be used with the @Payload annotation.
@Service
public class MyMessageListener {
@QueueListener(value = "${insert.queue.url.here}")
public void processMessage(@Payload final MyPojo payload) {
// process the message payload here
}
public static class MyPojo {
private String name;
public MyPojo() {
this.name = null;
}
public MyPojo(String name) {
this.name = null;
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
}
There are some core ArgumentResolvers provided in the application but custom ones can be defined if they don't cover your use case. As an example, the following is how we can populate the message listener argument with the payload in uppercase.
We will use an annotation on the field to indicate how the message should be resolved.
@Retention(value = RUNTIME)
@Target(ElementType.PARAMETER)
public @interface UppercasePayload {
}
Implement the ArgumentResolver interface where it will do the logic for converting the message payload to uppercase.
public class UppercasePayloadArgumentResolver implements ArgumentResolver<String> {
@Override
public boolean canResolveParameter(MethodParameter methodParameter) {
return (
methodParameter.getParameter().getType().isAssignableFrom(String.class) &&
AnnotationUtils.findParameterAnnotation(methodParameter, UppercasePayload.class).isPresent()
);
}
@Override
public String resolveArgumentForParameter(QueueProperties queueProperties, Parameter parameter, Message message)
throws ArgumentResolutionException {
return message.body().toUppercase();
}
}
You may be curious why we use a custom AnnotationUtils.findParameterAnnotation
function instead of getting the annotation directly from the parameter.
The reason for this is due to potential proxying of beans in the application, such as by applying Aspects around your code via CGLIB. As libraries, like
CGLIB, won't copy the annotations to the proxied classes the resolver needs to look through the class hierarchy to find the original class to get the
annotations. For more information about this, take a look at the JavaDoc provided in
AnnotationUtils. You can also see an example of
testing this problem in
PayloadArgumentResolver_ProxyClassTest.java.
Also, as this library is not Spring specific, the Spring Annotation classes cannot be used.
Include the custom ArgumentResolver in the application context for automatic injection into the ArgumentResolverService.
@Configuration
public class MyCustomConfiguration {
@Bean
public UppercasePayloadArgumentResolver uppercasePayloadArgumentResolver() {
return new UppercasePayloadArgumentResolver();
}
}
Use the new annotation in your message listener
@Component
public class MyService {
@QueueListener("${insert.queue.url.here}") // The queue here can point to your SQS server, e.g. a local SQS server or one on AWS
public void processMessage(@UppercasePayload final String uppercasePayload) {
// process the message payload here
}
}
For a more extensive guide for doing this, take a look at Spring - How to add a custom Argument Resolver. If you are using the core or Ktor library, you can look at the Core - How to Implement a Custom Argument Resolver for a guide on creating a new argument resolver.
There is no limit to the number of messages that can be processed in the application and therefore you can process as many messages to the limit of the threads that the application can handle. Therefore, if you are fine spinning up as many threads as concurrent messages, you can increase the concurrency to as high of a value as you wish.
public class SomeClass {
public MessageListenerContainer container() {
return new BatchingMessageListenerContainer(
// other configuration
ImmutableBatchingMessageListenerContainerProperties
.builder()
.concurrencyLevel(100)
.batchSize(5)
.batchingPeriod(Duration.ofSeconds(20))
.build()
);
}
}
@Service
public class MyMessageListener {
@QueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 100)
public void processMessage(@Payload final String payload) {
// process message here
}
}
coreMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
broker = concurrentBroker {
concurrencyLevel = { 100 }
}
// other configuration
}
or
batchingMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
concurrencyLevel = { 100 }
// other configuration
}
When the method executing the message finishes without throwing an exception, the MessageProcessor will acknowledge the message as a success, therefore removing it from the queue. When the method throws an exception, the message will not be acknowledged and if there is a re-drive policy the queue will perform another attempt of processing the message.
public class MyMessageListener {
@QueueListener(value = "queue-name")
public void processMessage(@Payload final String payload) {
// if this does not throw an exception it will be considered successfully processed
}
}
or
lambdaProcessor {
method { message ->
// if this does not throw an exception it will be considered successfully processed
}
}
If the method contains an Acknowledge argument it is now up to the method to manually acknowledge the message as a success. The MessageProcessor has handed off control to the message listener and will not acknowledge the message automatically when the method executes without throwing an exception.
public class MyMessageListener {
@QueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10, maxPeriodBetweenBatchesInMs = 2000)
public void processMessage(@Payload final String payload, final Acknowledge acknowledge) {
if (someCondition()) {
CompletableFuture<?> future = acknowledge.acknowledgeSuccessful();
future.get();
}
}
}
or
lambdaProcessor {
method { message, acknowledge ->
if (someCondition()) {
acknowledge.acknowledgeSuccessful().get()
}
}
}
The Spring Cloud AWS Messaging @SqsListener
works by requesting
a set of messages from the SQS and when they are done it will request some more. There is one disadvantage with this approach in that if 9/10 of the messages
finish in 10 milliseconds but one takes 10 seconds no other messages will be picked up until that last message is complete. The
@QueueListener
provides the same basic functionality, but it also provides a timeout where it will eventually request for more messages when there are threads that are
ready for another message.
public class MyMessageListener {
@QueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10, maxPeriodBetweenBatchesInMs = 2000)
public void processMessage(@Payload final String payload) {
// process the message payload here
}
}
batchingMessageListener("listener-identifier", sqsAsyncClient, "${insert.queue.url.here}") {
concurrencyLevel = { 10 }
batchSize = { 10 }
batchingPeriod = { Duration.ofSeconds(2) }
processor = lambdaProcessor {
method { message ->
// process the message payload here
}
}
}
In this example above we have set it to process 10 messages at once and when there are threads wanting more messages it will wait for a maximum of 2 seconds before requesting messages for threads waiting for another message.
When the amount of messages for a service is extremely high, prefetching messages may be a way to optimise the throughput of the application. In this example, if the amount of prefetched messages is below the desired amount of prefetched messages it will try to get as many messages as possible up to the maximum specified.
Note: because of the limit of the number of messages that can be obtained from SQS at once (10), having the maxPrefetchedMessages more than 10 above the desiredMinPrefetchedMessages will not provide much value as once it has prefetched more than the desired prefetched messages it will not prefetch anymore.
The @PrefetchingQueueListener annotation can be used to prefetch messages in a background thread while processing the existing messages. The usage is something like this:
@Service
public class MyMessageListener {
@PrefetchingQueueListener(
value = "${insert.queue.url.here}",
concurrencyLevel = 10,
desiredMinPrefetchedMessages = 5,
maxPrefetchedMessages = 10
)
public void processMessage(@Payload final String payload) {
// process the message payload here
}
}
prefetchingMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
concurrencyLevel = { 10 }
desiredPrefetchedMessages = 5
maxPrefetchedMessages = 10
processor = lambdaProcessor {
methodWithVisibilityExtender { message, _ ->
// process the message payload here
}
}
}
FIFO SQS Queues can be used when the order of the SQS messages are important. The FIFO message listener guarantees messages in the same message group run in the order they are generated and two messages in the same group executed concurrently. For more information about the configuration options for the message listener take a look at the FifoMessageListenerContainerProperties or the specific annotation or DSL builder for the framework implementation.
public class Main {
public static void main(String[] args) throws InterruptedException {
final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); // or your own custom client
final QueueProperties queueProperties = QueueProperties.builder().queueUrl("${insert.queue.url.here}").build();
final MessageListenerContainer container = new FifoMessageListenerContainer(
queueProperties,
sqsAsyncClient,
() ->
new LambdaMessageProcessor(
sqsAsyncClient,
queueProperties,
message -> {
// process the message here
}
),
ImmutableFifoMessageListenerContainerProperties.builder().identifier("listener-identifier").concurrencyLevel(10).build()
);
container.start();
Runtime.getRuntime().addShutdownHook(new Thread(container::stop));
Thread.currentThread().join();
}
}
@Component
class MessageListeners {
@FifoQueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10)
public void fifoListener(@Payload final String body) {
// process message here
}
}
fifoMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
concurrencyLevel = { 10 }
processor = lambdaProcessor {
method { message ->
// process the message payload here
}
}
}
See examples for all the available examples.
The easiest way to see the framework working is to run one of the examples locally. These use an in memory ElasticMQ SQS Server to simplify getting started. For example, to run a sample Spring Application you can use the Spring Starter Example.
gradle build -x test -x integrationTest
(cd examples/spring-starter-example && gradle bootRun)
This shows an example of running the SQS Listener in a Java application that will dynamically change the concurrency level while it is executing.
This examples works by having a thread constantly placing new messages while the SQS Listener will randomly change the rate of concurrency every 10 seconds.
gradle build -x test -x integrationTest
(cd examples/core-example && gradle runApp)
For bugs, questions and discussions please use Github Issues.
See CONTRIBUTING.md for more details.
MIT License
Copyright (c) 2018 Jaiden Ashmore
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.