Showing posts with label erlang. Show all posts
Showing posts with label erlang. Show all posts

Monday, April 13, 2009

Objects as Actors ?

Tony Arcieri, creator of Reia, recently brought up an interesting topic on unifying actors and objects. Talking about Scala and his disliking towards Scala's implementation of actors as an additional entity on top of objects, he says, it would have been a more useful abstraction to model all objects as actors. Doing it that way would eschew many of the overlapping functions that both of the object and actor semantics have implemented today. In Reia, which is supposed to run on top of BEAM (the Erlang VM), he has decided to make all objects as actors.

The way I look at it, this is mostly a decision of the philosophy of the language design. Scala is targetted to be a general purpose programming language, where concurrency and distribution are not the central concerns to address as part of the core language design. The entire actor model has hence been implemented as a library that integrates seamlessly with the rest of Scala's core object/functional engineering. This is a design decision which the language designers did take upfront - hence objects in Scala, by default, bind to local invocation semantics, that enable it to take advantage of all the optimizations and efficiencies of being collocated in the same process.

The actor model was designed primarily to address the concerns of distributed programming. As Jonas Boner recently said on Twitter - "The main benefit of the Actor model is not simpler concurrency but fault-tolerance and reliability". And for fault tolerance you need to have at least two machines running your programs. We all know the awesome capabilities of fault tolerance that the Erlang actor model offers through supervisors, linked actors and transparent restarts. Hence languages like Erlang, which address the concerns of concurrency and distribution as part of the core, have decided to implement actors as their basic building block of abstractions. This was done with the vision that the Erlang programming style will be based on simple primitives of process spawning and message passing, both of which implemented as low overhead primitives in the virtual machine. The philosophy of Scala is, however, a bit different. Though still it is not that difficult to implement the Active Object pattern on top of the Scala actors platform.

Erlang allows you to write programs that will run without any change in a regular non-distributed Erlang session, on two different Erlang nodes running on the same computer and as well on Erlang nodes running on two physically separated computers either in the same LAN or over the internet. It can do this, because the language designers decided to map the concurrency model naturally to distributed deployments extending the actor model beyond VM boundaries.

Another language Clojure, which also has strong concurrency support decided to go the Scala way addressing distribution concerns. Distribution is not something that Rich Hickey decided to hardwire into the core of the language. Here is what he says about it ..

"In Erlang the concurrency model is (always) a distributed one and in Clojure it is not. I have some reservations about unifying the distributed and non-distributed models [..], and have decided not to do so in Clojure, but I think Erlang, in doing so, does the right thing in forcing programmers to work as if the processes are distributed even when they are not, in order to allow the possibility of transparent distribution later, e.g. in the failure modes, the messaging system etc. However, issues related to latency, bandwidth, timeouts, chattiness, and costs of certain data structures etc remain."

And finally, on the JVM, there are a host of options that enable distribution of your programs, which is yet another reason not to go for language specific solutions. If you are implementing your language on top of the Erlang VM, it's all but natural to leverage the awesome power of cross virtual machine distribution capabilities that it offers. While for JVM, distribution can better be left to specialized frameworks.

Monday, October 27, 2008

Data 2.0 - Is your Application ready ?

Brian Aker talking about assumptions on Drizzle and future of database technologies ..

"Map/Reduce will kill every traditional data warehousing vendor in the market. Those who adapt to it as a design/deployment pattern will survive, the rest won't. Database systems that have no concept of being multiple node are pretty much dead. If there is no scale out story, then there is not future going forward."

Later on in the same post he goes on to mention the forces that he hopes would drive the performance of tomorrow's database technologies - map/reduce and asynchronous queues. Drizzle is a fork out of MySql, it is revolutionary in many respects compared to all other forks of the same product or similar products of the same genre. Drizzle does away with the erstwhile dodos of database technology viz. stored procedures, triggers etc. and is being planned exclusively for scaling out in the cloud. The development model is also ultra open source, aka organic open source and is being driven by the community as a whole.

Drizzle is clearly a step towards N > 1 and quite a forceful step too. Erlang was the silent roadmaker towards true N > 1 with the entire ecosystem of OTP, mnesia and the supervisor hierarchies .. Erlang offers the platform and the correct set of primitives for delivering fault tolerance in applications along with replicated mnesia storage. Erlang was not built for N = 1 .. it had N > 1 right into it and right from day 0 ..

Couchdb is another instance that may hit the sweet spot of getting the combination right - asynchronous map/reduce along with replicated, distributed, loosely coupled document storage. Implements REST, BASE, MVCC .. what else .. that leads to eventual consistency of the system.

