Showing posts with label akka. Show all posts
Showing posts with label akka. Show all posts

Wednesday, July 24, 2013

Scala Redis client goes non blocking : uses Akka IO

scala-redis is getting a new non blocking version based on a kernel implemented with the new Akka IO. The result is that all APIs are non blocking and return a Future. We are trying to keep the API as close to the blocking version as possible. But bits rot and some of the technical debt need to be repaid. We have cleaned up some of the return types which had unnecessary Option[] wrappers, made some improvements and standardizations on the API type signatures and also working on making the byte array manipulation faster using akka.util.ByteString at the implementation level. We also have plans of using the Akka IO pipeline for abstracting the various stages of handling Redis protocol request and response.

As of today we have quite a bit ready for review by the users. The APIs may change a bit here and there, but the core APIs are up there. There are a few areas which have not yet been implemented like PubSub or clustering. Stay tuned for more updates on this blog .. Here are a few code snippets that demonstrate the usage of the APIs ..

Non blocking get/set

@volatile var callbackExecuted = false

val ks = (1 to 10).map(i => s"client_key_$i")
val kvs = ks.zip(1 to 10)

val sets: Seq[Future[Boolean]] = kvs map {
  case (k, v) => client.set(k, v)
}

val setResult = Future.sequence(sets) map { r: Seq[Boolean] =>
  callbackExecuted = true
  r
}

callbackExecuted should be (false)
setResult.futureValue should contain only (true)
callbackExecuted should be (true)

callbackExecuted = false
val gets: Seq[Future[Option[Long]]] = ks.map { k => client.get[Long](k) }
val getResult = Future.sequence(gets).map { rs =>
  callbackExecuted = true
  rs.flatten.sum
}

callbackExecuted should be (false)
getResult.futureValue should equal (55)
callbackExecuted should be (true)

Composing through sequential combinators

val key = "client_key_seq"
val values = (1 to 100).toList
val pushResult = client.lpush(key, 0, values:_*)
val getResult = client.lrange[Long](key, 0, -1)

val res = for {
  p <- pushResult.mapTo[Long]
  if p > 0
  r <- getResult.mapTo[List[Long]]
} yield (p, r)

val (count, list) = res.futureValue
count should equal (101)
list.reverse should equal (0 to 100)

Error handling using Promise Failure

val key = "client_err"
val v = client.set(key, "value200")
v.futureValue should be (true)

val x = client.lpush(key, 1200)
val thrown = evaluating { x.futureValue } should produce [TestFailedException]
thrown.getCause.getMessage should equal ("ERR Operation against a key holding the wrong kind of value")
Feedbacks welcome, especially on the APIs and their usage. All code are in Github with all tests in the test folder. Jisoo Park (@guersam) has been doing an awesome job contributing a lot to all the goodness that's there in the repo. Thanks a lot for all the help ..

Sunday, January 15, 2012

Event Sourcing, Akka FSMs and functional domain models

I blogged on Event Sourcing and functional domain models earlier. In this post I would like to share more of my thoughts on the same subject and how with a higher level of abstraction you can make your domain aggregate boundary more resilient and decoupled from external references.

When we talk about a domain model, the Aggregate takes the centerstage. An aggregate is a core abstraction that represents the time invariant part of the domain. It's an embodiment of all states that the aggregate can be in throughout its lifecycle in the system. So, it's extremely important that we take every pain to distil the domain model and protect the aggregate from all unwanted external references. Maybe an example will make it clearer.

Keeping the Aggregate pure

Consider a Trade model as the aggregate. By Trade, I mean a security trade that takes place in the stock exchange where counterparties exchange securities and currencies for settlement. If you're a regular reader of my blog, you must be aware of this, since this is almost exclusively the domain that I talk of in my blog posts.

A trade can be in various states like newly entered, value date added, enriched with tax and fee information, net trade value computed etc. In a trading application, as a trade passes through the processing pipeline, it moves from one state to another. The final state represents the complete Trade object which is ready to be settled between the counterparties.

