Plugin to integrate EventStoreDB into Ktor-Server!

Overview

Ktor Plugin EventStoreDB

EventStoreDB is an open-source database technology that stores your critical data in streams of immutable events. It was built from the ground up for Event Sourcing. Checkout the official website for more information.

This Plugin is an seamless integration into the world of Ktor Server, which is a lightweight server application framework to handle http requests and more for the JVM.

Installation

You can use Jitpack to install the plugin in your Ktor project. Just add the following lines to your build.gradle or maven file.

Gradle

repositories {
    // ...
    maven("https://jitpack.io")
}
dependencies {
    // ...
    implementation("com.github.tracksterz:ktor-plugin-event-store-db:VERSION")
}

Maven

<repositories>
    <repository>
        <id>jitpack.io</id>
        <url>https://jitpack.io</url>
    </repository>
</repositories>
<dependency>
    <groupId>com.github.tracksterz</groupId>
    <artifactId>ktor-plugin-event-store-db</artifactId>
    <version>VERSION</version>
</dependency>

Ktor

Version I - Standard

fun main() {
    embeddedServer(CIO, port = 8080, host = "localhost") {
        install(EventStoreDB) {// this:EventStoreDB.Configuration
            connectionString = "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false"
        }

    }.start(wait = true)
}

Version II - Convenient

fun main() {
    embeddedServer(CIO, port = 8080, host = "localhost") {
        EventStoreDB {// this:EventStoreDB.Configuration
            connectionString = "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false"
        }
    }.start(wait = true)
}

What not to expect

This plugin is based on the official event store gRPC client. Therefore, we are limited to the functionality of the client in the version set here.

Not yet implemented are:

  • Subscription groups
  • Projection management

What to expect

You can expect a lightweight wrapper around the official EventStoreDB Java client with some convenience functions that port the sdk into the Kotlin world and into Ktor Server.

If you don´t know EventStoreDB, pleaser read the documentation first.

Documention

Accessing the client

val Application.eventStoreDb
    get() = featureOrNull(EventStoreDB) ?: install(EventStoreDB)

The plugin provide you with this extension val fetching the client or if absent installing it. Therefore, the client is almost "everywhere" accessible if you follow Ktor´s extension function pattern. After you installed Ktor plugin as described above you can fetch it in your code like this.

In case you use Koin for Ktor just add the client to Koin as simple as this:

val client = install(ServerSentEvents)
modules(module { single { client } })

And get the instance where ever you need them:

val eventStoreDBClient = get<EventStoreDB>()
// or lazy
val eventStoreDBClient by instance<EventStoreDB>()

Appending events

The plugin provides these methods for appending events:

suspend fun appendToStream(streamName: String, eventType: String, message: String, options: AppendToStreamOptions): WriteResult

suspend fun appendToStream(streamName: String, eventData: EventData, options: AppendToStreamOptions): WriteResult

These functions, expecting either an EventData object, which is borrowed from the underlying java client, or for your convenience, you can pass minimal needed information to append a stream. There is no need to inject options, since they default to the default options. Further information on handling EventData and AppendToStreamOptions can be found in the docs.

Example

suspend fun Application.saveEvent(streamName: String, eventType: String, event: CustomEvent) {
    val eventData = EventData.builderAsJson(eventType, event)
    eventStoreDb.appendToStream(streamName, eventData)
}

Reading events

The plugin provides these methods for reading events:

suspend fun readStream(streamName: String): ReadResult

suspend fun readStream(streamName: String, maxCount: Long): ReadResult

suspend fun readStream(streamName: String, options: ReadStreamOptions): ReadResult

suspend fun readStream(streamName: String, maxCount: Long, options: ReadStreamOptions): ReadResult

suspend fun readAll(): ReadResult

suspend fun readAll(maxCount: Long): ReadResult

suspend fun readAll(options: ReadAllOptions): ReadResult

suspend fun readAll(maxCount: Long, options: ReadAllOptions): ReadResult

Similar to appending events to an stream of events, you can read them with several options. As you might assume, we also borrowed parts here from the underlying client why we just refer to the java client documentation for further information. Basically what we added here is the suspending nature which fits into Ktor´s concurrency model.

Example

suspend fun Application.readAllUserEvents(): List<UserEvent> {
    val readResult: List<ResolvedEvent> = eventStoreDb.readStream("user", maxCount = 10)
    println(readResult.size) // prints 10
    return readResult.map { UserEvent.fromResolvedEvent(it) }
}

Subscribing to a single stream

Subscriptions allow you to subscribe to a stream and receive notifications about new events added to the stream. You provide an event listener and an optional starting point to the subscription. The listener is called for each event from the starting point onward. If events already exist, the handler will be called for each event one by one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event appears.

The plugin provides these methods for subscribing to a single stream:

suspend fun subscribeToStream(streamName: String, listener: ResolvedEventListener): Subscription

suspend fun subscribeToStream(streamName: String, options: SubscribeToStreamOptions, listener: ResolvedEventListener): Subscription

