Showing posts with label message-queue. Show all posts
Showing posts with label message-queue. Show all posts

Sunday, March 21, 2010

Thinking Asynchronous - Domain Modeling using Akka Transactors - Part 1

Followers of this blog must have known by now that I am a big fan of a clean domain model. And domain driven design, espoused by Eric Evans is the way to go when you are modeling a complex domain and would like to have your model survive for quite some time in the future. Recently I have been experimenting a bit with domain driven design using some amount of asynchronous message passing techniques particularly in the services and the storage layer.

The Repository, as Eric says, is the domain centric abstraction on top of your data storage layer. It gives your model back the feeling that you are dealing with domain concepts instead of marshalling data across your storage layers. Typically you have contracts for repositories at the aggregate root level. The underlying implementation commits to a platform (like JPA) and ensures that your object graph of the aggregate root rests in peace within the relational database. It need not be a relational database - it can be file system, it can be a NoSQL database. That's the power of abstraction that Repositories add to your model.

Ever since I started playing around with Erlang, I have been toying with thoughts of making repositories asynchronous. I blogged some of my thoughts in this post and even implemented a prototype using Scala actors.

Enter Akka and its lightweight actor model that offers transaction support over an STM. Akka offers seamless integration with a variety of persistence engines like Cassandra, MongoDB and Redis. It has plans of adding to this list many of the relational stores as well. The richness of the Akka stack makes for a strong case in designing a beautiful asynchronous repository abstraction.

Consider a very simple domain model for a Bank Account ..

case class Account(no: String, 
  name: String, 
  dateOfOpening: Date, 
  dateOfClose: Option[Date],
  balance: Float)

We can model typical operations on a bank account like Opening a New Account, Querying for the Balance of an Account, Posting an amount in an Account through message dispatch. Typical messages will look like the following in Scala ..

sealed trait AccountEvent
case class Open(from: String, no: String, name: String) extends AccountEvent
case class New(account: Account) extends AccountEvent
case class Balance(from: String, no: String) extends AccountEvent
case class Post(from: String, no: String, amount: Float) extends AccountEvent


Note all messages are immutable Scala objects, which will be dispatched by the client, intercepted by a domain service, which can optionally do some processing and validation, and then finally forwarded to the Repository.

In this post we will look at the final stage in the lifecycle of a message, which is how it gets processed by the Repository. In the next post we will integrate the whole along with an abstraction for a domain service. Along the way we will see many of the goodness that Akka transactors offer including support for fault tolerant processing in the event of system crashes.

trait AccountRepository extends Actor

class RedisAccountRepository extends AccountRepository {
  lifeCycle = Some(LifeCycle(Permanent))    
  val STORAGE_ID = "account.storage"

  // actual persistent store
  private var accounts = atomic { RedisStorage.getMap(STORAGE_ID) }

  def receive = {
    case New(a) => 
      atomic {
        accounts.+=(a.no.getBytes, toByteArray[Account](a))
      }

    case Balance(from, no) =>
      val b = atomic { accounts.get(no.getBytes) }
      b match {
        case None => reply(None)
        case Some(a) => 
          val acc = fromByteArray[Account](a).asInstanceOf[Account]
          reply(Some(acc.balance))
      }

      //.. other message handlers
  }

  override def postRestart(reason: Throwable) = {
    accounts = RedisStorage.getMap(STORAGE_ID)  
  }
}


The above snippet implements a message based Repository abstraction with an underlying implementation in Redis. Redis is an advanced key/value store that offers persistence for a suite of data structures like Lists, Sets, Hashes and more. Akka offers transparent persistence to a Redis storage through a common set of abstractions. In the above code you can change RedisStorage.getMap(STORAGE_ID) to CassandraStorage.getMap(..) and switch your underlying storage to Cassandra.

The above Repository works through asynchronous message passing modeled with Akka actors. Here are some of the salient points in the implementation ..

  1. Akka is based on the let-it-crash philosophy. You can design supervisor hierarchies that will be responsible for controlling the lifecycles of your actors. In the Actor abstraction you can configure how you would like to handle a crash. LifeCycle(Permanent) means that the actor will always be restarted by the supervisor in the event of a crash. It can also be Lifecycle(Temporary), which means that it will not be restarted and will be shut down using the shutdown hook that you provide. In our case we make the Repository resilient to crashes.

  2. accounts is the handle to a Map that gets persisted in Redis. Here we store all accounts that the clients open hashed by the account number. Have a look at the New message handler in the implementation.

  3. With Akka you can also provide a restart hook when you repository crashes and gets restarted automatically by the supervisor. postRestart is the hook where we re-initialize the Map structure.

  4. Akka uses multiverse, a Java based STM implementation for transaction handling. In the code mark your transactions using atomic {} and the underlying STM will take care of the rest. Instead of atomic, you can also use monadic for-comprehensions for annotating your transaction blocks. Have a look at Akka documentation for details.


