Event-sourced actors

An introduction to event-sourced actors is already given in sections Overview, Architecture and the User guide. Applications use event-sourced actors for writing events to an event log and for maintaining in-memory write models on the command side (C) of a CQRS application. Event-sourced actors distinguish command processing from event processing. They must extend the EventsourcedActor trait and implement a Command handler and an Event handler.

Command handler

A command handler is partial function of type PartialFunction[Any, Unit] for which a type alias Receive exists. It can be defined by implementing onCommand:

import scala.util._
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor

case class ExampleEvent(data: String)
case class ExampleCommand(data: String)
case class ExampleCommandSuccess(data: String)
case class ExampleCommandFailure(cause: Throwable)

class ExampleActor(override val id: String,
                   override val eventLog: ActorRef) extends EventsourcedActor {

  /** Command handler */
  override def onCommand = {
    case ExampleCommand(data) =>
      // validate command
      // ...

      // derive event
      val event = ExampleEvent(data)
      // persist event (asynchronously)
      persist(event) {
        case Success(evt) =>
          // success reply
          sender() ! ExampleCommandSuccess(data)
        case Failure(cause) =>
          // failure reply
          sender() ! ExampleCommandFailure(cause)
      }
  }

  override def onEvent = {
    case ExampleEvent(data) => // ...
  }
}

Messages sent by an application to an event-sourced actor are received by its command handler. Usually, a command handler first validates a command, then derives one or more events from it, persists these events with persist and replies with the persistence result. The persist method has the following signature[1]:

def persist[A](event: A, customDestinationAggregateIds: Set[String] = Set())(handler: Try[A] => Unit): Unit

The persist method can be called one ore more times per received command. Calling persist does not immediately write events to the event log. Instead, events from persist calls are collected in memory and written to the event log when onCommand returns.

Events are written asynchronously to the event-sourced actor’s eventLog. After writing, the eventLog actor internally replies to the event-sourced actor with a success or failure message which is passed as argument to the persist handler. Before calling the persist handler, the event-sourced actor internally calls the onEvent handler with the written event if writing was successful and onEvent is defined at that event.

Both, event handler and persist handler are called on a dispatcher thread of the actor. They can therefore safely access internal actor state. The sender() reference of the original command sender is also preserved, so that a persist handler can reply to the initial command sender.

Hint

The EventsourcedActor trait also defines a persistN method. Refer to the EventsourcedActor API documentation for details.

Note

A command handler should not modify persistent actor state i.e. state that is derived from events.

State synchronization

As explained in section Command handler, events are persisted asynchronously. What happens if another command is sent to an event-sourced actor while persistence is in progress? This depends on the value of stateSync, a member of EventsourcedActor that can be overridden.

def stateSync: Boolean = true

If stateSync is true (default), new commands are stashed while persistence is in progress. Consequently, new commands see actor state that is in sync with the events in the event log. A consequence is limited write throughput, because Batching of write requests is not possible in this case[2]. This setting is recommended for event-sourced actors that must validate commands against current state.

If stateSync is false, new commands are dispatched to onCommand immediately. Consequently, new commands may see stale actor state. The advantage is significantly higher write throughput as Batching of write requests is possible. This setting is recommended for event-sourced actors that don’t need to validate commands against current state.

If a sender sends several (update) commands followed by a query to an event-sourced actor that has stateSync set to false, the query will probably not see the state change from the preceding commands. To achieve read-your-write consistency, the command sender should wait for a reply from the last command before sending the query. The reply must of course be sent from within a persist handler.

Note

State synchronization settings only apply to a single actor instance. Events that are emitted concurrently by other actors and handled by that instance can arrive at any time and modify actor state. Anyway, concurrent events are not relevant for achieving read-your-write consistency and should be handled as described in the User guide.

Event handler

An event handler is partial function of type PartialFunction[Any, Unit] for which a type alias Receive exists. It can be defined by implementing onEvent. An event handler handles persisted events by updating actor state from event details.

/** Event handler */
override def onEvent = {
  case ExampleEvent(details) =>
    val eventSequenceNr = lastSequenceNr
    val eventVectorTimestamp = lastVectorTimestamp
    // ...

    // update actor state
    // ...
}

Event metadata of the last handled event can be obtained with the last* methods defined by EventsourcedActor. For example, lastSequenceNr returns the event’s local sequence number, lastVectorTimestamp returns the event’s vector timestamp. A complete reference is given by the EventsourcedActor API documentation.

Note

An event handler should only update internal actor state without having further side-effects. An exception is Reliable delivery of messages and Event-driven communication with PersistOnEvent.

Event-sourced views