All the buzz about future database technologies have been somehow related to the cloud, or at least being planned keeping the scale out factor in mind. Erlang started it all, and is now being actively driven in multiple dimensions by all similar complementary/competing technologies and platforms. No one talks about normalization or ACID or multi-phase commit these days. Call it Web 2.0 or Enterprise 2.0 or social networking, everything boils down to how enterprise IT can make data access easier, better, and more resilient to datacenter or network failures without compromising on the quality of service.

Middleware matters, data storage in the cloud matters, data processing in the cloud matters, and we are looking at lots of vendors fighting for a space there in. Applications of Enterprise 2.0 need to be smart and malleable enough to participate in this ecosystem. Make sure they are implemented as loosely coupled components that talk to middleware services asynchronously, can consume the atom feeds that other enterprise applications generate and do not depend on ACID properties or rely on synchronous 2-phase commit protocols from the data layer. Are we there yet ?

Thursday, October 09, 2008

To Tail Recurse or Not

Today I am going to talk about maps, not the java.util.Map, but, map as in map-reduce or map as in scala.List.map. Of course all of us know what map is and map does, and how this powerful concept has been used in all functional languages that we use on a regular basis. I will talk maps in the context of its implementation, as we find in all the languages, which brings out some of the important principles of using tail recursion.

A couple of months back, there was a thread in the Erlang discussion list, where someone wondered why the implementation of map in Erlang stdlib Lists.erl is NOT tail recursive. Here it is, faithfully copied from Erlang stdlib ..


map(F, [H|T]) ->
    [F(H)|map(F, T)];
map(F, []) when is_function(F, 1) -> [].



Clearly not a tail recursive one ..

The thread of discussion explains the rationale behind such implementation. And it has a lot to do with the compiler optimizations that have been done in R12B. Here is a quote from the Erlang efficiency guide, which explains the myth that tail-recursive functions are ALWAYS much faster than body-recursive ones ..

"In R12B, there is a new optimization that will in many cases reduces the number of words used on the stack in body-recursive calls, so that a body-recursive list function and tail-recursive function that calls lists:reverse/1 at the end will use exactly the same amount of memory. lists:map/2, lists:filter/2, list comprehensions, and many other recursive functions now use the same amount of space as their tail-recursive equivalents."

Since a tail recursive map needs to do a reverse ..

  • the incremental space that it needs to keep both the lists makes it equally space consuming with the body-recursive version

  • it puts pressure on the garbage collector, since the space used by the temporary list cannot be reclaimed immediately



The general advice is that you need to measure the timings of your use case and then decide whether to tail recurse or not.

I was curious enough to check what the Scala library does for map implementation. Here is the snippet from scala.List ..


final override def map[B](f: A => B): List[B] = {
  val b = new ListBuffer[B]
  var these = this
  while (!these.isEmpty) {
    b += f(these.head)
    these = these.tail
  }
  b.toList
}



It is not a functional implementation at all. In fact it adopts a clever use of localized mutation to achieve performance. Using mutation locally is a perfectly valid technique and a definite area where hybrid non-pure languages score over the purer ones. The contract for map is purely functional, does not have any side-effect, yet it uses localized side-effects for performance. This would not have been possible in Erlang. Neat!

Just for fun, I cooked up a tail recursive version of map in Scala, as a standalone function ..


def tr_map[B](f: A => B, l: List[A]): List[B] = {
  def iter_map(curr: List[B], l: List[A]): List[B] = l match {
    case Nil => curr.reverse
    case _ => iter_map(f(l.head) :: curr, l.tail)
  }
  iter_map(Nil, l)
}



It performed very slow compared to the native librray version.

Scala also offers a reverseMap function which does not need the additional reverse, which the tail-recursive map would require. And not surprisingly, the implementation is based on tail recursion and pattern matching ..


def reverseMap[B](f: A => B): List[B] = {
  def loop(l: List[A], res: List[B]): List[B] = l match {
    case Nil => res
    case head :: tail => loop(tail, f(head) :: res)
  }
  loop(this, Nil)
}



So, how does the story end ? Well, as usual, benchmark hard, and then decide ..

Friday, October 03, 2008

Erlang VM : now hosting multiple languages

In an earlier post, I had wondered why the Erlang virtual machine does not host a more diversified set of languages ..

"BEAM provides an ecosystem that offers phenomenal scalability with concurrent processes and distributed programming, which is really difficult (if not impossible) to replicate in any other virtual machine being worked upon today. After all, it is much easier to dress up Erlang with a nicer syntax than implementing the guarantees of reliability and extreme concurrency in any of your favorite languages' runtime."

Then I had blogged about Reia, the Python/Ruby like scripting language on BEAM.

