Event log

Local event log

A local event log belongs to a given location[1] and a location can have one or more local event logs. Depending on the storage backend, a local event log may optionally be replicated within that location for stronger durability guarantees but this is rather an implementation details of the local event log. From Eventuate’s perspective, event replication occurs between different locations which is further described in section Replicated event log.

To an application, an event log is represented by an event log actor. Producers and consumers interact with that actor to write events to and read events from the event log. They also register at the event log actor to be notified about newly written events. The messages that can be exchanged with an event log actor are defined in EventsourcingProtocol and ReplicationProtocol.

At the moment, two storage backends are directly supported by Eventuate, LevelDB and Cassandra. Usage of the corresponding event log actors is explained in LevelDB storage backend and Cassandra storage backend, respectively. The integration of custom storage backends into Eventuate is explained in Custom storage backends.

LevelDB storage backend

A local event log actor with a LevelDB storage backend writes events to the local file system. It can be created with:

import akka.actor.ActorSystem
import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog

val system: ActorSystem = // ...
val log = system.actorOf(LeveldbEventLog.props(logId = "L1", prefix = "log"))

Applications must provide a unique logId for that log, the prefix is optional and defaults to log. This will create a directory log-L1 in which the LevelDB files are stored. The root directory of all local LevelDB directories can be configured with the eventuate.log.leveldb.dir configuration key in application.conf:

eventuate.log.leveldb.dir = /var/eventuate

With this configuration, the absolute path of the LevelDB directory in the above example is /var/eventuate/log-L1. If not configured, eventuate.log.leveldb.dir defaults to target. Further eventuate.log.leveldb configuration options are given in section Configuration.

Cassandra storage backend

Eventuate also provides an event log actor implementation that writes events to a Cassandra cluster. That actor can be created with:

import akka.actor.ActorSystem
import com.rbmhtechnology.eventuate.log.cassandra.CassandraEventLog

val system: ActorSystem = // ...
val log = system.actorOf(CassandraEventLog.props(logId = "L1"))

With default configuration settings the event log actor connects to a Cassandra node at 127.0.0.1:9042. This can be customized with

eventuate.log.cassandra.contact-points = [host1[:port1], host2[:port2], ...]

Ports are optional and default to 9042 according to

eventuate.log.cassandra.default-port = 9042

If Cassandra requires authentication, the default username used by Eventuate is cassandra, the default password is cassandra (which corresponds to Cassandra’s default superuser). This can be changed with

eventuate.log.cassandra.username = "my-username"
eventuate.log.cassandra.password = "my-password"

Further details are described in the API docs of the Cassandra extension and the CassandraEventLog actor. A complete reference of eventuate.log.cassandra configuration options is given in section Configuration.

Note

Eventuate requires Cassandra version 2.1 or higher.

Hint

For instructions how to run a local Cassandra cluster you may want to read the article Chaos testing with Docker and Cassandra on Mac OS X.

Custom storage backends

A custom storage backend can be integrated into Eventuate by extending the abstract EventLog actor and implementing the EventLogSPI trait. For implementation examples, please take a look at LeveldbEventLog.scala and CassandraEventLog.scala.

Replicated event log

Local event logs from different locations can be connected for event replication. For example, when connecting a local event log L1 at location 1 with a local event log L2 at location 2, then the events written to L1 are asynchronously replicated to location 2 and merged into to L2. Also, events written to L2 are asynchronously replicated to location 1 and merged into L1. Merging preserves the causal ordering of events which is tracked with Vector clocks. Setting up a bi-directional replication connection between local event logs L1 and L2 yields a replicated event log L:

L1 ---- L2

Since events can be written concurrently at different locations, the total order of events in the local event logs at different locations is likely to differ. The causal order of events, however, is consistent across locations: if event e1 causes event e2 (i.e. e1 happened before e2) then the offset of e1 is less than the offset of e2 at all locations. The offset of an event in a local event log is its local sequence number. On the other hand, if e1 and e2 are written concurrently, their relative order in a local event log is not defined: the offset of e1 can be less than that of e2 at one location but greater than that of e2 at another location.

A replicated event log can also be set up for more than two locations. Here event log L is replicated across locations 1 - 6:

L1           L5
  \         /
   L2 --- L4
  /         \
L3           L6

A location may also have several local event logs that can be replicated independently of each other. The following example shows three replicated events logs L, M and N that are replicated across locations 1 and 2:

L1 ---- L2
M1 ---- M2
N1 ---- N2

The distribution of L, M and N across locations may also differ:

L1 ---- L2
M1 ---- M2 --- M3
        N2 --- N3

Note

Event replication is reliable and can recover from network failures. It can also recover from crashes of source and target locations i.e. event replication automatically resumes when a crashed location recovers. Replicated events are also guaranteed to be written exactly-once to a target log. This is possible because replication progress metadata are stored along with replicated events in the target log. This allows a replication target to reliably detect and ignore duplicates. Event-sourced components can therefore rely on receiving a de-duplicated event stream.

Replication endpoints

Events are replicated over replication connections that are established between replication endpoints. A location may have one or more replication endpoints and a replication endpoint can manage one or more event logs. The following examples assume two locations 1 and 2 and two replicated event logs L and M:

L1 ---- L2
M1 ---- M2

Each location has a ReplicationEndpoint that manages the local event logs. Replication endpoints communicate with each other via Akka Remoting which must be enabled by all locations in their application.conf:

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.enabled-transports = ["akka.remote.netty.tcp"]

Next to TCP Akka Remoting also support TLS as transport protocol. See Transport Security for details on how to set this up.

The network address of the replication endpoint at location 1 is:

akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port=2552

At location 2 it is:

akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port=2553

The ReplicationEndpoint at location 1 can be created programmatically with:

import com.rbmhtechnology.eventuate._
import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog

implicit val system: ActorSystem =
  ActorSystem(ReplicationConnection.DefaultRemoteSystemName)

val endpoint1 = new ReplicationEndpoint(id = "1", logNames = Set("L", "M"),
  logFactory = logId => LeveldbEventLog.props(logId),
  connections = Set(ReplicationConnection("127.0.0.1", 2553)))

endpoint1.activate()

A ReplicationEndpoint must have a global unique id. Here, the location identifier 1 is used to identify the replication endpoint. Furthermore, the logNames[2] of the replicated event logs (L and M) and a logFactory for creating the local event log actors are passed as constructor arguments. Input parameter of the logFactory is a unique logId that is generated by the replication endpoint from a combination of the given logNames and the endpoint id.

The last ReplicationEndpoint constructor parameter is a set of ReplicationConnections. Here, it is a single replication connection that connects to the remote replication endpoint at location 2. With this replication connection, events are replicated from location 2 to location 1. For starting event replication, a replication endpoint must be activated by calling the activate() method. For replicating events in the other direction, a corresponding ReplicationEndpoint and ReplicationConnection must be set up at location 2:

val endpoint2 = new ReplicationEndpoint(id = "2", logNames = Set("L", "M"),
  logFactory = logId => LeveldbEventLog.props(logId),
  connections = Set(ReplicationConnection("127.0.0.1", 2552)))

endpoint2.activate()

The event log actors that are created by a ReplicationEndpoint can be obtained from its logs map. Map keys are the event log names, map values the event log ActorRefs:

val l1: ActorRef = endpoint1.logs("L")
val m1: ActorRef = endpoint1.logs("M")

Optionally, an application may also set an applicationName and an applicationVersion for a replication endpoint:

val endpoint3 = new ReplicationEndpoint(id = "3", logNames = Set("L", "M"),
  logFactory = logId => LeveldbEventLog.props(logId),
  connections = Set(ReplicationConnection("127.0.0.1", 2552)),
  applicationName = "example-application",
  applicationVersion = ApplicationVersion("1.2"))

If the applicationNames of two replication endpoints are equal, events are only replicated from the source endpoint to the target endpoint if the applicationVersion of the target endpoint is greater than or equal to that of the source endpoint. This is a simple mechanism to support incremental version upgrades of replicated applications where each replica can be upgraded individually without shutting down other replicas. This avoids permanent state divergence during upgrade which may occur if events are replicated from replicas with higher version to those with lower version. A replication endpoint whose replication attempts have been rejected due to an incompatible application version logs warning such as:

