Akka Streams adapter

This adapter provides an Akka Streams interface for Eventuate Event logs. It allows applications to consume event streams from event logs, write event streams to event logs, build idempotent event stream processing networks and exchange events with other systems that provide a Reactive Streams API.

The examples in the following subsections depend on the event log references logA, logB and logC. Here, they reference isolated Local event logs with a LevelDB storage backend:

import akka.actor.{ ActorRef, ActorSystem }
import akka.stream.{ ActorMaterializer, Materializer }
import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog

implicit val system: ActorSystem = ActorSystem("example")
implicit val materializer: Materializer = ActorMaterializer()

val logAId = "A"
val logBId = "B"
val logCId = "C"

val logA: ActorRef = createLog(logAId)
val logB: ActorRef = createLog(logBId)
val logC: ActorRef = createLog(logCId)

def createLog(id: String): ActorRef =
  system.actorOf(LeveldbEventLog.props(id))

We could also have used the local references of Replicated event logs but this had no influence on the examples. Obtaining the local references of replicated event logs is explained in section Replication endpoints.

Event source

An event source can be created with DurableEventSource from an event log reference. The result is a Graph[SourceShape[DurableEvent], ActorRef] which can be used with the Akka Streams Scala DSL or Java DSL. Here, the Scala DSL is used to create a Source[DurableEvent, ActorRef] from logA:

import akka.stream.scaladsl.Source
import com.rbmhtechnology.eventuate.adapter.stream.DurableEventSource

val source1 = Source.fromGraph(DurableEventSource(logA))

A DurableEventSource does not only emit events that already exist in an event log but also those events that have been written to the event log after the source has been materialized. A DurableEvent contains the application-specific event (payload field) and its metadata (all other fields).

To create a source that emits elements from a given sequence number and/or for a given aggregate id only, the parameters fromSequenceNr and aggregateId should be used, respectively. If aggregateId is None events with any aggregate id, defined or not, are emitted (see also Event routing).

val source2 = Source.fromGraph(DurableEventSource(
  logA, fromSequenceNr = 12414, aggregateId = Some("user-17")))

Note

DurableEventSource emits events in local storage order. Local storage order is consistent with the potential causality of events which is tracked with Vector clocks. Find more details in section Event logs. Emission in local storage order also means that the order is repeatable across source materializations.

Event writer

An event writer is a stream stage that writes input events to an event log in stream order. It can be created with DurableEventWriter from a unique writer id and an event log reference. The result is a Graph[FlowShape[DurableEvent, DurableEvent], NotUsed] that emits the written events with updated metadata.

The following example converts a stream of Strings to a stream of DurableEvents and writes that stream to logA. It then extracts payload and localSequenceNr from the written events and prints the results to stdout:

import akka.stream.scaladsl.Source
import com.rbmhtechnology.eventuate.DurableEvent
import com.rbmhtechnology.eventuate.adapter.stream.DurableEventWriter

val writerId = "writer-1"

Source(List("a", "b", "c"))
  .map(DurableEvent(_))
  .via(DurableEventWriter(writerId, logA))
  .map(event => (event.payload, event.localSequenceNr))
  .runForeach(println)

// prints (on first run):
// (a,1)
// (b,2)
// (c,3)

The writer sets the emitterId of the input events to writerId. The processId, localLogId, localSequenceNr and systemTimestamp are set by the event log. The event log also updates the local time of vectorTimestamp. All other DurableEvent fields are written to the event log without modification.

Input events are batched if they are produced faster than they can be written. The maximum batch size can be configured with eventuate.log.write-batch-size. On write failure, the writer fails the stream.

Event processor

An event processor is a stream stage that expects input events from one or more source event logs, processes these events with application-defined processing logic and writes the processed events to a target event log. An event processor can be a stateful processor or a stateless processor and can be created with DurableEventProcessor.

Stateful processors apply processing logic of type (S, DurableEvent) => (S, Seq[O]) to input events where S is the type of the processing state and O is the type of the processed payload. Stateless processors apply processing logic of type DurableEvent => Seq[O] to input events.

Processing logic can filter, transform and/or split input events. To filter an input event from the event stream, an empty sequence should be returned. To transform an input event into one output event a sequence of length 1 should be returned. To split an input event into multiple output events a sequence of corresponding length should be returned.

Note

Application-defined processing logic can read the payload and metadata from the input event but can only return updated payloads. This makes metadata update a processor-internal concern, ensuring that event processing to the target log works correctly and is idempotent.

The following example is a stateless processor that consumes events from logA and writes the processing results to logB. The processing logic filters an input event if the payload equals a, it appends the source sequence number if the payload equals b and duplicates the input event if the payload equals c. Events with other payloads remain unchanged:

import akka.stream.scaladsl.Source
import com.rbmhtechnology.eventuate.DurableEvent
import com.rbmhtechnology.eventuate.adapter.stream.DurableEventSource
import com.rbmhtechnology.eventuate.adapter.stream.DurableEventProcessor._
import scala.collection.immutable.Seq

val processorId1 = "processor-1"

def statelessProcessingLogic(event: DurableEvent): Seq[Any] = event.payload match {
  case "a" => Seq()
  case "b" => Seq(s"b-${event.localSequenceNr}")
  case "c" => Seq("c", "c")
  case evt => Seq(evt)
}

Source.fromGraph(DurableEventSource(logA))
  .via(statelessProcessor(processorId1, logB)(statelessProcessingLogic))
  .map(event => (event.payload, event.localSequenceNr))
  .runForeach(println)

// prints (on first run):
// (b-2,1)
// (c,2)
// (c,3)

The example assumes that logA still contains the events that have been written by the event writer in the previous section. The next example uses a stateful processor that counts the number of events with a b payload and appends that number to all events. The processor consumes events from logA and writes the processing results to logC:

val processorId2 = "processor-2"

def statefulProcessingLogic(count: Int, event: DurableEvent): (Int, Seq[String]) = {
  event.payload match {
    case "b" =>
      val updated = count + 1
      (updated, Seq(s"b-$updated"))
    case evt =>
      (count, Seq(s"$evt-$count"))
  }
}

Source.fromGraph(DurableEventSource(logA))
  .via(statefulProcessor(processorId2, logC)(0)(statefulProcessingLogic))
  .map(event => (event.payload, event.localSequenceNr))
  .runForeach(println)

// prints (on first run):
// (a-0,1)
// (b-1,2)
// (c-1,3)

Note

When running the examples a second time or more often, no events will be written to logB and logC because the processors will detect previously processed events as duplicates and discard them. This makes event processing idempotent i.e. it can be re-started after failures without generating duplicates in the target event logs.

Input events are batched if they are produced faster than they can be processed. The maximum batch size can be configured with eventuate.log.write-batch-size. On write failure, a processor fails the stream.

Consuming from a shared source

In the above example, both processors use their own DurableEventSource to read from logA. A better alternative is to use a single source and broadcast the events to both processors which reduces the read load on logA:

import akka.stream.ClosedShape
import akka.stream.scaladsl._

val source = DurableEventSource(logA)
val sink = Flow[DurableEvent]
  .map(event => (event.payload, event.localSequenceNr))
  .to(Sink.foreach(println))

val processor1 = statelessProcessor(processorId1, logB)(statelessProcessingLogic)
val processor2 = statefulProcessor(processorId2, logC)(0)(statefulProcessingLogic)

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val bcast = b.add(Broadcast[DurableEvent](2))

  source ~> bcast
  bcast ~> processor1 ~> sink
  bcast ~> processor2 ~> sink

  ClosedShape
})

graph.run()

Consuming from multiple sources

An event processor may also consume events from multiple sources. In the following example, the processor consumes the merged stream from logA and logB and writes the processing results to logC:

Source.fromGraph(DurableEventSource(logA))
  .merge(DurableEventSource(logB))
  .via(statelessProcessor(processorId1, logC)(statelessProcessingLogic))
  .map(event => (event.payload, event.localSequenceNr))
  .runForeach(println)

Note

The example assumes that logA and logB are independent i.e. have no causal relationship. A plain stream merge is sufficient in this case. If these two logs had a causal relationship (e.g. after having processed events from logA into logB) a plain stream merge may generate a stream that is not consistent with potential causality.

Processing such a stream may generate vectorTimestamps that indicate concurrency of otherwise causally related events. This is acceptable for some applications but many others require stream merges that preserve causality. We will therefore soon provide a causal stream merge stage.

Event processing progress tracking

An event processor does not only write processed events to a target event log but also writes the latest source log sequence number to that log for tracking processing progress. When composing an event processing stream, an application should first read processing progresses from target logs in order to initialize DurableEventSources with appropriate fromSequenceNrs. Eventuate provides ProgressSource for reading the processing progress for a given source from a target log.

In Consuming from multiple sources, for example, the processing progress for logA and logB is stored at logC. The following example creates two sources, sourceA and sourceB, that first read the progress for values for logA and logB from logC, respectively, and then create the actual DurableEventSources with an appropriate fromSequenceNr:

import com.rbmhtechnology.eventuate.adapter.stream.ProgressSource

val progressSourceA = Source.fromGraph(ProgressSource(logAId, logC))
val progressSourceB = Source.fromGraph(ProgressSource(logBId, logC))

val sourceA = progressSourceA.flatMapConcat { progress =>
  Source.fromGraph(DurableEventSource(logA, fromSequenceNr = progress))
}

val sourceB = progressSourceB.flatMapConcat { progress =>
  Source.fromGraph(DurableEventSource(logB, fromSequenceNr = progress))
}