Bot releases are hidden (Show)
This release is sees a big new feature Publishing and a lot of minor improvements and fixes towards 1.0 in the receiver.
A PublishScope, that can offer message (doesn't await ack), or publish which is offer + ack.
The block however waits all the offer inside, similar to coroutineScope and re-throws any failed offer.
With transaction block, that wraps the block in the correct transaction semantics and has same behavior of await offer. A transaction blocks cannot be nested, thanks @PublisherDSL.
publisher.publishScope {
offer((1..10).map {
ProducerRecord(topic.name(), "$it", "msg-$it")
})
publish((11..20).map {
ProducerRecord(topic.name(), "$it", "msg-$it")
})
transaction {
// transaction { } illegal to be called here DslMarker magic
offer((21..30).map {
ProducerRecord(topic.name(), "$it", "msg-$it")
})
publish((31..40).map {
ProducerRecord(topic.name(), "$it", "msg-$it")
})
}// Waits until all offer finished in transaction, fails if any failed
// looping
(0..100).forEach {
delay(100.milliseconds)
val record = ProducerRecord(topic.name(), "$it", "msg-$it")
offer(record)
}
// streaming
flow(1..100)
.onEach { delay(100.milliseconds) }
.map { ProducerRecord(topic.name(), "$it", "msg-$it") }
.collect { offer(it) }
}
See KafkaPublisherSpec for more examples.
Often we need to receiver/consume events from Kafka, and as a result we need to publish new events to Kafka. That typically requires a streaming solution to produce records into Kafka, and keeping track of all published records into Kafka and their lifecycle and wiring that back into a stream is tricky. So we kotlin-kafka
now offers Flow.produce
build in the same style as PublisherScope
!
produce
will send message to Kafka, and stream Result
of RecordMetadata
back to the user.
It will not stop sending messages if any error occurs, you can throw it in the collector if you want the stream to stop. Otherwise, use produceOrThrow
.
Any encountered errors will be sent to the collector as [Result.failure], and they will also be rethrown if the Flow completes without handling them (e.g. using Flow.catch
). Check Kafka's [Callback] documentation for more information on when and which errors are thrown, and which are recoverable.
suspend fun publish messages(bootStrapServers: String, topic: String) {
val publisherSettings = PublisherSettings(
bootstrapServers = bootStrapServers,
keySerializer = StringSerializer(),
valueSerializer = StringSerializer()
)
(0..10_000).asFlow()
.onEach { delay(10.milliseconds) }
.map { index ->
ProducerRecord<String, String>(topic.name(), index % 4, "$index", "Message $index")
}.produce(publisherSettings)
.collect { metadata: Result<RecordMetadata> ->
metadata
.onSuccess { println("partition: ${it.partition()}, offset: ${it.offset}") }
.onFailure { println("Failed to send: $it") }
}
All, and any feedback welcome! βΊοΈ
Full Changelog: https://github.com/nomisRev/kotlin-kafka/compare/0.3.1...0.4.0
Published by nomisRev over 1 year ago
Fixes breaking change introduced by KotlinX Coroutines 1.7.x, reported in https://github.com/nomisRev/kotlin-kafka/issues/127.
This release contains no actual changes except for dependency updates.
Full Changelog: https://github.com/nomisRev/kotlin-kafka/compare/0.3.0...0.3.1
Published by nomisRev about 2 years ago
This release implements a custom ConsumerLoop
to make strong guarantees about committing offsets, and to hide the complexity of the low-level Java SDK.
This API is inspired by Alpakka Kafka and Reactor Kafka.
More details can be found in the PR: https://github.com/nomisRev/kotlin-kafka/pull/58