User guide

This is a brief user guide to Eventuate. It is recommended to read sections Overview and Architecture first. Based on simple examples, you’ll see how to

  • implement an event-sourced actor
  • replicate actor state with event sourcing
  • detect concurrent updates to replicated state
  • track conflicts from concurrent updates
  • resolve conflicts automatically and interactively
  • make concurrent updates conflict-free with operation-based CRDTs
  • implement an event-sourced view over many event-sourced actors
  • achieve causal read consistency across event-sourced actors and views and
  • implement event-driven communication between event-sourced actors.

The user guide only scratches the surface of Eventuate. You can find further details in the Reference.

Event-sourced actors

An event-sourced actor is an actor that captures changes to its internal state as a sequence of events. It persists these events to an event log and replays them to recover internal state after a crash or a planned re-start. This is the basic idea behind event sourcing: instead of storing current application state, the full history of changes is stored as immutable facts and current state is derived from these facts.

Event-sourced actors distinguish between commands and events. During command processing they usually validate external commands against internal state and, if validation succeeds, write one or more events to their event log. During event processing they consume events they have written and update internal state by handling these events.

Hint

Event-sourced actors can also write new events during event processing. This is covered in section Event-driven communication.

Concrete event-sourced actors must implement the EventsourcedActor trait. The following ExampleActor maintains state of type Vector[String] to which entries can be appended:

import scala.util._
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor

// Commands
case object Print
case class Append(entry: String)

// Command replies
case class AppendSuccess(entry: String)
case class AppendFailure(cause: Throwable)

// Event
case class Appended(entry: String)

class ExampleActor(override val id: String,
                   override val aggregateId: Option[String],
                   override val eventLog: ActorRef) extends EventsourcedActor {

  private var currentState: Vector[String] = Vector.empty

  override def onCommand = {
    case Print =>
      println(s"[id = $id, aggregate id = ${aggregateId.getOrElse("<undefined>")}] ${currentState.mkString(",")}")
    case Append(entry) => persist(Appended(entry)) {
      case Success(evt) => sender() ! AppendSuccess(entry)
      case Failure(err) => sender() ! AppendFailure(err)
    }
  }

  override def onEvent = {
    case Appended(entry) => currentState = currentState :+ entry
  }
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.rbmhtechnology.eventuate.AbstractEventsourcedActor;
import com.rbmhtechnology.eventuate.ReplicationConnection;
import com.rbmhtechnology.eventuate.ResultHandler;
import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;

import static akka.actor.ActorRef.noSender;
import static java.lang.System.out;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.concat;
import static java.util.stream.Stream.of;

class ExampleActor extends AbstractEventsourcedActor {

  private final Optional<String> aggregateId;

  private Collection<String> currentState = Collections.emptyList();

  public ExampleActor(String id, Optional<String> aggregateId, ActorRef eventLog) {
    super(id, eventLog);
    this.aggregateId = aggregateId;

    setOnCommand(ReceiveBuilder
      .match(Print.class, cmd -> printState(id, currentState))
      .match(Append.class, cmd -> persist(new Appended(cmd.entry), ResultHandler.on(
        evt -> sender().tell(new AppendSuccess(evt.entry), self()),
        err -> sender().tell(new AppendFailure(err), self())
      )))
      .build());

    setOnEvent(ReceiveBuilder
      .match(Appended.class, evt -> currentState = append(currentState, evt.entry))
      .build());
  }

  @Override
  public Optional<String> getAggregateId() {
    return aggregateId;
  }

  private void printState(String id, Collection<String> currentState) {
    out.println(String.format("[id = %s, aggregate id = %s] %s", id, getAggregateId().orElseGet(() -> "undefined"),
      String.join(",", currentState)));
  }

  private <T> Collection<T> append(Collection<T> collection, T el) {
    return concat(collection.stream(), of(el)).collect(toList());
  }
}

// Commands
class Print {
}

class Append {
  public final String entry;

  public Append(String entry) {
    this.entry = entry;
  }
}

// Command replies
class AppendSuccess {
  public final String entry;

  public AppendSuccess(String entry) {
    this.entry = entry;
  }
}

class AppendFailure {
  public final Throwable cause;

  public AppendFailure(Throwable cause) {
    this.cause = cause;
  }
}

// Events
class Appended {
  public final String entry;

  public Appended(String entry) {
    this.entry = entry;
  }
}

For modifying currentState, applications send Append commands which are handled by the onCommand handler. From an Append command, the handler derives an Appended event and persists it to the given eventLog. If persistence succeeds, the command sender is informed about successful processing. If persistence fails, the command sender is informed about the failure so it can retry, if needed.

The onEvent handler updates currentState from persisted events and is automatically called after a successful persist. If the actor is re-started, either after a crash or during normal application start, persisted events are replayed to onEvent which recovers internal state before new commands are processed.

EventsourcedActor implementations must define a global unique id and require an eventLog actor reference for writing and replaying events. An event-sourced actor may also define an optional aggregateId which has an impact how events are routed between event-sourced actors.

Hint

Section Event log explains how to create eventLog actor references.

Creating a single instance

In the following, a single instance of ExampleActor is created and two Append commands are sent to it:

val system: ActorSystem = // ...
val eventLog: ActorRef = // ...

val ea1 = system.actorOf(Props(new ExampleActor("1", Some("a"), eventLog)))

ea1 ! Append("a")
ea1 ! Append("b")
final ActorSystem system = // ...
final ActorRef eventLog = // ...

final ActorRef ea1 = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("1", Optional.of("a"), eventLog)));