In the traditional model of processing we have the final snapshot of the aggregate - what we don't have is the audit log of the actual state transitions that happened in response to the events. With event sourcing we record the state transitions as a pipeline of events which can be replayed any time to rollback or roll-forward to any state of our choice. Event sourcing is coming up as one of the potent ways to model a system and there are lots of blog posts being written to discuss about the various architectural strategies to implement an event sourced application.

That's ok. But whose responsibility is it to manage these state transitions and record the timeline of changes ? It's definitely not the responsibility of the aggregate. The aggregate is supposed to be a pure abstraction. We must design it as an immutable object that can respond to events and transform itself into the new state. In fact the aggregate implementation should not be aware of whether it's serving an event sourced architecture or not.

There are various ways you can model the states of an aggregate. One option that's frequently used involves algebraic data types. Model the various states as a sum type of products. In Scala we do this as case classes ..

sealed abstract class Trade {
  def account: Account
  def instrument: Instrument
  //..
}

case class NewTrade(..) extends Trade {
  //..
}

case class EnrichedTrade(..) extends Trade {
  //..
}

Another option may be to have one data type to model the Trade and model states as immutable enumerations with changes being effected on the aggregate as functional updates. No in place mutation, but use functional data structures like zippers or type lenses to create the transformed object in the new state. Here's an example where we create an enriched trade out of a newly created one ..

// closure that enriches a trade
val enrichTrade: Trade => Trade = {trade =>
  val taxes = for {
    taxFeeIds      <- forTrade // get the tax/fee ids for a trade
    taxFeeValues   <- taxFeeCalculate // calculate tax fee values
  }
  yield(taxFeeIds ° taxFeeValues)
  val t = taxFeeLens.set(trade, taxes(trade))
  netAmountLens.set(t, t.taxFees.map(_.foldl(principal(t))((a, b) => a + b._2)))
}

But then we come back to the same question - if the aggregate is distilled to model the core domain, who handles the events ? Someone needs to model the event changes, effect the state transitions and take the aggregate from one state to the next.

Enter Finite State Machines

In one of my projects I used the domain service layer to do this. The domain logic for effecting the changes lies with the aggregate, but they are invoked from the domain service in response to events when the aggregate reaches specific states. In other words I model the domain service as a finite state machine that manages the lifecycle of the aggregate.

In our example a Trading Service can be modeled as an FSM that controls the lifecycle of a Trade. As the following ..

import TradeModel._

class TradeLifecycle(trade: Trade, timeout: Duration, log: Option[EventLog]) 
  extends Actor with FSM[TradeState, Trade] {
  import FSM._

  startWith(Created, trade)

  when(Created) {
    case Event(e@AddValueDate, data) =>
      log.map(_.appendAsync(data.refNo, Created, Some(data), e))
      val trd = addValueDate(data)
      notifyListeners(trd) 
      goto(ValueDateAdded) using trd forMax(timeout)
  }

  when(ValueDateAdded) {
    case Event(StateTimeout, _) =>
      stay

    case Event(e@EnrichTrade, data) =>
      log.map(_.appendAsync(data.refNo, ValueDateAdded, None,  e))
      val trd = enrichTrade(data)
      notifyListeners(trd)
      goto(Enriched) using trd forMax(timeout)
  }

  when(Enriched) {
    case Event(StateTimeout, _) =>
      stay

    case Event(e@SendOutContractNote, data) =>
      log.map(_.appendAsync(data.refNo, Enriched, None,  e))
      sender ! data
      stop
  }

  initialize
}

The snippet above contains a lot of other details which I did not have time to prune. It's actually part of the implementation of an event sourced trading application that uses asynchronous messaging (actors) as the backbone for event logging and reaching out to multiple consumers based on the CQRS paradigm.

Note that the FSM model above makes it very explicit about the states that the Trade model can reach and the events that it handles while in each of these states. Also we can use this FSM technique to log events (for event sourcing), notify listeners about the events (CQRS) in a very much declarative manner as implemented above.

Let me know in the comments what are your views on this FSM approach towards handling state transitions in domain models. I think it helps keep aggregates pure and helps design domain services that focus on serving specific aggregate roots.

I will be talking about similar stuff, Akka actor based event sourcing implementations and functional domain models in PhillyETE 2012. Please drop by if this interests you.