An introduction to event-sourced views is already given in sections Overview, Architecture and the User guide. Applications use event-sourced views for for maintaining in-memory read models on the query side (Q) of a CQRS application.

Like event-sourced actors, event-sourced views distinguish command processing from event processing. They must implement the EventsourcedView trait. EventsourcedView is a functional subset of EventsourcedActor that cannot persist events.

Event-sourced writers

An introduction to event-sourced writers is already given in sections Overview and Architecture. Applications use event-sourced writers for maintaining persistent read models on the query side (Q) of a CQRS application.

Like event-sourced views, event-sourced writers can only consume events from an event log but can make incremental batch updates to external, application-defined query databases. A query database can be a relational database, a graph database or whatever is needed by an application. Concrete writers must implement the EventsourcedWriter trait.

This section outlines how to update a persistent read model in Cassandra from events consumed by an event-sourced writer. The relevant events are:

case class CustomerCreated(cid: Long, first: String, last: String, address: String)
case class AddressUpdated(cid: Long, address: String)

The persistent read model is a CUSTOMER table with the following structure:

 id | first  | last    | address
----+--------+---------+-------------
  1 | Martin | Krasser | Somewhere 1
  2 | Volker | Stampa  | Somewhere 3
  3 | ...    | ...     | ...

The read model update progress is written to a separate PROGRESS table with a single sequence_nr column:

 id | sequence_nr
----+-------------
  0 |           3

The stored sequence number is that of the last successfully processed event. An event is considered as successfully processed if its data have been written to the CUSTOMER table. Only a single row is needed in the PROGRESS table to track the update progress for the whole CUSTOMER table.

The event-sourced Writer in the following example implements EventsourcedWriter[Long, Unit] (where Long is the type of the initial read result and Unit the type of write results). It is initialized with an eventLog from which it consumes events and a Cassandra Session for writing event processing results.

import java.lang.{ Long => JLong }

import akka.actor.ActorRef

import com.datastax.driver.core._
import com.rbmhtechnology.eventuate.EventsourcedWriter

import scala.concurrent.Future

/**
 * Processes `CustomerCreated` and `AddressUpdated` events and updates
 * a `CUSTOMER` table in Cassandra with incremental batches.
 */
class Writer(val id: String, val eventLog: ActorRef, session: Session)
  extends EventsourcedWriter[Long, Unit] {

  import Writer._
  import context.dispatcher

  val insertCustomerStmt = session.prepare(
    "INSERT INTO CUSTOMER (id, first, last, address) VALUES (?, ?, ?, ?)")

  val updateCustomerStmt = session.prepare(
    "UPDATE CUSTOMER SET address = ? WHERE id = ?")

  val updateProgressStmt = session.prepare(
    "UPDATE PROGRESS SET sequence_nr = ? WHERE id = 0")

  /**
   * Batch of Cassandra update statements collected during event processing.
   */
  var batch: Vector[BoundStatement] = Vector.empty

  /**
   * Suspends replay after 16 events, triggers a `write` and then continues
   * with the next 16 events. This is implements event replay backpressure,
   * needed if writing to the database is slower than replaying from the
   * `eventLog` (which is usually the case).
   */
  override def replayBatchSize: Int =
    16

  override def onCommand = {
    case _ =>
  }

  /**
   * Prepares an update `batch` from handled events that is written to the
   * database when `write` is called. An event handler never writes to the
   * database directly.
   */
  override def onEvent = {
    case c @ CustomerCreated(cid, first, last, address) =>
      batch = batch :+ insertCustomerStmt.bind(cid: JLong, first, last, address)
    case u @ AddressUpdated(cid, address) =>
      batch = batch :+ updateCustomerStmt.bind(address, cid: JLong)
  }

  /**
   * Asynchronously writes the prepared update `batch` to the database
   * together with the sequence number of the last processed event. After
   * having submitted the batch, it is cleared so that further events can
   * be processed while the write is in progress.
   */
  override def write(): Future[Unit] = {
    val snr = lastSequenceNr
    val res = for {
      _ <- Future.sequence(batch.map(stmt => session.executeAsync(stmt).toFuture))
      _ <- session.executeAsync(updateProgressStmt.bind(snr: JLong)).toFuture
    } yield ()
    batch = Vector.empty // clear batch
    res
  }

  /**
   * Reads the sequence number of the last update. This method is called only
   * once during writer initialization (after start or restart).
   */
  override def read(): Future[Long] = {
    session.executeAsync("SELECT sequence_nr FROM PROGRESS WHERE id = 0").toFuture
      .map(rs => if (rs.isExhausted) 0L else rs.one().getLong(0))
  }

  /**
   * Handles the `read` result by returning the read value + 1, indicating the
   * start position for further reads from the event log.
   */
  override def readSuccess(result: Long): Option[Long] =
    Some(result + 1L)
}

