Skip to content
This repository has been archived by the owner on Jan 27, 2024. It is now read-only.
/ dolphin Public archive

A purely functional EventStoreDB client built using CE3 on top of the EventStoreDB Java Driver


Notifications You must be signed in to change notification settings


Repository files navigation

DolphinCats friendly

Continuous Integration Clean GitHub issues Version

Table of Contents


EventStoreDB is an open-source state-transition database, designed for businesses that are ready to harness the true power of event-driven architecture. It is a purpose-built database for event-driven applications, with a focus on high performance, scalability, and reliability.


⚠️ Dolphin is a Scala wrapper for the Java client of EventStoreDB. It is a work in progress and is not ready nor recommended for production use.


Add the following to your build.sbt file:

libraryDependencies ++= Seq("io.github.lapsushq" %% "dolphin-core" % "0.0-`Latest Commit Hash`-SNAPSHOT", "io.github.lapsushq" %% "dolphin-circe" % "0.0-`Latest Commit Hash`-SNAPSHOT")


EventStoreDB distinguishes between a normal session and a persistent session. A normal session is a volatile session, which means that reads operate on the disk without the possibility of acknowledging. A persistent session, in turn, is a session that reads from the disk and provides a mechanism to acknowledge the read, in turn, you can not write with this type of subscription. This means that a persistent session could perform slower than the normal session.

Append to a stream

import dolphin.*
import cats.effect.{IO, IOApp}
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import fs2.Stream

object Main extends IOApp.Simple {

  implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]

  override def run: IO[Unit] =
    (for {
      session <-[IO](Config.Default)
      _ <- Stream.eval(
          """{"id": "9b188885-04a8-4ae0-b8a4-74a82c17d2ec", "value": 1}""".getBytes,
    } yield ())

Read from a stream

import dolphin.*
import dolphin.setting.ReadFromStreamSettings

import cats.effect.{IO, IOApp}

import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import fs2.Stream

object Main extends IOApp.Simple {

  implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]

  override def run: IO[Unit] =
    (for {
      session <-[IO](Config.Default)
      read <- Stream.eval(session.readStream("ShoppingCart", ReadFromStreamSettings.Default))
      data <- read.getEventData
      _ <- Stream.eval(IO.println(new String(data))) // {"id": "9b188885-04a8-4ae0-b8a4-74a82c17d2ec", "value": 1}
    } yield ())

EventStoreDB provides a mechanism to subscribe to a stream. This means that the client can subscribe to a stream and receive all the events that are appended to the stream. The client can also acknowledge the events that are received (if created with a persistent session).

Subscribe to a stream

There are two ways to subscribe to a stream. The first way is to use the subscribeToStream method on the session. This will return a Stream of Message objects. The second way is to use the subscribeToStream method on the session and provide a MessageHandler. This will return a Resource of Unit.

  • With subscribeToStream of Stream:
import dolphin.*
import cats.effect.{IO, IOApp}
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import fs2.Stream

import java.util.UUID
import scala.concurrent.duration.*

object Main extends IOApp.Simple {

  implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]

  private def program: Stream[IO, Unit] =
    for {
      session <-[IO](Config.Default)
      _ <- Stream
        .iterateEval(UUID.randomUUID())(_ => IO(UUID.randomUUID()))
        .evalMap { uuid =>
              s"""{"id": "${uuid}", "value": 1}""".getBytes,
        .concurrently {
          session.subscribeToStream("ShoppingCart").evalMap {
            case Message.Event(_, event, _) => String(event.getEventData))
            case Message.Cancelled(_, error) =>"Received cancellation error: ${error}")
            case Message.Confirmation(_) =>"Received confirmation")
    } yield ()

  override def run: IO[Unit] = program.compile.drain

  • With subscribeToStream of MessageHandler (i.e. Message[F, VolatileConsumer[F]] => F[Unit]):
import dolphin.*

import cats.effect.{IO, IOApp}
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger

object Main extends IOApp.Simple {

  implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]

  private val handlers: Message[IO, VolatileConsumer[IO]] => IO[Unit] = {
    case Message.Event(consumer, event, retryCount) =>"Received event: $event")
    case Message.Cancelled(consumer, error) =>"Received cancellation")
    case Message.Confirmation(consumer) =>"Received confirmation")

  override def run: IO[Unit] =
    (for {
      session <- VolatileSession.resource[IO](Config.Default)
      _ <- session.subscribeToStream("ShoppingCart", handlers)

    } yield ()).useForever



Projections are a way to transform the data in a stream. EventStoreDB provides a mechanism to create and manage projections.

import dolphin.*

import cats.effect.{IO, IOApp, Resource}
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import com.fasterxml.jackson.annotation.JsonProperty

object Main extends IOApp.Simple {

  implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]

  private val GET_TOTAL_ON_A_SHOPPING_BASKET = """fromStream('Something').
        "$init": function() {
          return {
            state: {
              id: "",
              value: 0

        Counter: function(s, e) {
          if (
          if (
              s.state.value = s.state.value +;

  // JsonProperty is super important as it is how EventStore will deserialize internally
  final case class ShoppingBasket(@JsonProperty("id") id: String, @JsonProperty("value") value: Int)

  // You need to provide an empty instance of the state
  object ShoppingBasket {
    val Empty = ShoppingBasket("", 0)

    /* EventStoreDB will use `getState` and `setState` to manipulate the state. There is no need to implement them
    * yourself. But if you need to do so, you can do it by implementing the `Stateful` trait and overriding the `getState`
    * method. To override the `setState` method, you need to implement the `WithSetter` trait and override the `setState`.
    * You can distinguish between two `states`:
    * - `ServerState` is the state that EventStoreDB will provide to you, in this case, the one given by the projection.
    * - `ClientState` is the state that you will receive which can be manipulated by you by overriding the `setState` method.
  final case class Counter() extends Stateful[ShoppingBasket] {
    def init = ShoppingBasket.Empty

  def program: Resource[IO, Unit] =
    for {
      session <- ProjectionManager.resource[IO](Config.Default)
      _       <- Resource.eval(session.create("Cart", GET_TOTAL_ON_A_SHOPPING_BASKET)).attempt
      state   <- Resource.eval(session.getState("Cart", classOf[Counter]))
      _       <- Resource.eval(
    } yield ()

  override def run: IO[Unit] = program.use(_ => IO.never)



Go to Roadmap for further information.


  • This project is not affiliated with EventStoreDB. For further information about EventStoreDB, please visit EventStoreDB.
  • For further information about the Java client, please visit EventStoreDB Java Client.
  • There's a lot to change/improve, please feel free to open an issue if you have any questions or suggestions, or if you find any bugs.
  • For further information on usage and examples, please visit Dolphin Integration Tests.


A purely functional EventStoreDB client built using CE3 on top of the EventStoreDB Java Driver








No releases published