ea1.tell(new Append("a"), noSender());
ea1.tell(new Append("b"), noSender());

Sending a Print command

ea1 ! Print
ea1.tell(new Print(), noSender());

should print:

[id = 1, aggregate id = a] a,b

When the application is re-started, persisted events are replayed to onEvent which recovers currentState. Sending another Print command should print again:

[id = 1, aggregate id = a] a,b

Note

In the following sections, several instances of ExampleActor are created. It is assumed that they share a Replicated event log and are running at different locations.

A shared event log is a pre-requisite for event-sourced actors to consume each other’s events. However, sharing an event log doesn’t necessarily mean broadcast communication between all actors on the same log. It is the aggreagteId that determines which actors consume each other’s events.

Creating two isolated instances

When creating two instances of ExampleActor with different aggregateIds, they are isolated from each other, by default, and do not consume each other’s events:

val b2 = system.actorOf(Props(new ExampleActor("2", Some("b"), eventLog)))
val c3 = system.actorOf(Props(new ExampleActor("3", Some("c"), eventLog)))

b2 ! Append("a")
b2 ! Append("b")

c3 ! Append("x")
c3 ! Append("y")
final ActorRef b2 = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("2", Optional.of("b"), eventLog)));
final ActorRef c3 = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("3", Optional.of("c"), eventLog)));

b2.tell(new Append("a"), noSender());
b2.tell(new Append("b"), noSender());

c3.tell(new Append("x"), noSender());
c3.tell(new Append("y"), noSender());

Sending two Print commands

b2 ! Print
c3 ! Print
b2.tell(new Print(), noSender());
c3.tell(new Print(), noSender());

should print:

[id = 2, aggregate id = b] a,b
[id = 3, aggregate id = c] x,y

Creating two replica instances

When creating two ExampleActor instances with the same aggregateId, they consume each other’s events [1].

// created at location 1
val d4 = system.actorOf(Props(new ExampleActor("4", Some("d"), eventLog)))

// created at location 2
val d5 = system.actorOf(Props(new ExampleActor("5", Some("d"), eventLog)))

d4 ! Append("a")
// created at location 1
final ActorRef d4 = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("4", Optional.of("d"), eventLog)));

// created at location 2
final ActorRef d5 = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("5", Optional.of("d"), eventLog)));

d4.tell(new Append("a"), noSender());

Here, d4 processes an Append command and persists an Appended event. Both, d4 and d5, consume that event and update their internal state. After waiting a bit for convergence, sending a Print command to both actors should print:

[id = 4, aggregate id = d] a
[id = 5, aggregate id = d] a

After both replicas have converged, another Append is sent to d5.

d5 ! Append("b")
d5.tell(new Append("b"), noSender());

Again both actors consume the event and sending another Print command should print:

[id = 4, aggregate id = d] a,b
[id = 5, aggregate id = d] a,b

Warning

As you have probably recognized, replica convergence in this example can only be achieved if the second Append command is sent after both actors have processed the Appended event from the first Append command.

In other words, the first Appended event must happen before the second one. Only in this case, these two events can have a causal relationship. Since events are guaranteed to be delivered in potential causal order to all replicas, they can converge to the same state.

When concurrent updates are made to both replicas, the corresponding Appended events are not causally related and can be delivered in any order to both replicas. This may cause replicas to diverge because append operations do not commute. The following sections give examples how to detect and handle concurrent updates.

Detecting concurrent updates

Eventuate tracks happened-before relationships (= potential causality) of events with Vector clocks. Why is that needed at all? Let’s assume that an event-sourced actor emits an event e1 for changing internal state and later receives an event e2 from a replica instance. If the replica instance emits e2 after having processed e1, the actor can apply e2 as regular update. If the replica instance emits e2 before having received e1, the actor receives a concurrent, potentially conflicting event.

How can the actor determine if e2 is a regular i.e. causally related or concurrent update? It can do so by comparing the vector timestamps of e1 and e2, where t1 is the vector timestamp of e1 and t2 the vector timestamp of e2. If events e1 and e2 are concurrent then t1 conc t2 evaluates to true. Otherwise, they are causally related and t1 < t2 evaluates to true (because e1 happened-before e2).

The vector timestamp of an event can be obtained with lastVectorTimestamp during event processing. Vector timestamps can be attached as update timestamp to current state and compared with the vector timestamp of a new event in order to determine whether the new event is causally related to the previous state update or not[2]:

import com.rbmhtechnology.eventuate.EventsourcedActor
import com.rbmhtechnology.eventuate.VectorTime