A few days back, Robert Virding released a stable version of LFE - Lisp Flavored Erlang, a concurrent Lisp based on the features and limitations of the Erlang VM. Unlike Lisp it doesn't have global data or mutating operations. Instead it has the goodness of Lisp macros, sexprs, code-as-data together with the Erlang power of pattern matching and binary comprehensions. And the best part is that LFE hosts seamlessly with vanilla Erlang/OTP.

Along with Erlang being used to develop middleware applications, we are seeing increased use of Erlang VM, hosting more and more language variants. This is a clear indication that the Erlang ecosystem is growing. As Ted Leung has rightly observed in his post on VMs for everybody, we are going to see not only flourishing new virtual machines, but also lots of languages atop existing virtual machines.

Real good time to be a hacker .. a pity though only a few lucky ones get paid for hacking ..

Friday, September 26, 2008

Infinite Possibilities : Classes and Objects on top of Erlang Processes

From Toni Arceiri's blog on Reia ..

"Objects are able to communicate with only with messages. They hold their own state, which can only be accessed by other objects via messages."

"Reia’s objects function as separate Erlang processes. This mean every object is concurrent and runs simultaneously, provided it has something to do."

"More than that, Reia objects are gen_servers. Erlang’s gen_servers are a simultaneously lauded and malaigned construct of Erlang’s OTP framework."

Reia is a Python/Ruby like scripting language targetted for the Erlang Virtual Machine (BEAM). I had blogged on its release some time back. Now it looks more interesting with the object semantics on top of Erlang processes.

Really dying out to see how it compares to Scala actors once we have Scala-OTP on board ..

Lean Data Models and Asynchronous Repositories

In an earlier post, I had talked about scaling out the service layer of your application using actors and asynchronous processing. This can buy you some more donuts over and above your current throughput. With extra processing in the form of n actors pounding the cores of your CPU, the database will still be the bottleneck and SPOF. As long as you have a single database, there will be latency and you will have to accept it.

Sanitizing the data layer ..

I am not talking about scaling up, which implies adding more fuel to your already sizable database server. In order to increase the throughput of your application proportionately with your investment, you need to scale out, add redundancy and process asynchronously. As Michael Stonebraker mentioned once, it boils down to one simple thing - latency. It's a "latency arms race" out there, and the arbitrager with the least latency in their system wins. And when we talk about latency, it's not the latency of any isolated component, it's the latency of the entire architecture.

An instance of an RDBMS is the single most dominant source of latency in any architecture today. Traditionally we have been guilty of upsizing the database payload with logics, constraints and responsibilities that do not belong to the data layer. Or possibly, not in the form that today's relational model espouses. With an ultra normalized schema we try to fit in a data model that is not relational in nature, resulting in the complexities of big joins and aggregates while doing simple business queries. Now, the problem is not with the query per se .. the problem is with the impedance mismatch between the business model and the data model. The user wants to view his latest portfolio statement, which has been stored in 10 different tables with complex indexed structures that need to be joined on the fly to generate the document.

One of the ways to reduce the intellectual weight of your relational data model will be to take out elements that do not belong there. Use technologies like CouchDB, which offer much lighterweight solutions for your problem offering modeling techniques that suit your non-relational document oriented storage requirements like a charm.

Dealing with Impedance Mismatch

One of the reasons we need to do complex joins and use referential integrity within the relational data model is to incorporate data sanity, prevent data redundancy, and enforce business domain contraints within the data layer. I have seen many applications that use triggers and stored procedures to implement business logic. Instead of trying to decry this practice, I will simply quote DHH and his "single layer of cleverness" theory on this ..

.. I consider stored procedures and constraints vile and reckless destroyers of coherence. No, Mr. Database, you can not have my business logic. Your procedural ambitions will bear no fruit and you'll have to pry that logic from my dead, cold object-oriented hands.

He goes on to say in the same blog post ..

.. I want a single layer of cleverness: My domain model. Object-orientation is all about encapsulating clever. Letting it sieve half ways through to the database is a terrible violation of those fine intentions. And I want no part of it.

My domain model is object oriented - the more I keep churning out logic on the relational model, the more subverted it becomes. The mapping of my domain model to a relational database has already introduced a significant layer of impedance mismatch, which we are struggling with till today - I do not want any of your crappy SQL-ish language to further limit the frontiers of my expressibility.

Some time back, I was looking at Mnesia, the commonly used database system for Erlang applications. Mnesia is lightweight, distributed, fault tolerant etc. etc. like all other Erlang applications out there. The design philosophy is extremely simple - it is meant to be a high performant database system for Erlang applications only. They never claimed it to be a language neutral way of accessing data and instead focused on a tighter integration with the native language.

Hence you can do this ..


% create a custom data structure
-record(person, {name, %% atomic, unique key
        data, %% compound unspecified structure
        married_to, %% name of partner or undefined
        children}). %% list of children

