Module kotlin-kafka
This project is still under development, andd started as a playground where I was playing around with Kafka in Kotlin and the Kafka SDK whilst reading the Kafka book Definite Guide from Confluent. https://www.confluent.io/resources/kafka-the-definitive-guide-v2/
Rationale
At the time of starting this repository I didn't find any bindings between Kafka SDK and Kotlin suspension. These operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised runtimes.
Goals
- Lean Core library built on top of Kotlin Std & KotlinX Coroutines (possible extensions with Arrow in additional module)
- Extensions to easily operate over the Kafka SDK with KotlinX Coroutines and
suspend
. - Flow based operators, so you can easily compose KotlinX Flow based Kafka programs
- example for testing Kafka with Test Containers in Kotlin.
Example
@JvmInline
value class Key(val index: Int)
@JvmInline
value class Message(val content: String)
fun main(): Unit =
runBlocking(Default) {
val topicName = "test-topic"
val msgCount = 10
val kafka = Kafka.container
Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
client.createTopic(NewTopic(topicName, 1, 1))
}
coroutineScope { // Run produces and consumer in a single scope
launch { // Send 20 messages, and then close the producer
val settings: ProducerSettings<Key, Message> =
ProducerSettings(
kafka.bootstrapServers,
IntegerSerializer().imap { key: Key -> key.index },
StringSerializer().imap { msg: Message -> msg.content },
Acks.All
)
(1..msgCount)
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
.asFlow()
.produce(settings)
.collect(::println)
}
launch { // Consume 20 messages as a stream, and then close the consumer
val settings: ConsumerSettings<Key, Message> =
ConsumerSettings(
kafka.bootstrapServers,
IntegerDeserializer().map(::Key),
StringDeserializer().map(::Message),
groupId = UUID.randomUUID().toString(),
autoOffsetReset = AutoOffsetReset.Earliest
)
kafkaConsumer(settings)
.subscribeTo(topicName)
.take(msgCount)
.map { "${it.key()} -> ${it.value()}" }
.collect(::println)
}
}
}
You can get the full code here.
test-topic-0@0
test-topic-0@1
test-topic-0@2
test-topic-0@3
test-topic-0@4
test-topic-0@5
test-topic-0@6
test-topic-0@7
test-topic-0@8
test-topic-0@9
Key(index=1) -> Message(content=msg: 1)
Key(index=2) -> Message(content=msg: 2)
Key(index=3) -> Message(content=msg: 3)
Key(index=4) -> Message(content=msg: 4)
Key(index=5) -> Message(content=msg: 5)
Key(index=6) -> Message(content=msg: 6)
Key(index=7) -> Message(content=msg: 7)
Key(index=8) -> Message(content=msg: 8)
Key(index=9) -> Message(content=msg: 9)
Key(index=10) -> Message(content=msg: 10)