Monday, January 31, 2011

CQRS with Akka actors and functional domain models

Fighting with impedance mismatch has been quite a losing battle so far in the development of software systems. We fight mismatch to handle stateful interactions with a stateless protocol. We fight mismatch of paradigms between the user interface layers, domain layers and data layers. Nothing concrete has emerged till date, though there has been quite a number of efforts to keep the organization of persistent data as close as possible to the way the domain layer uses them.

Command Query separation is nothing new. It's yet another attempt to manage impedance mismatch between how your application uses data and how the underlying store manages data so that transactional updates can be served with equal agility as read-only queries. Bertrand Meyer made this distinction long back when he mentioned that ..

The features that characterize a class are divided into commands and queries. A command serves to modify objects, a query to return information about objects.
Processing a command involves manipulation of state - hence the underlying data model needs to be organized in a way that makes updation easier. A query needs to return data in the format the user wants to view them. Hence it makes sense to organize your storage likewise so that we don't need to process expensive joins in order to process queries. This leads to a dichotomy in the way the application, as a whole, requires processing of data. Command Query Separation (CQRS) endorses this separation. Commands update state - hence produce side-effects. Queries are like pure functions and should be designed using applicative, completely side-effect free approaches. So, the CQRS principle, as Bertrand Meyer said is ..

Functions should not produce abstract side-effects.
Greg Young has delivered some great sessions on DDD and CQRS. In 2008 he said "A single model cannot be appropriate for reporting, searching and transactional behavior". We have at least two models - one that processes commands and feeds changes to another model which serves user queries and reports. The transactional behavior of the application gets executed through the rich domain model of aggregates and repositories, while the queries are directly served from a de-normalized data model.

CQRS and Event Sourcing

One other concept that goes alongside CQRS is that of Event Sourcing. I blogged about some of the benefits that it has quite some time back and implemented event sourcing using Scala actors. The point where event sourcing meets CQRS is how we model the transactions of the domain (resulting from commands) as a sequence of events in the underlying persistent store. Modeling persistence of transactions as an event stream helps record updates as append only event snapshots that can be replayed as and when required. All updates in the domain model are now being translated into inserts in the persistence model. And this gives us an explicit view of all state changes in the domain model.

Over the last few days I have been playing around implementing CQRS and Event Sourcing within a domain model using the principles of functional programming and actor based asynchronous messaging. One of the big challenges is to model updates in a functional way and store them as sequences of event streams. In this post, I will share some of the experiences and implementation snippets that I came up with over the last few days. The complete implementation, so far, can be found in my github repository. It's very much a work in progress, which I hope to enrich more and more as I get some time.

A simple domain model

First the domain model and the aggregate root that will be used to publish events .. It's a ridiculously simple model for a security trade, with lots and lots of stuff elided for simplicity ..

// the main domain class
case class Trade(account: Account, instrument: Instrument, refNo: String, 
  market: Market, unitPrice: BigDecimal, quantity: BigDecimal, 
  tradeDate: Date = Calendar.getInstance.getTime, valueDate: Option[Date] = None, 
  taxFees: Option[List[(TaxFeeId, BigDecimal)]] = None, 
  netAmount: Option[BigDecimal] = None) {

  override def equals(that: Any) = refNo == that.asInstanceOf[Trade].refNo
  override def hashCode = refNo.hashCode
}


For simplicity, we make reference number a unique identifier for a trade. So all comparisons and equalities will be based on reference numbers only.

In a typical application, the entry point for users is the service layer that exposes facade methods that render use cases for the business. In a trading application, two of the most common services that need to be done on a security trade are it's value date computation and it's enrichment. So when the trade passes through its processing pipeline it gets its value date updated and then gets enriched with the applicable taxes and fees and finally its worth net cash value.

If you are a client using these services (again, overly elided for simplicity) you may have the following service methods ..

class TradingClient {
  // create a trade : wraps the model method
  def newTrade(account: Account, instrument: Instrument, refNo: String, market: Market,
    unitPrice: BigDecimal, quantity: BigDecimal, tradeDate: Date = Calendar.getInstance.getTime) =
      //..

