Showing posts with label mina. Show all posts
Showing posts with label mina. Show all posts

Sunday, March 15, 2009

Real world event based solutions using MINA and Scala Actors

In the QCon London track, Architectures in Financial Applications, Oracle presented an interesting solution that seeks to transform the reconciliation process from an inherently batch process to an incremental event driven asynchronous one. They presented a solution based on Coherence, their data grid that addresses the capacity challenge by spreading trades across servers and the availability challenge through the resilience of continuous replication that Coherence supports. As part of the presentation, the Coherence based solution touched upon one important paradigm, which can be adopted even outside grid based architectures to improve system performance and throughput.

Asynchronous processing ..


As the Oracle presentation rightly pointed out, one of the recurring problems in today's investment management solutions is the time pressure that the back-office faces dealing with regular overnight batch programs (like reconciliations, confirmations etc.) and struggling to complete them before the next day trading starts. Reconciliation is a typical example which needs to be executed at various levels between entities like traders, brokers, banks and custodians and with varying periodicities. The main bottleneck is these processes are executed as monolithic batch programs that operate on monstrous end-of-day databases containing millions of records that need to be processed, joined, validated and finally aggregated for reconciliation.

Enter Event based Reconciliation ..


This solution is not about grids, it is about poor man's asynchronous event based processing that transforms the end-of-day batch job into incremental MapReduce style progressions that move forward with every event, do real time processing and raise alerts and events for investment managers to respond to and take more informed decisions. One of the main goals of reconciliation being risk mitigation, I suppose such real time progressions will result in better risk handling and management. Also you save on time pressures that mandatory completion of today's batch processes imply, along with a more predictable and uniform load distribution across your array of servers.

And this proposed solution uses commodity tools and frameworks, good old proven techniques of asynchronous event processing, and never claims to be as scalable as the grid based one proposed by Coherence.

Think asynchronous, think events, think abstraction of transport, think selectors, think message queues, think managed thread-pools and thousands of open sockets and connections. After all you need to have a managed infrastructure that will be able to process your jobs incrementally based on events.

Asynchronous IO abstractions, MINA and Scala actors ..


Instead of one batch process for every reconciliation, we can think of handling reconciliation events. For example, loading STREET trades is an event that can trigger reconciliation with HOUSE trades. Or receipt of Trade Confirmations is an event for reconciliation with our placed Orders. We can set this up nicely using generic socket based end-points that listen for events .. and using event dispatch callbacks that do all the necessary processing. MINA provides nice abstractions for registering sockets, managing IoSessions, handling IO events that happen on your registered sockets and provides all the necessary glue to handle protocol encoding and decoding.

Here is a brief thought sequence .. (click to enlarge)



We can use MINA's asynchronous I/O service that abstracts the underlying transport's connection.

// set up the thread pool
executor = Executors.newCachedThreadPool()

// set up the socket acceptor with the thread pool
acceptor = new NioSocketAcceptor(executor, new NioProcessor(executor))

//.. set up other MINA stuff


Using MINA we can decouple protocol encoding / decoding from the I/O, and have separate abstractions for codec construction. Note how the responsibilities are nicely separated between the I/O session handling, protocol filters and event handling.

// add protocol encoder / decoder
acceptor.getFilterChain.addLast("codec", 
  new ProtocolCodecFilter(//.., //..))


Now we need to register an IoHandler, which will handle the events and call the various callbacks like messageReceived(), sessionClosed() etc. Here we would like to be more abstract so that we do not have to handle all complexities of thread and lock management ourselves. We can delegate the event handling to Scala actors, which again can optimize on thread usage and help make the model scale.

// set the IoHandler that delegates event handling to the underlying actor
acceptor.setHandler(
  new IoHandlerActorAdapter(session => new ReconHandler(session, ...)))

// bind and listen
acceptor.bind(new InetSocketAddress(address, port))


So, we have a socket endpoint where clients can push messages which result in MINA events that get routed through the IoHandler implementation, IoHandlerActorAdapter and translated to messages which our Scala actor ReconHandler can react to.

The class IoHandlerActorAdapter is adopted from the naggati DSL for protocol decoding from Twitter ..


class IoHandlerActorAdapter(val actorFactory: (IoSession) => Actor) 
  extends IoHandler {
  //..

  //.. callback
  def messageReceived(session: IoSession, message: AnyRef) = 
    send(session, MinaMessage.MessageReceived(message))

  // send a message to the actor associated with this session
  def send(session: IoSession, message: MinaMessage) = {
    val info = IoHandlerActorAdapter.sessionInfo(session)
    for (actor <- info.actor; if info.filter contains MinaMessage.classOfObj(message)) {
      actor ! message
    }
  }
  //..
}




and the class ReconHandler is the actor that handles reconciliation messages ..

class ReconHandler(val session: IoSession, ...) 
  extends Actor {

  //..
  //..

  def act = {
    loop {
      react {
        case MinaMessage.MessageReceived(msg) =>
          // note we have registered a ProtocolCodecFilter
          // hence we get msg as an instance of specific
          // reconciliation message
          doRecon(msg.asInstanceOf[ReconRequest])

        case MinaMessage.ExceptionCaught(cause) => 
          //.. handle

        case MinaMessage.SessionClosed =>
          //.. handle

        case MinaMessage.SessionIdle(status) =>
          //.. handle
      }
    }
  }

  private def doRecon(request: ReconRequest) = {
    request.header match {
      case "TRADE_CONF_ORDER" => 
        //.. handle reconciliation of confirmation with order

      case "TRADE_STREET_HOME" =>
        //.. handle reconciliation of stree side with home trades

      //..
  }
  //..
}


Note that the above strategy relies on incremental progress. It may not be the case that the entire process gets done upfront. We may have to wait till the closing of the trading hours until we receive the last batch of trades or position information from upstream components. But the difference with the big bang batch process is that, by that time we have progressed quite a bit and possibly have raised some alerts as well, which would not have been possible in the earlier strategy of execution. Another way to view it is as an implementation of MapReduce that gets processed incrementally throughout the day on a real time basis and comes up with the eventual result much earlier than a scheduled batch process.