object Writer {
  import java.util.concurrent.Executor

  import com.google.common.util.concurrent.ListenableFuture

  import scala.concurrent.{ ExecutionContext, Promise }
  import scala.language.implicitConversions
  import scala.util.Try

  implicit class ListenableFutureConverter[A](lf: ListenableFuture[A])(implicit executionContext: ExecutionContext) {

    def toFuture: Future[A] = {
      val promise = Promise[A]
      lf.addListener(new Runnable {
        def run() = promise.complete(Try(lf.get()))
      }, executionContext.asInstanceOf[Executor])
      promise.future
    }
  }
}

Hint

The full example source code is available here.

On a high level, the example Writer implements the following behavior:

  • During initialization (after start or restart) it asynchronously reads the stored update progress from the PROGRESS table. The read result is passed as argument to readSuccess and incremented by 1 before returning it to the caller. This causes the Writer to resume event processing from that position in the event log.
  • Event are processed in onEvent by translating them to Cassandra update statements which are added to an in-memory batch of type Vector[BoundStatement]. The batch is written to Cassandra when Eventuate calls the write method.
  • The write method asynchronously updates the CUSTOMER table with the statements contained in batch and then updates the PROGRESS table with the sequence number of the last processed event. After having submitted the statements to Cassandra, the batch is cleared for further event processing. Event processing can run concurrently to write operations.
  • A batch that has been updated while a write operation is in progress is written directly after the current write operation successfully completes. If no write operation is in progress, a change to batch is written immediately. This keeps read model update delays at a minimum and increases batch sizes under increasing load. Batch sizes can be limited with replayBatchSize.

If a write (or read) operation fails, the writer is restarted, by default, and resumes event processing from the last stored sequence number + 1. This behavior can be changed by overriding writeFailure (or readFailure) from EventsourcedWriter.

Note

The example does not use Cassandra BatchStatements for reasons explained in this article. Atomic writes are not needed because database updates in this example are idempotent and can be re-tried in failure cases. Failure cases where idempotency is relevant are partial updates to the CUSTOMER table or a failed write to the PROGRESS table. BatchStatements should only be used when database updates are not idempotent and atomicity is required on database level.

Stateful writers

The above Writer implements a stateless writer. Although it accumulates batches while a write operation is in progress, it cannot recover permanent in-memory state from the event log, because event processing only starts from the last stored sequence number. If a writer needs to be stateful, it must return None from readSuccess. In this case, event replay either starts from scratch or from a previously stored snapshot. A stateful writer should still write the update progress to the PROGRESS table but exclude events with a sequence number less than or equal to the stored sequence number from contributing to the update batch.

Event-sourced processors

An introduction to event-sourced processors is already given in sections Overview and Architecture. Applications use event-sourced processors to consume events form a source event log, process these events and write the processed events to a target event log. With processors, event logs can be connected to event stream processing pipelines and graphs.

Event-sourced processors are a specialization of Event-sourced writers where the external database is a target event log. Concrete stateless processors must implement the EventsourcedProcessor trait, stateful processors the StatefulProcessor trait (see also Stateful writers).

The following example Processor is an implementation of EventsourcedProcessor. In addition to providing a source eventLog, a concrete processor must also provide a targetEventLog:

class Processor(
    val id: String,
    val eventLog: ActorRef,
    val targetEventLog: ActorRef)
  extends EventsourcedProcessor {
  // ...

  override val processEvent: Process = {
    // exclude event
    case "my-event-1" => Seq()
    // transform event
    case "my-event-2" => Seq("my-event-2a")
    // transform and split event
    case "my-event-3" => Seq("my-event-3a", "my-event-3b")
  }
}

The event handler implemented by a processor is processEvent. The type of the handler is defined as:

type Process = PartialFunction[Any, Seq[Any]]

Processed events, to be written to the target event log, are returned by the handler as Seq[Any]. With this handler signature, events from the source log can be

  • excluded from being written to the target log by returning an empty Seq
  • transformed one-to-one by returning a Seq of size 1 or even
  • transformed and split by returning a Seq of size greater than 1

Note

EventsourcedProcessor and StatefulProcessor internally ensure that writing to the target event log is idempotent. Applications don’t need to take extra care about idempotency.

State recovery

When an event-sourced actor or view is started or re-started, events are replayed to its onEvent handler so that internal state can be recovered[3]. This is also the case for stateful event-sourced writers and processors. During event replay the recovering method returns true. Applications can also define a recovery completion handler by overriding onRecovery:

class ExampleActor(override val id: String,
                   override val eventLog: ActorRef) extends EventsourcedActor {
// ...

  override def onRecovery = {
    case Success(_) => // ...
    case Failure(_) => // ...
  }
}

If replay fails the completion handler is called with a Failure and the actor will be stopped, regardless of the action taken by the handler. The default recovery completion handler does nothing. Internally each replay request towards the event log is retried a couple of times in order to cope with a temporarily unresponsive event log or its underlying storage backend. The maximum number of retries for a replay request can be configured with:

eventuate.log.replay-retry-max = 10

Moreover the configuration value replay-retry-delay is used to determine the delay between consecutive replay attempts:

eventuate.log.replay-retry-delay = 10s

At the beginning of event replay, the initiating actor is registered at its event log so that newly written events can be routed to that actor. During replay, the actor internally stashes these newly written events and dispatches them to onEvent after successful replay. In a similar way, the actor also stashes new commands and dispatches them to onCommand afterwards. This ensures that new commands never see partially recovered state. When the actor is stopped it is automatically de-registered from its event log.

Backpressure

Events are replayed in batches. A given batch must have been handled by an event handler before the next batch is replayed. This allows slow event handlers to put backpressure on event replay. The default replay batch size can be configured with:

eventuate.log.replay-batch-size = 4096

Event-sourced components can override the configured default value by overriding replayBatchSize:

class ExampleActor(override val id: String,
                   override val eventLog: ActorRef) extends EventsourcedActor {

  override def replayBatchSize: Int = 64

  // ...
}

Recovery using an application-defined log sequence number

In order to keep recovery times small it is almost always sensible to recover using snapshots. However, in some very rare cases an event-sourced actor or view can recover quickly using an application-defined log sequence number. If defined, only events with a sequence number equal to or larger than the given sequence number are replayed.

class CustomRecoveryExampleActor(snr: Long) extends EventsourcedActor {

  override def replayFromSequenceNr: Option[Long] = Some(snr)

  // ...
}

Snapshots

Recovery times increase with the number of events that are replayed to event-sourced components. They can be decreased by starting event replay from a previously saved snapshot of internal state rather than replaying events from scratch. Event-sourced components can save snapshots by calling save within their command handler:

import scala.util._
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor
import com.rbmhtechnology.eventuate.SnapshotMetadata

case object Save
case class SaveSuccess(metadata: SnapshotMetadata)
case class SaveFailure(cause: Throwable)
case class ExampleState(components: Vector[String] = Vector.empty)

class ExampleActor(override val id: String,
                   override val eventLog: ActorRef) extends EventsourcedActor {

  var state: ExampleState = ExampleState()

  override def onCommand = {
    case Save =>
      // save snapshot of internal state (asynchronously)
      save(state) {
        case Success(metadata) =>
          // success reply
          sender() ! SaveSuccess(metadata)
        case Failure(cause) =>
          // failure reply
          sender() ! SaveFailure(cause)
      }
    case cmd => // ...
  }

  override def onEvent = {
    case evt => // update state ...
  }
}

Snapshots are saved asynchronously. On completion, a user-defined handler of type Try[SnapshotMetadata] => Unit is called. Like a persist handler, a save handler may also close over actor state and can reply to the command sender using the sender() reference.

An event-sourced actor that is Tracking conflicting versions of application state can also save ConcurrentVersions[A, B] instances directly. One can even configure custom serializers for type parameter A as explained in section Custom snapshot serialization.

During recovery, the latest snapshot saved by an event-sourced component is loaded and can be handled with the onSnapshot handler. This handler should initialize internal actor state from the loaded snapshot:

class ExampleActor(override val id: String,
                   override val eventLog: ActorRef) extends EventsourcedActor {

  var state: ExampleState = ExampleState()

  override def onCommand = {
    case Save => // ...
    case cmd => // ...
  }

  override def onEvent = {
    case evt => // update state ...
  }

  /** Snapshot handler */
  override def onSnapshot = {
    case s: ExampleState =>
      // initialize internal state from loaded snapshot
      state = s
  }
}

If onSnapshot is not defined at the loaded snapshot or not overridden at all, event replay starts from scratch. If onSnapshot is defined at the loaded snapshot, only events that are not covered by that snapshot will be replayed.

Event-sourced actors that implement ConfirmedDelivery for Reliable delivery automatically include unconfirmed messages into state snapshots. These are restored on recovery and re-delivered on recovery completion.

Note

State objects passed as argument to save should be immutable objects. If this is not the case, the caller is responsible for creating a defensive copy before passing it as argument to save.

Storage locations

Snapshots are currently stored in a directory that can be configured with

eventuate.snapshot.filesystem.dir = /my/storage/location

