Showing posts with label database. Show all posts
Showing posts with label database. Show all posts

Sunday, October 04, 2009

Pluggable Persistent Transactors with Akka

NoSql is here. Yes, like using multiple programnming languages, we are thinking in terms of using the same paradigm with storage too. And why not? If we can use an alternate language to be more expressive for a specific problem, why not use an alternate form of storage that is a better fit for your requirement?

More and more projects are using alternate forms of storage for persistence of the various forms of data that the application needs to handle. Of course relational databases have their very own place in this stack - the difference is that people today are not being pedantic about their use. And not using the RDBMS as the universal hammer for every nail that they see in the application.

Consider an application that needs durability for transactional data structures. I want to model a transactional banking system, basic debit credit operations, with a message based model. But the operations have to be persistent. The balance needs to be durable and all transactions need to be persisted on the disk. It doesn't matter what structures you store underneath - all I need is some key/value interface that allows me to store the transactions and the balances keyed by the transaction id. I don't even need to bother what form of storage I use at the backend. It can be any database, any key-value store, Terracotta or anything. Will you give me the flexibility to make the storage pluggable? Well, that's a bonus!

Enter Akka .. and its pluggable persistence layer that you can nicely marry to its message passing actor based interface. Consider the following messages for processing debit/credit operations ..


case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: BigInt, failer: Actor)
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor)
case class Credit(accountNo: String, amount: BigInt)
case object LogSize



In the above messages, the failer actor is used to report fail operations in case the debit fails. Also we want to have all of the above operations as transactional, which we can make declaratively in Akka. Here's the basic actor definition ..

class BankAccountActor extends Actor {
  makeTransactionRequired
  private val accountState = 
    TransactionalState.newPersistentMap(MongoStorageConfig())
  private val txnLog = 
    TransactionalState.newPersistentVector(MongoStorageConfig())
  //..
}



  • makeTransactionRequired makes the actor transactional

  • accountState is a persistent Map that plugs in to a MongoDB based storage, as is evident from the config parameter. In real life application, this will be further abstracted from a configuration file. Earlier I had blogged about the implementation of the MongoDB layer for Akka persistence. accountState offers the key/value interface that will be used by the actor to maintain the durable snapshot of all balances.

  • txnLog is a persistent vector, once again backed up by a MongoDB storage and stores all the transaction logs that occurs in the system


Let us now look at the actor interface that does the message receive and process the debit/credit operations ..

class BankAccountActor extends Actor {
  makeTransactionRequired
  private val accountState = 
    TransactionalState.newPersistentMap(MongoStorageConfig())
  private val txnLog = 
    TransactionalState.newPersistentVector(MongoStorageConfig())

  def receive: PartialFunction[Any, Unit] = {
    // check balance
    case Balance(accountNo) =>
      txnLog.add("Balance:" + accountNo)
      reply(accountState.get(accountNo).get)

    // debit amount: can fail
    case Debit(accountNo, amount, failer) =>
      txnLog.add("Debit:" + accountNo + " " + amount)
      val m: BigInt =
      accountState.get(accountNo) match {
        case None => 0
        case Some(v) => {
          val JsNumber(n) = v.asInstanceOf[JsValue]
          BigInt(n.toString)
        }
      }
      accountState.put(accountNo, (- amount))
      if (amount > m)
        failer !! "Failure"
      reply(- amount)

    //..
  }
}


Here we have the implementation of two messages -

  • Balance reports the current balance and

  • Debit does a debit operation on the balance


Note that the interfaces that these implementations use is in no way dependent on the MongoDB specific APIs. Akka offers a uniform key/value API set across all supported persistent storage. And each of the above pattern matched message processing fragments offer transaction semantics. This is pluggability!

Credit looks very similar to Debit. However, a more interesting use case is the MultiDebit operation that offers a transactional interface. Just like your relational database's ACID semantics, the transactional semantics of Akka offers atomicity over this message. Either the whole MultiDebit will pass or it will be rollbacked.


class BankAccountActor extends Actor {

  //..
  def receive: PartialFunction[Any, Unit] = {

    // many debits: can fail
    // demonstrates true rollback even if multiple puts have been done
    case MultiDebit(accountNo, amounts, failer) =>
      txnLog.add("MultiDebit:" + accountNo + " " + amounts.map(_.intValue).foldLeft(0)(+ _))
      val m: BigInt =
      accountState.get(accountNo) match {
        case None => 0
        case Some(v) => BigInt(v.asInstanceOf[String])
      }
      var bal: BigInt = 0
      amounts.foreach {amount =>
        bal = bal + amount
        accountState.put(accountNo, (- bal))
      }
      if (bal > m) failer !! "Failure"
      reply(- bal)
    
    //..
  }
}



