Showing posts with label actor. Show all posts
Showing posts with label actor. Show all posts

Sunday, August 02, 2009

MongoDB for Akka Persistence

Actors and message passing have been demonstrated to be great allies in implementing some of the specific use cases of concurrent applications. Message passing concurrency promotes loosely coupled application components, and hence has the natural side-effect of almost infinite scalability. But as Jonas Boner discusses in his JavaOne 2009 presentation, there are many examples in the real world today that have to deal with shared states, transactions and atomicity of operations. Software Transactional Memory provides a viable option towards these use cases, as has been implemented in Clojure and Haskell.

Akka, designed by Jonas Boner, offers Transactors, that combine the benefits of actors and STM, along with a pluggable storage model. It provides a unified set of data structures managed by the STM and backed by a variety of storage engines. It currently supports Cassandra as the storage model out of the box.

Over the weekend I was trying out MongoDB as yet another out of the box persistence options for Akka transactors. MongoDB is a high performance, schema free document oriented database that stores documents in the form of BSON, an enhanced version of JSON. The main storage abstraction is a Collection, which can loosely be equated to a table in a relational database. Besides support for replication, fault tolerance and sharding capabilities, the aspect which makes MongoDB much more easier to use is the rich querying facilities. It supports lots of built-in query capabilities with conditional operators, regular expressions and powerful variants of SQL where clauses on the document model .. Here are some examples of query filters ..


db.myCollection.find( { $where: "this.a > 3" });
db.myCollection.find( { "field" : { $gt: value1, $lt: value2 } } );  // value1 < field < value2



and useful convenience functions ..


db.students.find().limit(10).forEach( ... )  // limit the fetch count
db.students.find().skip(..) // skip some records



In Akka we can have a collection in MongoDB that can be used to store all transacted data keyed on a transaction id. The set of data can be stored in a HashMap as key-value pairs. Have a look at the following diagram for the scheme of data storage using MongoDB Collections ..



Akka TransactionalState offers APIs to publish the appropriate storage engines depending on the configuration ..


class TransactionalState {
  def newPersistentMap(
    config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = 
    config match {
    case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
    case MongoStorageConfig() => new MongoPersistentTransactionalMap
  }

  def newPersistentVector(
    config: PersistentStorageConfig): TransactionalVector[AnyRef] = 
    config match {
    //..
  }

  def newPersistentRef(
    config: PersistentStorageConfig): TransactionalRef[AnyRef] = 
    config match {
    //..
  }
  //..
}



and each transactional data structure defines the transaction semantics for the underlying structure that it encapsulates. For example, for a PersistentTransactionalMap we have the following APIs ..


abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {

  protected[kernel] val changeSet = new HashMap[K, V]

  def getRange(start: Int, count: Int)

  // ---- For Transactional ----
  override def begin = {}
  override def rollback = changeSet.clear

  //.. additional map semantics .. get, put etc.
}



A concrete implementation defines the rest of the semantics used to handle transactional data. The concrete implementation is parameterized with the actual storage engine that can be plugged in for specific implementations.


trait ConcretePersistentTransactionalMap extends PersistentTransactionalMap[String, AnyRef] {
  val storage: Storage
  
  override def getRange(start: Int, count: Int) = {
    verifyTransaction
    try {
      storage.getMapStorageRangeFor(uuid, start, count)
    } catch {
      case e: Exception => Nil
    }
  }

  // ---- For Transactional ----
  override def commit = {
    storage.insertMapStorageEntriesFor(uuid, changeSet.toList)
    changeSet.clear
  }

  override def contains(key: String): Boolean = {
    try {
      verifyTransaction
      storage.getMapStorageEntryFor(uuid, key).isDefined
    } catch {
      case e: Exception => false
    }
  }

  //.. others 
}



Note the use of abstract val in the above implementation that will be concretized when we make a Mongo map ..


class MongoPersistentTransactionalMap 
  extends ConcretePersistentTransactionalMap {
  val storage = MongoStorage
}



For the Storage part, we have another trait which abstracts the storage specific APIs ..


trait Storage extends Logging {
  def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]])
  def removeMapStorageFor(name: String)
  def getMapStorageEntryFor(name: String, key: String): Option[AnyRef]
  def getMapStorageSizeFor(name: String): Int
  def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]]
  def getMapStorageRangeFor(name: String, start: Int, 
    count: Int): List[Tuple2[String, AnyRef]]
}



I am in the process of implementing a concrete implementation of storage using MongoDB, which will look like the following ..


object MongoStorage extends Storage {
  val KEY = "key"
  val VALUE = "val"
  val db = new Mongo(..);  // needs to come from configuration
  val COLLECTION = "akka_coll"
  val coll = db.getCollection(COLLECTION)
  