in application.conf. The maximum number of stored snapshots per event-sourced component can be configured with

eventuate.snapshot.filesystem.snapshots-per-emitter-max = 3

If this number is exceeded, older snapshots are automatically deleted.

Event routing

An event that is emitted by an event-sourced actor or processor can be routed to other event-sourced components if they share an Event log[4] . The default event routing rules are:

  • If an event-sourced component has an undefined aggregateId, all events are routed to it. It may choose to handle only a subset of them though.
  • If an event-sourced component has a defined aggregateId, only events emitted by event-sourced actors or processors with the same aggregateId are routed to it.

Routing destinations are defined during emission of an event and are persisted together with the event[5]. This makes routing decisions repeatable during event replay and allows for routing rule changes without affecting past routing decisions. Applications can define additional routing destinations with the customDestinationAggregateIds parameter of persist:

import scala.util._
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor

case class ExampleEvent(data: String)
case class ExampleCommand(data: String)

class ExampleActor(override val id: String,
                   override val eventLog: ActorRef) extends EventsourcedActor {

  override def aggregateId: Option[String] = Some("a1")

  override def onCommand = {
    case ExampleCommand(data) =>
      persist(ExampleEvent(data), customDestinationAggregateIds = Set("a2", "a3")) {
        case Success(evt)   => // ...
        case Failure(cause) => // ...
      }
  }

  // ...
}

Here, ExampleEvent is routed to destinations with aggregateIds Some(“a2”) and Some(“a3”) in addition to the default routing destinations with aggregateIds Some(“a1”) and None.

Event-driven communication

Event-driven communication is one form of Event collaboration and covered in the Event-driven communication section of the User guide.

Reliable delivery

Reliable, event-based remote communication between event-sourced actors should be done via a Replicated event log. For reliable communication with other services that cannot connect to a replicated event log, event-sourced actors should use the ConfirmedDelivery trait:

import scala.concurrent.duration._
import scala.util._
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor
import com.rbmhtechnology.eventuate.ConfirmedDelivery

case class DeliverCommand(message: String)
case class DeliverEvent(message: String)

case class Confirmation(deliveryId: String)
case class ConfirmationEvent()

case class ReliableMessage(deliveryId: String, message: String)
case object Redeliver

class ExampleActor(destination: ActorPath,
                   override val id: String,
                   override val eventLog: ActorRef)
  extends EventsourcedActor with ConfirmedDelivery {

  import context.dispatcher

  context.system.scheduler.schedule(
    initialDelay = 10.seconds,
    interval = 5.seconds,
    receiver = self,
    message = Redeliver)

  override def onCommand = {
    case DeliverCommand(message) =>
      persist(DeliverEvent(message)) {
        case Success(evt) => // ...
        case Failure(err) => // ...
      }
    case Confirmation(deliveryId) if unconfirmed.contains(deliveryId) =>
      persistConfirmation(ConfirmationEvent(), deliveryId) {
        case Success(evt) => // ...
        case Failure(err) => // ...
      }
    case Redeliver =>
      redeliverUnconfirmed()
  }

  override def onEvent = {
      case DeliverEvent(message) =>
        val deliveryId = lastSequenceNr.toString
        deliver(deliveryId, ReliableMessage(deliveryId, message), destination)
      case ConfirmationEvent() =>
        // handling of confirmation event is optional
  }
}

ConfirmedDelivery supports the reliable delivery of messages to destinations by enabling applications to re-deliver messages until delivery is confirmed by destinations. In the example above, the reliable delivery of a message is initiated by sending a DeliverCommand to ExampleActor.

The handler of the generated DeliverEvent calls deliver to deliver a ReliableMessage to destination. The deliveryId is an identifier to correlate ReliableMessage with a Confirmation message. The deliveryId can be any application-defined id. Here, the event’s sequence number is used which can be obtained with lastSequenceNumber.

The destination confirms the delivery of the message by sending a Confirmation reply to the event-sourced actor from which it generates a ConfirmationEvent. The actor uses the persistConfirmation method to persist the confirmation event together with the delivery id. After successful persistence of the confirmation event, the corresponding reliable message is removed from the internal buffer of unconfirmed messages.

When the actor is re-started, unconfirmed reliable messages are automatically re-delivered to their destinations. The example actor additionally schedules redeliverUnconfirmed calls to periodically re-deliver unconfirmed messages. This is done within the actor’s command handler.

Note

In the above example a pattern guard is used for idempotent confirmation processing by ensuring that the deliveryId of the Confirmation message is still unconfirmed. This pattern may only be applied if the stateSync member of the EventsourcedActor is set to true. For further details on stateSync see section State synchronization.