Further information, surprise, surprise, can be found here.

Every subscription needs to be provided with an event listener which has the following signature:

typealias EventListener = suspend ResolvedEvent.() -> Unit

As you can see, the listener gets the resolved event with all the event information as a receiver attached. See the implementation further down below.

Furthermore, there is a global error event listener that gets executed whenever a subscription fails. This is the default error listener.

typealias ErrorEventListener = suspend (subscription: Subscription?, throwable: Throwable) -> Unit

var errorListener: ErrorEventListener =
    { subscription, throwable -> logger.error("Subscription[ ${subscription?.subscriptionId} ] failed due to due to ${throwable.message}") }

You can customize this in the plugin configuration section:

EventStoreDB {
    errorHandler = { subscription, throwable ->
        // your custom code goes here  
    }
}

Everytime a subscription drops, you would rarely want to reprocess all the events again. So you'd need to store the current position of the subscription somewhere, and then use it to restore the subscription from the point where it dropped off.

To manually do it on every subscription sounds a bit verbose, so we implemented it as the default behaviour of every subscription function. Check out the implementation for details:

object : SubscriptionListener() {
    override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
        streamRevisionBySubscriptionId[subscription.subscriptionId] = event.originalEvent.streamRevision
        launch { listener(event) }
    }

    override fun onError(subscription: Subscription?, throwable: Throwable) {
        launch {
            if (config.reSubscribeOnDrop && subscription != null)
                subscribeToStream(
                    streamName,
                    options.fromRevision(streamRevisionBySubscriptionId[subscription.subscriptionId]),
                    listener
                )
            config.errorListener(subscription, throwable)
        }
    }
}

Subscribing to all streams

Subscribing to $all is much the same as subscribing to a single stream. The handler will be called for every event appended after the starting position. Check out the docs for further information.

The plugin provides these methods for subscribing to multiple streams:

suspend fun subscribeToAll(listener: EventListener): Subscription

suspend fun subscribeToAll(options: SubscribeToAllOptions, listener: EventListener): Subscription

In most cases you do not want to receive all events from all streams. Therefore, there is a handy server site filtering option we like to point out here. See the full documentation for all details. For the most common filtering we provided dedicated functions:

suspend fun subscribeByStreamNameFiltered(prefix: Prefix, listener: EventListener): Subscription

suspend fun subscribeByStreamNameFiltered(regex: Regex, listener: EventListener): Subscription

suspend fun subscribeByEventTypeFiltered(prefix: Prefix, listener: EventListener): Subscription

suspend fun subscribeByEventTypeFiltered(regex: Regex, listener: EventListener): Subscription

Example:

Prefix
fun Application.subscribeToCustomerStreams() = launch {
    eventStoreDb.subscribeByStreamNameFiltered("customer-".prefix) {//this:ResolvedEvent
        val event = when (event.eventType) {
            "CustomersMailAddressChanged" -> event.getEventDataAs<CustomersMailAddressChanged>()
            else -> log.error("Received unknown event type: [ ${event.eventType} ]")
        }
        customerAggegration.applyEvent(event)
    }
}
Regex
fun Application.subscribeToAllNonSystemEvents() = launch {
    eventStoreDb.subscribeByEventTypeFiltered("/^[^\\$].*/".regex) {//this:ResolvedEvent
        logService.logEvent(event.eventType, event.eventData)
    }
}

Kotlin DSL - WIP

For those of you who are fans of Kotlin DSL´s, we provide an experimental version of the EventStoreDB as a Kotlin DSL.

Streams API

fun Application.configureStreamsSubscriptions() = launch {
    routing {
        streams {
            subscribe {
                authenticated("admin", "password") {
                    filter {
                        eventType {
                            prefixed("customer-") {
                                val event = when (event.eventType) {
                                    "CustomersMailAddressChanged" -> event.getEventDataAs<CustomersMailAddressChanged>()
                                    else -> log.error("Received unknown event type: [ ${event.eventType} ]")
                                }
                                customerAggegration.applyEvent(event)
                            }

                            regex("/^[^\\$].*/".regex) {
                                logService.logEvent(event.eventType, event.eventData)
                            }
                        }
                    }
                }
                filter {
                    eventType {
                        prefixed("order-") {
                            val event = when (event.eventType) {
                                "OrderReceived" -> event.getEventDataAs<OrderReceived>()
                                else -> log.error("Received unknown event type: [ ${event.eventType} ]")
                            }
                            orderAggegration.applyEvent(event)
                        }

                        regex("/^[^\\$].*/".regex) {
                            logService.logEvent(event.eventType, event.eventData)
                        }
                    }
                }

                start { /** replay all events **/ }
                end { /** receive only new events **/ }
            }
        }
    }
}

Stream API

fun Application.configureOrderStreamSubscriptions() = launch {
    routing {
        streams { 
            subscribe("order") {
                authenticated("admin", "password") {
                    start { /** replay all events **/ }
                    end { /** receive only new events **/ }
                    revision(1013L) { /** receive only specific events **/ }
                }

                start { /** replay all events **/ }
                end { /** receive only new events **/ }
                revision(1013L) { /** receive only specific events **/ }
            }
        }
    }
}

