Spark adapter

The Eventuate Spark adapter allows applications to consume events from event logs and to process them in Apache Spark. Writing processed events back to event logs is not possible yet but will be supported in future versions.

Note

The Spark adapter is only available for Scala 2.11 at the moment (see Download).

Warning

The Spark adapter is experimental. Expect major API changes.

Batch processing

SparkBatchAdapter supports event batch processing from event logs with a Cassandra storage backend. The batch adapter internally uses the Spark Cassandra Connector for exposing an event log as Spark RDD of DurableEvents:

import akka.actor.ActorSystem

import com.rbmhtechnology.eventuate.DurableEvent
import com.rbmhtechnology.eventuate.adapter.spark.SparkBatchAdapter

import org.apache.spark.rdd.RDD
import org.apache.spark.{ SparkConf, SparkContext }

implicit val system = ActorSystem("spark-example")

val sparkConfig = new SparkConf(true)
  .set("spark.cassandra.connection.host", "127.0.0.1")
  .set("spark.cassandra.connection.port", "9042")
  .set("spark.cassandra.auth.username", "cassandra")
  .set("spark.cassandra.auth.password", "cassandra")
  .setAppName("adapter")
  .setMaster("local[4]")

val logId = "example"
val sparkContext: SparkContext =
  new SparkContext(sparkConfig)

// Create an Eventuate Spark batch adapter
val sparkBatchAdapter: SparkBatchAdapter =
  new SparkBatchAdapter(sparkContext, system.settings.config)

// Expose all events of given event log as Spark RDD
val events: RDD[DurableEvent] =
  sparkBatchAdapter.eventBatch(logId)

// Expose events of given event log as Spark RDD, starting at sequence number 3
val eventsFrom: RDD[DurableEvent] =
  sparkBatchAdapter.eventBatch(logId, fromSequenceNr = 3L)

A SparkBatchAdapter is instantiated with a SparkContext, configured for connecting to a Cassandra storage backend, and a Custom event serialization configuration (if any). The eventBatch method exposes an event log with given logId as RDD[DurableEvent], optionally starting from a custom sequence number.

Event logs can span several partitions in a Cassandra cluster and the batch adapter reads from these partitions concurrently. Hence, events in the resulting RDD are ordered per partition. Applications that require a total order by localSequenceNr can sort the resulting RDD:

// By default, events are sorted by sequence number *per partition*.
// Use .sortBy(_.localSequenceNr) to create a totally ordered RDD.
val eventsSorted: RDD[DurableEvent] = events.sortBy(_.localSequenceNr)

Exposing Spark DataFrames directly is not possible yet but will be supported in future versions. In the meantime, applications should convert RDDs to DataFrames or Datasets as shown in the following example:

import org.apache.spark.sql.{ Dataset, DataFrame, SQLContext }

case class DomainEvent(sequenceNr: Long, payload: String)

val sqlContext: SQLContext = new SQLContext(sparkContext)
import sqlContext.implicits._

// Create a DataFrame from RDD[DurableEvent]
val eventsDF: DataFrame = events.map(event =>
  DomainEvent(event.localSequenceNr, event.payload.toString)).toDF()

// Create a Dataset from RDD[DurableEvent]
val eventDS: Dataset[DomainEvent] = events.map(event =>
  DomainEvent(event.localSequenceNr, event.payload.toString)).toDS()

Hint

The full example source code is in SparkBatchAdapterExample.scala

Stream processing

SparkStreamAdapter supports event stream processing from event logs with any storage backend. The stream adapter connects to the ReplicationEndpoint[1] of an event log for exposing it as Spark DStream of DurableEvents:

import com.rbmhtechnology.eventuate._
import com.rbmhtechnology.eventuate.adapter.spark.SparkStreamAdapter

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream

val sparkConfig = new SparkConf(true)
  .setAppName("adapter")
  .setMaster("local[4]")
val sparkContext = new SparkContext(sparkConfig)
val sparkStreamingContext = new StreamingContext(sparkContext, Seconds(1))

// Create an Eventuate Spark stream adapter
val sparkStreamAdapter = new SparkStreamAdapter(
  sparkStreamingContext, system.settings.config)

// Create a DStream from event log L by connecting to its replication endpoint
val stream: DStream[DurableEvent] = sparkStreamAdapter.eventStream(
  id = "s1", host = "127.0.0.1", port = 2552, logName = "L",
  fromSequenceNr = 1L, storageLevel = StorageLevel.MEMORY_ONLY)

// For processing in strict event storage order, use repartition(1)
stream.repartition(1).foreachRDD(rdd => rdd.foreach(println))

// Start event stream processing
sparkStreamingContext.start()

A SparkStreamAdapter is instantiated with a Spark StreamingContext and a Custom event serialization configuration (if any). The eventStream method exposes an event log with given logName as DStream[DurableEvent]. The stream is updated by interacting with the event log’s replication endpoint at given host and port.

The stream starts from the given fromSequenceNr and is updated with both, replayed events and newly written events. The storage level of events in Spark can be set with the storageLevel parameter. Applications that want to enforce event processing in strict event log storage order should repartition the stream with .repartition(1), as shown in the example.

For persisting the stream processing progress, an application should store the last processed sequence number at a custom place. When the application is restarted, the stored sequence number should be used as argument to the eventStream call. Later versions will additionally support internal storage of event processing progresses.

Hint

The full example source code is in SparkStreamAdapterExample.scala

[1]See also Replication endpoints in the reference documentation.