class ExampleActor(override val id: String,
                   override val aggregateId: Option[String],
                   override val eventLog: ActorRef) extends EventsourcedActor {

  private var currentState: Vector[String] = Vector.empty
  private var updateTimestamp: VectorTime = VectorTime()

  override def onCommand = {
    // ...
  }

  override def onEvent = {
    case Appended(entry) =>
      if (updateTimestamp < lastVectorTimestamp) {
        // regular update
        currentState = currentState :+ entry
        updateTimestamp = lastVectorTimestamp
      } else if (updateTimestamp conc lastVectorTimestamp) {
        // concurrent update
        // TODO: track conflicting versions
      }
  }
}
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
import com.rbmhtechnology.eventuate.AbstractEventsourcedActor;
import com.rbmhtechnology.eventuate.VectorTime;

import java.util.Collection;
import java.util.Collections;

class ExampleActor extends AbstractEventsourcedActor {

  private Collection<String> currentState = Collections.emptyList();
  private VectorTime updateTimestamp = VectorTime.Zero();

  public ExampleActor(String id, ActorRef eventLog) {
    super(id, eventLog);

    setOnEvent(ReceiveBuilder
      .match(Appended.class, evt -> {
        if (updateTimestamp.lt(lastVectorTimestamp())) {
          // regular update
          currentState = append(currentState, evt.entry);
          updateTimestamp = lastVectorTimestamp();
        } else if (updateTimestamp.conc(lastVectorTimestamp())) {
          // concurrent update
          // TODO: track conflicting versions
        }
      })
      .build());
  }
}

Attaching update timestamps to current state and comparing them with vector timestamps of new events can be easily abstracted over so that applications don’t have to deal with these low level details, as shown in the next section.

Tracking conflicting versions

If state update operations from concurrent events do not commute, conflicting versions of actor state arise that must be tracked and resolved. This can be done with Eventuate’s ConcurrentVersions[S, A] abstraction and an application-defined update function of type (S, A) => S where S is the type of actor state and A the update type. In our example, the ConcurrentVersions type is ConcurrentVersions[Vector[String], String] and the update function (s, a) => s :+ a:

import scala.collection.immutable.Seq
import com.rbmhtechnology.eventuate.{ConcurrentVersions, Versioned}
import com.rbmhtechnology.eventuate.EventsourcedActor

class ExampleActor(override val id: String,
                   override val aggregateId: Option[String],
                   override val eventLog: ActorRef) extends EventsourcedActor {

  private var versionedState: ConcurrentVersions[Vector[String], String] =
    ConcurrentVersions(Vector.empty, (s, a) => s :+ a)

  override def onCommand = {
    // ...
  }

  override def onEvent = {
    case Appended(entry) =>
      versionedState = versionedState.update(entry, lastVectorTimestamp)
      if (versionedState.conflict) {
        val conflictingVersions: Seq[Versioned[Vector[String]]] = versionedState.all
        // TODO: resolve conflicting versions
      } else {
        val currentState: Vector[String] = versionedState.all.head.value
        // ...
      }
  }
}
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
import com.rbmhtechnology.eventuate.*;

import java.util.Collection;
import java.util.Collections;

class ExampleActor extends AbstractEventsourcedActor {

  private ConcurrentVersions<Collection<String>, String> versionedState =
    ConcurrentVersionsTree.create(Collections.emptyList(), (s, a) -> append(s, a));

  public ExampleActor(String id, ActorRef eventLog) {
    super(id, eventLog);

    setOnEvent(ReceiveBuilder
      .match(Appended.class, evt -> {
        versionedState = versionedState.update(evt.entry, lastVectorTimestamp(), lastSystemTimestamp(), lastEmitterId());

        if (versionedState.conflict()) {
          final Collection<Versioned<Collection<String>>> all = versionedState.getAll();
          // TODO: resolve conflicting versions
        } else {
          final Collection<String> currentState = versionedState.getAll().get(0).value();
          // ...
        }
      })
      .build());
  }
}

Internally, ConcurrentVersions maintains versions of actor state in a tree structure where each concurrent update creates a new branch. The shape of the tree is determined solely by the vector timestamps of the corresponding update events.

An event’s vector timestamp is passed as lastVectorTimestamp argument to update. The update method internally creates a new version by applying the update function (s, a) => s :+ a to the closest predecessor version and the actual update value (entry). The lastVectorTimestamp is attached as update timestamp to the newly created version.

Concurrent versions of actor state and their update timestamp can be obtained with all which is a sequence of type Seq[Versioned[Vector[String]]] in our example. The Versioned data type represents a particular version of actor state and its update timestamp (= vectorTimestamp field).

If all contains only a single element, there is no conflict and the element represents the current, conflict-free actor state. If the sequence contains two or more elements, there is a conflict where the elements represent conflicting versions of actor states. They can be resolved either automatically or interactively.

Note

Only concurrent updates to replicas with the same aggregateId may conflict. Concurrent updates to actors with different aggregateId do not conflict (unless an application does custom Event routing).

Also, if the data type of actor state is designed in a way that update operations commute, concurrent updates can be made conflict-free. This is discussed in section Operation-based CRDTs.