If you find any bugs or want to contribute, feel free to contact us!

You might also like...
Jambeez-server - Jambeez server with kotlin

jambeez-server How to start Start your own server with: docker pull ghcr.io/jamb

Plugin and Desktop app for parsing layout xml into Composable code

composed-xml Inspired by - Recompose composed-xml is a tool for parsing Android layouts into Jetpack Compose code. It can work as both Desktop app or

Custom plugin for private minecraft server

Custom Plugin Custom plugin for private minecraft server. Requirements Java 1.17 PaperMC 1.18 (Minecraft 1.18) Features Cancels Creeper griefing Build

It is a repository containing backend structure for Ktor.
It is a repository containing backend structure for Ktor.

Backend Architecture with Ktor + KMongo This project contains, Authentication using Jwt Database Layer (KMongo - Orm for MongoDB) Routing Advanced Rou

Kotlin backend based on the Clean Architecture principles. Ktor, JWT, Exposed, Flyway, KGraphQL/GraphQL generated endpoints, Gradle.
Kotlin backend based on the Clean Architecture principles. Ktor, JWT, Exposed, Flyway, KGraphQL/GraphQL generated endpoints, Gradle.

Kotlin Clean Architecture Backend Kotlin backend based on the Clean Architecture principles. The application is separated into three modules: Domain,

Ktor OpenAPI Spec Generator

Kompendium What is Kompendium Kompendium is intended to be a minimally invasive OpenApi Specification generator for Ktor. Minimally invasive meaning t

sharex image uploader using ktor

ktor-sharex-uploader uploader zdjec napisany w kotlinie przy uzyciu ktor pobierak gotowa jarka jest do pobrania tutaj config apki konfiguracje apki ma

This is a Ktor project to build your own Url shortener
This is a Ktor project to build your own Url shortener

Ktor URL Shortner This project is a implementation for creating Short URL using Ktor + Kotlin + MongoDB Usage It contains two routes if you want to im

SSU u-saint parser with Kotlin-Multiplatform and Ktor.

kusaint Soongsil University(SSU) u-Saint Parser with Kotlin Multiplatform. Prerequisites JVM !!IMPORTANT!! To run kusaint as a library in JVM environm

Releases(v1.2.1)
Owner
null
Integration Testing Kotlin Multiplatform Kata for Kotlin Developers. The main goal is to practice integration testing using Ktor and Ktor Client Mock

This kata is a Kotlin multiplatform version of the kata KataTODOApiClientKotlin of Karumi. We are here to practice integration testing using HTTP stub

Jorge Sánchez Fernández 29 Oct 3, 2022
KTor-Client---Android - The essence of KTor Client for network calls

KTor Client - Android This project encompasses the essence of KTor Client for ne

Mansoor Nisar 2 Jan 18, 2022
sample project that shows you how you can use Ktor to creat a server for real Project.

Ktor-Sample This is a sample project that shows you how you can use Ktor to creat a server for real Project. What is done Save data to database (Get a

Mohamed Emad 4 Dec 23, 2022
🍣✨ Simple Ktor server to handle GitHub -> YouTrack, usually for YouTrack Standalone.

?? sushi Simple Ktor server to handle GitHub Issues to YouTrack, usually for YouTrack Standalone. Why? There is not really a definite way to handle Gi

Noelware 2 Nov 11, 2021
Provides Ktor Server libs for building awesome Kotlin plugins which needs to provide builtin HTTP servers

Ktor Plugin Provides Ktor Server libs for building awesome Kotlin plugins which needs to provide builtin HTTP servers. Requires: https://github.com/Po

null 0 Nov 13, 2021
Live-coding a web server with Ktor

ktor-sample Live-coding a web server with Ktor Ktor is a Kotlin framework dedicated to building asynchronous servers and clients in connected systems.

Renaud Mathieu 1 May 10, 2022
User Authentication ( Compose Ktor Server MongoDB)

Utilising Security in android for Authentication and Authorization for Android I would be using mongo db to perform all basic operations no need for f

Michael Enoma 0 May 2, 2022
Firebase Authentication plugin for Ktor framework.

Firebase Authentication is a Ktor plugin which verifies requests authorized by a Firebase Auth Id Token.

Manav Tamboli 2 Jul 16, 2022
🪟 Pluggable Ktor plugin to implement Sentry for error handling and request contexts

?? Ktor Plugin for Sentry Pluggable Ktor plugin to implement Sentry for error handling and request contexts. What is this library? This basically impl

Noel 3 Dec 6, 2022
Allowing server admins to backdoor their own server!

DiscordBackdoorBot Allowing server admins to backdoor their own server! What does Discord Backdoor Bot do? Discord Backdoor bot allows the bot owner t

Awesomemoder316 1 Jun 8, 2022