Asynchronous processing ..
As the Oracle presentation rightly pointed out, one of the recurring problems in today's investment management solutions is the time pressure that the back-office faces dealing with regular overnight batch programs (like reconciliations, confirmations etc.) and struggling to complete them before the next day trading starts. Reconciliation is a typical example which needs to be executed at various levels between entities like traders, brokers, banks and custodians and with varying periodicities. The main bottleneck is these processes are executed as monolithic batch programs that operate on monstrous end-of-day databases containing millions of records that need to be processed, joined, validated and finally aggregated for reconciliation.
Enter Event based Reconciliation ..
This solution is not about grids, it is about poor man's asynchronous event based processing that transforms the end-of-day batch job into incremental MapReduce style progressions that move forward with every event, do real time processing and raise alerts and events for investment managers to respond to and take more informed decisions. One of the main goals of reconciliation being risk mitigation, I suppose such real time progressions will result in better risk handling and management. Also you save on time pressures that mandatory completion of today's batch processes imply, along with a more predictable and uniform load distribution across your array of servers.
And this proposed solution uses commodity tools and frameworks, good old proven techniques of asynchronous event processing, and never claims to be as scalable as the grid based one proposed by Coherence.
Think asynchronous, think events, think abstraction of transport, think selectors, think message queues, think managed thread-pools and thousands of open sockets and connections. After all you need to have a managed infrastructure that will be able to process your jobs incrementally based on events.
Asynchronous IO abstractions, MINA and Scala actors ..
Instead of one batch process for every reconciliation, we can think of handling reconciliation events. For example, loading STREET trades is an event that can trigger reconciliation with HOUSE trades. Or receipt of Trade Confirmations is an event for reconciliation with our placed Orders. We can set this up nicely using generic socket based end-points that listen for events .. and using event dispatch callbacks that do all the necessary processing. MINA provides nice abstractions for registering sockets, managing
IoSession
s, handling IO events that happen on your registered sockets and provides all the necessary glue to handle protocol encoding and decoding. Here is a brief thought sequence .. (click to enlarge)
We can use MINA's asynchronous I/O service that abstracts the underlying transport's connection.
// set up the thread pool
executor = Executors.newCachedThreadPool()
// set up the socket acceptor with the thread pool
acceptor = new NioSocketAcceptor(executor, new NioProcessor(executor))
//.. set up other MINA stuff
Using MINA we can decouple protocol encoding / decoding from the I/O, and have separate abstractions for codec construction. Note how the responsibilities are nicely separated between the I/O session handling, protocol filters and event handling.
// add protocol encoder / decoder
acceptor.getFilterChain.addLast("codec",
new ProtocolCodecFilter(//.., //..))
Now we need to register an
IoHandler
, which will handle the events and call the various callbacks like messageReceived()
, sessionClosed()
etc. Here we would like to be more abstract so that we do not have to handle all complexities of thread and lock management ourselves. We can delegate the event handling to Scala actors, which again can optimize on thread usage and help make the model scale.// set the IoHandler that delegates event handling to the underlying actor
acceptor.setHandler(
new IoHandlerActorAdapter(session => new ReconHandler(session, ...)))
// bind and listen
acceptor.bind(new InetSocketAddress(address, port))
So, we have a socket endpoint where clients can push messages which result in MINA events that get routed through the
IoHandler
implementation, IoHandlerActorAdapter
and translated to messages which our Scala actor ReconHandler
can react to. The class
IoHandlerActorAdapter
is adopted from the naggati DSL for protocol decoding from Twitter ..class IoHandlerActorAdapter(val actorFactory: (IoSession) => Actor)
extends IoHandler {
//..
//.. callback
def messageReceived(session: IoSession, message: AnyRef) =
send(session, MinaMessage.MessageReceived(message))
// send a message to the actor associated with this session
def send(session: IoSession, message: MinaMessage) = {
val info = IoHandlerActorAdapter.sessionInfo(session)
for (actor <- info.actor; if info.filter contains MinaMessage.classOfObj(message)) {
actor ! message
}
}
//..
}
and the class
ReconHandler
is the actor that handles reconciliation messages ..class ReconHandler(val session: IoSession, ...)
extends Actor {
//..
//..
def act = {
loop {
react {
case MinaMessage.MessageReceived(msg) =>
// note we have registered a ProtocolCodecFilter
// hence we get msg as an instance of specific
// reconciliation message
doRecon(msg.asInstanceOf[ReconRequest])
case MinaMessage.ExceptionCaught(cause) =>
//.. handle
case MinaMessage.SessionClosed =>
//.. handle
case MinaMessage.SessionIdle(status) =>
//.. handle
}
}
}
private def doRecon(request: ReconRequest) = {
request.header match {
case "TRADE_CONF_ORDER" =>
//.. handle reconciliation of confirmation with order
case "TRADE_STREET_HOME" =>
//.. handle reconciliation of stree side with home trades
//..
}
//..
}
Note that the above strategy relies on incremental progress. It may not be the case that the entire process gets done upfront. We may have to wait till the closing of the trading hours until we receive the last batch of trades or position information from upstream components. But the difference with the big bang batch process is that, by that time we have progressed quite a bit and possibly have raised some alerts as well, which would not have been possible in the earlier strategy of execution. Another way to view it is as an implementation of MapReduce that gets processed incrementally throughout the day on a real time basis and comes up with the eventual result much earlier than a scheduled batch process.