Resolving conflicting versions

Automated conflict resolution

The following is a simple example of automated conflict resolution: if a conflict has been detected, the version with the higher wall clock timestamp is selected to be the winner. In case of equal wall clock timestamps, the version with the lower emitter id is selected. The wall clock timestamp can be obtained with lastSystemTimestamp during event handling, the emitter id with lastEmitterId. The emitter id is the id of the EventsourcedActor that emitted the event.

class ExampleActor(override val id: String,
                   override val aggregateId: Option[String],
                   override val eventLog: ActorRef) extends EventsourcedActor {

  private var versionedState: ConcurrentVersions[Vector[String], String] =
    ConcurrentVersions(Vector.empty, (s, a) => s :+ a)

  override def onCommand = {
    // ...
  }

  override def onEvent = {
    case Appended(entry) =>
      versionedState = versionedState
        .update(entry, lastVectorTimestamp, lastSystemTimestamp, lastEmitterId)
      if (versionedState.conflict) {
        val conflictingVersions = versionedState.all.sortWith { (v1, v2) =>
          if (v1.systemTimestamp == v2.systemTimestamp) v1.creator < v2.creator
          else v1.systemTimestamp > v2.systemTimestamp
        }
        val winnerTimestamp: VectorTime = conflictingVersions.head.vectorTimestamp
        versionedState = versionedState.resolve(winnerTimestamp)
      }
  }
}
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
import com.rbmhtechnology.eventuate.*;

import java.util.Collection;
import java.util.Collections;
import java.util.stream.Stream;

import static userguide.japi.DocUtils.append;


class ExampleActor extends AbstractEventsourcedActor {

  private ConcurrentVersions<Collection<String>, String> versionedState =
    ConcurrentVersionsTree.create(Collections.emptyList(), (s, a) -> append(s, a));

  public ExampleActor(String id, ActorRef eventLog) {
    super(id, eventLog);

    setOnEvent(ReceiveBuilder
      .match(Appended.class, evt -> {
        versionedState = versionedState.update(evt.entry, lastVectorTimestamp(), lastSystemTimestamp(), lastEmitterId());

        if (versionedState.conflict()) {
          final Stream<Versioned<Collection<String>>> conflictingVersions = versionedState.getAll().stream()
            .sorted((v1, v2) -> {
              if (v1.systemTimestamp() == v2.systemTimestamp()) {
                return v1.creator().compareTo(v2.creator());
              }
              return v1.systemTimestamp() > v2.systemTimestamp() ? -1 : 1;
            });

          final VectorTime winnerTimestamp = conflictingVersions.findFirst().get().vectorTimestamp();
          versionedState = versionedState.resolve(winnerTimestamp);
        }
      })
      .build());
  }
}

Here, conflicting versions are sorted by descending wall clock timestamp and ascending emitter id where the latter is tracked as creator of the version. The first version is selected to be the winner. Its vector timestamp is passed as argument to resolve which selects this version and discards all other versions.

More advanced conflict resolution could select a winner depending on the actual value of concurrent versions. After selection, an application could even update the winner with the merged value of all conflicting versions[3].

Note

For replicas to converge, it is important that winner selection does not depend on the order of conflicting events. In our example, this is the case because wall clock timestamp and emitter id comparison is transitive.

Interactive conflict resolution

Interactive conflict resolution does not resolve conflicts immediately but requests the user to inspect and resolve a conflict. The following is a very simple example of interactive conflict resolution: a user selects a winner version if conflicting versions of application state exist.

case class Append(entry: String)
case class AppendRejected(entry: String, conflictingVersions: Seq[Versioned[Vector[String]]])

case class Resolve(selectedTimestamp: VectorTime)
case class Resolved(selectedTimestamp: VectorTime)

class ExampleActor(override val id: String,
                   override val aggregateId: Option[String],
                   override val eventLog: ActorRef) extends EventsourcedActor {

  private var versionedState: ConcurrentVersions[Vector[String], String] =
    ConcurrentVersions(Vector.empty, (s, a) => s :+ a)

  override def onCommand = {
    case Append(entry) if versionedState.conflict =>
      sender() ! AppendRejected(entry, versionedState.all)
    case Append(entry) =>
      // ...
    case Resolve(selectedTimestamp) => persist(Resolved(selectedTimestamp)) {
      case Success(evt) => // reply to sender omitted ...
      case Failure(err) => // reply to sender omitted ...
    }
  }

  override def onEvent = {
    case Appended(entry) =>
      versionedState = versionedState
        .update(entry, lastVectorTimestamp, lastSystemTimestamp, lastEmitterId)
    case Resolved(selectedTimestamp) =>
      versionedState = versionedState.resolve(selectedTimestamp, lastVectorTimestamp)
  }
}
class ExampleActor extends AbstractEventsourcedActor {

  private ConcurrentVersions<Collection<String>, String> versionedState =
    ConcurrentVersionsTree.create(Collections.emptyList(), (s, a) -> append(s, a));