Note

If a snapshot is taken unconfirmed messages are stored in the snapshot along with the destination ActorPath. That is why the actual ActorPath of the destination must not change between restarts of the actor, if, for example, the destination actor is within the same application and the application is restarted. That is why the destination actor must be named explicitly instead of having a name generated by the ActorSystem.

Conditional requests

Conditional requests are covered in the Conditional requests section of the User guide.

Command stashing

EventsourcedView and EventsourcedActor override stash() and unstashAll() of akka.actor.Stash so that application-specific subclasses can safely stash and unstash commands. Stashing of events is not allowed. Hence, stash() must only be used in a command handler, using it in an event handler will throw StashError. On the other hand, unsatshAll() can be used anywhere i.e. in a command handler, persist handler or event handler. The following is a trivial usage example which calls stash() in the command handler and unstashAll() in the persist handler:

case class CreateUser(userId: String, name: String)
case class UpdateUser(userId: String, name: String)

sealed trait UserEvent

case class UserCreated(userId: String, name: String) extends UserEvent
case class UserUpdated(userId: String, name: String) extends UserEvent

case class User(userId: String, name: String)

class UserManager(val eventLog: ActorRef) extends EventsourcedActor {
  private var users: Map[String, User] = Map.empty

  override val id = "example"

  override def onCommand = {
    case CreateUser(userId, name) =>
      persistUserEvent(UserCreated(userId, name), unstashAll())
    case UpdateUser(userId, name) if users.contains(userId) =>
      // UpdateUser received after CreateUser
      persistUserEvent(UserUpdated(userId, name))
    case UpdateUser(userId, name) =>
      // UpdateUser received before CreateUser
      stash()
    // ...
  }

  override def onEvent = {
    case UserCreated(userId, name) =>
      users = users.updated(userId, User(userId, name))
    case UserUpdated(userId, name) =>
      users = users.updated(userId, User(userId, name))
  }

  private def persistUserEvent(event: UserEvent, onSuccess: => Unit = ()) =
    persist(event) {
      case Success(evt) =>
        sender() ! evt
        onSuccess
      case Failure(err) =>
        sender() ! err
    }
}

The UserManager maintains a persistent users map. User can be added to the map by sending a CreateUser command and updated by sending and UpdateUser command. Should these commands arrive in wrong order i.e. UpdateUser before a corresponding CreateUser, the UserManager stashes UpdateUser and unstashes it after having successfully processed another CreateUser command.

In the above implementation, an UpdateUser command might be repeatedly stashed and unstashed if the corresponding CreateUser command is preceded by other unrelated CreateUser commands. Assuming that out-of-order user commands are rare, the performance impact is limited. Alternatively, one could record stashed user ids in transient actor state and conditionally call unstashAll() by checking that state.

Behavior changes

Event-sourced components distinguish command processing from event processing. Consequently, applications should be able to change the behavior of command handlers and event handlers independent of each other, at runtime. Command handling behavior can be changed with commandContext.become() and commandContext.unbecome(), event handling behavior with eventContext.become() and eventContext.unbecome() (for details, see the BehaviorContext API docs):

trait ExampleActor extends EventsourcedActor {
  // default command handler
  override def onCommand: Receive = {
    case "a" => commandContext.become(newCommandHandler)
  }

  // default event handler
  override def onEvent: Receive = {
    case "x" => eventContext.become(newEventHandler)
  }

  def newCommandHandler: Receive = {
    case "b" =>
      // restores default command handler
      commandContext.unbecome()
  }

  def newEventHandler: Receive = {
    case "y" =>
      // restores default event handler
      eventContext.unbecome()
  }

This works for all event-sourcing abstractions except for EventsourcedProcessor. Its eventContext does not allow behavior changes as EventsourcedProcessor implements default onEvent behavior that should be changed by applications. An attempt to change that behavior will throw an UnsupportedOperationException. Changing an EventsourcedProcessor’s processEvent behavior is not supported yet.

Note

Command and event handling behaviors are managed by internal behavior stacks. Eventuate does not include these behavior stacks into Snapshots when applications save actor state. Although the state of an event handling behavior stack can be recovered by replaying events from scratch, that stack is not automatically recovered when a snapshot is loaded. Applications are therefore responsible to restore the required command and event handling behavior from application-specific snapshot details in the onSnapshot handler. Of course, this is only necessary if the required behavior differs from the default onEvent and onCommand behavior.

Failure handling

Event-sourced components register themselves at an EventLog actor in order to be notified about changes in the event log. Directly after registration, during recovery, they read from the event log in order to recover internal state from past events. After recovery has completed, the event log actor pushes newly written events to registered actors so that they can update application state with minimal latency. If a registered actor is restarted, it recovers again from the event log and continues to process push-updates after recovery has completed.

An EventLog actor processes write requests from Event-sourced actors, Event-sourced processors and Replication endpoints. If a write succeeds it pushes the written events to registered actors (under consideration of Event routing rules) and handles the next write request. Writing to a storage backend may also fail for several reasons. In the following, it is assumed that writes are made to a remote storage backend such as the Cassandra storage backend.

A write failure reported from a storage backend driver does not necessarily mean that the events have not been written to the storage backend. For example, a write could have been actually applied to the remote storage backend but the ACK message got lost. This usually causes the driver to report a timeout. If an event log actor would simply continue with the next write request, after having informed the event emitter about the failure, the emitter and and other registered actors would erroneously assume that the emitted events do not exist in the event log. However, these events may become visible to newly registered actors that are about to recover or to replication endpoints that read events for replication.

This would violate the event ordering and consistency guarantees made by Eventuate because some registered actors would see an event stream with missing events. The following describes two options to deal with that situation:

  1. After a failed write, the event log actor notifies all registered actors to restart themselves so that another recovery phase would find out whether the events have been actually written or not. This is fine if the write failure was actually a lost ACK and the storage backend is immediately available for subsequent reads (neglecting a potentially high read load). If the write failure was because of a longer-lasting problem, such as a longer network partition that disconnects the application from the storage backend, registered actors would fail to recover and would be therefore be unavailable for in-memory reads.
  2. The event log actor itself tries to find out whether the write was successful or not, either by reading from the storage backend or by retrying the write until it succeeds, before continuing with the next write request. In this case, the log actor would inform the event emitter either about a failed write if it can guarantee that the write has not been applied to the storage backend or about a successful write if retrying the write finally succeeded. Retrying writes can only be made to storage backends that support idempotent writes. With this strategy, registered actors don’t need be restarted and remain available for in-memory reads.

In Eventuate, the second approach is taken. Should there be a longer-lasting problem with the storage backend, it may take a longer time for an event log actor to make a decision about the success or failure of a write. During that time, it will reject further writes in order to avoid being overloaded with pending write requests. This is an application of the circuit breaker design pattern.

Consequently, a write failure reported by an event log actor means that the write was actually not applied to the storage backend. This additional guarantee comes at the cost of potentially long write reply delays but allows registered actors to remain available for in-memory reads during storage backend unavailability. It also provides clearer semantics of write failures.

Circuit breaker

The strategy described above can be implemented by wrapping a CassandraEventLog in a CircuitBreaker actor. This is the default when creating the log actor for a Cassandra storage backend. Should the event log actor need to retry a write eventuate.log.circuit-breaker.open-after-retries times or more, the circuit breaker opens. If open, it rejects all requests by replying with a failure message that contains an EventLogUnavailableException. If retrying the write finally succeeds, the circuit breaker closes again. The maximum number of write retries can be configured with eventuate.log.cassandra.write-retry-max and the delay between write retries with eventuate.log.write-timeout. If the maximum number of retries is reached, the event log actor gives up and stops itself which also stops all registered actors.

persist failure handling

Asynchronous persist operations send write requests to an EventLog actor. The write reply is passed as argument to the persist handler (see section Command handler). If the persist handler is called with a Failure one can safely assume that the events have not been written to the storage backend. As already explained above, a consequence of this additional guarantee is that persist handler callbacks may be delayed indefinitely.

For an EventsourcedActor with stateSync set to true, this means that further commands sent to that actor will be stashed until the current write completes. In this case, it is the responsibility of the application not to overload that actor with further commands. For example, an application could use timeouts for command replies and prevent sending further commands to that actor if a timeout occurred. After an application-defined delay, command sending can be resumed. This is comparable to using an application-level circuit breaker. Alternatively, an application could restart an event-sourced actor on command timeout and continue sending new commands to that actor after recovery succeeded. This however may take a while depending on the unavailability duration of the storage backend.

EventsourcedActors with stateSync set to false do not stash commands but rather send write requests immediately to the event log actor. If the log actor is busy retrying a write and the Circuit breaker opens, later persist operations will be completed immediately with an EventLogUnavailableException failure, regardless whether the event-sourced actor has persist operations in progress or not. A persist operation of an EventsourcedActor with stateSync set to true will only be completed with an EventLogUnavailableException failure if that actor had no persist operation in progress at the time the circuit breaker opened.

persistOnEvent failure handling

EventsourcedActors can also persist events in the Event handler if they additionally extend PersistOnEvent. An asynchronous persistOnEvent operation may also fail for reasons explained in persist failure handling. If a persistOnEvent operation fails, the actor is automatically restarted by throwing a PersistOnEventException.

Recovery failure handling

As explained in section State recovery, event-sourced components are stopped if their recovery fails. Applications should either define a custom onRecovery completion handler to obtain information about recovery failure details or just watch these actors if recovery failure details are not relevant.

Batch write failure handling

Events are written in batches. When using the Cassandra storage backend, there’s a warn threshold and fail threshold for batch sizes. The default settings in cassandra.yaml are:

# Caution should be taken on increasing the size of this threshold as it can
# lead to node instability.
batch_size_warn_threshold_in_kb: 5

# Fail any batch exceeding this value. 50kb (10x warn threshold) by default.
batch_size_fail_threshold_in_kb: 50

When the size of an event batch exceeds the fail threshold, the batch write fails with:

com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large

The corresponding entry in the Cassandra system log is:

ERROR <timestamp> Batch of prepared statements for [eventuate.log_<id>] is of size 103800, exceeding specified threshold of 51200 by 52600. (see batch_size_fail_threshold_in_kb)

Note

If Eventuate is the only writer to the Cassandra cluster then it is safe to increase these thresholds to higher values as Eventuate only makes single-partition batch writes (see also CASSANDRA-8825).

If other applications additionally make multi-partition batch writes to the same Cassandra cluster then is recommended to reduce

eventuate.log.write-batch-size = 64

and

eventuate.log.cassandra.index-update-limit = 64

to a smaller value like 32, for example, or even smaller. Failed replication writes or index writes are re-tried automatically by Eventuate. Failed persist operations must be re-tried by the application.

Batch replication failure handling

During replication, events are batch-transferred over the network. The maximum number of events per batch can be configured with:

eventuate.log.write-batch-size = 64

The maximum batch size in bytes the transport will accept is limited. If this limit is exceeded, batch transfer will fail. In this case, applications should either increase

akka.remote.netty.tcp.maximum-frame-size = 128000b

or decrease the event batch size.

Note

Batch sizes in Eventuate are currently defined in units of events whereas maximum-frame-size is defined in bytes. This mismatch will be removed in a later release (see also ticket 166).

Custom serialization

Custom event serialization

Custom serializers for application-defined events can be configured with Akka’s serialization extension. For example, an application that wants to use a custom MyDomainEventSerializer for events of type MyDomainEvent (both defined in package com.example) should add the following configuration to application.conf:

akka.actor {
  serializers {
    domain-event-serializer = "com.example.MyDomainEventSerializer"
  }

  serialization-bindings {
    "com.example.MyDomainEvent" = domain-event-serializer
  }
}

MyDomainEventSerializer must extend Akka’s Serializer trait. Please refer to Akka’s serialization extension documentation for further details.

Eventuate stores application-defined events as payload of DurableEvents. DurableEvent itself is serialized with DurableEventSerializer, a Protocol Buffers based serializer that delegates payload serialization to a custom serializer. If no custom serializer is configured, Akka’s default serializer is used.

Custom snapshot serialization

Applications can also configure custom serializers for snapshots in the same way as for application-defined events and replication filters (see sections Custom event serialization and replication-filter-serialization).

Custom snapshot serialization also works for state managed with ConcurrentVersions[A, B]. A custom serializer configured for type parameter A is used whenever a snapshot of type ConcurrentVersions[A, B] is saved (see also Tracking conflicting versions).

Event-sourced actors that extend ConfirmedDelivery for Reliable delivery of messages to destinations will also include unconfirmed messages as deliveryAttempts in a Snapshot. The message field of a DeliveryAttempt can also be custom-serialized by configuring a serializer.

Custom CRDT serialization

Custom serializers can also be configured for the type parameter A of MVRegister[A], LWWRegister[A] and ORSet[A] Operation-based CRDTs. These serializers are used for both persistent CRDT operations and CRDT snapshots.

Resolution of serializers when deserializing

When eventuate serializes application-defined events, Replication filters or snapshots it includes the identifier of the Akka serializer and the class or string based manifest when available. When deserializing these application-defined payloads a serializer is selected as follows:

  • If a class-based manifest is included, the serializer that is configured in the Akka configuration for this class is selected
  • In case of a string-based manifest or no manifest the serializer is selected by the included identifier
[1]The customDestinationAggregateIds parameter is described in section Event routing.
[2]Writes from different event-sourced actors that have stateSync set to true are still batched, but not the writes from a single event-sourced actor.
[3]Event replay can optionally start from Snapshots of actor state.
[4]Event-sourced processors can additionally route events between event logs.
[5]The routing destinations of a DurableEvent can be obtained with its destinationAggregateIds method.