  // enrich trade
  def doEnrichTrade(trade: Trade) = //..

  // add value date
  def doAddValueDate(trade: Trade) = //..

  // a sample query
  def getAllTrades = //..
}


In a typical implementation these methods will invoke the domain artifacts of repositories that will either query aggregate roots or do updates on them before being persisted in the underlying store. In a CQRS implementation, the domain model will be updated but the persistent store will record these updates as event streams.

So now we have the first problem - how do we represent updates in the functional world so that we can compose them later when we need to snapshot the persistent aggregate root?

Lenses FTW

I used type lenses for representing updates functionally. Lenses solve the problem of representing updates so that they can be composed. A lens between a set of source structures S and a set of target structures T is a pair of functions:
- get from S to T
- putback from T x S to S

For more on lenses, have a look at this presentation by Benjamin Pierce. scalaz contains lenses as part of its distribution and models a lens as a case class containing a pair of get and set functions ..

case class Lens[A,B](get: A => B, set: (A,B) => A) extends Immutable { //..


Here are some examples from my domain model for updating a trade with its value date or enriching it with tax/fee values and net cash value ..

// add tax/fees
val taxFeeLens: Lens[Trade, Option[List[(TaxFeeId, BigDecimal)]]] = 
  Lens((t: Trade) => t.taxFees, 
       (t: Trade, tfs: Option[List[(TaxFeeId, BigDecimal)]]) => t.copy(taxFees = tfs))

// add net amount
val netAmountLens: Lens[Trade, Option[BigDecimal]] = 
  Lens((t: Trade) => t.netAmount, 
       (t: Trade, n: Option[BigDecimal]) => t.copy(netAmount = n))

// add value date
val valueDateLens: Lens[Trade, Option[Date]] = 
  Lens((t: Trade) => t.valueDate, 
       (t: Trade, d: Option[Date]) => t.copy(valueDate = d))


We will use the above lenses for updation of our aggregate root and also wrap them into closures for subsequent feed into the event stream for persistent storage. In this example I have implemented in-memory persistence for both the command and the query store. Persistence into an on disk database will be available very soon at a github repository near you :)

Combinators that abstract state processing

Let's now define a couple of combinators that encapsulate our transactional service method implementations within the domain model. Note how the lenses have also been abstracted away from the client API as implementation artifacts. For details of these implementations please visit the github repo that contains a working model along with test cases.

// closure that enriches a trade with tax/fee information and net cash value
val enrichTrade: Trade => Trade = {trade =>
  val taxes = for {
    taxFeeIds      <- forTrade // get the tax/fee ids for a trade
    taxFeeValues   <- taxFeeCalculate // calculate tax fee values
  }
  yield(taxFeeIds map taxFeeValues)
  val t = taxFeeLens.set(trade, taxes(trade))
  netAmountLens.set(t, t.taxFees.map(_.foldl(principal(t))((a, b) => a + b._2)))
}

// closure for adding a value date
val addValueDate: Trade => Trade = {trade =>
  val c = Calendar.getInstance
  c.setTime(trade.tradeDate)
  c.add(Calendar.DAY_OF_MONTH, 3)
  valueDateLens.set(trade, Some(c.getTime))
}


We will now use these combinators to implement our transactional services which the TradingClient will invoke. Each of these service methods will do 2 things :-

1. effect the closure on the domain model and
2. as a side-effect stream the event into the command store

Sounds like a kestrel .. doesn't it ? Well here's a kestrel combinator and the above service methods realized in my CQRS implementation ..

// refer To Mock a Mockingbird
private[service] def kestrel[T](trade: T, proc: T => T)(effect: => Unit) = {
  val t = proc(trade)
  effect
  t
}

// enrich trade
def doEnrichTrade(trade: Trade) = 
  kestrel(trade, enrichTrade) { 
    ts ! TradeEnriched(trade, enrichTrade)
  }

// add value date
def doAddValueDate(trade: Trade) = 
  kestrel(trade, addValueDate) { 
    ts ! ValueDateAdded(trade, addValueDate)


Back to Akka!

It was only expected that I will be using Akka for transporting the event down to the command store. And this transport is implemented as a asynchronous side-effect of the service methods - just what the doctor ordered for an actor use case :)