  public ExampleActor(String id, ActorRef eventLog) {
    super(id, eventLog);

    setOnCommand(ReceiveBuilder
      .match(Append.class, cmd -> versionedState.conflict(),
        cmd -> sender().tell(new AppendRejected(cmd.entry, versionedState.getAll()), self())
      )
      .match(Append.class, cmd -> {
        // ....
      })
      .match(Resolve.class, cmd -> persist(new Resolved(cmd.selectedTimestamp), ResultHandler.on(
        evt -> { /* reply to sender omitted ... */ },
        err -> { /* reply to sender omitted ... */ }
      )))
      .build());

    setOnEvent(ReceiveBuilder
      .match(Appended.class, evt ->
        versionedState = versionedState.update(evt.entry, lastVectorTimestamp(), lastSystemTimestamp(), lastEmitterId())
      )
      .match(Resolved.class, evt ->
        versionedState = versionedState.resolve(evt.selectedTimestamp, lastVectorTimestamp(), lastSystemTimestamp())
      )
      .build());
  }
}

// Command
class Append {
  public final String entry;

  public Append(String entry) {
    this.entry = entry;
  }
}

// Command reply
class AppendRejected {
  public final String entry;
  public final Collection<Versioned<Collection<String>>> conflictingVersions;

  public AppendRejected(String entry, Collection<Versioned<Collection<String>>> conflictingVersions) {
    this.entry = entry;
    this.conflictingVersions = conflictingVersions;
  }
}

// Command
class Resolve {
  public final VectorTime selectedTimestamp;

  public Resolve(VectorTime selectedTimestamp) {
    this.selectedTimestamp = selectedTimestamp;
  }
}

// Command reply
class Resolved {
  public final VectorTime selectedTimestamp;

  public Resolved(VectorTime selectedTimestamp) {
    this.selectedTimestamp = selectedTimestamp;
  }
}

When a user tries to Append in presence of a conflict, the ExampleActor rejects the update and requests the user to select a winner version from a sequence of conflicting versions. The user then sends the update timestamp of the winner version as selectedTimestamp with a Resolve command from which a Resolved event is derived and persisted. Handling of Resolved at all replicas finally resolves the conflict.

In addition to just selecting a winner, an application could also update the winner version in a second step, for example, with a value derived from the merge result of conflicting versions. Support for atomic, interactive conflict resolution with an application-defined merge function is planned for later Eventuate releases.

Note

Interactive conflict resolution requires agreement among replicas that are affected by a given conflict: only one of them may emit the Resolved event. This does not necessarily mean distributed lock acquisition or leader (= resolver) election but can also rely on static rules such as only the initial creator location of an aggregate is allowed to resolve the conflict[4]. This rule is implemented in the Example application.

Operation-based CRDTs

If state update operations commute, there’s no need to use Eventuate’s ConcurrentVersions utility. A simple example is a replicated counter, which converges because its increment and decrement operations commute.

A formal to approach to commutative replicated data types (CmRDTs) or operation-based CRDTs is given in the paper A comprehensive study of Convergent and Commutative Replicated Data Types by Marc Shapiro et al. Eventuate is a good basis for implementing operation-based CRDTs:

  • Update operations can be modeled as events and reliably broadcasted to all replicas by a Replicated event log.
  • The command and event handler of an event-sourced actor can be used to implement the two update phases mentioned in the paper: atSource and downstream, respectively.
  • All downstream preconditions mentioned in the paper are satisfied in case of causal delivery of update operations which is guaranteed for actors consuming from a replicated event log.

Eventuate currently implements 5 out of 12 operation-based CRDTs specified in the paper. These are Counter, MV-Register, LWW-Register, OR-Set and OR-Cart (a shopping cart CRDT). They can be instantiated and used via their corresponding CRDT services. CRDT operations are asynchronous methods on the service interfaces. CRDT services free applications from dealing with low-level details like event-sourced actors or command messages directly. The following is the definition of ORSetService:

/**
 * Replicated [[ORSet]] CRDT service.
 *
 * @param serviceId Unique id of this service.
 * @param log Event log.
 * @tparam A [[ORSet]] entry type.
 */
class ORSetService[A](val serviceId: String, val log: ActorRef)(implicit val system: ActorSystem, val ops: CRDTServiceOps[ORSet[A], Set[A]])
  extends CRDTService[ORSet[A], Set[A]] {

  /**
   * Adds `entry` to the OR-Set identified by `id` and returns the updated entry set.
   */
  def add(id: String, entry: A): Future[Set[A]] =
    op(id, AddOp(entry))

  /**
   * Removes `entry` from the OR-Set identified by `id` and returns the updated entry set.
   */
  def remove(id: String, entry: A): Future[Set[A]] =
    op(id, RemoveOp(entry))

  start()
}

/**
 * Persistent add operation used for [[ORSet]] and [[ORCart]].
 */
case class AddOp(entry: Any) extends CRDTFormat

/**
 * Persistent remove operation used for [[ORSet]] and [[ORCart]].
 */
