Ktor Plugin EventStoreDB
EventStoreDB is an open-source database technology that stores your critical data in streams of immutable events. It was built from the ground up for Event Sourcing. Checkout the official website for more information.
This Plugin is an seamless integration into the world of Ktor Server, which is a lightweight server application framework to handle http requests and more for the JVM.
Installation
You can use Jitpack to install the plugin in your Ktor project. Just add the following lines to your build.gradle or maven file.
Gradle
repositories {
// ...
maven("https://jitpack.io")
}
dependencies {
// ...
implementation("com.github.tracksterz:ktor-plugin-event-store-db:VERSION")
}
Maven
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
<dependency>
<groupId>com.github.tracksterz</groupId>
<artifactId>ktor-plugin-event-store-db</artifactId>
<version>VERSION</version>
</dependency>
Ktor
Version I - Standard
fun main() {
embeddedServer(CIO, port = 8080, host = "localhost") {
install(EventStoreDB) {// this:EventStoreDB.Configuration
connectionString = "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false"
}
}.start(wait = true)
}
Version II - Convenient
fun main() {
embeddedServer(CIO, port = 8080, host = "localhost") {
EventStoreDB {// this:EventStoreDB.Configuration
connectionString = "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false"
}
}.start(wait = true)
}
What not to expect
This plugin is based on the official event store gRPC client. Therefore, we are limited to the functionality of the client in the version set here.
Not yet implemented are:
- Subscription groups
- Projection management
What to expect
You can expect a lightweight wrapper around the official EventStoreDB Java client with some convenience functions that port the sdk into the Kotlin world and into Ktor Server.
If you don´t know EventStoreDB, pleaser read the documentation first.
Documention
Accessing the client
val Application.eventStoreDb
get() = featureOrNull(EventStoreDB) ?: install(EventStoreDB)
The plugin provide you with this extension val fetching the client or if absent installing it. Therefore, the client is almost "everywhere" accessible if you follow Ktor´s extension function pattern. After you installed Ktor plugin as described above you can fetch it in your code like this.
In case you use Koin for Ktor just add the client to Koin as simple as this:
val client = install(ServerSentEvents)
modules(module { single { client } })
And get the instance where ever you need them:
val eventStoreDBClient = get<EventStoreDB>()
// or lazy
val eventStoreDBClient by instance<EventStoreDB>()
Appending events
The plugin provides these methods for appending events:
suspend fun appendToStream(streamName: String, eventType: String, message: String, options: AppendToStreamOptions): WriteResult
suspend fun appendToStream(streamName: String, eventData: EventData, options: AppendToStreamOptions): WriteResult
These functions, expecting either an EventData object, which is borrowed from the underlying java client, or for your convenience, you can pass minimal needed information to append a stream. There is no need to inject options, since they default to the default options. Further information on handling EventData and AppendToStreamOptions can be found in the docs.
Example
suspend fun Application.saveEvent(streamName: String, eventType: String, event: CustomEvent) {
val eventData = EventData.builderAsJson(eventType, event)
eventStoreDb.appendToStream(streamName, eventData)
}
Reading events
The plugin provides these methods for reading events:
suspend fun readStream(streamName: String): ReadResult
suspend fun readStream(streamName: String, maxCount: Long): ReadResult
suspend fun readStream(streamName: String, options: ReadStreamOptions): ReadResult
suspend fun readStream(streamName: String, maxCount: Long, options: ReadStreamOptions): ReadResult
suspend fun readAll(): ReadResult
suspend fun readAll(maxCount: Long): ReadResult
suspend fun readAll(options: ReadAllOptions): ReadResult
suspend fun readAll(maxCount: Long, options: ReadAllOptions): ReadResult
Similar to appending events to an stream of events, you can read them with several options. As you might assume, we also borrowed parts here from the underlying client why we just refer to the java client documentation for further information. Basically what we added here is the suspending nature which fits into Ktor´s concurrency model.
Example
suspend fun Application.readAllUserEvents(): List<UserEvent> {
val readResult: List<ResolvedEvent> = eventStoreDb.readStream("user", maxCount = 10)
println(readResult.size) // prints 10
return readResult.map { UserEvent.fromResolvedEvent(it) }
}
Subscribing to a single stream
Subscriptions allow you to subscribe to a stream and receive notifications about new events added to the stream. You provide an event listener and an optional starting point to the subscription. The listener is called for each event from the starting point onward. If events already exist, the handler will be called for each event one by one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event appears.
The plugin provides these methods for subscribing to a single stream:
suspend fun subscribeToStream(streamName: String, listener: ResolvedEventListener): Subscription
suspend fun subscribeToStream(streamName: String, options: SubscribeToStreamOptions, listener: ResolvedEventListener): Subscription
Further information, surprise, surprise, can be found here.
Every subscription needs to be provided with an event listener which has the following signature:
typealias EventListener = suspend ResolvedEvent.() -> Unit
As you can see, the listener gets the resolved event with all the event information as a receiver attached. See the implementation further down below.
Furthermore, there is a global error event listener that gets executed whenever a subscription fails. This is the default error listener.
typealias ErrorEventListener = suspend (subscription: Subscription?, throwable: Throwable) -> Unit
var errorListener: ErrorEventListener =
{ subscription, throwable -> logger.error("Subscription[ ${subscription?.subscriptionId} ] failed due to due to ${throwable.message}") }
You can customize this in the plugin configuration section:
EventStoreDB {
errorHandler = { subscription, throwable ->
// your custom code goes here
}
}
Everytime a subscription drops, you would rarely want to reprocess all the events again. So you'd need to store the current position of the subscription somewhere, and then use it to restore the subscription from the point where it dropped off.
To manually do it on every subscription sounds a bit verbose, so we implemented it as the default behaviour of every subscription function. Check out the implementation for details:
object : SubscriptionListener() {
override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
streamRevisionBySubscriptionId[subscription.subscriptionId] = event.originalEvent.streamRevision
launch { listener(event) }
}
override fun onError(subscription: Subscription?, throwable: Throwable) {
launch {
if (config.reSubscribeOnDrop && subscription != null)
subscribeToStream(
streamName,
options.fromRevision(streamRevisionBySubscriptionId[subscription.subscriptionId]),
listener
)
config.errorListener(subscription, throwable)
}
}
}
Subscribing to all streams
Subscribing to $all
is much the same as subscribing to a single stream. The handler will be called for every event appended after the starting position. Check out the docs for further information.
The plugin provides these methods for subscribing to multiple streams:
suspend fun subscribeToAll(listener: EventListener): Subscription
suspend fun subscribeToAll(options: SubscribeToAllOptions, listener: EventListener): Subscription
In most cases you do not want to receive all events from all streams. Therefore, there is a handy server site filtering option we like to point out here. See the full documentation for all details. For the most common filtering we provided dedicated functions:
suspend fun subscribeByStreamNameFiltered(prefix: Prefix, listener: EventListener): Subscription
suspend fun subscribeByStreamNameFiltered(regex: Regex, listener: EventListener): Subscription
suspend fun subscribeByEventTypeFiltered(prefix: Prefix, listener: EventListener): Subscription
suspend fun subscribeByEventTypeFiltered(regex: Regex, listener: EventListener): Subscription
Example:
Prefix
fun Application.subscribeToCustomerStreams() = launch {
eventStoreDb.subscribeByStreamNameFiltered("customer-".prefix) {//this:ResolvedEvent
val event = when (event.eventType) {
"CustomersMailAddressChanged" -> event.getEventDataAs<CustomersMailAddressChanged>()
else -> log.error("Received unknown event type: [ ${event.eventType} ]")
}
customerAggegration.applyEvent(event)
}
}
Regex
fun Application.subscribeToAllNonSystemEvents() = launch {
eventStoreDb.subscribeByEventTypeFiltered("/^[^\\$].*/".regex) {//this:ResolvedEvent
logService.logEvent(event.eventType, event.eventData)
}
}
Kotlin DSL - WIP
For those of you who are fans of Kotlin DSL´s, we provide an experimental version of the EventStoreDB as a Kotlin DSL.
Streams API
fun Application.configureStreamsSubscriptions() = launch {
routing {
streams {
subscribe {
authenticated("admin", "password") {
filter {
eventType {
prefixed("customer-") {
val event = when (event.eventType) {
"CustomersMailAddressChanged" -> event.getEventDataAs<CustomersMailAddressChanged>()
else -> log.error("Received unknown event type: [ ${event.eventType} ]")
}
customerAggegration.applyEvent(event)
}
regex("/^[^\\$].*/".regex) {
logService.logEvent(event.eventType, event.eventData)
}
}
}
}
filter {
eventType {
prefixed("order-") {
val event = when (event.eventType) {
"OrderReceived" -> event.getEventDataAs<OrderReceived>()
else -> log.error("Received unknown event type: [ ${event.eventType} ]")
}
orderAggegration.applyEvent(event)
}
regex("/^[^\\$].*/".regex) {
logService.logEvent(event.eventType, event.eventData)
}
}
}
start { /** replay all events **/ }
end { /** receive only new events **/ }
}
}
}
}
Stream API
fun Application.configureOrderStreamSubscriptions() = launch {
routing {
streams {
subscribe("order") {
authenticated("admin", "password") {
start { /** replay all events **/ }
end { /** receive only new events **/ }
revision(1013L) { /** receive only specific events **/ }
}
start { /** replay all events **/ }
end { /** receive only new events **/ }
revision(1013L) { /** receive only specific events **/ }
}
}
}
}
If you find any bugs or want to contribute, feel free to contact us!