With Event Sourcing and CQRS, one of the things that you would require is the ability to snapshot your persistent versions of the aggregate root. The current implementation is simple and does a zero based snapshotting i.e every time you ask for a snapshot, it replays the whole stream for that trade and gives you the current state. In typical real world systems, you do interval snapshotting and start replaying from the latest available snapshot in case you want to get the current state.

Here's our command store modeled as an Akka actor that processes the various events that it receives from an upstream server ..

// CommandStore modeled as an actor
class CommandStore(qryStore: ActorRef) extends Actor {
  private var events = Map.empty[Trade, List[TradeEvent]]

  def receive = {
    case m@TradeEnriched(trade, closure) => 
      events += ((trade, events.getOrElse(trade, List.empty[TradeEvent]) :+ closure))
      qryStore forward m
    case m@ValueDateAdded(trade, closure) => 
      events += ((trade, events.getOrElse(trade, List.empty[TradeEvent]) :+ closure))
      qryStore forward m
    case Snapshot => 
      self.reply(events.keys.map {trade =>
        events(trade).foldLeft(trade)((t, e) => e(t))
      })
  }
}


Note how the Snapshot message is processed as a fold over all the accumulated closures starting with the base trade. Also the command store adds the event to its repository (which is currently an in memory collection) and forwards the event to the query store. There we can have the trade modeled as per the requirements of the query / reporting client. For simplicity the current example assumes the model as the same as our domain model presented above.

Here's the query store, also an actor, that persists the trades on receiving relevant events from the command store. In effect the command store responds to messages that it receives from an upstream TradingServer and asynchronously updates the query store with the latest state of the trade.

// QueryStore modeled as an actor
class QueryStore extends Actor {
  private var trades = new collection.immutable.TreeSet[Trade]()(Ordering.by(_.refNo))

  def receive = {
    case TradeEnriched(trade, closure) => 
      trades += trades.find(== trade).map(closure(_)).getOrElse(closure(trade))
    case ValueDateAdded(trade, closure) => 
      trades += trades.find(== trade).map(closure(_)).getOrElse(closure(trade))
    case QueryAllTrades =>
      self.reply(trades.toList)
  }
}


And here's a sample sequence diagram that illustrates the interactions that take place for a sample service call by the client in the CQRS implementation ..


The full implementation also contains the complete wiring of the above abstractions along with Akka's fault tolerant supervision capabilities. A complete test case is also included along with the distribution.

Have fun!

Monday, April 19, 2010

PubSub with Redis and Akka Actors

Redis (the version on the trunk) offers publish/subscribe based messaging. This is quite a big feature compared to the typical data structure oriented services that it had been offering so far. This also opens up lots of possibilities to use Redis as a messaging engine of a different kind. The sender and the receiver of the messages are absolutely decoupled from each other in the sense that senders do not send messages to specific receivers. Publishers publish messages on specific channels. Subscribers who subscribe to those channels get them and can take specific actions on them. As Salvatore notes in his weekly updates on Redis, this specific feature has evolved from lots of user requests who had been asking for a general notification mechanism to trap changes in the key space. Redis already offers BLPOP (Blocking list pop operation) for similar use cases. But still it's not sufficient to satisfy the needs of a general notification scheme. Salvatore explains it in more details in his blog post.

I have been working on a Scala client, which I forked from Alejandro Crosa's repository. I implemented pubsub very recently and also have integrated it with Akka actors. The full implementation of the pubsub client in Scala is in my github repository. And if you like to play around with the Akka actor based implementation, have a look at the Akka repository.

You define your publishers and subscribers as actors and exchange messages over channels. You can define your own callbacks as to what you would like to do when you receive a particular message. Let's have a look at a sample implementation at the client level .. I will assume that you want to implement your own pub/sub application on top of the Akka actor based pubsub substrate that uses the redis service underneath.

Implementing the publisher interface is easy .. here is how you can bootstrap your own publishing service ..



The publish method just sends a Publish message to the Publisher. Publisher is an actor defined in Akka as follows:



The subscriber implementation is a bit more complex since you need to register your callback as to what you would like to do when you receive a specific type of message. This depends on your use case and it's your responsibility to provide a proper callback function downstream.

Here is a sample implementation for the subscriber. We need two methods to subscribe and unsubscribe from channels. Remember in Redis the subscriber cannot publish - hence our Sub cannot do a Pub.



I have not yet specified the implementation of the callback. How should it look like ?

The callback will be invoked when the subscriber receives a specific type of message. According to Redis specification, the types of messages which a subscriber can receive are:

a. subscribe
b. unsubscribe
c. message

Refer to the Redis documentation for details of these message formats. In our case, we model them as case classes as part of the core Redis client implementation ..



Our callback needs to take appropriate custom action on receipt of any of these types of messages. The following can be one such implementation .. It is customized for a specific application which treats various formats of messages and gives appropriate application dependent semantics ..



Note in the above implementation we specialize some of the messages to give additional semantics. e.g. if I receive a message as "+t", I will interpret it as subscribing to the channel "t". Similarly "exit" will unsubscribe me from all channels.

How to run this application ?

I will assume that you have the Akka master with you. Also you need to have a version of Redis running that implements pubsub. You can start the subscription service using the above implementation and then use any other Redis client to publish messages. Here's a sample recipe for a run ..

Prerequisite: Need Redis Server running (the version that supports pubsub)



For running this sample application :-

Starting the Subscription service



Starting a Publishing service



Another publishing client using redis-cli



Have fun with the message formats



The full implementation of the above is there as a sample project in Akka master. And in case you are not using Akka, I also have a version of the above implemented using Scala actors in the scala-redis distribution.

Have fun!

Sunday, March 28, 2010

Domain Services and Bounded Context using Akka - Part 2

In Part 1 of this series you saw how we can model a domain repository as an actor in Akka. It gives you declarative transaction semantics through Akka's STM and pluggable persistence engine support over a variety of data stores. As a result the domain model becomes cleaner. The repository that you design can take advantage of Akka's fault tolerance capabilities through supervisors that offer configurable lifecycle strategies.

One other important artifact of a domain model are the domain services. A domain service is not necessarily focused on any particular entity and is mostly centered around the verbs of the system. It models some actions or use cases involving multiple entities and is usually implemented as a stateless abstraction.

Using Akka you model a service as yet another actor. Domain services are coarse level abstractions and are the ones to receive requests from the clients. It can invoke other services or use any other entitites to do the job that it's supposed to do. No wonder a busy service gets requests from lots of consumers. Not only does it need to be stable, but it needs to ensure that all of it's other services with which it collaborates also stay alive while serving requests.

One of the services that it interacts with is the Domain Repository, which I discussed in the last post.

When you design a domain service using Akka actors, you can ensure that the service can make its collaborating services fault-tolerant through declarative or minimal programming effort. Akka runtime offers all the machinery to make implementation of fault tolerant services quite easy.

Consider the following domain service for management of Accounts, continuing our earlier example from the last post ..

trait AccountServer extends Actor {
  // handle crash
  faultHandler = Some(OneForOneStrategy(5, 5000))
  trapExit = List(classOf[Exception])
  
  // abstract val : the Repository Service
  val storage: AccountRepository

  // message handler
  def receive = {
    case Open(no, name) => 
      storage ! New(Account(no, name, Calendar.getInstance.getTime, None, 100))
    case msg @ Balance(_) => storage forward msg
    case msg @ Post(_, _) => storage forward msg
    case msg @ OpenM(as) => storage forward msg
  }
  
  // shutdown hook
  override def shutdown = { 
    unlink(storage)
    storage.stop
  }
}


The message handler is a standard one that forwards client requests to the repository. Note tha use of the abstract val storage: AccountRepository that helps you defer committing to the concrete implementation class till instantiation of the service object.

AccountServer plays the role of a supervisor for the repository actor. The first 2 lines of code defines the strategy of supervision. OneForOneStrategy says that only the component that has crashed will be restarted. You can make it AllForOne also when the supervising actor will restart all of the actors that it's supervising if one of them crashes. trapExit defines the list of exceptions in the linked actor for which the supervising actor will take an action.