case class RemoveOp(entry: Any, timestamps: Set[VectorTime] = Set.empty) extends CRDTFormat
/**
 * Java API of a replicated [[ORSet]] CRDT service.
 *
 * @param serviceId Unique id of this service.
 * @param log Event log.
 * @param system Actor system.
 * @tparam A [[ORSet]] entry type.
 */
class ORSetService<A> extends CRDTService<ORSet<A>, Set<A>> {

  ORSetService(String serviceId, ActorRef log, ActorSystem system) {
    super(serviceId, log, system);

    start();
  }

  public CompletionStage<Set<A>> add(String id, A entry) {
    return op(id, new AddOp(entry));
  }

  public CompletionStage<Set<A>> remove(String id, A entry) {
    return op(id, new RemoveOp(entry));
  }
}

The ORSetService is a CRDT service that manages ORSet instances. It implements the asynchronous add and remove methods and inherits the value(id: String): Future[Set[A]] method from CRDTService[ORSet[A], Set[A]] for reading the current value. Their id parameter identifies an ORSet instance. Instances are automatically created by the service on demand. A usage example is the ReplicatedOrSetSpec that is based on Akka’s multi node testkit.

A CRDT service also implements a save(id: String): Future[SnapshotMetadata] method for saving CRDT snapshots. Snapshots may reduce recovery times of CRDTs with a long update history but are not required for CRDT persistence.

New operation-based CRDTs and their corresponding services can be developed with the CRDT development framework, by defining an instance of the CRDTServiceOps type class and implementing the CRDTService trait. Take a look at the CRDT sources for examples.

Hint

Eventuate’s CRDT approach is also described in this article.

Event-sourced views

Event-sourced views are a functional subset of event-sourced actors. They can only consume events from an event log but cannot produce new events. Concrete event-sourced views must implement the EventsourcedView trait. In the following example, the view counts all Appended and Resolved events emitted by all event-sourced actors to the same eventLog:

import akka.actor.ActorRef
import com.rbmhtechnology.eventuate.EventsourcedView
import com.rbmhtechnology.eventuate.VectorTime

case class Appended(entry: String)
case class Resolved(selectedTimestamp: VectorTime)

case object GetAppendCount
case class GetAppendCountReply(count: Long)

case object GetResolveCount
case class GetResolveCountReply(count: Long)

class ExampleView(override val id: String, override val eventLog: ActorRef) extends EventsourcedView {
  private var appendCount: Long = 0L
  private var resolveCount: Long = 0L

  override def onCommand = {
    case GetAppendCount => sender() ! GetAppendCountReply(appendCount)
    case GetResolveCount => sender() ! GetResolveCountReply(resolveCount)
  }

  override def onEvent = {
    case Appended(_) => appendCount += 1L
    case Resolved(_) => resolveCount += 1L
  }
}
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
import com.rbmhtechnology.eventuate.AbstractEventsourcedView;
import com.rbmhtechnology.eventuate.VectorTime;

class ExampleView extends AbstractEventsourcedView {

  private Long appendCount = 0L;
  private Long resolveCount = 0L;

  public ExampleView(String id, ActorRef eventLog) {
    super(id, eventLog);

    setOnCommand(ReceiveBuilder
      .match(GetAppendCount.class, cmd -> sender().tell(new GetAppendCountReply(appendCount), self()))
      .match(GetResolveCount.class, cmd -> sender().tell(new GetResolveCountReply(resolveCount), self()))
      .build());

    setOnEvent(ReceiveBuilder
      .match(Appended.class, evt -> appendCount += 1)
      .match(Resolved.class, evt -> resolveCount += 1)
      .build());
  }
}

// Commands
class GetAppendCount {
}

class GetResolveCount {
}

// Command replies
class GetAppendCountReply {
  public final Long count;

  public GetAppendCountReply(Long count) {
    this.count = count;
  }
}

class GetResolveCountReply {
  public final Long count;

  public GetResolveCountReply(Long count) {
    this.count = count;
  }
}

// Events
class Appended {
  public final String entry;

  public Appended(String entry) {
    this.entry = entry;
  }
}

class Resolved {
  public final VectorTime selectedTimestamp;

  public Resolved(VectorTime selectedTimestamp) {
    this.selectedTimestamp = selectedTimestamp;
  }
}

Event-sourced views handle events in the same way as event-sourced actors by implementing an onEvent handler. The onCommand handler in the example processes the queries GetAppendCount and GetResolveCount.

ExampleView implements the mandatory global unique id but doesn’t define an aggregateId. A view that doesn’t define an aggregateId can consume events from all event-sourced actors on the same event log. If it defines an aggregateId it can only consume events from event-sourced actors with the same aggregateId (assuming the default Event routing rules).

Hint

While event-sourced views maintain view state in-memory, Event-sourced writers can be used to persist view state to external databases. A specialization of event-sourced writers are Event-sourced processors whose external database is an event log.

Conditional requests

Causal read consistency is the default when reading state from a single event-sourced actor or view. The event stream received by that actor is always causally ordered, hence, it will never see an effect before having seen its cause.

The situation is different when a client reads from multiple actors. Imagine two event-sourced actor replicas where a client updates one replica and observes the updated state with the reply. A subsequent from the other replica, made by the same client, may return the old state which violates causal consistency.