% create an instance of it
= #person{name = klacke,
            data = {male, 36, 971191},
            married_to = eva,
            children = [marten, maja, klara]}.

% persist in mnesia
mnesia:write(X)



and this ..


query [P.name || P < table(person),
                 length(P.children) > X]
end



It feels so natural when I can persist my complex native Erlang data structure directly into my store and then fetch it using it's list comprehension syntax.

Mnesia supports full transaction semantics, when you need it. But for optimum performance it offers lightweight locking and dirty interfaces that promise the same predictable amount of time regardless of the size of the database. And Mnesia is also primarily recommended to be used as an in-memory database where tables and indexes are implemented as linear hash lists. Alternatively all database structures can be persisted to the file system as well using named files. In summary, Mnesia gives me the bare essentials that I need to develop my application layer and integrate it with a persistent data store and with minimum of impedance with my natural Erlang abstraction level.

Let us just assume that we have an Mnesia on the JVM (call it JVMnesia) that gives me access to APIs that enable me to program in the natural collection semantics of the native language. Also I can define abstractions at a level that suits my programming and design paradigm, without having to resort to any specific data manipulation languages. In other words, I can define my Repositories that can transparently interact with a multitude of storage mechanisms asynchronously. My data store can be an in-memory storage that syncs up with a persistent medium using write behind processes, or it can be the file system with a traditional relational database. All my query modules will bootstrap an application context that warms up with an in-memory snapshot of the required data tables. The snapshot needs to be clustered and kept in sync with the disk based persistent store at the backend. We can have multiple options here. Terracotta with it's Network Attached Memory offers similar capabilities. David Pollak talks about implementing something similar using the wire level protocol of Memcached.

Now that my JVMnesia offers a fast and scalable data store, how can we make the data processing asynchronous ? Front end it with an actor based Repository implementation ..


trait Repository extends Actor

class EmployeeRepository extends Repository {

  def init: Map[Int, Employee] = {
    // initialize repository
    // load employees from backend store
  }

  private def fetchEmployeeFromDatabase(id: Int) = //..

  def act = loop(init)

  def loop(emps: Map[Int, Employee]): Unit = {
    react {
      case GetEmployee(id, client) =>
        client ! emps.getOrElse(id, fetchEmployeeFromDatabase(id))
        loop(emps)
      case AddEmployee(emp: Employee, client) =>
        client ! DbSuccess
        loop(emps + (emp.id -> emp))
    }
  }
}

case class Employee(id: Int, name: String, age: Int)
case class GetEmployee(id: Int, client: Actor)
case class AddEmployee(emp: Employee, client: Actor)

case object DbSuccess
case object DbFailure



Every repository is an actor that serves up requests through asynchronous message passing to it's clients.

There is an ongoing effort towards implementing Erlang/OTP like behavior in Scala. We can think of integrating the repository implementation with the process supervisor hierarchies that Scala-OTP will offer. Then we have seamless process management, fault tolerance, distribution etc., making it a robust data layer that can scale out easily.

Sunday, September 07, 2008

More Erlang with Disco

Over the weekend I was having a look at Disco, an open source map-reduce framework, built atop Erlang/OTP that allows users to write mapper/reducer jobs in Python. Disco does not mandate any Erlang knowledge on part of the user, who can use all the expressiveness of Python to implement map/reduce jobs.

One more addition to the stack of using Erlang as middleware.

As a programmer, you can concentrate on composing map/reduce jobs using all the expressiveness of Python. Disco master receives jobs from the clients, adds them to the job queue, and makes them run on Erlang powered clusters. Each node of the cluster runs the usual supervisor-worker hierarchies of Erlang processes that fires up the concurrent processing of all client jobs. Server crash does not affect job execution, new servers can be added on the fly and high availability can be ensured through a multiple Disco master configuration.

Disco has quite a bit of overlap of functionalities with CouchDb, one of the earliest adopters of Erlang-at-the-backend with Javascript and optionally a host of your favorite languages for view processing and REST APIs.

As I had mentioned before, Erlang as middleware is catching up ..

Wednesday, August 27, 2008

Actors in the Service Layer - Asynchronous and Concurrent

Exploit the virtues of immutability. Design your application around stateless abstractions interacting with each other through asynchronous message passing. These are some of the mantras that I have been trying to grok recently. In a typical Java EE application, we design the service layer to be maximally stateless. What this means is that each individual service has localized mutability interacting with other services on a shared-nothing basis. Asynchronous message passing offers some interesting avenues towards scaling up the throughput of such service layers in an application.

Imagine the service layer of your domain model has the following API ..


