Java SQS Listener library built to be customisable and dynamic during runtime
MIT License
Bot releases are hidden (Show)
Version upgrade release! This bumps all the dependencies to allow for the library to work for Spring Boot 3. Upgrading this library to 6.0.0 would involve making sure the corresponding dependencies are at a later version:
Published by JaidenAshmore over 3 years ago
This is a small major release that upgrades some of the dependencies, such as Spring Boot, as well as removing the default message visibility for the Spring Queue Listener annotations.
See #368 for the list of dependency changes.
Changes the Spring Queue Listener annotations to use the Visibility Timeout set on the AWS SQS queue instead of defaulting to 30 seconds. This should prevent unexpected re-processing of messages when it wasn't realised that there was a default. This was also applied to the Kotlin DSL.
If you wish to maintain the existing functionality you will need to change your queue annotations from:
@QueueListener("${insert.queue.url.here}")
to be
@QueueListener(value = "${insert.queue.url.here}", messageVisibilityTimeoutInSeconds = 30)
Published by JaidenAshmore about 4 years ago
Moved the construction of Spring's @PrefetchingQueueListener and @QueueListener into the core library as the PrefetchingMessageListenerContainer and BatchingMessageListenerContainer respectively. This should simplify the usage of the core library, as well as making it easier to provide custom construction of these containers in the wrapping libraries, like the Spring or Ktor frameworks.
Currently it is only possible to attach a MessageProcessingDecorator to a core message listener by including it as a Spring bean. This has the disadvantage that it will be applied to all message listeners and therefore it can be desirable to want to apply it to only a subset of listeners. This can be achieved implementing a MessageProcessingDecoratorFactory which will have the opportunity to wrap a message listener, e.g. by looking for an annotation.
MyAnnotation
public class MyDecoratorFactory implements MessageProcessingDecoratorFactory<MyDecorator> {
@Override
public Optional<MyDecorator> buildDecorator(
SqsAsyncClient sqsAsyncClient,
QueueProperties queueProperties,
String identifier,
Object bean,
Method method
) {
return AnnotationUtils
.findMethodAnnotation(method, MyAnnotation.class)
.map(annotation -> new MyDecoratorProperties(annotation.value()))
.map(properties -> new MyDecorator(properties));
}
}
@Configuration
class MyConfiguration {
@Bean
MyDecoratorFactory myDecoratorFactory() {
return new MyDecoratorFactory();
}
}
@QueueListener("${insert.queue.url.here}")
@MyAnnotation("someValue")
public void processMessage(@Payload final String payload) {
// process the message payload here
}
Adds the ability to more easily override the values for a message listener annotation by providing a custom annotation parser bean. A use case for this it to provide custom logic for calculating the concurrency level allowing the message listener to dynamically change the concurrency rate.
Provide a custom annotation parser bean, for example the PrefetchingQueueListenerParser which is used to parse the @PrefetchingQueueListener annotation.
public class CustomPrefetchingQueueListenerParser extends PrefetchingQueueListenerParser {
private static final Random random = new Random();
private final LoadingCache<Boolean, Integer> cachedConcurrencyLevel = CacheBuilder
.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.build(CacheLoader.from(() -> random.nextInt(10)));
public CustomPrefetchingQueueListenerParser(Environment environment) {
super(environment);
}
@Override
protected Supplier<Integer> concurrencySupplier(PrefetchingQueueListener annotation) {
return () -> cachedConcurrencyLevel.getUnchecked(true);
}
}
In this example above we are using a Guava cache to make sure that the concurrency level is cached for a certain time period before it switches values.
Include this parser as a bean, this will replace the existing implementation and will be used for all @PrefetchingQueueListener message listeners.
class MyConfiguration {
public PrefetchingQueueListenerParser customParser(final Environment environment) {
return new CustomPrefetchingQueueListenerParser(environment);
}
}
This will not break consumers of this library but if you are building your own aspects of this library manually you may need to modify some of the constructors. For example the PrefetchingMessageListenerContainerFactory now takes a PrefetchingQueueListenerParser instead of the Spring Environment
.
Allows for the processing of long running messages with the message visibility of the message being automatically extended when it is about to reach its limit. If the message takes longer than a certain amount of time without completing it will be forced stopped via interrupting the thread.
Note: this only works with synchronous processing of messages, e.g. ones that do not return a CompletableFuture
. Asynchronous message listener's will be supported at a later point.
final MessageProcessingDecorator autoVisibilityExtender = new AutoVisibilityExtenderMessageProcessingDecorator(
sqsAsyncClient,
queueProperties,
new AutoVisibilityExtenderMessageProcessingDecoratorProperties() {
@Override
public Duration visibilityTimeout() {
return Duration.ofMinutes(1);
}
@Override
public Duration maxDuration() {
return Duration.ofMinutes(5);
}
@Override
public Duration bufferDuration() {
return Duration.ofSeconds(30);
}
}
);
MessageProcessor processor = new DecoratingMessageProcessor(
"listener-identifier",
queueProperties,
Collections.singletonList(autoVisibilityExtender),
new LambdaMessageProcessor(
sqsAsyncClient,
queueProperties,
message -> {
try {
someLongFileIOMethod();
} catch (InterruptionException e) {
// the message took to long and it was interrupted
}
}
)
);
See Core - How to extend message visibility during processing for more details.@QueueListener("${insert.queue.url.here}")
@AutoVisibilityExtender(visibilityTimeoutInSeconds = 60, maximumDurationInSeconds = 300, bufferTimeInSeconds = 10)
public void processMessage(@Payload final String payload) {
// process the message payload here
}
See Spring - How to extend message visibility during processing for more details.software.amazon.awssdk:sqs 2.14.22 -> 2.15.7
com.fasterxml.jackson.core:jackson-databind 2.11.2 -> 2.11.3
io.ktor:ktor-server-core 1.4.0 -> 1.4.1
Published by JaidenAshmore about 4 years ago
Adds the ability to consume FIFO SQS queues guaranteeing that messages in a message group consumed are executed in order and are not run at the same time. For more details about configuring this message listener, take a look at the FifoMessageListenerContainerProperties.
public class Main {
public static void main(String[] args) throws InterruptedException {
final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); // or your own custom client
final QueueProperties queueProperties = QueueProperties.builder().queueUrl("${insert.queue.url.here}").build();
final MessageListenerContainer container = new FifoMessageListenerContainer(
queueProperties,
sqsAsyncClient,
() ->
new LambdaMessageProcessor(
sqsAsyncClient,
queueProperties,
message -> {
// process the message here
}
),
ImmutableFifoMessageListenerContainerProperties.builder().identifier("listener-identifier").concurrencyLevel(10).build()
);
container.start();
Runtime.getRuntime().addShutdownHook(new Thread(container::stop));
Thread.currentThread().join();
}
}
@Component
class MessageListeners {
@FifoQueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10)
public void fifoListener(@Payload final String body) {
// process message here
}
}
fifoMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
concurrencyLevel = { 10 }
processor = lambdaProcessor {
method { message ->
// process the message payload here
}
}
}
org.immutables:value-annotations +2.8.8
com.fasterxml.jackson.core:jackson-databind 2.11.1 -> 2.11.2
software.amazon.awssdk:sqs 2.13.66 -> 2.14.22
org.springframework.boot:spring-boot-dependencies 2.3.2.RELEASE -> 2.3.4.RELEASE
com.amazonaws:aws-xray-recorder-sdk-core 2.6.1 -> 2.7.1
io.zipkin.brave:brave 5.12.4 -> 5.12.6
org.springframework.cloud:spring-cloud-schema-registry-client 1.0.7.RELEASE -> 1.0.8.RELEASE
io.ktor:ktor-server-core 1.3.2 -> 1.4.0
Published by JaidenAshmore about 4 years ago
Improves the DefaultMessageListenerContainerCoordinator by allowing the auto startup of the containers to be configured. For more information see the Spring - How to prevent containers starting on startup guide.
Adds a getContainers
method to the MessageListenerContainerCoordinator to allow for smarter configuration of the containers during runtime.
Updates how the dependencies of the library are managed resulting in the removal of the <dependencyManagement>
/ constaints
for each module in favour of explicitly setting the versions. The reason for this was a module like the api had a dependency management section setting values for libraries like Jackson or Avro which this module does not care about.
Published by JaidenAshmore about 4 years ago
The CoreMessageProcessor required the use of the Java reflection API to indicate the message listener method. This works great for the Spring Boot applications where you want to run a method on a bean in the application, it doesn't work well for simpler applications like a basic Java app or simpler Web application frameworks like Ktor. This adds the ability to use a lambda/the Java functional API for defining the message listener method instead.
new LambdaMessageProcessor(sqsAsyncClient, queueProperties, (message) -> log.info("processing message"));
new AsyncLambdaMessageProcessor(sqsAsyncClient, queueProperties, (message) -> {
log.info("processing message");
return CompletableFuture.runAsync(() -> {
// do asynchronous processing here
});
});
new AsyncLambdaMessageProcessor(sqsAsyncClient, queueProperties, (message, acknowledge) -> {
log.info("processing message");
return CompletableFuture.runAsync(() -> {
// do asynchronous processing here
acknowledge.acknowledgeSuccessful();
});
});
val container = coreMessageListener("identifier", sqsAsyncClient, queueUrl) {
processor = lambdaProcessor {
method { message ->
log.info("Message: {}", message.body())
}
}
// ...other configuration
}
The Spring Starter for the library provided some core annotations for listening to messages to simplify the configuration of a message listener by providing defaults for most of the configuration; see the @QueueListener and @PrefetchingQueueListener annotations. This adds these two types of message listeners into the Core Kotlin DSL so that consumers can more easily configure new message listeners.
val container = batchingMessageListener("identifier", sqsAsyncClient, queueUrl) {
concurrencyLevel = { 10 }
batchSize = { 5 }
batchingPeriod = { Duration.ofSeconds(5) }
processor = lambdaProcessor {
method { message ->
log.info("Message: {}", message.body())
}
}
}
or
val container = prefetchingMessageListener("identifier", sqsAsyncClient, queueUrl) {
concurrencyLevel = { 2 }
desiredPrefetchedMessages = 5
maxPrefetchedMessages = 10
processor = lambdaProcessor {
method { message ->
log.info("Message: {}", message.body())
}
}
}
Added a Ktor integration so that you can easily setup message listeners in a Ktor application. This utilises the Core Kotlin DSL to configure the message listeners and will take the containers and hook it into the lifecycle of the Ktor server.
val server = embeddedServer(Netty, 8080) {
prefetchingMessageListener("prefetching-listener", sqsAsyncClient, queueUrl) {
concurrencyLevel = { 2 }
desiredPrefetchedMessages = 5
maxPrefetchedMessages = 10
processor = lambdaProcessor {
method { message -> log.info("Message: {}", message.body()) }
}
}
}
See the Ktor Example for an example application that can be used.
Published by JaidenAshmore about 4 years ago
Added support to wrap the message processing logic via a MessageProcessingDecorator. This allows the addition of functionality like tracing, metrics or any other functionality.
Added Support for Xray Tracing allowing you to trace messages being processed by your message listener. See Core - How to add Xray Tracing or Spring - How to add Xray Tracing for guides on how to integrate this. You can use the AWS Xray Spring Example to see an example of a Spring service with this configured.
Added support for Brave Tracing allowing you to trace messages being processed processed by your message listener. See Core - How to add Brave Tracing or Spring - How to Add Brave Tracing for guides on how to integrate this. You can use the Spring Sleuth Example to see an example of a Spring service with this configured.
You can now use the core module using a Kotlin DSL reducing the verbosity in configuring a message listener. See Core - Kotlin DSL for guides on how to integrate this. You can use the Core Kotlin Example to see an example of a Java program using the DSL.
val container = coreMessageListener("core-example-container", sqsAsyncClient, queueUrl) {
broker = concurrentBroker {
concurrencyLevel = cached(Duration.ofSeconds(10)) {
Random.nextInt(concurrencyLimit)
}
concurrencyPollingRate = { concurrencyLevelPeriod }
errorBackoffTime = { Duration.ofMillis(500) }
}
retriever = prefetchingMessageRetriever {
desiredPrefetchedMessages = 10
maxPrefetchedMessages = 20
}
processor = coreProcessor {
argumentResolverService = coreArgumentResolverService(objectMapper)
bean = MessageListener()
method = MessageListener::class.java.getMethod("listen", Request::class.java, String::class.java)
decorators {
add(BraveMessageProcessingDecorator(tracing))
}
}
resolver = batchingResolver {
bufferingSizeLimit = { 1 }
bufferingTime = { Duration.ofSeconds(5) }
}
}
Guava was being used for some helper functions and it didn't seem like it was worth the extra dependency for the little gain that was provided for it. Therefore, Guava has been completely removed from the library and any necessary functions were reproduced in this library.
The component properties now use a Duration
type instead of integers/longs. For example, instead of backoffTimeInMs
long property, it would be backoffTime
as a Duration
.
See build.gradle.kts for the latest versions.
ObjectMapper
provided by Spring Boot Web.java-dynamic-sqs-listener-core
to be core
Published by JaidenAshmore over 4 years ago
This patch release fixes bugs and improves logging.
CoreMessageListenerContainer
was not shutting down the MessageBroker
ExecutorService
and therefore you would have a ExecutorService
sitting there waiting for tasks to be submitted which will never happen. Also adds more debug logging for the CoreMessageListenerContainer
to help debug shutdown problems. https://github.com/JaidenAshmore/java-dynamic-sqs-listener/commit/1e806aee716059fe6d8975240e4b44be3a8051c3
Published by JaidenAshmore over 4 years ago
This release adds the ability to deserialise messages that have been serialised by a schema tool like Avro. It uses the Spring Cloud Schema Registry to obtain the schema for the published message to properly deserialise it. It utilises the Spring Cloud Schema Registry Client to obtain information about this schema.
Published by JaidenAshmore about 5 years ago
This milestone contained big changes to the frameworks API to follow a non-blocking paradigm. This reduces the number of threads that are blocked while there are no messages to process. For example, if we have 2 listeners with concurrency of 30 threads each and there are no messages to process, there would be 60 threads blocked. With this approach only the threads that are being used to process messages will be around.
This also changed how the framework is shutdown by allowing for any messages that have been download and stored locally will have a chance to be processed before the application ends.
CompletableFuture
for the message instead of blocking and returning the message when it is obtained. It has also been merged with the Async implementation as a non-async implementation doesn't work well with graceful shutdowns. When the retriever is shutdown it will also return any messages that have been stored internally so that they can be processed on shutdown if desired.CompletableFuture
for when the message has finished processing. It now also takes in a Runnable
that should be called if the message has successfully been processed and should been resolved instead of requiring a dependency on the MessageResolver.ExecutorService
. Now it has been made to have more responsibility with determining the order of startup and shutdown to allow for more graceful shutdowns. For example, it now has the responsibility of allowing the processing of any messages downloaded but not processed yet.RetryableMessageProcessor
was removed. I don't want to maintain this and it is simple for consumers to implement this them self if they want.BlockingMessageRetriever
was removed. This was provided to allow for more graceful shutdowns as well as for testing. This isn't really needed anymore.IndividualMessageRetriever
was removed. This was originally used to show that you could just request a message when needed but I feel it wouldn't be used in production and therefore is a waste for me to maintain it. You can achieve the same by using a BatchingMessageRetriever with a batch size of 1.Published by JaidenAshmore over 5 years ago
QueueContainerService
was renamed to MessageListenerContainerCoordinator as its only responsibility is coordinating starting and stopping of containers and "Service" is pretty generic.QueueWrapper
to MessageListenerContainerFactory as these are building these containers from beans.QueueResolverService
to QueueResolver because it only resolves queue URLs and doesn't need to be a generic "service"java-dynamic-sqs-listener
from example modules to make it easier to read. https://github.com/JaidenAshmore/java-dynamic-sqs-listener/commit/d4dad4d8e0fe1398b5b62d07de38db634aab982c
Published by JaidenAshmore over 5 years ago
SqsAsyncClients
. See Docs - How to Connect to Multiple AWS Accounts for a guide. https://github.com/JaidenAshmore/java-dynamic-sqs-listener/commit/b70b903081b668a730a02ed7411db0bc4494f640
Published by JaidenAshmore over 5 years ago
@EnableSqs
which will enable the framework when it is used. https://github.com/JaidenAshmore/java-dynamic-sqs-listener/commit/153d8c9d952e1bb730a6594de7f0b8f091bf1f58
MessageRetrievers
was not being followed when the thread was interrupted during the call to get more messages from the SqsAsyncClient
but before we wait on the CompletableFuture
. https://github.com/JaidenAshmore/java-dynamic-sqs-listener/commit/c5d2f5e36f2a144971c93b3f15b58e653ee7288c
{queueName}-dlq
. https://github.com/JaidenAshmore/java-dynamic-sqs-listener/commit/7a789f4c2f822ee619be2659fef831df7f6380bc
Published by JaidenAshmore over 5 years ago
public class MyListener {
@QueueListener("queueName")
public void myMethod(Message message) {
// do something with the raw message
}
}
public class MyListener {
@QueueListener("queueName")
public void myMethod(@MessageAttribute("key") String attribute) {
// do something
}
@QueueListener("otherQueueName")
public void myMethod(@MessageSystemAttribute(SENT_TIMESTAMP) String systemAttribute) {
// do something
}
}
@QueueListener(value = "something", concurrencyLevelString="$something.concurrencyLevel}")
and the application.yml has
something:
concurrencyLevel: 2
catch (Throwable throwable)
for catching exceptions as this can catch Errors like an OutOfMemoryError
. https://github.com/JaidenAshmore/java-dynamic-sqs-listener/commit/117036dde6a4e32dba9c56a6cfb6b995950798e8
Published by JaidenAshmore over 5 years ago