Spring Cloud Stream is the solution provided by Spring to build applications connected to shared messaging systems.
It offers an abstraction (the binding) that works the same whatever underneath implementation we use (the binder):
Let's try to set up a simple example step by step and see how it works!
This demo has been created using this spring initializr configuration adding Kafka binder dependency spring-cloud-starter-stream-kafka.
Step by step:
You can browse older versions of this repo:
Our final goal is to produce messages to a Kafka topic.
From the point of view of the application we want an interface MyEventProducer
to produce events to a generic messaging system. These events will be of type MyEvent
, just containing a text
field to make it simpler:
data class MyEvent(val text: String)
interface MyEventProducer {
fun produce(event: MyEvent)
}
Then we follow these steps:
my-producer
in application.yml:spring:
cloud:
stream:
kafka:
binder:
brokers: "localhost:9094"
bindings:
my-producer-out-0:
destination: "my.topic"
function:
definition: "my-producer"
spring.cloud.kafka.binder
is related to the Kafka binder implementation and we can use all these extra Kafka binder properties.spring.cloud.stream.bindings
is related to the Spring Cloud Stream binding abstraction and we can use all these extra binding properties.my-producer
is the function name, out
is for output bindings and 0
is the index we have to use if we have a single function.MyEventProducer
as a Kotlin lambda () -> Flux<MyEventPayload>
, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting:class MyStreamEventProducer : () -> Flux<MyEventPayload>, MyEventProducer {
private val sink = Sinks.many().unicast().onBackpressureBuffer<MyEventPayload>()
override fun produce(event: MyEvent) {
sink.emitNext(toPayload(event), FAIL_FAST)
}
override fun invoke() = sink.asFlux()
private fun toPayload(event: MyEvent) = MyEventPayload(event.text, event.text.length)
}
data class MyEventPayload(
val string: String,
val number: Int
)
MyEventPayload
to specify how do we want the payload to be serialized to JSON. In this case we don't need to but we could use Jackson annotations if we wanted to customize the JSON serialization.MyEvent
and MyEventPayload
just as an example.MyEventPayload
through the Flux
, Spring Cloud Stream will publish it to Kafka.my-producer
function definition:@Configuration
class MyConfiguration {
@Bean
fun myStreamEventProducer() = MyStreamEventProducer()
@Bean("my-producer")
fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux<MyEventPayload> = producer
}
MyStreamEventProducer
that will be injected wherever a MyEventProducer
is needed.() -> Flux<MyEventPayload>
that will be bound to my-producer
function.
Supplier<Flux<MyEventPayload>>
.@SpringBootTest(webEnvironment = NONE)
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {
@Autowired // We inject MyEventProducer (it should be a MyStreamEventProducer)
@Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2
lateinit var eventProducer: MyEventProducer
@Test
fun `should produce event`() {
// We produce an event using MyEventProducer
val text = "hello ${UUID.randomUUID()}"
eventProducer.produce(MyEvent(text))
// We consume from Kafka using a helper
val records = consumerHelper.consumeAtLeast(1, FIVE_SECONDS)
// We verify the received json
assertThat(records).singleElement().satisfies { record ->
JSONAssert.assertEquals(
record.value(),
"{\"number\":${text.length},\"string\":\"$text\"}",
true
)
}
}
}
Our final goal is to consume messages from a Kafka topic.
From the point of view of the application we want an interface MyEventConsumer
to be called every time an event is consumed from a generic messaging system. These events will be of type MyEvent
like in the producer example:
data class MyEvent(val text: String)
interface MyEventConsumer {
fun consume(event: MyEvent)
}
Then we follow these steps:
my-consumer
in application.yml declaring it as a function:spring:
cloud:
stream:
kafka:
binder:
brokers: "localhost:9094"
bindings:
my-consumer-in-0:
destination: "my.topic"
group: "${spring.application.name}"
function:
definition: "my-consumer"
spring.cloud.kafka.binder
is related to the Kafka binder implementation and we can use all these extra Kafka binder properties and everything under spring.cloud.stream.bindings
is related to the Spring Cloud Stream binding abstraction and we can use all these extra binding properties.group
because we want the application to consume from Kafka identifying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances.my-consumer
is the function name, in
is for input bindings and 0
is the index we have to use if we have a single function.MyStreamEventConsumer
to fulfill the interface required by Spring Cloud Stream:class MyStreamEventConsumer(private val consumer: MyEventConsumer) : (MyEventPayload) -> Unit {
override fun invoke(payload: MyEventPayload) {
consumer.consume(fromPayload(payload))
}
private fun fromPayload(payload: MyEventPayload) = MyEvent(payload.string)
}
MyEventPayload
and the invoke
method will we called.MyEventPayload
to a MyEvent
and callback the generic MyEventConsumer
.my-consumer
function definition:@Configuration
class MyConfiguration {
@Bean
fun myEventConsumer() = object : MyEventConsumer {
override fun consume(event: MyEvent) {
println("Received ${event.text}")
}
}
@Bean("my-consumer")
fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit =
MyStreamEventConsumer(consumer)
}
(MyEventPayload) -> Unit
that will be bound to my-consumer
function.
Consumer<MyEventPayload>
.MyEventConsumer
that just prints the event.@SpringBootTest(webEnvironment = NONE)
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {
@MockBean // We mock MyEventConsumer
lateinit var eventConsumer: MyEventConsumer
@Test
fun `should consume event`() {
val eventCaptor = argumentCaptor<MyEvent>()
doNothing().`when`(eventConsumer).consume(eventCaptor.capture())
// We send a Kafka message using a helper
val text = "hello ${UUID.randomUUID()}"
kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}")
// We wait at most 5 seconds to receive the expected MyEvent in MyEventConsumer mock
await().atMost(TEN_SECONDS).untilAsserted {
assertThat(eventCaptor.allValues.filter { it.text == text }).isEqualTo(ONE)
}
}
}
Kafka topics are partitioned to allow horizontal scalability.
When a message is sent to a topic, Kafka chooses randomly the destination partition. If we specify a key for the message, Kafka will use this key to choose the destination partition, then all messages sharing the same key will always be sent to the same partition.
This is important on the consumer side, because chronological order of messages is only guaranteed within the same partition, so if we need to consume some messages in the order they were produced, we should use the same key for all of them (i.e. for messages of a user, we use the user id as the message key).
To specify the message key in MyStreamEventProducer
we can produce Message<MyEventPayload>
instead of MyEventPayload
and inform the KafkaHeaders.KEY
header:
class MyStreamEventProducer : () -> Flux<Message<MyEventPayload>>, MyEventProducer {
// ...
override fun produce(event: MyEvent) {
val message = MessageBuilder
.withPayload(MyEventPayload(event.text, event.text.length))
.setHeader(KafkaHeaders.KEY, "key-${event.text.length}")
.build()
sink.emitNext(message, FAIL_FAST)
}
// ...
}
As we are setting a key of type String
we should use a StringSerializer
as key.serializer
:
spring:
cloud:
stream:
kafka:
binder:
brokers: "localhost:9094"
producer-properties:
key.serializer: "org.apache.kafka.common.serialization.StringSerializer"
And we can test it like this:
@Test
fun `should produce event`() {
val text = "hello ${UUID.randomUUID()}"
eventProducer.produce(MyEvent(text))
val records = kafkaConsumerHelper.consumeAtLeast(1, TEN_SECONDS)
assertThat(records).singleElement().satisfies { record ->
// check the message payload
JSONAssert.assertEquals(
record.value(),
"{\"number\":${text.length},\"string\":\"$text\"}",
true
)
// check the message key
assertThat(record.key())
.isEqualTo("key-${text.length}")
}
}
partitionKeyExpression
and other related binding producer properties to achieve the same but at the binding abstraction level of Spring Cloud Stream.If errors are thrown while consuming messages, we can tell Spring Cloud Stream what to do using the following binding consumer properties:
For example we can use this configuration:
spring:
cloud:
stream:
bindings:
my-consumer-in-0:
destination: "my.topic"
group: "${spring.application.name}"
consumer:
max-attempts: 5
back-off-initial-interval: 100
default-retryable: false
retryable-exceptions:
com.rogervinas.stream.domain.MyRetryableException: true
And we can test it like this:
@Test
fun `should retry consume event 5 times`() {
// we throw a MyRetryableException every time we receive a message
val eventCaptor = argumentCaptor<MyEvent>()
doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(eventCaptor.capture())
// we send a Kafka message using a helper
val text = "hello ${UUID.randomUUID()}"
kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}")
// consumer has been called five times with the same message
await().atMost(TEN_SECONDS).untilAsserted {
assertThat(eventCaptor.allValues.filter { it.text == text }).isEqualTo(FIVE)
}
}
Additional to retries, DLQ is another mechanism we can use to deal with consumer errors.
In the case of Kafka it consists of sending to another topic all the messages that the consumer has rejected.
We can configure the DLQ using these Kafka binder consumer properties:
error.<destination>.<group>
DlqPartitionFunction
beanFor example we can use this configuration:
spring:
cloud:
stream:
kafka:
binder:
brokers: "localhost:9094"
bindings:
my-consumer-in-0:
consumer:
enable-dlq: true
dlq-name: "my.topic.errors"
dlq-partitions: 1
bindings:
my-consumer-in-0:
destination: "my.topic"
group: "${spring.application.name}"
And we can test it like this:
@Test
fun `should send to DLQ rejected messages`() {
// we throw a MyRetryableException every time we receive a message
doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any())
// we send a Kafka message using a helper
val text = "hello ${UUID.randomUUID()}"
kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}")
// we check the message has been sent to the DLQ
val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS)
assertThat(errorRecords).singleElement().satisfies { record ->
JSONAssert.assertEquals(
record.value(),
"{\"number\":${text.length},\"string\":\"$text\"}",
true
)
}
}
@ParameterizedTest
@ValueSource(strings = [
"plain text",
"{\"unknownField\":\"not expected\"}"
])
fun `send to DLQ undeserializable messages`(body: String) {
// we send a Kafka message with an invalid body using a helper
kafkaProducerHelper.send(TOPIC, body)
// we check the message has been sent to the DLQ
val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS)
assertThat(errorRecords).singleElement().satisfies { record ->
assertThat(record.value()).isEqualTo(body)
}
}
That's it! Happy coding! 💙
./gradlew test
Run with docker-compose:
docker-compose up -d
./gradlew bootRun
docker-compose down
Then you can use kcat to produce/consume to/from Kafka:
# consume
kcat -b localhost:9094 -C -t my.topic
kcat -b localhost:9094 -C -t my.topic.errors
# produce a valid message
echo '{"string":"hello!", "number":37}' | kcat -b localhost:9094 -P -t my.topic
# produce an invalid message
echo 'hello!' | kcat -b localhost:9094 -P -t my.topic