class SettlementService {
  //..
  //..
  public List<Completion> processCompletion(List<Settlement> settlements) {
    List<Completion> completions = new ArrayList<Completion>();
    for(Settlement s : settlements) {
      // completion logic : complex
      completions.add(..);
    }
    return completions;
  }
  //..
  //..
}



The method processCompletion() takes a collection of Settlement objects and for each of them does pretty complex business logic processing before returning the collection of completed settlements. We are talking about settlement of trades as part of back office processing logic of financial services.

The API was fine and worked perfectly during all demo sessions and the prototypes that we did. The client was happy that all of his settlements were being processed correctly and within perfectly acceptable limits of latency.

One day the stock exchange had a great day, the traders were happy and the market saw a phenomenal upsurge of trading volumes. Accordingly our back-office solution also received part of the sunshine and all numbers started moving up. Lots of trades to process, lots of settlements to complete, and this is exactly when latency reared it's ugly head. A detailed profiling revealed the bottleneck in the call of processCompletion() trying to complete lots of settlements synchronously and the entire user experience sucked big time.

Going Asynchronous ..

Asynchronous APIs scale, asynchronous interactions can be parallelized, asynchronous communication model encourages loose coupling between the producer and the consumer entities. Elsewhere I have documented usage of actor based modeling for asynchronous processing. Actors provide natural asynchrony, and Erlang is a great testimony to the fact that asynchronous message passing based systems scale easily, offer better approaches towards distribution, reliability and fault tolerance. And if you can parallelize your business process, then you can allocate each isolation unit to a separate actor that can run concurrently over all the cores of your system. Though this is only feasible for languages or runtime that can create processes on the cheap. Erlang does it with green threads, Scala does it through some form of continuations on the JVM.

With Scala Actors ..

In the above example, processing of every settlement completion for a unique combination of client account and security can be considered as an isolated task that can be safely allocated to an actor. The following snippet is a transformation of the above code that incorporates asynchrony using Scala actors. For brevity and simplicity of code, I have ignored the business constraint of uniqueness depending on client account and security, and allocated every completion processing to a separate actor.


class SettlementService {

  case class Settlement
  case class Completion(s: Settlement)

  def processCompletion(settlements: List[Settlement]) = {
    val buffer = new Array[Completion](settlements.length)
    val cs =
      for(idx <- (0 until settlements.length).toList) yield {
        scala.actors.Futures.future {
          buffer(idx) = doComplete(settlements(idx))
        }
      }
    buffer
  }

  def doComplete(s: Settlement): Completion = {
    // actual completion logic for a settlement
    new Completion(s)
  }
}



The above code snippet uses Scala's future method that invokes the actor behind the scenes ..


def future[T](body: => T): Future[T] = {
  case object Eval
  val a = Actor.actor {
    Actor.react {
      case Eval => Actor.reply(body)
    }
  }
  a !! (Eval, { case any => any.asInstanceOf[T] })
}



The actors are scheduled asynchronously in parallel and block the underlying thread only during the time slice when a received Scala message matches the pattern specified in the partial function that forms the react block. Scala actors are not implemented on a thread-per-actor model - hence invoking actors are way cheaper than starting JVM threads. Scala actors are threadless, event based and can be forked in thousands on a commodity machine.

What the actor model offers ..

The actor model is all about immutability and a shared-nothing paradigm and encourages a programming style where you can think of modeling your interactions in terms of immutable messages. Service layers of an application are always meant to be stateless, and the actor model makes you think more deeply on this aspect. And once you have statelessness you can achieve concurrency by distributing the stateless components amongst the actors.

Making your Infrastructure Stack Lighter ..

In many cases, asynchronous messaging libraries also help getting rid of additional heavyweight infrastructures from the application stack. In one of our applications, we were using JMS to handle priority messages. Scala and Erlang both support prioritizing messages through timeouts in the receive loop. Consider a scenario from the above application domain, where the system receives Trade messages from all over the places, that need to be processed in the back-office solution before it can be forwarded to the Settlement component. And the business rule mandates that trades for FixedIncome type of securities need to have higher priority in processing than those for Equity instruments. We can have this requirement modeled using the following Scala snippet of actor code (simplified for brevity) ..


trait Instrument
case class Equity(id: Int) extends Instrument
case class FixedIncome(id: Int) extends Instrument

case class Trade(security: Instrument)

val sx = actor {
  loop {
    reactWithin(0) {
      case Trade(i: FixedIncome) => //.. process fixed income trade
      case TIMEOUT =>
        react {
          case Trade(i: Equity) => //.. process equity trade
        }
    }
  }
}



With a timeout value of 0 in reactWithin, the function first removes all FixedIncome trade messages from the mailbox before entering the inner react loop. Hence Equity trade messages will be processed only when there are no pending FixedIncome trade messages in the mailbox of the actor.

Asynchronous messaging is here ..