Similar considerations can be made for reading from an event-sourced view after having made an update to an event-sourced actor. For example, an application that successfully appended an entry to ExampleActor may not immediately see that update in the appendCount of ExampleView. To achieve causal read consistency, the view should delay command processing until the emitted event has been consumed by the view. This can be achieved with a ConditionalRequest.

import scala.concurrent.duration._
import scala.util._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import com.rbmhtechnology.eventuate._

case class Append(entry: String)
case class AppendSuccess(entry: String, updateTimestamp: VectorTime)

class ExampleActor(override val id: String,
                   override val eventLog: ActorRef) extends EventsourcedActor {

  private var currentState: Vector[String] = Vector.empty
  override val aggregateId = Some(id)

  override def onCommand = {
    case Append(entry) => persist(Appended(entry)) {
      case Success(evt) =>
        sender() ! AppendSuccess(entry, lastVectorTimestamp)
      // ...
    }
    // ...
  }

  override def onEvent = {
    case Appended(entry) => currentState = currentState :+ entry
  }
}

class ExampleView(override val id: String, override val eventLog: ActorRef)
  extends EventsourcedView with ConditionalRequests {
  // ...
}

val ea = system.actorOf(Props(new ExampleActor("ea", eventLog)))
val ev = system.actorOf(Props(new ExampleView("ev", eventLog)))

import system.dispatcher
implicit val timeout = Timeout(5.seconds)

for {
  AppendSuccess(_, timestamp) <- ea ? Append("a")
  GetAppendCountReply(count)  <- ev ? ConditionalRequest(timestamp, GetAppendCount)
} println(s"append count = $count")
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.rbmhtechnology.eventuate.*;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static scala.compat.java8.JFunction.func;
import static scala.compat.java8.JFunction.proc;

class ExampleActor extends AbstractEventsourcedActor {

  private final String id;
  private Collection<String> currentState = Collections.emptyList();

  public ExampleActor(String id, ActorRef eventLog) {
    super(id, eventLog);
    this.id = id;

    setOnCommand(ReceiveBuilder
      .match(Append.class, cmd -> persist(new Appended(cmd.entry), ResultHandler.onSuccess(
        evt -> sender().tell(new AppendSuccess(evt.entry, lastVectorTimestamp()), self())
      )))
      // ...
      .build());

    setOnEvent(ReceiveBuilder
      .match(Appended.class, evt -> currentState = append(currentState, evt.entry))
      .build());
  }

  @Override
  public Optional<String> getAggregateId() {
    return Optional.of(id);
  }
}

// Command
public class Append {
  public final String entry;

  public Append(String entry) {
    this.entry = entry;
  }
}

// Command reply
public class AppendSuccess {
  public final String entry;
  public final VectorTime updateTimestamp;

  public AppendSuccess(String entry, VectorTime updateTimestamp) {
    this.entry = entry;
    this.updateTimestamp = updateTimestamp;
  }
}

// Eventsourced-View
class ExampleView extends AbstractEventsourcedView {

  // AbstractEventsourcedView has ConditionalRequests mixed-in by default

  public ExampleView(String id, ActorRef eventLog) {
    super(id, eventLog);

    // ...
  }
}

final ActorRef ea = system.actorOf(Props.create(ExampleActor.class, () -> new ExampleActor("ea", eventLog)));
final ActorRef ev = system.actorOf(Props.create(ExampleView.class, () -> new ExampleView("ev", eventLog)));

final Timeout timeout = Timeout.apply(5, TimeUnit.SECONDS);

Patterns.ask(ea, new Append("a"), timeout)
  .flatMap(func(m -> Patterns.ask(ev, new ConditionalRequest(((AppendSuccess) m).updateTimestamp, new GetAppendCount()), timeout)), dispatcher)
  .onComplete(proc(result -> {
    if (result.isSuccess()) {
      System.out.println("append count = " + ((GetAppendCountReply) result.get()).count);
    }
  }), dispatcher);

Here, the ExampleActor includes the event’s vector timestamp in its AppendSuccess reply. Together with the actual GetAppendCount command, the timestamp is included as condition in a ConditionalRequest and sent to the view. For ConditionalRequest processing, an event-sourced view must extend the ConditionalRequests trait. ConditionalRequests internally delays the command, if needed, and only dispatches GetAppendCount to the view’s onCommand handler if the condition timestamp is in the causal past of the view (which is earliest the case when the view consumed the update event). When running the example with an empty event log, it should print:

append count = 1

Note

Not only event-sourced views but also event-sourced actors, stateful event-sourced writers and processors can extend ConditionalRequests. Delaying conditional requests may re-order them relative to other conditional and non-conditional requests.

Event-driven communication

Earlier sections have already shown one form of event collaboration: state replication. For that purpose, event-sourced actors of the same type exchange their events to re-construct actor state at different locations.