AccountServer is the aupervising actor for the repository. When it is shutdown it has to be unlinked fro the linked actors. This we do in the shutdown hook of the AccountServer.

But how do we link the Repository actor to our domain service actor ? Note that AccountServer is not a concrete object yet. We need to instantiate a concrete implementation of AccountRepository and assign it to storage in AccountServer. And link the two during this instantiation.

We define another trait that resolves the abstract val that we defined in AccountServer and provides a concrete instance of AccountRepository. spawnLink not only starts an instance of Redis based repository implementation, it also links the repository actor with the AccountServer ..

trait RedisAccountRepositoryFactory { this: Actor =>
  val storage: AccountRepository = spawnLink(classOf[RedisAccountRepository]) 
}


Now we have all the components needed to instantiate a fault tolerant domain service object.

object AccountService extends 
  AccountServer with 
  RedisAccountRepositoryFactory {

  // start the service
  override def start: Actor = {
    super.start
    RemoteNode.start("localhost", 9999)
    RemoteNode.register("account:service", this)
    this
  }
}


Have a look at the start method that starts the service on a remote node. We start a remote node and then register the current service under the id "account:service". Any client that needs to use the service can get hold of the service actor by specifying this id .. as in the following snippet ..

class AccountClient(val client: String) { 
  import Actor.Sender.Self
  val service = RemoteClient.actorFor("account:service", "localhost", 9999)

  def open(no: String, name: String) = service ! Open(no, name)
  def balance(no: String): Option[Int] = 
    (service !! Balance(no)).getOrElse(
      throw new Exception("cannot get balance from server"))
  def post(no: String, amount: Int) = service ! Post(no, amount)
  def openMulti(as: List[(String, String)]) = service !!! OpenMulti(as)
}


Services defined using Akka can be made to run on remote nodes without much of an engineering hack. Akka runtime offers APIs for doing that. However, Akka never tries to hide from you the paradigms of distribution. You need to be aware of your distribution requirements and process and supervising hierarchies. Akka facilitates you to define them at the application level, doing the heavy lifting within its underlying implementation. This principle is inspired from Erlang's philosophy and is in sharp contrast to the RPC way of defining APIs. Along with the benefits of message based computation that decouples the sender and the receiver, Akka also enables you to handle states using its built-in STM and pluggable storage engine.

When you model a complex domain, you need to deal with multiple contexts, which Eric calls Bounded Contexts. Within a specific context you have a cohesive model with a set of domain behavior and abstractions. The interpretations of the same abstractions may change when you move to a different context within the same application. As a domain modeler you need to define your context boundaries very carefully using context maps and implement appropriate translation maps between multiple contexts.

Messaging is a great way to implement translation between contexts. You process messages within a context using domain services as above and at the end forward the same message or a translated one to the other contexts of the application. It can be in the form of push or you can also implement a publish/subscribe model between the related contexts. In the latter case, you use Akka-Camel integration that allows actors to send or receive messages through Camel end-points. In either case Akka provides you a world of options to implement loosely coupled domain contexts that form the components of your domain model.

Sunday, March 21, 2010

Thinking Asynchronous - Domain Modeling using Akka Transactors - Part 1

Followers of this blog must have known by now that I am a big fan of a clean domain model. And domain driven design, espoused by Eric Evans is the way to go when you are modeling a complex domain and would like to have your model survive for quite some time in the future. Recently I have been experimenting a bit with domain driven design using some amount of asynchronous message passing techniques particularly in the services and the storage layer.

The Repository, as Eric says, is the domain centric abstraction on top of your data storage layer. It gives your model back the feeling that you are dealing with domain concepts instead of marshalling data across your storage layers. Typically you have contracts for repositories at the aggregate root level. The underlying implementation commits to a platform (like JPA) and ensures that your object graph of the aggregate root rests in peace within the relational database. It need not be a relational database - it can be file system, it can be a NoSQL database. That's the power of abstraction that Repositories add to your model.

Ever since I started playing around with Erlang, I have been toying with thoughts of making repositories asynchronous. I blogged some of my thoughts in this post and even implemented a prototype using Scala actors.

