Event-sourced actors¶
An introduction to event-sourced actors is already given in sections Overview, Architecture and the User guide. Applications use event-sourced actors for writing events to an event log and for maintaining in-memory write models on the command side (C) of a CQRS application. Event-sourced actors distinguish command processing from event processing. They must extend the EventsourcedActor trait and implement a Command handler and an Event handler.
Command handler¶
A command handler is partial function of type PartialFunction[Any, Unit]
for which a type alias Receive
exists. It can be defined by implementing onCommand
:
import scala.util._
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor
case class ExampleEvent(data: String)
case class ExampleCommand(data: String)
case class ExampleCommandSuccess(data: String)
case class ExampleCommandFailure(cause: Throwable)
class ExampleActor(override val id: String,
override val eventLog: ActorRef) extends EventsourcedActor {
/** Command handler */
override def onCommand = {
case ExampleCommand(data) =>
// validate command
// ...
// derive event
val event = ExampleEvent(data)
// persist event (asynchronously)
persist(event) {
case Success(evt) =>
// success reply
sender() ! ExampleCommandSuccess(data)
case Failure(cause) =>
// failure reply
sender() ! ExampleCommandFailure(cause)
}
}
override def onEvent = {
case ExampleEvent(data) => // ...
}
}
Messages sent by an application to an event-sourced actor are received by its command handler. Usually, a command handler first validates a command, then derives one or more events from it, persists these events with persist
and replies with the persistence result. The persist
method has the following signature[1]:
def persist[A](event: A, customDestinationAggregateIds: Set[String] = Set())(handler: Try[A] => Unit): Unit
The persist
method can be called one ore more times per received command. Calling persist
does not immediately write events to the event log. Instead, events from persist
calls are collected in memory and written to the event log when onCommand
returns.
Events are written asynchronously to the event-sourced actor’s eventLog
. After writing, the eventLog
actor internally replies to the event-sourced actor with a success or failure message which is passed as argument to the persist handler
. Before calling the persist handler
, the event-sourced actor internally calls the onEvent
handler with the written event if writing was successful and onEvent
is defined at that event.
Both, event handler and persist handler are called on a dispatcher thread of the actor. They can therefore safely access internal actor state. The sender()
reference of the original command sender is also preserved, so that a persist handler can reply to the initial command sender.
Hint
The EventsourcedActor
trait also defines a persistN
method. Refer to the EventsourcedActor API documentation for details.
Note
A command handler should not modify persistent actor state i.e. state that is derived from events.
State synchronization¶
As explained in section Command handler, events are persisted asynchronously. What happens if another command is sent to an event-sourced actor while persistence is in progress? This depends on the value of stateSync
, a member of EventsourcedActor
that can be overridden.
def stateSync: Boolean = true
If stateSync
is true
(default), new commands are stashed while persistence is in progress. Consequently, new commands see actor state that is in sync with the events in the event log. A consequence is limited write throughput, because Batching of write requests is not possible in this case[2]. This setting is recommended for event-sourced actors that must validate commands against current state.
If stateSync
is false
, new commands are dispatched to onCommand
immediately. Consequently, new commands may see stale actor state. The advantage is significantly higher write throughput as Batching of write requests is possible. This setting is recommended for event-sourced actors that don’t need to validate commands against current state.
If a sender sends several (update) commands followed by a query to an event-sourced actor that has stateSync
set to false
, the query will probably not see the state change from the preceding commands. To achieve read-your-write consistency, the command sender should wait for a reply from the last command before sending the query. The reply must of course be sent from within a persist
handler.
Note
State synchronization settings only apply to a single actor instance. Events that are emitted concurrently by other actors and handled by that instance can arrive at any time and modify actor state. Anyway, concurrent events are not relevant for achieving read-your-write consistency and should be handled as described in the User guide.
Event handler¶
An event handler is partial function of type PartialFunction[Any, Unit]
for which a type alias Receive
exists. It can be defined by implementing onEvent
. An event handler handles persisted events by updating actor state from event details.
/** Event handler */
override def onEvent = {
case ExampleEvent(details) =>
val eventSequenceNr = lastSequenceNr
val eventVectorTimestamp = lastVectorTimestamp
// ...
// update actor state
// ...
}
Event metadata of the last handled event can be obtained with the last*
methods defined by EventsourcedActor
. For example, lastSequenceNr
returns the event’s local sequence number, lastVectorTimestamp
returns the event’s vector timestamp. A complete reference is given by the EventsourcedActor API documentation.
Note
An event handler should only update internal actor state without having further side-effects. An exception is Reliable delivery of messages and Event-driven communication with PersistOnEvent.
Event-sourced views¶
An introduction to event-sourced views is already given in sections Overview, Architecture and the User guide. Applications use event-sourced views for for maintaining in-memory read models on the query side (Q) of a CQRS application.
Like event-sourced actors, event-sourced views distinguish command processing from event processing. They must implement the EventsourcedView trait. EventsourcedView
is a functional subset of EventsourcedActor
that cannot persist
events.
Event-sourced writers¶
An introduction to event-sourced writers is already given in sections Overview and Architecture. Applications use event-sourced writers for maintaining persistent read models on the query side (Q) of a CQRS application.
Like event-sourced views, event-sourced writers can only consume events from an event log but can make incremental batch updates to external, application-defined query databases. A query database can be a relational database, a graph database or whatever is needed by an application. Concrete writers must implement the EventsourcedWriter trait.
This section outlines how to update a persistent read model in Cassandra from events consumed by an event-sourced writer. The relevant events are:
case class CustomerCreated(cid: Long, first: String, last: String, address: String)
case class AddressUpdated(cid: Long, address: String)
The persistent read model is a CUSTOMER
table with the following structure:
id | first | last | address
----+--------+---------+-------------
1 | Martin | Krasser | Somewhere 1
2 | Volker | Stampa | Somewhere 3
3 | ... | ... | ...
The read model update progress is written to a separate PROGRESS
table with a single sequence_nr
column:
id | sequence_nr
----+-------------
0 | 3
The stored sequence number is that of the last successfully processed event. An event is considered as successfully processed if its data have been written to the CUSTOMER
table. Only a single row is needed in the PROGRESS
table to track the update progress for the whole CUSTOMER
table.
The event-sourced Writer
in the following example implements EventsourcedWriter[Long, Unit]
(where Long
is the type of the initial read result and Unit
the type of write results). It is initialized with an eventLog
from which it consumes events and a Cassandra Session
for writing event processing results.
import java.lang.{ Long => JLong }
import akka.actor.ActorRef
import com.datastax.driver.core._
import com.rbmhtechnology.eventuate.EventsourcedWriter
import scala.concurrent.Future
/**
* Processes `CustomerCreated` and `AddressUpdated` events and updates
* a `CUSTOMER` table in Cassandra with incremental batches.
*/
class Writer(val id: String, val eventLog: ActorRef, session: Session)
extends EventsourcedWriter[Long, Unit] {
import Writer._
import context.dispatcher
val insertCustomerStmt = session.prepare(
"INSERT INTO CUSTOMER (id, first, last, address) VALUES (?, ?, ?, ?)")
val updateCustomerStmt = session.prepare(
"UPDATE CUSTOMER SET address = ? WHERE id = ?")
val updateProgressStmt = session.prepare(
"UPDATE PROGRESS SET sequence_nr = ? WHERE id = 0")
/**
* Batch of Cassandra update statements collected during event processing.
*/
var batch: Vector[BoundStatement] = Vector.empty
/**
* Suspends replay after 16 events, triggers a `write` and then continues
* with the next 16 events. This is implements event replay backpressure,
* needed if writing to the database is slower than replaying from the
* `eventLog` (which is usually the case).
*/
override def replayBatchSize: Int =
16
override def onCommand = {
case _ =>
}
/**
* Prepares an update `batch` from handled events that is written to the
* database when `write` is called. An event handler never writes to the
* database directly.
*/
override def onEvent = {
case c @ CustomerCreated(cid, first, last, address) =>
batch = batch :+ insertCustomerStmt.bind(cid: JLong, first, last, address)
case u @ AddressUpdated(cid, address) =>
batch = batch :+ updateCustomerStmt.bind(address, cid: JLong)
}
/**
* Asynchronously writes the prepared update `batch` to the database
* together with the sequence number of the last processed event. After
* having submitted the batch, it is cleared so that further events can
* be processed while the write is in progress.
*/
override def write(): Future[Unit] = {
val snr = lastSequenceNr
val res = for {
_ <- Future.sequence(batch.map(stmt => session.executeAsync(stmt).toFuture))
_ <- session.executeAsync(updateProgressStmt.bind(snr: JLong)).toFuture
} yield ()
batch = Vector.empty // clear batch
res
}
/**
* Reads the sequence number of the last update. This method is called only
* once during writer initialization (after start or restart).
*/
override def read(): Future[Long] = {
session.executeAsync("SELECT sequence_nr FROM PROGRESS WHERE id = 0").toFuture
.map(rs => if (rs.isExhausted) 0L else rs.one().getLong(0))
}
/**
* Handles the `read` result by returning the read value + 1, indicating the
* start position for further reads from the event log.
*/
override def readSuccess(result: Long): Option[Long] =
Some(result + 1L)
}
object Writer {
import java.util.concurrent.Executor
import com.google.common.util.concurrent.ListenableFuture
import scala.concurrent.{ ExecutionContext, Promise }
import scala.language.implicitConversions
import scala.util.Try
implicit class ListenableFutureConverter[A](lf: ListenableFuture[A])(implicit executionContext: ExecutionContext) {
def toFuture: Future[A] = {
val promise = Promise[A]
lf.addListener(new Runnable {
def run() = promise.complete(Try(lf.get()))
}, executionContext.asInstanceOf[Executor])
promise.future
}
}
}
Hint
The full example source code is available here.
On a high level, the example Writer
implements the following behavior:
- During initialization (after start or restart) it asynchronously
read
s the stored update progress from thePROGRESS
table. The read result is passed as argument toreadSuccess
and incremented by1
before returning it to the caller. This causes theWriter
to resume event processing from that position in the event log. - Event are processed in
onEvent
by translating them to Cassandra update statements which are added to an in-memorybatch
of typeVector[BoundStatement]
. The batch is written to Cassandra when Eventuate calls thewrite
method. - The
write
method asynchronously updates theCUSTOMER
table with the statements contained inbatch
and then updates thePROGRESS
table with the sequence number of the last processed event. After having submitted the statements to Cassandra, the batch is cleared for further event processing. Event processing can run concurrently to write operations. - A
batch
that has been updated while a write operation is in progress is written directly after the current write operation successfully completes. If no write operation is in progress, a change tobatch
is written immediately. This keeps read model update delays at a minimum and increases batch sizes under increasing load. Batch sizes can be limited withreplayBatchSize
.
If a write
(or read
) operation fails, the writer is restarted, by default, and resumes event processing from the last stored sequence number + 1
. This behavior can be changed by overriding writeFailure
(or readFailure
) from EventsourcedWriter
.
Note
The example does not use Cassandra BatchStatement
s for reasons explained in this article. Atomic writes are not needed because database updates in this example are idempotent and can be re-tried in failure cases. Failure cases where idempotency is relevant are partial updates to the CUSTOMER
table or a failed write to the PROGRESS
table. BatchStatement
s should only be used when database updates are not idempotent and atomicity is required on database level.
Stateful writers¶
The above Writer
implements a stateless writer. Although it accumulates batches while a write operation is in progress, it cannot recover permanent in-memory state from the event log, because event processing only starts from the last stored sequence number. If a writer needs to be stateful, it must return None
from readSuccess
. In this case, event replay either starts from scratch or from a previously stored snapshot. A stateful writer should still write the update progress to the PROGRESS
table but exclude events with a sequence number less than or equal to the stored sequence number from contributing to the update batch
.
Event-sourced processors¶
An introduction to event-sourced processors is already given in sections Overview and Architecture. Applications use event-sourced processors to consume events form a source event log, process these events and write the processed events to a target event log. With processors, event logs can be connected to event stream processing pipelines and graphs.
Event-sourced processors are a specialization of Event-sourced writers where the external database is a target event log. Concrete stateless processors must implement the EventsourcedProcessor trait, stateful processors the StatefulProcessor trait (see also Stateful writers).
The following example Processor
is an implementation of EventsourcedProcessor
. In addition to providing a source eventLog
, a concrete processor must also provide a targetEventLog
:
class Processor(
val id: String,
val eventLog: ActorRef,
val targetEventLog: ActorRef)
extends EventsourcedProcessor {
// ...
override val processEvent: Process = {
// exclude event
case "my-event-1" => Seq()
// transform event
case "my-event-2" => Seq("my-event-2a")
// transform and split event
case "my-event-3" => Seq("my-event-3a", "my-event-3b")
}
}
The event handler implemented by a processor is processEvent
. The type of the handler is defined as:
type Process = PartialFunction[Any, Seq[Any]]
Processed events, to be written to the target event log, are returned by the handler as Seq[Any]
. With this handler signature, events from the source log can be
- excluded from being written to the target log by returning an empty
Seq
- transformed one-to-one by returning a
Seq
of size 1 or even - transformed and split by returning a
Seq
of size greater than1
Note
EventsourcedProcessor
and StatefulProcessor
internally ensure that writing to the target event log is idempotent. Applications don’t need to take extra care about idempotency.
State recovery¶
When an event-sourced actor or view is started or re-started, events are replayed to its onEvent
handler so that internal state can be recovered[3]. This is also the case for stateful event-sourced writers and processors. During event replay the recovering
method returns true
. Applications can also define a recovery completion handler by overriding onRecovery
:
class ExampleActor(override val id: String,
override val eventLog: ActorRef) extends EventsourcedActor {
// ...
override def onRecovery = {
case Success(_) => // ...
case Failure(_) => // ...
}
}
If replay fails the completion handler is called with a Failure
and the actor will be stopped, regardless of the action taken by the handler. The default recovery completion handler does nothing. Internally each replay request towards the event log is retried a couple of times in order to cope with a temporarily unresponsive event log or its underlying storage backend. The maximum number of retries for a replay request can be configured with:
eventuate.log.replay-retry-max = 10
Moreover the configuration value replay-retry-delay
is used to determine the delay between consecutive replay attempts:
eventuate.log.replay-retry-delay = 10s
At the beginning of event replay, the initiating actor is registered at its event log so that newly written events can be routed to that actor. During replay, the actor internally stashes these newly written events and dispatches them to onEvent
after successful replay. In a similar way, the actor also stashes new commands and dispatches them to onCommand
afterwards. This ensures that new commands never see partially recovered state. When the actor is stopped it is automatically de-registered from its event log.
Backpressure¶
Events are replayed in batches. A given batch must have been handled by an event handler before the next batch is replayed. This allows slow event handlers to put backpressure on event replay. The default replay batch size can be configured with:
eventuate.log.replay-batch-size = 4096
Event-sourced components can override the configured default value by overriding replayBatchSize
:
class ExampleActor(override val id: String,
override val eventLog: ActorRef) extends EventsourcedActor {
override def replayBatchSize: Int = 64
// ...
}
Recovery using an application-defined log sequence number¶
In order to keep recovery times small it is almost always sensible to recover using snapshots. However, in some very rare cases an event-sourced actor or view can recover quickly using an application-defined log sequence number. If defined, only events with a sequence number equal to or larger than the given sequence number are replayed.
class CustomRecoveryExampleActor(snr: Long) extends EventsourcedActor {
override def replayFromSequenceNr: Option[Long] = Some(snr)
// ...
}
Snapshots¶
Recovery times increase with the number of events that are replayed to event-sourced components. They can be decreased by starting event replay from a previously saved snapshot of internal state rather than replaying events from scratch. Event-sourced components can save snapshots by calling save
within their command handler:
import scala.util._
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor
import com.rbmhtechnology.eventuate.SnapshotMetadata
case object Save
case class SaveSuccess(metadata: SnapshotMetadata)
case class SaveFailure(cause: Throwable)
case class ExampleState(components: Vector[String] = Vector.empty)
class ExampleActor(override val id: String,
override val eventLog: ActorRef) extends EventsourcedActor {
var state: ExampleState = ExampleState()
override def onCommand = {
case Save =>
// save snapshot of internal state (asynchronously)
save(state) {
case Success(metadata) =>
// success reply
sender() ! SaveSuccess(metadata)
case Failure(cause) =>
// failure reply
sender() ! SaveFailure(cause)
}
case cmd => // ...
}
override def onEvent = {
case evt => // update state ...
}
}
Snapshots are saved asynchronously. On completion, a user-defined handler of type Try[SnapshotMetadata] => Unit
is called. Like a persist
handler, a save
handler may also close over actor state and can reply to the command sender using the sender()
reference.
An event-sourced actor that is Tracking conflicting versions of application state can also save ConcurrentVersions[A, B]
instances directly. One can even configure custom serializers for type parameter A
as explained in section Custom snapshot serialization.
During recovery, the latest snapshot saved by an event-sourced component is loaded and can be handled with the onSnapshot
handler. This handler should initialize internal actor state from the loaded snapshot:
class ExampleActor(override val id: String,
override val eventLog: ActorRef) extends EventsourcedActor {
var state: ExampleState = ExampleState()
override def onCommand = {
case Save => // ...
case cmd => // ...
}
override def onEvent = {
case evt => // update state ...
}
/** Snapshot handler */
override def onSnapshot = {
case s: ExampleState =>
// initialize internal state from loaded snapshot
state = s
}
}
If onSnapshot
is not defined at the loaded snapshot or not overridden at all, event replay starts from scratch. If onSnapshot
is defined at the loaded snapshot, only events that are not covered by that snapshot will be replayed.
Event-sourced actors that implement ConfirmedDelivery
for Reliable delivery automatically include unconfirmed messages into state snapshots. These are restored on recovery and re-delivered on recovery completion.
Note
State objects passed as argument to save
should be immutable objects. If this is not the case, the caller is responsible for creating a defensive copy before passing it as argument to save
.
Storage locations¶
Snapshots are currently stored in a directory that can be configured with
eventuate.snapshot.filesystem.dir = /my/storage/location
in application.conf
. The maximum number of stored snapshots per event-sourced component can be configured with
eventuate.snapshot.filesystem.snapshots-per-emitter-max = 3
If this number is exceeded, older snapshots are automatically deleted.
Event routing¶
An event that is emitted by an event-sourced actor or processor can be routed to other event-sourced components if they share an Event log[4] . The default event routing rules are:
- If an event-sourced component has an undefined
aggregateId
, all events are routed to it. It may choose to handle only a subset of them though. - If an event-sourced component has a defined
aggregateId
, only events emitted by event-sourced actors or processors with the sameaggregateId
are routed to it.
Routing destinations are defined during emission of an event and are persisted together with the event[5]. This makes routing decisions repeatable during event replay and allows for routing rule changes without affecting past routing decisions. Applications can define additional routing destinations with the customDestinationAggregateIds
parameter of persist
:
import scala.util._
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor
case class ExampleEvent(data: String)
case class ExampleCommand(data: String)
class ExampleActor(override val id: String,
override val eventLog: ActorRef) extends EventsourcedActor {
override def aggregateId: Option[String] = Some("a1")
override def onCommand = {
case ExampleCommand(data) =>
persist(ExampleEvent(data), customDestinationAggregateIds = Set("a2", "a3")) {
case Success(evt) => // ...
case Failure(cause) => // ...
}
}
// ...
}
Here, ExampleEvent
is routed to destinations with aggregateId
s Some(“a2”)
and Some(“a3”)
in addition to the default routing destinations with aggregateId
s Some(“a1”)
and None
.
Event-driven communication¶
Event-driven communication is one form of Event collaboration and covered in the Event-driven communication section of the User guide.
Reliable delivery¶
Reliable, event-based remote communication between event-sourced actors should be done via a Replicated event log. For reliable communication with other services that cannot connect to a replicated event log, event-sourced actors should use the ConfirmedDelivery trait:
import scala.concurrent.duration._
import scala.util._
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor
import com.rbmhtechnology.eventuate.ConfirmedDelivery
case class DeliverCommand(message: String)
case class DeliverEvent(message: String)
case class Confirmation(deliveryId: String)
case class ConfirmationEvent()
case class ReliableMessage(deliveryId: String, message: String)
case object Redeliver
class ExampleActor(destination: ActorPath,
override val id: String,
override val eventLog: ActorRef)
extends EventsourcedActor with ConfirmedDelivery {
import context.dispatcher
context.system.scheduler.schedule(
initialDelay = 10.seconds,
interval = 5.seconds,
receiver = self,
message = Redeliver)
override def onCommand = {
case DeliverCommand(message) =>
persist(DeliverEvent(message)) {
case Success(evt) => // ...
case Failure(err) => // ...
}
case Confirmation(deliveryId) if unconfirmed.contains(deliveryId) =>
persistConfirmation(ConfirmationEvent(), deliveryId) {
case Success(evt) => // ...
case Failure(err) => // ...
}
case Redeliver =>
redeliverUnconfirmed()
}
override def onEvent = {
case DeliverEvent(message) =>
val deliveryId = lastSequenceNr.toString
deliver(deliveryId, ReliableMessage(deliveryId, message), destination)
case ConfirmationEvent() =>
// handling of confirmation event is optional
}
}
ConfirmedDelivery
supports the reliable delivery of messages to destinations by enabling applications to re-deliver messages until delivery is confirmed by destinations. In the example above, the reliable delivery of a message is initiated by sending a DeliverCommand
to ExampleActor
.
The handler of the generated DeliverEvent
calls deliver
to deliver a ReliableMessage
to destination
. The deliveryId
is an identifier to correlate ReliableMessage
with a Confirmation
message. The deliveryId
can be any application-defined id. Here, the event’s sequence number is used which can be obtained with lastSequenceNumber
.
The destination
confirms the delivery of the message by sending a Confirmation
reply to the event-sourced actor from which it generates a ConfirmationEvent
. The actor uses the persistConfirmation
method to persist the confirmation event together with the delivery id. After successful persistence of the confirmation event, the corresponding reliable message is removed from the internal buffer of unconfirmed messages.
When the actor is re-started, unconfirmed reliable messages are automatically re-delivered to their destination
s. The example actor additionally schedules redeliverUnconfirmed
calls to periodically re-deliver unconfirmed messages. This is done within the actor’s command handler.
Note
In the above example a pattern guard is used for idempotent confirmation processing by ensuring that the deliveryId
of the Confirmation
message is still unconfirmed. This pattern may only be applied if the stateSync
member of the EventsourcedActor
is set to true
. For further details on stateSync
see section State synchronization.
Note
If a snapshot is taken unconfirmed messages are stored in the snapshot along with the destination ActorPath
. That is why the actual ActorPath
of the destination must not change between restarts of the actor, if, for example, the destination actor is within the same application and the application is restarted. That is why the destination actor must be named explicitly instead of having a name generated by the ActorSystem
.
Conditional requests¶
Conditional requests are covered in the Conditional requests section of the User guide.
Command stashing¶
EventsourcedView
and EventsourcedActor
override stash()
and unstashAll()
of akka.actor.Stash
so that application-specific subclasses can safely stash and unstash commands. Stashing of events is not allowed. Hence, stash()
must only be used in a command handler, using it in an event handler will throw StashError
. On the other hand, unsatshAll()
can be used anywhere i.e. in a command handler, persist handler or event handler. The following is a trivial usage example which calls stash()
in the command handler and unstashAll()
in the persist handler:
case class CreateUser(userId: String, name: String)
case class UpdateUser(userId: String, name: String)
sealed trait UserEvent
case class UserCreated(userId: String, name: String) extends UserEvent
case class UserUpdated(userId: String, name: String) extends UserEvent
case class User(userId: String, name: String)
class UserManager(val eventLog: ActorRef) extends EventsourcedActor {
private var users: Map[String, User] = Map.empty
override val id = "example"
override def onCommand = {
case CreateUser(userId, name) =>
persistUserEvent(UserCreated(userId, name), unstashAll())
case UpdateUser(userId, name) if users.contains(userId) =>
// UpdateUser received after CreateUser
persistUserEvent(UserUpdated(userId, name))
case UpdateUser(userId, name) =>
// UpdateUser received before CreateUser
stash()
// ...
}
override def onEvent = {
case UserCreated(userId, name) =>
users = users.updated(userId, User(userId, name))
case UserUpdated(userId, name) =>
users = users.updated(userId, User(userId, name))
}
private def persistUserEvent(event: UserEvent, onSuccess: => Unit = ()) =
persist(event) {
case Success(evt) =>
sender() ! evt
onSuccess
case Failure(err) =>
sender() ! err
}
}
The UserManager
maintains a persistent users
map. User can be added to the map by sending a CreateUser
command and updated by sending and UpdateUser
command. Should these commands arrive in wrong order i.e. UpdateUser
before a corresponding CreateUser
, the UserManager
stashes UpdateUser
and unstashes it after having successfully processed another CreateUser
command.
In the above implementation, an UpdateUser
command might be repeatedly stashed and unstashed if the corresponding CreateUser
command is preceded by other unrelated CreateUser
commands. Assuming that out-of-order user commands are rare, the performance impact is limited. Alternatively, one could record stashed user ids in transient actor state and conditionally call unstashAll()
by checking that state.
Behavior changes¶
Event-sourced components distinguish command processing from event processing. Consequently, applications should be able to change the behavior of command handlers and event handlers independent of each other, at runtime. Command handling behavior can be changed with commandContext.become()
and commandContext.unbecome()
, event handling behavior with eventContext.become()
and eventContext.unbecome()
(for details, see the BehaviorContext API docs):
trait ExampleActor extends EventsourcedActor {
// default command handler
override def onCommand: Receive = {
case "a" => commandContext.become(newCommandHandler)
}
// default event handler
override def onEvent: Receive = {
case "x" => eventContext.become(newEventHandler)
}
def newCommandHandler: Receive = {
case "b" =>
// restores default command handler
commandContext.unbecome()
}
def newEventHandler: Receive = {
case "y" =>
// restores default event handler
eventContext.unbecome()
}
This works for all event-sourcing abstractions except for EventsourcedProcessor
. Its eventContext
does not allow behavior changes as EventsourcedProcessor
implements default onEvent
behavior that should be changed by applications. An attempt to change that behavior will throw an UnsupportedOperationException
. Changing an EventsourcedProcessor
’s processEvent
behavior is not supported yet.
Note
Command and event handling behaviors are managed by internal behavior stacks. Eventuate does not include these behavior stacks into Snapshots when applications save
actor state. Although the state of an event handling behavior stack can be recovered by replaying events from scratch, that stack is not automatically recovered when a snapshot is loaded. Applications are therefore responsible to restore the required command and event handling behavior from application-specific snapshot details in the onSnapshot
handler. Of course, this is only necessary if the required behavior differs from the default onEvent
and onCommand
behavior.
Failure handling¶
Event-sourced components register themselves at an EventLog actor in order to be notified about changes in the event log. Directly after registration, during recovery, they read from the event log in order to recover internal state from past events. After recovery has completed, the event log actor pushes newly written events to registered actors so that they can update application state with minimal latency. If a registered actor is restarted, it recovers again from the event log and continues to process push-updates after recovery has completed.
An EventLog actor processes write requests from Event-sourced actors, Event-sourced processors and Replication endpoints. If a write succeeds it pushes the written events to registered actors (under consideration of Event routing rules) and handles the next write request. Writing to a storage backend may also fail for several reasons. In the following, it is assumed that writes are made to a remote storage backend such as the Cassandra storage backend.
A write failure reported from a storage backend driver does not necessarily mean that the events have not been written to the storage backend. For example, a write could have been actually applied to the remote storage backend but the ACK message got lost. This usually causes the driver to report a timeout. If an event log actor would simply continue with the next write request, after having informed the event emitter about the failure, the emitter and and other registered actors would erroneously assume that the emitted events do not exist in the event log. However, these events may become visible to newly registered actors that are about to recover or to replication endpoints that read events for replication.
This would violate the event ordering and consistency guarantees made by Eventuate because some registered actors would see an event stream with missing events. The following describes two options to deal with that situation:
- After a failed write, the event log actor notifies all registered actors to restart themselves so that another recovery phase would find out whether the events have been actually written or not. This is fine if the write failure was actually a lost ACK and the storage backend is immediately available for subsequent reads (neglecting a potentially high read load). If the write failure was because of a longer-lasting problem, such as a longer network partition that disconnects the application from the storage backend, registered actors would fail to recover and would be therefore be unavailable for in-memory reads.
- The event log actor itself tries to find out whether the write was successful or not, either by reading from the storage backend or by retrying the write until it succeeds, before continuing with the next write request. In this case, the log actor would inform the event emitter either about a failed write if it can guarantee that the write has not been applied to the storage backend or about a successful write if retrying the write finally succeeded. Retrying writes can only be made to storage backends that support idempotent writes. With this strategy, registered actors don’t need be restarted and remain available for in-memory reads.
In Eventuate, the second approach is taken. Should there be a longer-lasting problem with the storage backend, it may take a longer time for an event log actor to make a decision about the success or failure of a write. During that time, it will reject further writes in order to avoid being overloaded with pending write requests. This is an application of the circuit breaker design pattern.
Consequently, a write failure reported by an event log actor means that the write was actually not applied to the storage backend. This additional guarantee comes at the cost of potentially long write reply delays but allows registered actors to remain available for in-memory reads during storage backend unavailability. It also provides clearer semantics of write failures.
Circuit breaker¶
The strategy described above can be implemented by wrapping a CassandraEventLog in a CircuitBreaker actor. This is the default when creating the log actor for a Cassandra storage backend. Should the event log actor need to retry a write eventuate.log.circuit-breaker.open-after-retries
times or more, the circuit breaker opens. If open, it rejects all requests by replying with a failure message that contains an EventLogUnavailableException. If retrying the write finally succeeds, the circuit breaker closes again. The maximum number of write retries can be configured with eventuate.log.cassandra.write-retry-max
and the delay between write retries with eventuate.log.write-timeout
. If the maximum number of retries is reached, the event log actor gives up and stops itself which also stops all registered actors.
persist
failure handling¶
Asynchronous persist
operations send write requests to an EventLog actor. The write reply is passed as argument to the persist handler (see section Command handler). If the persist handler is called with a Failure
one can safely assume that the events have not been written to the storage backend. As already explained above, a consequence of this additional guarantee is that persist handler callbacks may be delayed indefinitely.
For an EventsourcedActor
with stateSync
set to true
, this means that further commands sent to that actor will be stashed until the current write completes. In this case, it is the responsibility of the application not to overload that actor with further commands. For example, an application could use timeouts for command replies and prevent sending further commands to that actor if a timeout occurred. After an application-defined delay, command sending can be resumed. This is comparable to using an application-level circuit breaker. Alternatively, an application could restart an event-sourced actor on command timeout and continue sending new commands to that actor after recovery succeeded. This however may take a while depending on the unavailability duration of the storage backend.
EventsourcedActor
s with stateSync
set to false
do not stash commands but rather send write requests immediately to the event log actor. If the log actor is busy retrying a write and the Circuit breaker opens, later persist operations will be completed immediately with an EventLogUnavailableException
failure, regardless whether the event-sourced actor has persist operations in progress or not. A persist operation of an EventsourcedActor
with stateSync
set to true
will only be completed with an EventLogUnavailableException
failure if that actor had no persist operation in progress at the time the circuit breaker opened.
persistOnEvent
failure handling¶
EventsourcedActor
s can also persist events in the Event handler if they additionally extend PersistOnEvent. An asynchronous persistOnEvent
operation may also fail for reasons explained in persist failure handling. If a persistOnEvent
operation fails, the actor is automatically restarted by throwing a PersistOnEventException
.
Recovery failure handling¶
As explained in section State recovery, event-sourced components are stopped if their recovery fails. Applications should either define a custom onRecovery
completion handler to obtain information about recovery failure details or just watch these actors if recovery failure details are not relevant.
Batch write failure handling¶
Events are written in batches. When using the Cassandra storage backend, there’s a warn threshold and fail threshold for batch sizes. The default settings in cassandra.yaml
are:
# Caution should be taken on increasing the size of this threshold as it can
# lead to node instability.
batch_size_warn_threshold_in_kb: 5
# Fail any batch exceeding this value. 50kb (10x warn threshold) by default.
batch_size_fail_threshold_in_kb: 50
When the size of an event batch exceeds the fail threshold, the batch write fails with:
com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
The corresponding entry in the Cassandra system log is:
ERROR <timestamp> Batch of prepared statements for [eventuate.log_<id>] is of size 103800, exceeding specified threshold of 51200 by 52600. (see batch_size_fail_threshold_in_kb)
Note
If Eventuate is the only writer to the Cassandra cluster then it is safe to increase these thresholds to higher values as Eventuate only makes single-partition batch writes (see also CASSANDRA-8825).
If other applications additionally make multi-partition batch writes to the same Cassandra cluster then is recommended to reduce
eventuate.log.write-batch-size = 64
and
eventuate.log.cassandra.index-update-limit = 64
to a smaller value like 32
, for example, or even smaller. Failed replication writes or index writes are re-tried automatically by Eventuate. Failed persist
operations must be re-tried by the application.
Batch replication failure handling¶
During replication, events are batch-transferred over the network. The maximum number of events per batch can be configured with:
eventuate.log.write-batch-size = 64
The maximum batch size in bytes the transport will accept is limited. If this limit is exceeded, batch transfer will fail. In this case, applications should either increase
akka.remote.netty.tcp.maximum-frame-size = 128000b
or decrease the event batch size.
Note
Batch sizes in Eventuate are currently defined in units of events whereas maximum-frame-size
is defined in bytes. This mismatch will be removed in a later release (see also ticket 166).
Custom serialization¶
Custom event serialization¶
Custom serializers for application-defined events can be configured with Akka’s serialization extension. For example, an application that wants to use a custom MyDomainEventSerializer
for events of type MyDomainEvent
(both defined in package com.example
) should add the following configuration to application.conf
:
akka.actor {
serializers {
domain-event-serializer = "com.example.MyDomainEventSerializer"
}
serialization-bindings {
"com.example.MyDomainEvent" = domain-event-serializer
}
}
MyDomainEventSerializer
must extend Akka’s Serializer trait. Please refer to Akka’s serialization extension documentation for further details.
Eventuate stores application-defined events as payload
of DurableEvents. DurableEvent
itself is serialized with DurableEventSerializer, a Protocol Buffers based serializer that delegates payload
serialization to a custom serializer. If no custom serializer is configured, Akka’s default serializer is used.
Custom snapshot serialization¶
Applications can also configure custom serializers for snapshots in the same way as for application-defined events and replication filters (see sections Custom event serialization and replication-filter-serialization).
Custom snapshot serialization also works for state managed with ConcurrentVersions[A, B]
. A custom serializer configured for type parameter A
is used whenever a snapshot of type ConcurrentVersions[A, B]
is saved (see also Tracking conflicting versions).
Event-sourced actors that extend ConfirmedDelivery
for Reliable delivery of messages to destinations will also include unconfirmed messages as deliveryAttempts
in a Snapshot. The message
field of a DeliveryAttempt can also be custom-serialized by configuring a serializer.
Custom CRDT serialization¶
Custom serializers can also be configured for the type parameter A
of MVRegister[A]
, LWWRegister[A]
and ORSet[A]
Operation-based CRDTs. These serializers are used for both persistent CRDT operations and CRDT snapshots.
Resolution of serializers when deserializing¶
When eventuate serializes application-defined events, Replication filters or snapshots it includes the identifier
of the Akka serializer and the class or string based manifest when available. When deserializing these application-defined payloads a serializer is selected as follows:
- If a class-based manifest is included, the serializer that is configured in the Akka configuration for this class is selected
- In case of a string-based manifest or no manifest the serializer is selected by the included
identifier
[1] | The customDestinationAggregateIds parameter is described in section Event routing. |
[2] | Writes from different event-sourced actors that have stateSync set to true are still batched, but not the writes from a single event-sourced actor. |
[3] | Event replay can optionally start from Snapshots of actor state. |
[4] | Event-sourced processors can additionally route events between event logs. |
[5] | The routing destinations of a DurableEvent can be obtained with its destinationAggregateIds method. |