Event replication rejected by remote endpoint 3.
Target ApplicationVersion(1,2) not compatible with
source ApplicationVersion(1,3).

If the applicationNames of two replication endpoints are not equal, events are always replicated, regardless of their applicationVersion value.

Hint

Further ReplicationEndpoint creation options are described in the API documentation of the ReplicationEndpoint and ReplicationConnection companion objects. A complete reference of configuration options is given in section Configuration.

Note

Failure detection reports endpoints with incompatible application versions as unavailable.

Replication filters

By default, all events are replicated. Applications may provide ReplicationFilters to limit replication to a subset of events. A custom replication filter can be defined, by extending ReplicationFilter and implementing a filter predicate (method apply). For example, the following replication filter accepts DurableEvents with a matching emitterAggregateId:

import com.rbmhtechnology.eventuate._

case class AggregateIdFilter(aggregateId: String) extends ReplicationFilter {
  override def apply(event: DurableEvent): Boolean =
    event.emitterAggregateId.contains(aggregateId)
}

Replication filters can also be composed. The following composedFilter accepts events with a defined emitterAggregateId of value order-17 or order-19:

val filter1 = AggregateIdFilter("order-17")
val filter2 = AggregateIdFilter("order-19")
val composedFilter = filter1 or filter2

For the definition of filter logic based on application-defined events, replication filters should use the payload field of DurableEvent.

Replication filters are defined per ReplicationEndpoint in Form of an EndpointFilters instance. An EndpointFilters instance allows to specify filters for specific target event logs (by target log id) or source log (by source log name). The filters are applied on incoming replication read request at the endpoint at which they are defined. The following example configures a replication filter for the log with name L:

val endpoint = new ReplicationEndpoint(id = "2", logNames = Set("L", "M"),
  logFactory = logId => LeveldbEventLog.props(logId),
  connections = Set(ReplicationConnection("127.0.0.1", 2553)),
  endpointFilters = EndpointFilters.sourceFilters(Map("L" -> filter1))
)

Events of log L are only replicated to remote logs when they pass filter filter1. See EndpointFilters for various options to combine source and target log specific filters.

Hint

Replication filters are especially useful to prevent location-specific (or location-private) events from being replicated to other locations.

Update notifications

After having replicated a non-empty event batch, a replication endpoint immediately makes another replication attempt. On the other hand, if the replicated event batch is empty, the next replication attempt is delayed by a duration that can be configured with:

eventuate.log.replication.retry-delay = 5s
eventuate.log.replay-retry-delay = 10s

Consequently, event replication latency has an upper bound that is determined by this parameter. To minimize event replication latency, replication endpoints by default send event log update notifications to each other. The corresponding configuration parameter is:

eventuate.log.replication.update-notifications = on

The impact of sending update notifications on average event replication latency, however, decreases with increasing event write load. Applications under high event write load may even experience increased event replication throughput if update notifications are turned off.

Failure detection

Replication endpoints can notify applications about availability and unavailability of remote event logs. They can become unavailable during a network partition, during a downtime of their hosting application or because of incompatible applicationVersions of their replication endpoints, for example. A local replication endpoint publishes

  • Available(endpointId: String, logName: String) messages to the local ActorSystems event stream if the remote replication endpoint is available, and
  • Unavailable(endpointId: String, logName: String, causes: Seq[Throwable]) messages to the local ActorSystems event stream if the remote replication endpoint is unavailable

Both messages are defined in ReplicationEndpoint. Their endpointId parameter is the id of the remote replication endpoint, the logName parameter is the name of an event log that is managed by the remote endpoint.

Unavailable messages also contain all causes of an unavailability. Only those causes that occurred since last publication of a previous Available or Unavailable message with the same endpointId and logName are included. Causes are one or more instances of:

A failure detection limit can be configured with:

eventuate.log.replication.failure-detection-limit = 60s

It instructs the failure detector to publish an Unavailable message if there is no successful reply from a remote replication endpoint within 60 seconds. Available and Unavailable messages are published periodically at intervals of eventuate.log.replication.failure-detection-limit.

Disaster recovery

Total or partial event loss at a given location is classified as disaster. Event loss can be usually prevented by using a clustered Cassandra storage backend (at each location) but a catastrophic failure may still lead to event loss. In this case, lost events can be recovered from other locations, optionally starting from an existing storage backup, a procedure called disaster recovery.