People are talking about it, open source implementations of messaging protocols like AMQP and XMPP are also available now. Erlang has demonstrated how to design and implement fault tolerant, distributed systems using the shared nothing, immutable message based programming model. Scala has started imbibing many of the goodness from Erlang/OTP platforms. And Scala runs on the JVM - it is far too natural that I have been thinking of replacing most of my synchronous interfaces at the service layer with Scala actors. Recently I have been experimenting with Erlang based RabbitMQ clusters as the messaging middleware, and got the application layer to scale pretty well with Scala actors.

And Servlet 3.0 ..

The Web layer is also getting async support from the upcoming JSR 315 and Servlet 3.0 spec about to hit the ground. Thanks to some great stuff from Greg Wilkins of Webtide, async servlets will allow applications to suspend and resume request processing and enable and disable the response - a direct support for writing comet style applications. Till date we have been using Jetty continuations and Scala's event based actors for asynchronous processing .. Web applications are definitely going to get a big scaling boost with support for asynchronous servlets.

Monday, August 18, 2008

Concurrency Oriented Programming and Side Effects

In my last post on Scala actors, I had mentioned about the actor code being side-effect-free and referentially transparent. James Iry correctly pointed out that Scala react is side-effected, since the partial function that it takes processes a message which is neither a parameter to react nor a value in the lexical scope.

Sure! I should have been more careful to articulate my thoughts. The side-effect that react induces can be a problem if the messages that it processes are not immutable, do share mutable state either amongst themselves or with the actor. In fact concurrency oriented programming is all about side-effects, the better models provide more manageable semantics to abstract them away from the client programmers. Abstracting out the concurrency oriented parts of a large software system is one of the biggest challenges that the industry has been trying to solve for years. And this is where asynchronous message passing model shines, and modules like gen_server of Erlang/OTP provides convenience and correctness. The bottomline is that we can avoid unwanted side-effects and difficult to debug concurrency issues if we keep all messaages immutable without any sharing of mutable state. Thanks James for correcting the thought!

In both Scala and Erlang, the underlying actor model has to deal with concurrency explicitly, manage synchronization of actor mailboxes and deal with issues of message ordering and potential dead- or live-lock problems. If we were to write the threaded versions of the actor code ourselves, we would need to manage the stateful mailboxes of individual actors as blocking queues. With Scala's actor model, this pattern is subsumed within the implementation, thereby ensuring racefree communication between concurrent actors.

Once we play to the rules of the game, we need not have to bother about the side-effect that react induces.

Monday, August 11, 2008

Asynchronous, Functional and automatically Concurrent

The following code fragment is from an earlier post on using Scala actors and AMQP. I thought I would bring this snippet up once again to highlight some of the goodness that functional Scala offers in modeling actor model of concurrent computation.


import scala.actors.Actor

case class Trade(id: Int, security: String, principal: Int, commission: Int)
case class TradeMessage(message: Trade)
case class AddListener(a: Actor)

class TradingService extends Actor {

  def act = loop(Nil)

  def loop(traders: List[Actor]) {
    react {
    case AddListener(a) => loop(:: traders)
    case msg@TradeMessage(t) => traders.foreach(! msg); loop(traders)
    case _ => loop(traders)
    }
  }
}



An implementation of the Observer design pattern using message passing. Interested traders can register as observers and observe every trade that takes place. But without any mutable state for maintaining the list of observers. Not a very familiar paradigm to the programmers of an imperative language. The trick is to have the list of observers as an argument to the loop() function which is tail called.

Nice .. asynchronous, referentially transparent, side-effect-free functional code. No mutable state, no need of explicit synchronization, no fear of race conditions or deadlocks, since no shared data are being processed concurrently by multiple threads of execution.

Monday, August 04, 2008

Erlang as middleware

Delicious is also using Erlang. Well, that's yet another addition to the Erlangy list of Facebook, SimpleDB, CouchDB, Twitter and many more. All these applications/services rely on the intrinsic scalability of Erlang as a platform. RabbitMQ provides an implementation of AMQP based on Erlang, ejabberd, an XMPP implementation is also Erlang based. EngineYard is also betting on Erlang for Vertebrae, its platform for Cloud Computing. It's like Erlang is carving out it's own niche as the dominant choice of service based backends.

I can make my application scale using distributed hashmap technologies of memcached or in-process JVM clustering techniques of Terracotta or a host of other techniques that treat distribution and scalability as a concern separate from the core application design. But with Erlang/OTP, I start with shared nothing concurrency oriented process design, which can naturally be distributed across the cores of your deployment server. What is a module in the codebase can be made to map to a process in the runtime, instances of which can be distributed transparently to the nodes of your cluster.

Why Erlang ?