Now that we have the implementation in place, let's look at the test cases that exercise them ..

First a successful debit test case. Note how we have a separate failer actor that reports failure of operations to the caller.

@Test
def testSuccessfulDebit = {
  val bactor = new BankAccountActor
  bactor.start
  val failer = new PersistentFailerActor
  failer.start
  bactor !! Credit("a-123", 5000)
  bactor !! Debit("a-123", 3000, failer)
  val b = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
  val JsNumber(n) = b
  assertEquals(BigInt(2000), BigInt(n.toString))

  bactor !! Credit("a-123", 7000)
  val b1 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
  val JsNumber(n1) = b1
  assertEquals(BigInt(9000), BigInt(n1.toString))

  bactor !! Debit("a-123", 8000, failer)
  val b2 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
  val JsNumber(n2) = b2
  assertEquals(BigInt(1000), BigInt(n2.toString))
  assertEquals(7, (bactor !! LogSize).get)
}


And now the interesting MultiDebit that illustrates the transaction rollback semantics ..

@Test
def testUnsuccessfulMultiDebit = {
  val bactor = new BankAccountActor
  bactor.start
  bactor !! Credit("a-123", 5000)
  val b = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
  val JsNumber(n) = b
  assertEquals(BigInt(5000), BigInt(n.toString))

  val failer = new PersistentFailerActor
  failer.start
  try {
    bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
    fail("should throw exception")
  } catch { case e: RuntimeException => {}}

  val b1 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
  val JsNumber(n1) = b1
  assertEquals(BigInt(5000), BigInt(n1.toString))

  // should not count the failed one
  assertEquals(3, (bactor !! LogSize).get)
}


In the snippet above, the balance remains at 5000 when the debit fails while processing the final amount of the list passed to MultiDebit message.

Relational database will always remain for the use case that it serves the best - persistence of data that needs a true relational model. NoSQL is gradually making its place in the application stack for the complementary set of use cases that need a much loosely coupled model, a key/value database or a document oriented database. Apart from easier manageability, another big advantage using these databases is that they do not need big ceremonious ORM layers between the application model and the data model. This is because what you see is what you store (WYSIWYS), there is no paradigm mismatch that needs to be bridged.

Monday, March 09, 2009

Bloom Filters - optimizing network bandwidth

Bloom Filter is one of the coolest data structures that qualify as being elegant as well as incredibly useful. I had a short post on a cool application of bloom filters in front ending access to disk based data to achieve improved throughput in query processing. I didn't know Oracle uses bloom filters for processing parallel joins and join filter pruning. In case of parallel joins, the idea is quite simple ..

  1. Each slave process prepares a bloom filter for the join condition that it is processing

  2. It then passes on the bloom filter to the other slave processes, which can apply the filter to its own selected set of records before passing on the final set to the join coordinator


Remember each of the above processes may run in a distributed environment - hence the above technique leads to less data being transported across nodes, thereby saving in network bandwidth for some extra CPU cycles. This paper describes all of these in full details with illustrative examples.

This idea of serializing bloom filters instead of data set has been used quite extensively in load balancing MapReduce operations to minimize intermediate results before sending everything across the network for final aggregation. In case of processing distributed join operations, we may need to compose multiple bloom filters to get the final dataset. Bloom joins, as they are called allow cheap serialization of filters over the wire, by employing some clever techniques like linear hash tables and multi-tier bloom filters, as described in this paper in Comonad Reader.

Bloom joins can also be used effectively in MapReduce processing with CouchDB. The map phase can produce the bloom filters, which can be joined in the reduce phase. In a recent application, I needed to store a very large list mainly for set operations. Instead of storing individual elements, I decided to store a bloom filter that nicely fit in a memcached slab. I could pull it out and do all sorts of bit operations easily and it's blinding fast. Next time you decide to distribute your huge list in Terracotta, think back - there may be a lighter weight option in distributing a bloom filter instead. There are use cases when you will be doing membership checks only and some false positives may not do much harm.

Monday, March 02, 2009

Your data model can speak different languages too

Andrej Koelewijn writes ..

  • REST is about resources. Resource is just another word for object, or record.

  • REST is also about URLs. URLs that identify resources. Just like ids can identify objects or records.

  • And, often overlooked, REST is also about links. Resources use URLs to link to other resources. Just like foreign keys can link records to other records.