  private[this] val serializer: Serializer = ScalaJSON
  
  override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) {
    import java.util.{Map, HashMap}
    val m: Map[String, AnyRef] = new HashMap
    for ((k, v) <- entries) {
      m.put(k, serializer.out(v))
    }
    coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
  }
  
  override def removeMapStorageFor(name: String) = {
    val q = new BasicDBObject
    q.put(KEY, name)
    coll.remove(q)
  }
  //.. others
}



As the diagram above illustrates, every transaction will have its own DBObject in the Mongo Collection, which will store a HashMap that contains the transacted data set. Using MongoDB's powerful query APIs we can always get to a specific key/value pair for a particular transaction as ..


// form the query object with the transaction id
val q = new BasicDBObject
q.put(KEY, name)

// 1. use the query object to get the DBObject (findOne)
// 2. extract the VALUE which has the HashMap of transacted data set
// 3. query on the HashMap on the passed in key to get the value
// 4. use the scala-json serializer to get back the Scala object
serializer.in(
  coll.findOne(q)
      .get(VALUE).asInstanceOf[JMap[String, AnyRef]]
      .get(key).asInstanceOf[Array[Byte]], None)



MongoDB looks a cool storage engine and has already been used in production as a performant key/value store. It looks promising to be used as the backup storage engine for persistent transactional actors as well. Akka transactors look poised to evolve as a platform that can deliver the goods for stateful STM based as well as stateless message passing based concurrent applications. I plan to complete the implementation in the near future and, if Jonas agrees will be more than willing to contribute to the Akka master.

Open source is as much about contributing, as it is about using ..

Monday, April 13, 2009

Objects as Actors ?

Tony Arcieri, creator of Reia, recently brought up an interesting topic on unifying actors and objects. Talking about Scala and his disliking towards Scala's implementation of actors as an additional entity on top of objects, he says, it would have been a more useful abstraction to model all objects as actors. Doing it that way would eschew many of the overlapping functions that both of the object and actor semantics have implemented today. In Reia, which is supposed to run on top of BEAM (the Erlang VM), he has decided to make all objects as actors.

The way I look at it, this is mostly a decision of the philosophy of the language design. Scala is targetted to be a general purpose programming language, where concurrency and distribution are not the central concerns to address as part of the core language design. The entire actor model has hence been implemented as a library that integrates seamlessly with the rest of Scala's core object/functional engineering. This is a design decision which the language designers did take upfront - hence objects in Scala, by default, bind to local invocation semantics, that enable it to take advantage of all the optimizations and efficiencies of being collocated in the same process.

The actor model was designed primarily to address the concerns of distributed programming. As Jonas Boner recently said on Twitter - "The main benefit of the Actor model is not simpler concurrency but fault-tolerance and reliability". And for fault tolerance you need to have at least two machines running your programs. We all know the awesome capabilities of fault tolerance that the Erlang actor model offers through supervisors, linked actors and transparent restarts. Hence languages like Erlang, which address the concerns of concurrency and distribution as part of the core, have decided to implement actors as their basic building block of abstractions. This was done with the vision that the Erlang programming style will be based on simple primitives of process spawning and message passing, both of which implemented as low overhead primitives in the virtual machine. The philosophy of Scala is, however, a bit different. Though still it is not that difficult to implement the Active Object pattern on top of the Scala actors platform.

Erlang allows you to write programs that will run without any change in a regular non-distributed Erlang session, on two different Erlang nodes running on the same computer and as well on Erlang nodes running on two physically separated computers either in the same LAN or over the internet. It can do this, because the language designers decided to map the concurrency model naturally to distributed deployments extending the actor model beyond VM boundaries.

Another language Clojure, which also has strong concurrency support decided to go the Scala way addressing distribution concerns. Distribution is not something that Rich Hickey decided to hardwire into the core of the language. Here is what he says about it ..

"In Erlang the concurrency model is (always) a distributed one and in Clojure it is not. I have some reservations about unifying the distributed and non-distributed models [..], and have decided not to do so in Clojure, but I think Erlang, in doing so, does the right thing in forcing programmers to work as if the processes are distributed even when they are not, in order to allow the possibility of transparent distribution later, e.g. in the failure modes, the messaging system etc. However, issues related to latency, bandwidth, timeouts, chattiness, and costs of certain data structures etc remain."

And finally, on the JVM, there are a host of options that enable distribution of your programs, which is yet another reason not to go for language specific solutions. If you are implementing your language on top of the Erlang VM, it's all but natural to leverage the awesome power of cross virtual machine distribution capabilities that it offers. While for JVM, distribution can better be left to specialized frameworks.