Erlang is naturally concurrent, with ultralightweight processes based on green threads that can be spawned in millions on a cluster of commodity hardware. As a functional language, Erlang applications are designed as shared nothing architectures that interact with each other through asynchronous message passing primitives - as if the whole code can be mathematically analyzed. This is unlike an imperative language runtime that offers shared state concurrency through threads and mutexes. Erlang runtime offers dynamic hotswapping of code, you can change code on-the-fly, converting your application to a non stop system. Finally Erlang processes can be organized into supervisor hierarchies that manage the lifetimes of their child processes and automatically restart in case of exceptions and failures. And almost all of these come out of the box through the goodness of platforms like OTP.

But do we find enough Erlang programmers ? Well, the syntax .. umm ..

Isn't your OS mainstream ?

Then why don't you find chores of developers programming with the kernel APIs that your OS publishes ? The OS offers the service which developers use everyday when they open up a host of windows, manage their filesystems, send out an IM or open up the browser to get the latest quotes on their tickers. And all these, being completely oblivious of how the kernel handles scheduling of native threads to serve up your long running requests.

Erlang is becoming mainstream in the same context.

I do not need to know a bit of Erlang to design my trading service that can process millions of messages from my AMQP endpoints in real time. In fact while prototyping for the next version of a trading back-office system, I cooked up all my services using Scala actors, that happily could use RabbitMQ's Erlang based queue and exchange implementation through well-published Java APIs of the client.

I can still architect scalable websites that need not poll Flickr 3 million times a day to fetch 6000 updates, without an iota of Erlang awareness. The technology is called XMPP application server, which scaffolds all the Erlang machinery underneath while exposing easy-to-use client interfaces in your favorite programming language ..

Erlang is becoming mainstream as a middleware service provider. And, one could feel the buzz in OSCON 2008.

Monday, July 14, 2008

Scaling out messaging applications with Scala Actors and AMQP

We have been sandboxing various alternatives towards scaling out Java/JMS based services that we implemented for our clients quite some time back. The services that form the stack comprise of dispatchers and routers meant to handle heavy load and perform heavy duty processing on a huge number of trade messages streaming in from the front and middle offices. I have been exploring lots of options including some of the standard ones like grid based distribution of processing and some wildly insane options like using Erlang/OTP. I knew Erlang/OTP is a difficult sell to a client, though we got some amazing results using OTP and mnesia over a clustered intranet. I was also looking at clustered Scala actors as another option, but I guess the Terracotta implementation is not yet production ready and would be a more difficult sell to clients.

Over the last week, I happened to have a look at RabbitMQ. It is based on Erlang/OTP and implements AMQP, a protocol, built on open standards, designed by the financial industry to replace their existing proprietary message queues. AMQP based implementations create full functional interoperability between conforming clients and messaging middleware servers (also called "brokers").

People have been talking about messaging as the front-runners in implementing enterprise architectures. And RabbitMQ is based on the best of the implementations, that, messaging over a set of clustered nodes, has to offer. Erlang's shared-nothing process hierarchies, extreme pattern matching capabilities and wonderful bit comprehension based binary data handling implement high performance reliable messaging with almost obscene scalability. And with RabbitMQ, the wonderful part is that you can hide your Erlang machine completely from your application programmers, who can still write Java classes to blast bits across the wire using typical message queueing programming paradigms.

AMQP (and hence RabbitMQ) defines the set of messaging capabilities as a set of components that route and store messages and a wire level protocol that defines the interaction between the client and the messaging services. The application has to define the producers and consumers for the messaging engines. And in order to ensure linear scalability, these components also need to be scalable enough to feed the Erlang exchange with enough bits to chew on and harness the full power of OTP.

Linear scalability on the JVM .. enter Scala actors ..

Over the weekend I had great fun with Scala actors churning out messages over AMQP endpoints, serializing trade objects at one end, and dispatching them to the subscribers at the other end for doing necessary trade enrichment calculations. Here are some snippets (simplified for brevity) of the quick and dirty prototype that I cooked up. Incidentally Lift contains actor-style APIs that allows you to send and receive messages from an AMQP broker, and the following prototype uses the same interfaces ..

The class TradeDispatcher is a Scala actor that listens as an AMQP message endpoint. It manages a list of subscribers to the trade message and also sends AMQP messages coming in to the queue/exchange to the list of observers.


// message for adding observers
case class AddListener(a: Actor)

// The trade object that needs to be serialized
@serializable
case class Trade(ref: String, security: String, var value: Int)

case class TradeMessage(message: Trade)