and finally concludes ..

REST is a distributed data model

There is no denying the fact that REST is based on the central concept of Resources, which are addressable entities that can be manipulated by all components of the communicating network using a standardized and uniform interface. Resources abstract the underlying data model for the user, based on which the application developer can design nice REST based APIs to the external world. REST abstracts the data model, it is NOT the data model - in fact underlying data representations can change without any impact on the API.

In another related post, talking about the relational data model, he also mentions ..

This is why RDBMSes are so great: it doesn’t bind your data to a single application.

RDBMS does bind your data to the specific application model. What it abstracts from the application, is the underlying physical representation of the data. In fact it is this specific binding and the constraints that it imposes on the data model that makes it so difficult to work with semi-structured data. You design the schema upfront, define relationships and constraints along with the schema, and organize your data based on them. This is one of the reasons why we need to have separate data models for writes (normalized), queries and reports (denormalized).

In an earlier post, I had drawn upon some of my thoughts on Data 2.0, as I see it in an application today. The fact is that, data is no longer viewed as something to be abstracted in a uniform storage and drenched out using a single query language. Gone are the days when we used to think of the ORM as the grand unifier for all database platforms. Both the O and the R are fast losing ubiquity in today's application development context, at least they are not as universal as they used to be, half a decade back.

Things are surely moving towards polyglotism in the data modeling world as well. Not all of your data need to be on the relational database - distribute data to where it belongs. Sometime back Alex Miller had a post that suggested distribution and modeling of data based on lifetimes. Gone are the days when you need to persist your conversational data in the database for scaling out your application. Modern day grid platforms like Terracotta and Gigaspaces offer network attached memory that will store this data for you in the form that is much closer to the application model along with transparent clustering of your application.

Not all data need a relational model. In a typical application, there are data that are document oriented, does not need to have a fixed schema attached to it. They can be stored as key-value pairs and their primary reason for existence is to support fast, real fast inserts, updates and key based lookups. The semantics of the data inside the value is fairly opaque and does not need to have any constraints of relation with the rest of the data model. Why coerce such a simple model into one that forces you to pay the upfront tax of normalization, constraint enforcement, index rebuilding and joins. Think simple, think key value pairs. And we have lots of them being used in the application space today. Rip such data out of your relational model into lightweight transactional stores that scale easily and dynamically. Long lived persistent data can however, happily choose to stay around within the confines of your normalized relational model.

One of the significant advantages that you get out of storing data on the key/value stores is that your persistent data is now more closely mapped to the objects and classes of your application. This leads to less of a cognitive dissonance that the relational data model enforces upon the application.

But what about queries that can fetch relevant records from the key/value stores based on user defined criteria ?

Obviously primary key based fetch is not always that useful in practical applications. All distributed key/value stores provide the capability to index based on custom defined filters and in conjunction with full text search engines like Lucene, return collections of selected entries from the data store. Have a look at this article demonstrating how Sphinx, a full text search engine, can be integrated with MemcacheDB, a distributed key-value store which conforms to the memcached protocol and uses Berkeley DB as its storage back-end.

CouchDB provides an interesting view model that offers the capability to aggregate and query documents stored in the database through the map-reduce paradigm. Users can define the computation to model the query using map functions and subsequent aggregates using the reduce function that make relevant data available to the user.

So, now that your data model is polyglotic, there can be situations where you may need to synchronize data across multiple storage engines. Here is an example ..

In a real life trading application, huge volume of trade messages need to be processed by the back office within a very short time interval. This needs scaling out to throttle at a rate that can be achieved more easily using the light payloads that schemaless, amorphous key value stores offer than traditional relational databases. You just need to validate the message and store it against the trade reference number. Also, these peer based distributed key/value databases offer easy bi-directional replication and updates to shared data in disconnected mode, which can make offline message processing, followed by synchronization later on, quite affordable. Hence it makes every sense to offload this message processing from the database and deploy clusters of key/value stores.

However, there is a second level processing that needs to be done, which updates all books and accounts and customer balances based on each individual trade. And this information needs to be stored as a system of record for various queries, reports, audit trails and other subsequent downstream processing. One way this can be achieved is by pushing the second level of processing to scheduled queue jobs that asynchronously operate on the key value store, do all relevent heavy lifting with the processing of data and finally pushing the balances to the back end relational database. Currently we are doing everything as one synchronous ACID transaction against an RDBMS. Distribute the model based on data lifetime, rely on asynchronous processing, and what you get is eventual consistency with the goodness of better scalability.