In more general cases, event-sourced actors of different type exchange events to achieve a common goal. They react on received events by updating internal state and producing new events. This form of event collaboration is called event-driven communication. In the following example, two event-actors collaborate in a ping-pong game where

  • a PingActor emits a Ping event on receiving a Pong event and
  • a PongActor emits a Pong event on receiving a Ping event
// some imports omitted ...
import com.rbmhtechnology.eventuate.EventsourcedView.Handler
import com.rbmhtechnology.eventuate.EventsourcedActor
import com.rbmhtechnology.eventuate.PersistOnEvent

case class Ping(num: Int)
case class Pong(num: Int)

class PingActor(val id: String, val eventLog: ActorRef, completion: ActorRef)
  extends EventsourcedActor with PersistOnEvent {

  override def onCommand = {
    case "serve" => persist(Ping(1))(Handler.empty)
  }

  override def onEvent = {
    case Pong(10) if !recovering => completion ! "done"
    case Pong(i)  => persistOnEvent(Ping(i + 1))
  }
}

class PongActor(val id: String, val eventLog: ActorRef)
  extends EventsourcedActor with PersistOnEvent {

  override def onCommand = {
    case _ =>
  }
  override def onEvent = {
    case Ping(i) => persistOnEvent(Pong(i))
  }
}

val pingActor = system.actorOf(Props(new PingActor("ping", eventLog, system.deadLetters)))
val pongActor = system.actorOf(Props(new PongActor("pong", eventLog)))

pingActor ! "serve"
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.rbmhtechnology.eventuate.AbstractEventsourcedActor;
import com.rbmhtechnology.eventuate.ResultHandler;
import static akka.actor.ActorRef.noSender;

class PingActor extends AbstractEventsourcedActor {

  public PingActor(String id, ActorRef eventLog, ActorRef completion) {
    super(id, eventLog);

    setOnCommand(ReceiveBuilder
      .matchEquals("serve", cmd -> persist(new Ping(1), ResultHandler.none()))
      .build());

    setOnEvent(ReceiveBuilder
      .match(Pong.class, evt -> evt.num == 10 && !recovering(), evt -> completion.tell("done", self()))
      .match(Pong.class, evt -> persistOnEvent(new Ping(evt.num + 1)))
      .build());
  }
}

class PongActor extends AbstractEventsourcedActor {

  public PongActor(String id, ActorRef eventLog) {
    super(id, eventLog);

    setOnEvent(ReceiveBuilder
      .match(Ping.class, evt -> persistOnEvent(new Pong(evt.num)))
      .build());
  }
}

class Ping {
  public final Integer num;

  public Ping(Integer num) {
    this.num = num;
  }
}

class Pong {
  public final Integer num;

  public Pong(Integer num) {
    this.num = num;
  }
}

final ActorRef pingActor = system.actorOf(Props.create(PingActor.class, () -> new PingActor("ping", eventLog, system.deadLetters())));
final ActorRef pongActor = system.actorOf(Props.create(PongActor.class, () -> new PongActor("pong", eventLog)));

pingActor.tell("serve", noSender());

The ping-pong game is started by sending the PingActor a ”serve” command which persists the first Ping event. This event however is not consumed by the emitter but rather by the PongActor. The PongActor reacts on the Ping event by emitting a Pong event. Other than in previous examples, the event is not emitted in the actor’s onCommand handler but in the onEvent handler. For that purpose, the actor has to mixin the PersistOnEvent trait and use the persistOnEvent method. The emitted Pong too isn’t consumed by its emitter but rather by the PingActor, emitting another Ping, and so on. The game ends when the PingActor received the 10th Pong.

Note

The ping-pong game is reliable. When an actor crashes and is re-started, the game is reliably resumed from where it was interrupted. The persistOnEvent method is idempotent i.e. no duplicates are written under failure conditions and later event replay. When deployed at different location, the ping-pong actors are also partition-tolerant. When their game is interrupted by a network partition, it is automatically resumed when the partition heals.

Furthermore, the actors don’t need to care about idempotency in their business logic i.e. they can assume to receive a de-duplicated and causally-ordered event stream in their onEvent handler. This is a significant advantage over at-least-once delivery based communication with ConfirmedDelivery, for example, which can lead to duplicates and message re-ordering.

In a more real-world example, there would be several actors of different type collaborating to achieve a common goal, for example, in a distributed business process. These actors can be considered as event-driven and event-sourced microservices, collaborating on a causally ordered event stream in a reliable and partition-tolerant way. Furthermore, when partitioned, they remain available for local writes and automatically catch up with their collaborators when the partition heals.

Hint

Further persistOnEvent details are described in the PersistOnEvent API docs.

[1]EventsourcedActors and EventsourcedViews that have an undefined aggregateId can consume events from all other actors on the same event log.
[2]Attached update timestamps are not version vectors because Eventuate uses vector clock update rules instead of version vector update rules. Consequently, update timestamp equivalence cannot be used as criterion for replica convergence.
[3]A formal approach to automatically merge concurrent versions of application state are convergent replicated data types (CvRDTs) or state-based CRDTs.
[4]Distributed lock acquisition or leader election require an external coordination service like ZooKeeper, for example, whereas static rules do not.