// The dispatcher that listens over the AMQP message endpoint
class TradeDispatcher(cf: ConnectionFactory, host: String, port: Int)
    extends Actor {

  val conn = cf.newConnection(host, port)
  val channel = conn.createChannel()
  val ticket = channel.accessRequest("/data")

  // set up exchange and queue
  channel.exchangeDeclare(ticket, "mult", "direct")
  channel.queueDeclare(ticket, "mult_queue")
  channel.queueBind(ticket, "mult_queue", "mult", "routeroute")

  // register consumer
  channel.basicConsume(ticket, "mult_queue", false, new TradeValueCalculator(channel, this))

  def act = loop(Nil)

  def loop(as: List[Actor]) {
    react {
    case AddListener(a) => loop(:: as)
    case msg@TradeMessage(t) => as.foreach(! msg); loop(as)
    case _ => loop(as)
    }
  }
}



and here is an actor that gets messages from upstream and publishes them to the AMQP exchange ..


class TradeMessageGenerator(cf: ConnectionFactory, host: String,
         port: Int, exchange: String, routingKey: String) extends Actor {

  val conn = cf.newConnection(host, port)
  val channel = conn.createChannel()
  val ticket = channel.accessRequest("/data")

  def send(msg: Trade) {

    val bytes = new ByteArrayOutputStream
    val store = new ObjectOutputStream(bytes)
    store.writeObject(msg)
    store.close

    // publish to exchange
    channel.basicPublish(ticket, exchange, routingKey, null, bytes.toByteArray)
  }

  def act = loop

  def loop {
    react {
      case TradeMessage(msg: Trade) => send(msg); loop
    }
  }
}



The next step is to design the consumer that reads from the exchange and does some business processing. Here the consumer (TradeValueCalculator) does valuation of the trade and has already been registered with the dispatcher above. Then it passes the message back to the dispatcher for relaying to the interested observers. Note that the TradeDispatcher has already passed itself as the actor while registering the object TradeValueCalculator as consumer callback in the snippet above (class TradeDispatcher).


class TradeValueCalculator(channel: Channel, a: Actor)
    extends DefaultConsumer(channel) {

  override def handleDelivery(tag: String, env: Envelope,
               props: AMQP.BasicProperties, body: Array[byte]) {

    val routingKey = env.getRoutingKey
    val contentType = props.contentType
    val deliveryTag = env.getDeliveryTag
    val in = new ObjectInputStream(new ByteArrayInputStream(body))

    // deserialize
    var t = in.readObject.asInstanceOf[Trade]

    // invoke business processing logic
    t.value = computeTradeValue(...)

    // send back to dispatcher for further relay to
    // interested observers
    a ! TradeMessage(t)

    channel.basicAck(deliveryTag, false)
  }
}



I have not yet done any serious benchmarking. But the implementation, on its face, looks wicked cool. Erlang at the backend, for high performance reliable messaging being throttled out by Scala actors in the application layer. Every actor opens up a new channel - the channel-per-thread model that AMQP encourages for multi-threaded client applications and scales so well in RabbitMQ.

Integrating the above classes into a small application prototype is not that difficult. Here is a small service class that uses the above framework classes to have the scala actors flying to talk to RabbitMQ ..


class SampleTradeListener {
  val params = new ConnectionParameters
  params.setUsername("guest")
  params.setPassword("guest")
  params.setVirtualHost("/")
  params.setRequestedHeartbeat(0)

  val factory = new ConnectionFactory(params)
  val amqp = new TradeDispatcher(factory, "localhost", 5672)
  amqp.start

  class TradeListener extends Actor {
    def act = {
      react {
      case msg@TradeMessage(contents: Trade) =>
        println("received trade: " + msg.message); act
      }
    }
  }
  val tradeListener = new TradeListener()
  tradeListener.start
  amqp ! AddListener(tradeListener)
}



Instantiate the above SampleTradeListener class and write a sample message generator facade that sends trdae messages to the consumer TradeMessageGenerator designed above.

Meanwhile here are some other related thoughts behind an AMQP based architecture ..

  • One of the areas which always makes me sceptical about linear scalability of applications is the single point of dependency on the relational database store.

  • If the application involves heavy relational database processing, does it make sense to make use of mnesia's easy distribution and fault tolerance capabilities to achieve overall scalability of the application ? Harness the power of durable and reliable transacted message processing of RabbitMQ and integrate RDBMS storage through write-behind log of AMQP activities.

  • Financial services solutions like back office systems typically need to talk to lots of external systems like Clearing Corporations, Stock Exchanges, Custodians etc. I think AMQP is more meaningful when you have the end points under your control and operate over a low latency, high bandwidth wire. When we talk about messaging over the internet (high latency, low bandwidth), possibly XMPP or Atompub is a better option. RabbitMQ has also released an XMPP gateway, for exposing a RabbitMQ instance to the global XMPP network through an ejabberd extension module. Looks like it's going to be messaging all the way down and up.