Asynchronous message based implementations decouple the end points, do not block and offer more manageability in distribution of your system. Typical implementations of actor based models are very lightweight. Akka actors take around 600 bytes which means that you can have millions of actors even in a commodity machine. Akka supports various types of message passing semantics which you can use to organize interactions between your collaborating objects.

In the next post we will see how the Repository interacts with Domain Services to implement client request handling. And yeah, you got it right - we will use more of Akka actors.

Thursday, December 24, 2009

A case for hybrid SQL - NoSQL stack

Alex Popescu talks about Drizzle replication in his MyNoSql column. He makes a very interesting observation in his post regarding Drizzle's replication capabilities into a host of NoSQL storage backends ..

"Leaving aside the technical details — which are definitely interesting .., the solution using the Erlang AMQP .. implementation RabbitMQ .. — I think this replication layer could represent a good basis for SQL-NoSQL hybrid solutions".

We are going to see more and more of such hybrid solutions in the days to come. Drizzle does it at the product level. Using RabbitMQ as the transport, Drizzle can replicate data as serialized Java objects to Voldemort, as JSON marshalled objects to Memcached or as a hashmap to column family based Cassandra.

Custom implementation projects have also started using the hybrid stack of persistent stores. When you are dealing with real high volumes and patterns of access where you cannot use joins, anyway you need to denormalize. ORMs cease to be a part of your solution toolset. Data access patterns vary widely across the profile of clients using your system. If you are running an ecommerce suite and your product is launched you may have an explosive use of the shopping cart module. It makes every sense to move your shopping cart from the single relational data store where it was lying around and have it served through a more appropriate data store that lives up to the scalability requirements. It's not that you need to throw away your relational database that has served you so long. Like Alex mentioned, you can always go along with a hybrid model.

In one of our recent projects, we were using Oracle as the main relational database for a securities trading back office solution implementation. The database load was computed based on all calculations that were done initially. In a very late stage of the project a new requirement came up that needed heavy processing and storage of semi-structured data and meta-data from an external feed. Both the data and the meta-data were extensible which meant that it was difficult to model them with a fixed schema.

We could not afford frequent schema changes since it would entail long downtime of the production database. But there also was the requirement that after processing of these semi-structured data lots of them will have to be made available in the production database. We could have modeled it following the key/value paradigm in Oracle itself, which we were using anyway as the primary database. But that's again going down the age old saying of the hammer and nail story.

We decided to supplement the stack with another data store that fits the bill for this specific use case. We used MongoDB, that gave us phenomenal performance for our requirements. We were getting the feed from external data sources and loaded our MongoDB database with all the semi-structured data and meta-data. All necessary processing was done in MongoDB on those data and relevant information from MongoDB were pushed to JMS based queues for consumption by appropriate services that copied data asynchrnously to our Oracle servers.

What did we achieve with the above architecture ?

  1. Kept Oracle free to do what it does the best.

  2. Took away unnecessary load from production database servers.

  3. Introduced a document database for serving a requirement tailor made for its use - semi structured data, mainly reads, no constraints, no overhead of ORM ceremony. MongoDB supports a very clean programming model, a very decent query interface, simple to use and easy to convince your client.

  4. Used message based mapping to sync up data ASYNCHRONOUSLY between the nosql MongoDB and sql based Oracle. Each of the data stores were doing what they do the best, keeping us away from the blames of the hammer-nail paradigm.


With more and more of the nosql stores coming up, message based replication is going to play a very important role. Even within the nosql datastore, we are seeing choices of sql based storage backends being offered. Voldemort offers MySql as one of the storage backends - so the hybrid model starts right up there. It's always advisable to use multiple storage that fits your use case than trying to force-fit everything into a single paradigm.

Friday, March 20, 2009

Now serving - Message Queuing in Web Applications

Looks like asynchronous messaging is becoming quite a common paradigm of distribution in Web applications and large Web sites. The general theory is to do the absolutely essential user requirement stuff as part of the synchronous HTTP request/response cycle. And delegate the rest offline for the asynchronous queuing service to process. This leads to faster page reloads, optimal resource utilization and much better user experience. Gojko Azdic has a great post clarifying the misconceptions around messaging systems, as being only useful for big investment banks.

And here are some real life use cases ..

  • Flickr uses offline queuing systems to process notifications to contacts and third party partners when a user uploads a photo. Informing the user is of prime importance which Flickr does as part of the synchronous request/response processing, while the other notifications are processed by job queues running in parallel.

  • Digg uses Gearman to farm out similar processing of jobs in parallel while serving the absolute essential stuff synchronously.

  • And as recently presented in QCon by Evan Weaver, Twitter is becoming more and more centered around messaging middleware. Every incoming tweet triggers the messaging system for processing notifications to all followers, and this is done asynchronously through Kestrel, an implementation of a message queue written in Scala.


All these use cases lead to the conclusion that message queuing is becoming an increasingly potent medium of distribution even in Web based applications. The moot point is eventual consistency, that leads to a better scalability model. And, by the way, it looks to be about time that open source messaging platforms start getting more steam, so that every house does not have to reinvent the wheel and develop its own message queuing infrastructure.