User guide¶
This is a brief user guide to Eventuate. It is recommended to read sections Overview and Architecture first. Based on simple examples, you’ll see how to
- implement an event-sourced actor
- replicate actor state with event sourcing
- detect concurrent updates to replicated state
- track conflicts from concurrent updates
- resolve conflicts automatically and interactively
- make concurrent updates conflict-free with operation-based CRDTs
- implement an event-sourced view over many event-sourced actors
- achieve causal read consistency across event-sourced actors and views and
- implement event-driven communication between event-sourced actors.
The user guide only scratches the surface of Eventuate. You can find further details in the Reference.
Event-sourced actors¶
An event-sourced actor is an actor that captures changes to its internal state as a sequence of events. It persists these events to an event log and replays them to recover internal state after a crash or a planned re-start. This is the basic idea behind event sourcing: instead of storing current application state, the full history of changes is stored as immutable facts and current state is derived from these facts.
Event-sourced actors distinguish between commands and events. During command processing they usually validate external commands against internal state and, if validation succeeds, write one or more events to their event log. During event processing they consume events they have written and update internal state by handling these events.
Hint
Event-sourced actors can also write new events during event processing. This is covered in section Event-driven communication.
Concrete event-sourced actors must implement the EventsourcedActor
trait. The following ExampleActor
maintains state of type Vector[String]
to which entries can be appended:
import scala.util._
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor
// Commands
case object Print
case class Append(entry: String)
// Command replies
case class AppendSuccess(entry: String)
case class AppendFailure(cause: Throwable)
// Event
case class Appended(entry: String)
class ExampleActor(override val id: String,
override val aggregateId: Option[String],
override val eventLog: ActorRef) extends EventsourcedActor {
private var currentState: Vector[String] = Vector.empty
override def onCommand = {
case Print =>
println(s"[id = $id, aggregate id = ${aggregateId.getOrElse("<undefined>")}] ${currentState.mkString(",")}")
case Append(entry) => persist(Appended(entry)) {
case Success(evt) => sender() ! AppendSuccess(entry)
case Failure(err) => sender() ! AppendFailure(err)
}
}
override def onEvent = {
case Appended(entry) => currentState = currentState :+ entry
}
}
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.rbmhtechnology.eventuate.AbstractEventsourcedActor;
import com.rbmhtechnology.eventuate.ReplicationConnection;
import com.rbmhtechnology.eventuate.ResultHandler;
import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import static akka.actor.ActorRef.noSender;
import static java.lang.System.out;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.concat;
import static java.util.stream.Stream.of;
class ExampleActor extends AbstractEventsourcedActor {
private final Optional<String> aggregateId;
private Collection<String> currentState = Collections.emptyList();
public ExampleActor(String id, Optional<String> aggregateId, ActorRef eventLog) {
super(id, eventLog);
this.aggregateId = aggregateId;
}
@Override
public Optional<String> getAggregateId() {
return aggregateId;
}
@Override
public AbstractActor.Receive createOnCommand() {
return receiveBuilder()
.match(Print.class, cmd -> printState(id(), currentState))
.match(Append.class, cmd -> persist(new Appended(cmd.entry), ResultHandler.on(
evt -> getSender().tell(new AppendSuccess(evt.entry), getSelf()),
err -> getSender().tell(new AppendFailure(err), getSelf())
)))
.build();
}
@Override
public AbstractActor.Receive createOnEvent() {
return receiveBuilder()
.match(Appended.class, evt -> currentState = append(currentState, evt.entry))
.build();
}
private void printState(String id, Collection<String> currentState) {
out.println(String.format("[id = %s, aggregate id = %s] %s", id, getAggregateId().orElseGet(() -> "undefined"),
String.join(",", currentState)));
}
private <T> Collection<T> append(Collection<T> collection, T el) {
return concat(collection.stream(), of(el)).collect(toList());
}
}
// Commands
class Print {
}
class Append {
public final String entry;
public Append(String entry) {
this.entry = entry;
}
}
// Command replies
class AppendSuccess {
public final String entry;
public AppendSuccess(String entry) {
this.entry = entry;
}
}
class AppendFailure {
public final Throwable cause;
public AppendFailure(Throwable cause) {
this.cause = cause;
}
}
// Events
class Appended {
public final String entry;
public Appended(String entry) {
this.entry = entry;
}
}
For modifying currentState
, applications send Append
commands which are handled by the onCommand
handler. From an Append
command, the handler derives an Appended
event and persist
s it to the given eventLog
. If persistence succeeds, the command sender is informed about successful processing. If persistence fails, the command sender is informed about the failure so it can retry, if needed.
The onEvent
handler updates currentState
from persisted events and is automatically called after a successful persist
. If the actor is re-started, either after a crash or during normal application start, persisted events are replayed to onEvent
which recovers internal state before new commands are processed.
EventsourcedActor
implementations must define a global unique id
and require an eventLog
actor reference for writing and replaying events. An event-sourced actor may also define an optional aggregateId
which has an impact how events are routed between event-sourced actors.
Hint
Section Event log explains how to create eventLog
actor references.
Creating a single instance¶
In the following, a single instance of ExampleActor
is created and two Append
commands are sent to it:
val system: ActorSystem = // ...
val eventLog: ActorRef = // ...
val ea1 = system.actorOf(Props(new ExampleActor("1", Some("a"), eventLog)))
ea1 ! Append("a")
ea1 ! Append("b")
final ActorSystem system = // ...
final ActorRef eventLog = // ...
final ActorRef ea1 = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("1", Optional.of("a"), eventLog)));
ea1.tell(new Append("a"), noSender());
ea1.tell(new Append("b"), noSender());
Sending a Print
command
ea1 ! Print
ea1.tell(new Print(), noSender());
should print:
[id = 1, aggregate id = a] a,b
When the application is re-started, persisted events are replayed to onEvent
which recovers currentState
. Sending another Print
command should print again:
[id = 1, aggregate id = a] a,b
Note
In the following sections, several instances of ExampleActor
are created. It is assumed that they share a Replicated event log and are running at different locations.
A shared event log is a pre-requisite for event-sourced actors to consume each other’s events. However, sharing an event log doesn’t necessarily mean broadcast communication between all actors on the same log. It is the aggreagteId
that determines which actors consume each other’s events.
Creating two isolated instances¶
When creating two instances of ExampleActor
with different aggregateId
s, they are isolated from each other, by default, and do not consume each other’s events:
val b2 = system.actorOf(Props(new ExampleActor("2", Some("b"), eventLog)))
val c3 = system.actorOf(Props(new ExampleActor("3", Some("c"), eventLog)))
b2 ! Append("a")
b2 ! Append("b")
c3 ! Append("x")
c3 ! Append("y")
final ActorRef b2 = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("2", Optional.of("b"), eventLog)));
final ActorRef c3 = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("3", Optional.of("c"), eventLog)));
b2.tell(new Append("a"), noSender());
b2.tell(new Append("b"), noSender());
c3.tell(new Append("x"), noSender());
c3.tell(new Append("y"), noSender());
Sending two Print
commands
b2 ! Print
c3 ! Print
b2.tell(new Print(), noSender());
c3.tell(new Print(), noSender());
should print:
[id = 2, aggregate id = b] a,b
[id = 3, aggregate id = c] x,y
Creating two replica instances¶
When creating two ExampleActor
instances with the same aggregateId
, they consume each other’s events [1].
// created at location 1
val d4 = system.actorOf(Props(new ExampleActor("4", Some("d"), eventLog)))
// created at location 2
val d5 = system.actorOf(Props(new ExampleActor("5", Some("d"), eventLog)))
d4 ! Append("a")
// created at location 1
final ActorRef d4 = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("4", Optional.of("d"), eventLog)));
// created at location 2
final ActorRef d5 = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("5", Optional.of("d"), eventLog)));
d4.tell(new Append("a"), noSender());
Here, d4
processes an Append
command and persists an Appended
event. Both, d4
and d5
, consume that event and update their internal state. After waiting a bit for convergence, sending a Print
command to both actors should print:
[id = 4, aggregate id = d] a
[id = 5, aggregate id = d] a
After both replicas have converged, another Append
is sent to d5
.
d5 ! Append("b")
d5.tell(new Append("b"), noSender());
Again both actors consume the event and sending another Print
command should print:
[id = 4, aggregate id = d] a,b
[id = 5, aggregate id = d] a,b
Warning
As you have probably recognized, replica convergence in this example can only be achieved if the second Append
command is sent after both actors have processed the Appended
event from the first Append
command.
In other words, the first Appended
event must happen before the second one. Only in this case, these two events can have a causal relationship. Since events are guaranteed to be delivered in potential causal order to all replicas, they can converge to the same state.
When concurrent updates are made to both replicas, the corresponding Appended
events are not causally related and can be delivered in any order to both replicas. This may cause replicas to diverge because append operations do not commute. The following sections give examples how to detect and handle concurrent updates.
Detecting concurrent updates¶
Eventuate tracks happened-before relationships (= potential causality) of events with Vector clocks. Why is that needed at all? Let’s assume that an event-sourced actor emits an event e1
for changing internal state and later receives an event e2
from a replica instance. If the replica instance emits e2
after having processed e1
, the actor can apply e2
as regular update. If the replica instance emits e2
before having received e1
, the actor receives a concurrent, potentially conflicting event.
How can the actor determine if e2
is a regular i.e. causally related or concurrent update? It can do so by comparing the vector timestamps of e1
and e2
, where t1
is the vector timestamp of e1
and t2
the vector timestamp of e2
. If events e1
and e2
are concurrent then t1 conc t2
evaluates to true
. Otherwise, they are causally related and t1 < t2
evaluates to true
(because e1
happened-before e2
).
The vector timestamp of an event can be obtained with lastVectorTimestamp
during event processing. Vector timestamps can be attached as update timestamp to current state and compared with the vector timestamp of a new event in order to determine whether the new event is causally related to the previous state update or not[2]:
import com.rbmhtechnology.eventuate.EventsourcedActor
import com.rbmhtechnology.eventuate.VectorTime
class ExampleActor(override val id: String,
override val aggregateId: Option[String],
override val eventLog: ActorRef) extends EventsourcedActor {
private var currentState: Vector[String] = Vector.empty
private var updateTimestamp: VectorTime = VectorTime()
override def onCommand = {
// ...
}
override def onEvent = {
case Appended(entry) =>
if (updateTimestamp < lastVectorTimestamp) {
// regular update
currentState = currentState :+ entry
updateTimestamp = lastVectorTimestamp
} else if (updateTimestamp conc lastVectorTimestamp) {
// concurrent update
// TODO: track conflicting versions
}
}
}
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.rbmhtechnology.eventuate.AbstractEventsourcedActor;
import com.rbmhtechnology.eventuate.VectorTime;
import java.util.Collection;
import java.util.Collections;
class ExampleActor extends AbstractEventsourcedActor {
private Collection<String> currentState = Collections.emptyList();
private VectorTime updateTimestamp = VectorTime.Zero();
public ExampleActor(String id, ActorRef eventLog) {
super(id, eventLog);
}
@Override
public AbstractActor.Receive createOnEvent() {
return receiveBuilder()
.match(Appended.class, evt -> {
if (updateTimestamp.lt(getLastVectorTimestamp())) {
// regular update
currentState = append(currentState, evt.entry);
updateTimestamp = getLastVectorTimestamp();
} else if (updateTimestamp.conc(getLastVectorTimestamp())) {
// concurrent update
// TODO: track conflicting versions
}
})
.build();
}
}
Attaching update timestamps to current state and comparing them with vector timestamps of new events can be easily abstracted over so that applications don’t have to deal with these low level details, as shown in the next section.
Tracking conflicting versions¶
If state update operations from concurrent events do not commute, conflicting versions of actor state arise that must be tracked and resolved. This can be done with Eventuate’s ConcurrentVersions[S, A]
abstraction and an application-defined update function of type (S, A) => S
where S
is the type of actor state and A
the update type. In our example, the ConcurrentVersions
type is ConcurrentVersions[Vector[String], String]
and the update function (s, a) => s :+ a
:
import scala.collection.immutable.Seq
import com.rbmhtechnology.eventuate.{ConcurrentVersions, Versioned}
import com.rbmhtechnology.eventuate.EventsourcedActor
class ExampleActor(override val id: String,
override val aggregateId: Option[String],
override val eventLog: ActorRef) extends EventsourcedActor {
private var versionedState: ConcurrentVersions[Vector[String], String] =
ConcurrentVersions(Vector.empty, (s, a) => s :+ a)
override def onCommand = {
// ...
}
override def onEvent = {
case Appended(entry) =>
versionedState = versionedState.update(entry, lastVectorTimestamp)
if (versionedState.conflict) {
val conflictingVersions: Seq[Versioned[Vector[String]]] = versionedState.all
// TODO: resolve conflicting versions
} else {
val currentState: Vector[String] = versionedState.all.head.value
// ...
}
}
}
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.rbmhtechnology.eventuate.*;
import java.util.Collection;
import java.util.Collections;
class ExampleActor extends AbstractEventsourcedActor {
private ConcurrentVersions<Collection<String>, String> versionedState =
ConcurrentVersionsTree.create(Collections.emptyList(), (s, a) -> append(s, a));
public ExampleActor(String id, ActorRef eventLog) {
super(id, eventLog);
}
@Override
public AbstractActor.Receive createOnEvent() {
return receiveBuilder()
.match(Appended.class, evt -> {
versionedState = versionedState.update(evt.entry, getLastVectorTimestamp(), getLastSystemTimestamp(), getLastEmitterId());
if (versionedState.conflict()) {
final Collection<Versioned<Collection<String>>> all = versionedState.getAll();
// TODO: resolve conflicting versions
} else {
final Collection<String> currentState = versionedState.getAll().get(0).value();
// ...
}
})
.build();
}
}
Internally, ConcurrentVersions
maintains versions of actor state in a tree structure where each concurrent update
creates a new branch. The shape of the tree is determined solely by the vector timestamps of the corresponding update events.
An event’s vector timestamp is passed as lastVectorTimestamp
argument to update
. The update
method internally creates a new version by applying the update function (s, a) => s :+ a
to the closest predecessor version and the actual update value (entry
). The lastVectorTimestamp
is attached as update timestamp to the newly created version.
Concurrent versions of actor state and their update timestamp can be obtained with all
which is a sequence of type Seq[Versioned[Vector[String]]]
in our example. The Versioned data type represents a particular version of actor state and its update timestamp (= vectorTimestamp
field).
If all
contains only a single element, there is no conflict and the element represents the current, conflict-free actor state. If the sequence contains two or more elements, there is a conflict where the elements represent conflicting versions of actor states. They can be resolved either automatically or interactively.
Note
Only concurrent updates to replicas with the same aggregateId
may conflict. Concurrent updates to actors with different aggregateId
do not conflict (unless an application does custom Event routing).
Also, if the data type of actor state is designed in a way that update operations commute, concurrent updates can be made conflict-free. This is discussed in section Operation-based CRDTs.
Resolving conflicting versions¶
Automated conflict resolution¶
The following is a simple example of automated conflict resolution: if a conflict has been detected, the version with the higher wall clock timestamp is selected to be the winner. In case of equal wall clock timestamps, the version with the lower emitter id is selected. The wall clock timestamp can be obtained with lastSystemTimestamp
during event handling, the emitter id with lastEmitterId
. The emitter id is the id
of the EventsourcedActor
that emitted the event.
class ExampleActor(override val id: String,
override val aggregateId: Option[String],
override val eventLog: ActorRef) extends EventsourcedActor {
private var versionedState: ConcurrentVersions[Vector[String], String] =
ConcurrentVersions(Vector.empty, (s, a) => s :+ a)
override def onCommand = {
// ...
}
override def onEvent = {
case Appended(entry) =>
versionedState = versionedState
.update(entry, lastVectorTimestamp, lastSystemTimestamp, lastEmitterId)
if (versionedState.conflict) {
val conflictingVersions = versionedState.all.sortWith { (v1, v2) =>
if (v1.systemTimestamp == v2.systemTimestamp) v1.creator < v2.creator
else v1.systemTimestamp > v2.systemTimestamp
}
val winnerTimestamp: VectorTime = conflictingVersions.head.vectorTimestamp
versionedState = versionedState.resolve(winnerTimestamp)
}
}
}
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.rbmhtechnology.eventuate.*;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Stream;
import static userguide.japi.DocUtils.append;
class ExampleActor extends AbstractEventsourcedActor {
private ConcurrentVersions<Collection<String>, String> versionedState =
ConcurrentVersionsTree.create(Collections.emptyList(), (s, a) -> append(s, a));
public ExampleActor(String id, ActorRef eventLog) {
super(id, eventLog);
}
@Override
public AbstractActor.Receive createOnEvent() {
return receiveBuilder()
.match(Appended.class, evt -> {
versionedState = versionedState.update(evt.entry, getLastVectorTimestamp(), getLastSystemTimestamp(), getLastEmitterId());
if (versionedState.conflict()) {
final Stream<Versioned<Collection<String>>> conflictingVersions = versionedState.getAll().stream()
.sorted((v1, v2) -> {
if (v1.systemTimestamp() == v2.systemTimestamp()) {
return v1.creator().compareTo(v2.creator());
}
return v1.systemTimestamp() > v2.systemTimestamp() ? -1 : 1;
});
final VectorTime winnerTimestamp = conflictingVersions.findFirst().get().vectorTimestamp();
versionedState = versionedState.resolve(winnerTimestamp);
}
})
.build();
}
}
Here, conflicting versions are sorted by descending wall clock timestamp and ascending emitter id where the latter is tracked as creator
of the version. The first version is selected to be the winner. Its vector timestamp is passed as argument to resolve
which selects this version and discards all other versions.
More advanced conflict resolution could select a winner depending on the actual value of concurrent versions. After selection, an application could even update the winner with the merged value of all conflicting versions[3].
Note
For replicas to converge, it is important that winner selection does not depend on the order of conflicting events. In our example, this is the case because wall clock timestamp and emitter id comparison is transitive.
Interactive conflict resolution¶
Interactive conflict resolution does not resolve conflicts immediately but requests the user to inspect and resolve a conflict. The following is a very simple example of interactive conflict resolution: a user selects a winner version if conflicting versions of application state exist.
case class Append(entry: String)
case class AppendRejected(entry: String, conflictingVersions: Seq[Versioned[Vector[String]]])
case class Resolve(selectedTimestamp: VectorTime)
case class Resolved(selectedTimestamp: VectorTime)
class ExampleActor(override val id: String,
override val aggregateId: Option[String],
override val eventLog: ActorRef) extends EventsourcedActor {
private var versionedState: ConcurrentVersions[Vector[String], String] =
ConcurrentVersions(Vector.empty, (s, a) => s :+ a)
override def onCommand = {
case Append(entry) if versionedState.conflict =>
sender() ! AppendRejected(entry, versionedState.all)
case Append(entry) =>
// ...
case Resolve(selectedTimestamp) => persist(Resolved(selectedTimestamp)) {
case Success(evt) => // reply to sender omitted ...
case Failure(err) => // reply to sender omitted ...
}
}
override def onEvent = {
case Appended(entry) =>
versionedState = versionedState
.update(entry, lastVectorTimestamp, lastSystemTimestamp, lastEmitterId)
case Resolved(selectedTimestamp) =>
versionedState = versionedState.resolve(selectedTimestamp, lastVectorTimestamp)
}
}
class ExampleActor extends AbstractEventsourcedActor {
private ConcurrentVersions<Collection<String>, String> versionedState =
ConcurrentVersionsTree.create(Collections.emptyList(), (s, a) -> append(s, a));
public ExampleActor(String id, ActorRef eventLog) {
super(id, eventLog);
}
@Override
public AbstractActor.Receive createOnCommand() {
return receiveBuilder()
.match(Append.class, cmd -> versionedState.conflict(),
cmd -> getSender().tell(new AppendRejected(cmd.entry, versionedState.getAll()), getSelf())
)
.match(Append.class, cmd -> {
// ....
})
.match(Resolve.class, cmd -> persist(new Resolved(cmd.selectedTimestamp), ResultHandler.on(
evt -> { /* reply to sender omitted ... */ },
err -> { /* reply to sender omitted ... */ }
)))
.build();
}
@Override
public AbstractActor.Receive createOnEvent() {
return receiveBuilder()
.match(Appended.class, evt ->
versionedState = versionedState.update(evt.entry, getLastVectorTimestamp(), getLastSystemTimestamp(), getLastEmitterId())
)
.match(Resolved.class, evt ->
versionedState = versionedState.resolve(evt.selectedTimestamp, getLastVectorTimestamp(), getLastSystemTimestamp())
)
.build();
}
}
// Command
class Append {
public final String entry;
public Append(String entry) {
this.entry = entry;
}
}
// Command reply
class AppendRejected {
public final String entry;
public final Collection<Versioned<Collection<String>>> conflictingVersions;
public AppendRejected(String entry, Collection<Versioned<Collection<String>>> conflictingVersions) {
this.entry = entry;
this.conflictingVersions = conflictingVersions;
}
}
// Command
class Resolve {
public final VectorTime selectedTimestamp;
public Resolve(VectorTime selectedTimestamp) {
this.selectedTimestamp = selectedTimestamp;
}
}
// Command reply
class Resolved {
public final VectorTime selectedTimestamp;
public Resolved(VectorTime selectedTimestamp) {
this.selectedTimestamp = selectedTimestamp;
}
}
When a user tries to Append
in presence of a conflict, the ExampleActor
rejects the update and requests the user to select a winner version from a sequence of conflicting versions. The user then sends the update timestamp of the winner version as selectedTimestamp
with a Resolve
command from which a Resolved
event is derived and persisted. Handling of Resolved
at all replicas finally resolves the conflict.
In addition to just selecting a winner, an application could also update the winner version in a second step, for example, with a value derived from the merge result of conflicting versions. Support for atomic, interactive conflict resolution with an application-defined merge function is planned for later Eventuate releases.
Note
Interactive conflict resolution requires agreement among replicas that are affected by a given conflict: only one of them may emit the Resolved
event. This does not necessarily mean distributed lock acquisition or leader (= resolver) election but can also rely on static rules such as only the initial creator location of an aggregate is allowed to resolve the conflict[4]. This rule is implemented in the Example application.
Operation-based CRDTs¶
If state update operations commute, there’s no need to use Eventuate’s ConcurrentVersions
utility. A simple example is a replicated counter, which converges because its increment and decrement operations commute.
A formal to approach to commutative replicated data types (CmRDTs) or operation-based CRDTs is given in the paper A comprehensive study of Convergent and Commutative Replicated Data Types by Marc Shapiro et al. Eventuate is a good basis for implementing operation-based CRDTs:
- Update operations can be modeled as events and reliably broadcasted to all replicas by a Replicated event log.
- The command and event handler of an event-sourced actor can be used to implement the two update phases mentioned in the paper: atSource and downstream, respectively.
- All downstream preconditions mentioned in the paper are satisfied in case of causal delivery of update operations which is guaranteed for actors consuming from a replicated event log.
Eventuate currently implements 5 out of 12 operation-based CRDTs specified in the paper. These are Counter, MV-Register, LWW-Register, OR-Set and OR-Cart (a shopping cart CRDT). They can be instantiated and used via their corresponding CRDT services. CRDT operations are asynchronous methods on the service interfaces. CRDT services free applications from dealing with low-level details like event-sourced actors or command messages directly. The following is the definition of ORSetService:
/**
* Replicated [[ORSet]] CRDT service.
*
* @param serviceId Unique id of this service.
* @param log Event log.
* @tparam A [[ORSet]] entry type.
*/
class ORSetService[A](val serviceId: String, val log: ActorRef)(implicit val system: ActorSystem, val ops: CRDTServiceOps[ORSet[A], Set[A]])
extends CRDTService[ORSet[A], Set[A]] {
/**
* Adds `entry` to the OR-Set identified by `id` and returns the updated entry set.
*/
def add(id: String, entry: A): Future[Set[A]] =
op(id, AddOp(entry))
/**
* Removes `entry` from the OR-Set identified by `id` and returns the updated entry set.
*/
def remove(id: String, entry: A): Future[Set[A]] =
op(id, RemoveOp(entry))
start()
}
/**
* Persistent add operation used for [[ORSet]] and [[ORCart]].
*/
case class AddOp(entry: Any) extends CRDTFormat
/**
* Persistent remove operation used for [[ORSet]] and [[ORCart]].
*/
case class RemoveOp(entry: Any, timestamps: Set[VectorTime] = Set.empty) extends CRDTFormat
/**
* Java API of a replicated [[ORSet]] CRDT service.
*
* @param serviceId Unique id of this service.
* @param log Event log.
* @param system Actor system.
* @tparam A [[ORSet]] entry type.
*/
class ORSetService<A> extends CRDTService<ORSet<A>, Set<A>> {
ORSetService(String serviceId, ActorRef log, ActorSystem system) {
super(serviceId, log, system);
start();
}
public CompletionStage<Set<A>> add(String id, A entry) {
return op(id, new AddOp(entry));
}
public CompletionStage<Set<A>> remove(String id, A entry) {
return op(id, new RemoveOp(entry));
}
}
The ORSetService is a CRDT service that manages ORSet instances. It implements the asynchronous add
and remove
methods and inherits the value(id: String): Future[Set[A]]
method from CRDTService[ORSet[A], Set[A]]
for reading the current value. Their id
parameter identifies an ORSet
instance. Instances are automatically created by the service on demand. A usage example is the ReplicatedOrSetSpec that is based on Akka’s multi node testkit.
A CRDT service also implements a save(id: String): Future[SnapshotMetadata]
method for saving CRDT snapshots. Snapshots may reduce recovery times of CRDTs with a long update history but are not required for CRDT persistence.
New operation-based CRDTs and their corresponding services can be developed with the CRDT development framework, by defining an instance of the CRDTServiceOps type class and implementing the CRDTService trait. Take a look at the CRDT sources for examples.
Hint
Eventuate’s CRDT approach is also described in this article.
Event-sourced views¶
Event-sourced views are a functional subset of event-sourced actors. They can only consume events from an event log but cannot produce new events. Concrete event-sourced views must implement the EventsourcedView
trait. In the following example, the view counts all Appended
and Resolved
events emitted by all event-sourced actors to the same eventLog
:
import akka.actor.ActorRef
import com.rbmhtechnology.eventuate.EventsourcedView
import com.rbmhtechnology.eventuate.VectorTime
case class Appended(entry: String)
case class Resolved(selectedTimestamp: VectorTime)
case object GetAppendCount
case class GetAppendCountReply(count: Long)
case object GetResolveCount
case class GetResolveCountReply(count: Long)
class ExampleView(override val id: String, override val eventLog: ActorRef) extends EventsourcedView {
private var appendCount: Long = 0L
private var resolveCount: Long = 0L
override def onCommand = {
case GetAppendCount => sender() ! GetAppendCountReply(appendCount)
case GetResolveCount => sender() ! GetResolveCountReply(resolveCount)
}
override def onEvent = {
case Appended(_) => appendCount += 1L
case Resolved(_) => resolveCount += 1L
}
}
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.rbmhtechnology.eventuate.AbstractEventsourcedView;
import com.rbmhtechnology.eventuate.VectorTime;
class ExampleView extends AbstractEventsourcedView {
private Long appendCount = 0L;
private Long resolveCount = 0L;
public ExampleView(String id, ActorRef eventLog) {
super(id, eventLog);
}
@Override
public AbstractActor.Receive createOnCommand() {
return receiveBuilder()
.match(GetAppendCount.class, cmd -> sender().tell(new GetAppendCountReply(appendCount), getSelf()))
.match(GetResolveCount.class, cmd -> sender().tell(new GetResolveCountReply(resolveCount), getSelf()))
.build();
}
@Override
public AbstractActor.Receive createOnEvent() {
return receiveBuilder()
.match(Appended.class, evt -> appendCount += 1)
.match(Resolved.class, evt -> resolveCount += 1)
.build();
}
}
// Commands
class GetAppendCount {
}
class GetResolveCount {
}
// Command replies
class GetAppendCountReply {
public final Long count;
public GetAppendCountReply(Long count) {
this.count = count;
}
}
class GetResolveCountReply {
public final Long count;
public GetResolveCountReply(Long count) {
this.count = count;
}
}
// Events
class Appended {
public final String entry;
public Appended(String entry) {
this.entry = entry;
}
}
class Resolved {
public final VectorTime selectedTimestamp;
public Resolved(VectorTime selectedTimestamp) {
this.selectedTimestamp = selectedTimestamp;
}
}
Event-sourced views handle events in the same way as event-sourced actors by implementing an onEvent
handler. The onCommand
handler in the example processes the queries GetAppendCount
and GetResolveCount
.
ExampleView
implements the mandatory global unique id
but doesn’t define an aggregateId
. A view that doesn’t define an aggregateId
can consume events from all event-sourced actors on the same event log. If it defines an aggregateId
it can only consume events from event-sourced actors with the same aggregateId
(assuming the default Event routing rules).
Hint
While event-sourced views maintain view state in-memory, Event-sourced writers can be used to persist view state to external databases. A specialization of event-sourced writers are Event-sourced processors whose external database is an event log.
Conditional requests¶
Causal read consistency is the default when reading state from a single event-sourced actor or view. The event stream received by that actor is always causally ordered, hence, it will never see an effect before having seen its cause.
The situation is different when a client reads from multiple actors. Imagine two event-sourced actor replicas where a client updates one replica and observes the updated state with the reply. A subsequent from the other replica, made by the same client, may return the old state which violates causal consistency.
Similar considerations can be made for reading from an event-sourced view after having made an update to an event-sourced actor. For example, an application that successfully appended an entry to ExampleActor
may not immediately see that update in the appendCount
of ExampleView
. To achieve causal read consistency, the view should delay command processing until the emitted event has been consumed by the view. This can be achieved with a ConditionalRequest
.
import scala.concurrent.duration._
import scala.util._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import com.rbmhtechnology.eventuate._
case class Append(entry: String)
case class AppendSuccess(entry: String, updateTimestamp: VectorTime)
class ExampleActor(override val id: String,
override val eventLog: ActorRef) extends EventsourcedActor {
private var currentState: Vector[String] = Vector.empty
override val aggregateId = Some(id)
override def onCommand = {
case Append(entry) => persist(Appended(entry)) {
case Success(evt) =>
sender() ! AppendSuccess(entry, lastVectorTimestamp)
// ...
}
// ...
}
override def onEvent = {
case Appended(entry) => currentState = currentState :+ entry
}
}
class ExampleView(override val id: String, override val eventLog: ActorRef)
extends EventsourcedView with ConditionalRequests {
// ...
}
val ea = system.actorOf(Props(new ExampleActor("ea", eventLog)))
val ev = system.actorOf(Props(new ExampleView("ev", eventLog)))
import system.dispatcher
implicit val timeout = Timeout(5.seconds)
for {
AppendSuccess(_, timestamp) <- ea ? Append("a")
GetAppendCountReply(count) <- ev ? ConditionalRequest(timestamp, GetAppendCount)
} println(s"append count = $count")
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.rbmhtechnology.eventuate.*;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static scala.compat.java8.JFunction.func;
import static scala.compat.java8.JFunction.proc;
class ExampleActor extends AbstractEventsourcedActor {
private final String id;
private Collection<String> currentState = Collections.emptyList();
public ExampleActor(String id, ActorRef eventLog) {
super(id, eventLog);
this.id = id;
}
@Override
public Optional<String> getAggregateId() {
return Optional.of(id);
}
@Override
public AbstractActor.Receive createOnCommand() {
return receiveBuilder()
.match(Append.class, cmd -> persist(new Appended(cmd.entry), ResultHandler.onSuccess(
evt -> getSender().tell(new AppendSuccess(evt.entry, getLastVectorTimestamp()), getSelf())
)))
// ...
.build();
}
@Override
public AbstractActor.Receive createOnEvent() {
return receiveBuilder()
.match(Appended.class, evt -> currentState = append(currentState, evt.entry))
.build();
}
}
// Command
public class Append {
public final String entry;
public Append(String entry) {
this.entry = entry;
}
}
// Command reply
public class AppendSuccess {
public final String entry;
public final VectorTime updateTimestamp;
public AppendSuccess(String entry, VectorTime updateTimestamp) {
this.entry = entry;
this.updateTimestamp = updateTimestamp;
}
}
// Eventsourced-View
class ExampleView extends AbstractEventsourcedView {
// AbstractEventsourcedView has ConditionalRequests mixed-in by default
public ExampleView(String id, ActorRef eventLog) {
super(id, eventLog);
// ...
}
}
final ActorRef ea = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("ea", eventLog)));
final ActorRef ev = system.actorOf(Props.create(ExampleView.class, () -> new ExampleView("ev", eventLog)));
final Timeout timeout = Timeout.apply(5, TimeUnit.SECONDS);
Patterns.ask(ea, new Append("a"), timeout)
.flatMap(func(m -> Patterns.ask(ev, new ConditionalRequest(((AppendSuccess) m).updateTimestamp, new GetAppendCount()), timeout)), dispatcher)
.onComplete(proc(result -> {
if (result.isSuccess()) {
System.out.println("append count = " + ((GetAppendCountReply) result.get()).count);
}
}), dispatcher);
Here, the ExampleActor
includes the event’s vector timestamp in its AppendSuccess
reply. Together with the actual GetAppendCount
command, the timestamp is included as condition in a ConditionalRequest
and sent to the view. For ConditionalRequest
processing, an event-sourced view must extend the ConditionalRequests
trait. ConditionalRequests
internally delays the command, if needed, and only dispatches GetAppendCount
to the view’s onCommand
handler if the condition timestamp is in the causal past of the view (which is earliest the case when the view consumed the update event). When running the example with an empty event log, it should print:
append count = 1
Note
Not only event-sourced views but also event-sourced actors, stateful event-sourced writers and processors can extend ConditionalRequests
. Delaying conditional requests may re-order them relative to other conditional and non-conditional requests.
Event-driven communication¶
Earlier sections have already shown one form of event collaboration: state replication. For that purpose, event-sourced actors of the same type exchange their events to re-construct actor state at different locations.
In more general cases, event-sourced actors of different type exchange events to achieve a common goal. They react on received events by updating internal state and producing new events. This form of event collaboration is called event-driven communication. In the following example, two event-actors collaborate in a ping-pong game where
- a
PingActor
emits aPing
event on receiving aPong
event and - a
PongActor
emits aPong
event on receiving aPing
event
// some imports omitted ...
import com.rbmhtechnology.eventuate.EventsourcedView.Handler
import com.rbmhtechnology.eventuate.EventsourcedActor
import com.rbmhtechnology.eventuate.PersistOnEvent
case class Ping(num: Int)
case class Pong(num: Int)
class PingActor(val id: String, val eventLog: ActorRef, completion: ActorRef)
extends EventsourcedActor with PersistOnEvent {
override def onCommand = {
case "serve" => persist(Ping(1))(Handler.empty)
}
override def onEvent = {
case Pong(10) if !recovering => completion ! "done"
case Pong(i) => persistOnEvent(Ping(i + 1))
}
}
class PongActor(val id: String, val eventLog: ActorRef)
extends EventsourcedActor with PersistOnEvent {
override def onCommand = {
case _ =>
}
override def onEvent = {
case Ping(i) => persistOnEvent(Pong(i))
}
}
val pingActor = system.actorOf(Props(new PingActor("ping", eventLog, system.deadLetters)))
val pongActor = system.actorOf(Props(new PongActor("pong", eventLog)))
pingActor ! "serve"
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.rbmhtechnology.eventuate.AbstractEventsourcedActor;
import com.rbmhtechnology.eventuate.ResultHandler;
import static akka.actor.ActorRef.noSender;
class PingActor extends AbstractEventsourcedActor {
private final ActorRef completion;
public PingActor(String id, ActorRef eventLog, ActorRef completion) {
super(id, eventLog);
this.completion = completion;
}
@Override
public AbstractActor.Receive createOnCommand() {
return receiveBuilder()
.matchEquals("serve", cmd -> persist(new Ping(1), ResultHandler.none()))
.build();
}
@Override
public AbstractActor.Receive createOnEvent() {
return receiveBuilder()
.match(Pong.class, evt -> evt.num == 10 && !isRecovering(), evt -> completion.tell("done", getSelf()))
.match(Pong.class, evt -> persistOnEvent(new Ping(evt.num + 1)))
.build();
}
}
class PongActor extends AbstractEventsourcedActor {
public PongActor(String id, ActorRef eventLog) {
super(id, eventLog);
}
@Override
public AbstractActor.Receive createOnEvent() {
return receiveBuilder()
.match(Ping.class, evt -> persistOnEvent(new Pong(evt.num)))
.build();
}
}
class Ping {
public final Integer num;
public Ping(Integer num) {
this.num = num;
}
}
class Pong {
public final Integer num;
public Pong(Integer num) {
this.num = num;
}
}
final ActorRef pingActor = system.actorOf(Props.create(PingActor.class, () -> new PingActor("ping", eventLog, system.deadLetters())));
final ActorRef pongActor = system.actorOf(Props.create(PongActor.class, () -> new PongActor("pong", eventLog)));
pingActor.tell("serve", noSender());
The ping-pong game is started by sending the PingActor
a ”serve”
command which persist
s the first Ping
event. This event however is not consumed by the emitter but rather by the PongActor
. The PongActor
reacts on the Ping
event by emitting a Pong
event. Other than in previous examples, the event is not emitted in the actor’s onCommand
handler but in the onEvent
handler. For that purpose, the actor has to mixin the PersistOnEvent
trait and use the persistOnEvent
method. The emitted Pong
too isn’t consumed by its emitter but rather by the PingActor
, emitting another Ping
, and so on. The game ends when the PingActor
received the 10th Pong
.
Note
The ping-pong game is reliable. When an actor crashes and is re-started, the game is reliably resumed from where it was interrupted. The persistOnEvent
method is idempotent i.e. no duplicates are written under failure conditions and later event replay. When deployed at different location, the ping-pong actors are also partition-tolerant. When their game is interrupted by a network partition, it is automatically resumed when the partition heals.
Furthermore, the actors don’t need to care about idempotency in their business logic i.e. they can assume to receive a de-duplicated and causally-ordered event stream in their onEvent
handler. This is a significant advantage over at-least-once delivery based communication with ConfirmedDelivery, for example, which can lead to duplicates and message re-ordering.
In a more real-world example, there would be several actors of different type collaborating to achieve a common goal, for example, in a distributed business process. These actors can be considered as event-driven and event-sourced microservices, collaborating on a causally ordered event stream in a reliable and partition-tolerant way. Furthermore, when partitioned, they remain available for local writes and automatically catch up with their collaborators when the partition heals.
Hint
Further persistOnEvent
details are described in the PersistOnEvent API docs.
[1] | EventsourcedActor s and EventsourcedView s that have an undefined aggregateId can consume events from all other actors on the same event log. |
[2] | Attached update timestamps are not version vectors because Eventuate uses vector clock update rules instead of version vector update rules. Consequently, update timestamp equivalence cannot be used as criterion for replica convergence. |
[3] | A formal approach to automatically merge concurrent versions of application state are convergent replicated data types (CvRDTs) or state-based CRDTs. |
[4] | Distributed lock acquisition or leader election require an external coordination service like ZooKeeper, for example, whereas static rules do not. |