A few days back I was listening to this Scaling DIGG episode with Joe Stump. He mentioned repeatedly that it's not the language, the bottleneck is the IO and the latency resulting from the IO. And the database is the single most bottleneck in your architecture. More specifically, it is NOT the database per se, but the synchronous communication between the application tier and the persistent data layer. This is the reason why DIGG is architected around Gearman, MemcacheDB and MogileFS. However, not all sites need the scalability of DIGG. But even with applications that need a fraction of scalability compared to DIGG, architecting it away from strictly synchronous ACID transaction oriented data sources is the way to go.

Sunday, March 01, 2009

Preemptive Bloom Filter

Michael Mitzenmacher describes a great trick with bloom filter and hash table that has been used in Longest Prefix Matching techniques to improve performance by reducing the number of dependent memory lookups.

Just in case you are not familiar with bloom filters, have a look at the great introduction in Wikipedia ..
The Bloom filter, conceived by Burton H. Bloom in 1970, is a space-efficient probabilistic data structure that is used to test whether an element is a member of a set. False positives are possible, but false negatives are not. Elements can be added to the set, but not removed (though this can be addressed with a counting filter). The more elements that are added to the set, the larger the probability of false positives.


Suppose you are doing lookups of large keys in a slow hash table, and the lookups are primarily for negative search, i.e. in most of the cases, the lookup will fail. In such cases you can front end the (slow) hash table lookup with a bloom filter for the keys in a fast memory. Since most of your lookups are likely to fail, you will potentially save a lot by avoiding access to slower memory based hash table. Of course the bloom filter can return false positives (remember, it never gives false negatives), where you will still have to lookup the hash table.

I was wondering if the same technique can be generalized for lookups in database tables. For all sets of indexes that will be typically used for searching the table, we can have separate bloom filters in fast memory, which will lead to lesser and lesser access of disk based database tables. Of course this will work meaningfully for tables on which lookup failures outweigh successes, as Michael gives some examples of URLs on blacklist or dangerous packet signatures.

A nice trick to have up your sleeve ..

Thursday, December 04, 2008

Data 2.0 - more musings

Martin Fowler writes ..

"If you switch your integration protocol from SQL to HTTP, it now means you can change databases from being IntegrationDatabases to ApplicationDatabases. This change is profound. In the first step it supports a much simpler approach to object-relational mapping - such as the approach taken by Ruby on Rails. But furthermore it breaks the vice-like grip of the relational data model. If you integrate through HTTP it no longer matters how an application stores its own data, which in turn means an application can choose a data model that makes sense for its own needs."

and echoes similar sentiments that I expressed here.

Today's application stack has started taking different views on using the database, particularly relational database, as a persistent store. The driving force is, of course, to reduce the semantic distance between the application model and the data model.

For the case Martin mentions above, the data model for the application need not be relational at all. We are seeing more and more cases where applications need not bolt the data that it operates on, forcibly into the clutches of the relational paradigm. In other words, the data remains much closer to the application domain model. Instead of splitting a domain abstraction into multiple relational tables and using the SQL glue to join them for queries and reports, we can directly operate on a semantically richer persistent abstraction. Storage options have evolved, RAM is the new disk, creation of RAM clusters is now easier than creation of disk clusters. And technologies like Map/Reduce have enabled easy parallelization of data processing on commodity hardware.

I have been hacking around with CouchDB for some time now. This is one platform that promises an HTTP based interface to the entire application stack. It's a server and a database, and the best part of it is that, the database driver is HTTP. JSON based storage of application documents, REST APIs, map/reduce based queries in Javascript - no schema, SQL, no database constraints .. your application data is semantically closer to the domain model. Loosely coupled document storage, multi-version concurrency control, easy replication - try replacing the relational database with this stack if it fits your application model.

Another train of thoughts that positions the database in a new light within an application stack, is the upcoming grid computing platforms on the JVM. Java as a platform is growing fast and grid vendors like Terracotta and Gigaspaces are catching up to this trend. They offer coherently clustered in-memory grid that enables a POJO based programming model without any synchronous interaction with the persistent data store. The application layer can program to the POJO based Network Attached Memory of Terracotta, using standard in-memory data structures. Terracotta offers an interface, which, if you implement, will be called to flush your objects to the database asynchronously in a write-behind thread. One other way to reduce the impedance mismatch of your domain model from the data model.