Enter Akka and its lightweight actor model that offers transaction support over an STM. Akka offers seamless integration with a variety of persistence engines like Cassandra, MongoDB and Redis. It has plans of adding to this list many of the relational stores as well. The richness of the Akka stack makes for a strong case in designing a beautiful asynchronous repository abstraction.

Consider a very simple domain model for a Bank Account ..

case class Account(no: String, 
  name: String, 
  dateOfOpening: Date, 
  dateOfClose: Option[Date],
  balance: Float)

We can model typical operations on a bank account like Opening a New Account, Querying for the Balance of an Account, Posting an amount in an Account through message dispatch. Typical messages will look like the following in Scala ..

sealed trait AccountEvent
case class Open(from: String, no: String, name: String) extends AccountEvent
case class New(account: Account) extends AccountEvent
case class Balance(from: String, no: String) extends AccountEvent
case class Post(from: String, no: String, amount: Float) extends AccountEvent


Note all messages are immutable Scala objects, which will be dispatched by the client, intercepted by a domain service, which can optionally do some processing and validation, and then finally forwarded to the Repository.

In this post we will look at the final stage in the lifecycle of a message, which is how it gets processed by the Repository. In the next post we will integrate the whole along with an abstraction for a domain service. Along the way we will see many of the goodness that Akka transactors offer including support for fault tolerant processing in the event of system crashes.

trait AccountRepository extends Actor

class RedisAccountRepository extends AccountRepository {
  lifeCycle = Some(LifeCycle(Permanent))    
  val STORAGE_ID = "account.storage"

  // actual persistent store
  private var accounts = atomic { RedisStorage.getMap(STORAGE_ID) }

  def receive = {
    case New(a) => 
      atomic {
        accounts.+=(a.no.getBytes, toByteArray[Account](a))
      }

    case Balance(from, no) =>
      val b = atomic { accounts.get(no.getBytes) }
      b match {
        case None => reply(None)
        case Some(a) => 
          val acc = fromByteArray[Account](a).asInstanceOf[Account]
          reply(Some(acc.balance))
      }

      //.. other message handlers
  }

  override def postRestart(reason: Throwable) = {
    accounts = RedisStorage.getMap(STORAGE_ID)  
  }
}


The above snippet implements a message based Repository abstraction with an underlying implementation in Redis. Redis is an advanced key/value store that offers persistence for a suite of data structures like Lists, Sets, Hashes and more. Akka offers transparent persistence to a Redis storage through a common set of abstractions. In the above code you can change RedisStorage.getMap(STORAGE_ID) to CassandraStorage.getMap(..) and switch your underlying storage to Cassandra.

The above Repository works through asynchronous message passing modeled with Akka actors. Here are some of the salient points in the implementation ..

  1. Akka is based on the let-it-crash philosophy. You can design supervisor hierarchies that will be responsible for controlling the lifecycles of your actors. In the Actor abstraction you can configure how you would like to handle a crash. LifeCycle(Permanent) means that the actor will always be restarted by the supervisor in the event of a crash. It can also be Lifecycle(Temporary), which means that it will not be restarted and will be shut down using the shutdown hook that you provide. In our case we make the Repository resilient to crashes.

  2. accounts is the handle to a Map that gets persisted in Redis. Here we store all accounts that the clients open hashed by the account number. Have a look at the New message handler in the implementation.

  3. With Akka you can also provide a restart hook when you repository crashes and gets restarted automatically by the supervisor. postRestart is the hook where we re-initialize the Map structure.

  4. Akka uses multiverse, a Java based STM implementation for transaction handling. In the code mark your transactions using atomic {} and the underlying STM will take care of the rest. Instead of atomic, you can also use monadic for-comprehensions for annotating your transaction blocks. Have a look at Akka documentation for details.


Asynchronous message based implementations decouple the end points, do not block and offer more manageability in distribution of your system. Typical implementations of actor based models are very lightweight. Akka actors take around 600 bytes which means that you can have millions of actors even in a commodity machine. Akka supports various types of message passing semantics which you can use to organize interactions between your collaborating objects.

In the next post we will see how the Repository interacts with Domain Services to implement client request handling. And yeah, you got it right - we will use more of Akka actors.