If a storage backup exists, events can be partially recovered from that backup so that only events not covered by the backup must be copied from other locations. Recovery of events at a given location is only possible to the extend they have been previously replicated to other locations (or written to the backup). Events that have not been replicated to other locations or for which no storage backup exists cannot be recovered.

Disaster recovery is executed per ReplicationEndpoint by calling its asynchronous recover() method. During recovery the endpoint is activated to replicate lost events. Only after recovery successfully completed the application may start their event-sourced components, otherwise recovery must be re-tried.

import com.rbmhtechnology.eventuate.ReplicationEndpoint
import scala.concurrent.Future
import scala.util._

val endpoint: ReplicationEndpoint = //...
val recovery: Future[Unit] = endpoint.recover()

recovery onComplete {
  case Success(_) => // all events recovered, local writes are allowed
  case Failure(e) => // retry recovery ...
}

During execution of disaster recovery, directly connected endpoints must be available. These are endpoints for which replication connections have been configured at the endpoint to be recovered. Availability is needed because connected endpoints need to update internal metadata before they can resume event replication with the recovered endpoint.

Hint

In theory a ReplicationEndpoint could always be activated at the beginning by calling its recover() method. However, as this requires that all directly connected endpoints must be available, it is not recommended.

Disaster recovery also deletes invalid snapshots, in case they survived the disaster. Invalid snapshots are those that cover lost events.

A complete reference of eventuate.log.recovery.* configuration options is given in section Configuration. The example application also implements Disaster recovery.

Note

Installing a storage backup is a separate administrative task that is not covered by running recover().

Deleting events

As outlined in the Overview, an event log is a continuously growing store of immutable facts. Depending on the implementation of the application, not all events are necessarily needed to recover application state after an application or actor restart. For example, if the application saves snapshots, only those events that occurred after the snapshot need to be available. But even without snapshots there can be application-specific boundary conditions that allow an application to recover its state from a certain sequence number on. To keep a store from growing indefinitely in these cases a ReplicationEndpoint allows the deletion events up to a given sequence number from a local log. Deletion of events actually differentiates between:

  • Logical deletion of events: Events that are logically deleted are not replayed in case of an actor restart. However they are still available for replication to event logs of connected ReplicationEndpoints. All storage backends support logical deletion of events.
  • Physical deletion of events: Depending on the storage backend logically deleted events are eventually physically deleted. Physical deleted events are of course not available any more for local replay or replication. Physical deletion is currently only supported by the LevelDB backend.

While a location can very well decide if it needs certain events from a local event log to recover its state, it may be much less clear if these events might be needed in the future for replication to other locations. Eventuate can defer physical deletion of events until they are replicated to known ReplicationEndpoints. In case a newly added location wants to catch up with the application’s full event history, it has to connect to a location that actually has the full event history.

Considering this, deletion is invoked through the delete method of a ReplicationEndpoint:

// logically delete all events from log L1 with a sequence number <= 100
// defer physical deletion until they are replicated to the given endpoints
val logicallyDeleted: Future[Long] =
  endpoint.delete("L1", 100L, Set("remoteEndpointId1", "remoteEndpointId2"))

logicallyDeleted onComplete {
  case Success(sequenceNr) => // events up to sequenceNr are logically deleted
  case Failure(e) => // deletion failed
}

This method returns the sequence number up to which events are logically deleted. The returned sequence number can differ from the requested one (here 100L), if

  • the log’s current sequence number is smaller than the requested number. In this case the current sequence number is returned.
  • there was a previous successful deletion request with a higher sequence number. In this case that sequence number is returned.

Depending on the storage backend, this call also triggers physical deletion of events in a reliable background process that survives event log restarts. To defer physical deletion of not yet replicated events, the third parameter takes a set of ReplicationEndpoint ids. Events are not physically deleted until they are replicated to these endpoints. If the set is empty, asynchronous deletion is triggered immediately.

[1]A location can be a whole data center, a node within a data center or even a process on a single node, for example.
[2]Log names must be unique per replication endpoint. Replication connections are only established between logs of the same name.