Showing posts with label NOSQL. Show all posts
Showing posts with label NOSQL. Show all posts

Thursday, July 5, 2012

Couchbase Architecture

After receiving a lot of good feedback and comment on my last blog on MongoDb, I was encouraged to do another deep dive on another popular document oriented db; Couchbase.

I have been a long-time fan CouchDb and has wrote a blog on it many years ago.  After it merges with Membase, I am very excited to take a deep look into it again.

Couchbase is the merge of two popular NOSQL technologies: 
  • Membase, which provides persistence, replication, sharding to the high performance memcached technology
  • CouchDB, which pioneers the document oriented model based on JSON
Like other NOSQL technologies, both Membase and CouchDB are built from the ground up on a highly distributed architecture, with data shard across machines in a cluster.  Built around the Memcached protocol, Membase provides an easy migration to existing Memcached users who want to add persistence, sharding and fault resilience on their familiar Memcached model.  On the other hand, CouchDB provides first class support for storing JSON documents as well as a simple RESTful API to access them.  Underneath, CouchDB also has a highly tuned storage engine that is optimized for both update transaction as well as query processing.  Taking the best of both technologies, Membase is well-positioned in the NOSQL marketplace.

Programming model

Couchbase provides client libraries for different programming languages such as Java / .NET / PHP / Ruby / C / Python / Node.js

For read, Couchbase provides a key-based lookup mechanism where the client is expected to provide the key, and only the server hosting the data (with that key) will be contacted.

Couchbase also provides a query mechanism to retrieve data where the client provides a query (for example, range based on some  secondary key) as well as the view (basically the index).  The query will be broadcasted to all servers in the cluster and the result will be merged and sent back to the client.

For write, Couchbase provides a key-based update mechanism where the client sends in an updated document with the key (as doc id).  When handling write request, the server will return to client’s write request as soon as the data is stored in RAM on the active server, which offers the lowest latency for write requests.

Following is the core API that Couchbase offers.  (in an abstract sense)

# Get a document by key

doc = get(key)

# Modify a document, notice the whole document 
#   need to be passed in

set(key, doc)

# Modify a document when no one has modified it 
#  since my last read

casVersion = doc.getCas()
cas(key, casVersion, changedDoc)

# Create a new document, with an expiration time 
#   after which the document will be deleted

addIfNotExist(key, doc, timeToLive)

# Delete a document

delete(key)

# When the value is an integer, increment the integer

increment(key)

# When the value is an integer, decrement the integer

decrement(key)

# When the value is an opaque byte array, append more 
#  data into existing value 

append(key, newData)

# Query the data 

results = query(viewName, queryParameters)

In Couchbase, document is the unit of manipulation.  Currently Couchbase doesn't support server-side execution of custom logic.  Couchbase server is basically a passive store and unlike other document oriented DB, Couchbase doesn't support field-level modification.  In case of modifying documents, client need to retrieve documents by its key, do the modification locally and then send back the whole (modified) document back to the server.  This design tradeoff network bandwidth (since more data will be transferred across the network) for CPU (now CPU load shift to client).

Couchbase currently doesn't support bulk modification based on a condition matching.  Modification happens only in a per document basis.  (client will save the modified document one at a time).

Transaction Model

Similar to many NOSQL databases, Couchbase’s transaction model is primitive as compared to RDBMS.  Atomicity is guaranteed at a single document and transactions that span update of multiple documents are unsupported.  To provide necessary isolation for concurrent access, Couchbase provides a CAS (compare and swap) mechanism which works as follows …
  • When the client retrieves a document, a CAS ID (equivalent to a revision number) is attached to it.
  • While the client is manipulating the retrieved document locally, another client may modify this document.  When this happens, the CAS ID of the document at the server will be incremented.
  • Now, when the original client submits its modification to the server, it can attach the original  CAS ID in its request.  The server will verify this ID with the actual ID in the server.  If they differ, the document has been updated in between and the server will not apply the update.
  • The original client will re-read the document (which now has a newer ID) and re-submit its modification. 
Couchbase also provides a locking mechanism for clients to coordinate their access to documents.  Clients can request a LOCK on the document it intends to modify, update the documents and then releases the LOCK.  To prevent a deadlock situation, each LOCK grant has a timeout so it will automatically be released after a period of time.

Deployment Architecture

 In a typical setting, a Couchbase DB resides in a server clusters involving multiple machines.  Client library will connect to the appropriate servers to access the data.  Each machine contains a number of daemon processes which provides data access as well as management functions.


The data server, written in C/C++, is responsible to handle get/set/delete request from client.  The Management server, written in Erlang, is responsible to handle the query traffic from client, as well as manage the configuration and communicate with other member nodes in the cluster.

Virtual Buckets

The basic unit of data storage in Couchbase DB is a JSON document (or primitive data type such as int and byte array) which is associated with a key.  The overall key space is partitioned into 1024 logical storage unit called "virtual buckets" (or vBucket).  vBucket are distributed across machines within the cluster via a map that is shared among servers in the cluster as well as the client library.


High availability is achieved through data replication at the vBucket level.  Currently Couchbase supports one active vBucket zero or more standby replicas hosted in other machines.  Curremtly the standby server are idle and not serving any client request.  In future version of Couchbase, the standby replica will be able to serve read request.

Load balancing in Couchbase is achieved as follows:
  • Keys are uniformly distributed based on the hash function
  • When machines are added and removed in the cluster.  The administrator can request a redistribution of vBucket so that data are evenly spread across physical machines.

Management Server

Management server performs the management function and co-ordinate the other nodes within the cluster.  It includes the following monitoring and administration functions

Heartbeat: A watchdog process periodically communicates with all member nodes within the same cluster to provide Couchbase Server health updates.

Process monitor: This subsystem monitors execution of the local data manager, restarting failed processes as required and provide status information to the heartbeat module.

Configuration manager: Each Couchbase Server node shares a cluster-wide configuration which contains the member nodes within the cluster, a vBucket map.  The configuration manager pull this config from other member nodes at bootup time.

Within a cluster, one node’s Management Server will be elected as the leader which performs the following cluster-wide management function
  • Controls the distribution of vBuckets among other nodes and initiate vBucket migration
  • Orchestrates the failover and update the configuration manager of member nodes
If the leader node crashes, a new leader will be elected from surviving members in the cluster.

When a machine in the cluster has crashed, the leader will detect that and notify member machines in the cluster that all vBuckets hosted in the crashed machine is dead.  After getting this signal, machines hosting the corresponding vBucket replica will set the vBucket status as “active”.  The vBucket/server map is updated and eventually propagated to the client lib.  Notice that at this moment, the replication level of the vBucket will be reduced.  Couchbase doesn’t automatically re-create new replicas which will cause data copying traffic.  Administrator can issue a command to explicitly initiate a data rebalancing.  The crashed machine, after reboot can rejoin the cluster.  At this moment, all the data it stores previously will be completely discard and the machine will be treated as a brand new empty machine.

As more machines are put into the cluster (for scaling out), vBucket should be redistributed to achieve a load balance.  This is currently triggered by an explicit command from the administrator.  Once receive the “rebalance” command, the leader will compute the new provisional map which has the balanced distribution of vBuckets and send this provisional map to all members of the cluster.

To compute the vBucket map and migration plan, the leader attempts the following objectives:
  • Evenly distribute the number of active vBuckets and replica vBuckets among member nodes.
  • Place the active copy and each replicas in physically separated nodes.
  • Spread the replica vBucket as wide as possible among other member nodes.
  • Minimize the amount of data migration
  • Orchestrate the steps of replica redistribution so no node or network will be overwhelmed by the replica migration.
Once the vBucket maps is determined, the leader will pass the redistribution map to each member in the cluster and coordinate the steps of vBucket migration.  The actual data transfer happens directly between the origination node to the destination node.

Notice that since we have generally more vBuckets than machines.  The workload of migration will be evenly distributed automatically.  For example, when new machines are added into the clusters, all existing machines will migrate some portion of its vBucket to the new machines.  There is no single bottleneck in the cluster.

Throughput the migration and redistribution of vBucket among servers, the life cycle of a vBucket in a server will be in one of the following states
  • “Active”:  means the server is hosting the vBucket is ready to handle both read and write request
  • “Replica”: means the server is hosting the a copy of the vBucket that may be slightly out of date but can take read request that can tolerate some degree of outdate.
  • “Pending”: means the server is hosting a copy that is in a critical transitional state.  The server cannot take either read or write request at this moment.
  • “Dead”: means the server is no longer responsible for the vBucket and will not take either read or write request anymore.

Data Server

Data server implements the memcached APIs such as get, set, delete, append, prepend, etc. It contains the following key datastructure:
  • One in-memory hashtable (key by doc id) for the corresponding vBucket hosted.  The hashtable acts as both a metadata for all documents as well as a cache for the document content.  Maintain the entry gives a quick way to detect whether the document exists on disk.
  • To support async write, there is a checkpoint linkedlist per vBucket holding the doc id of modified documents that hasn't been flushed to disk or replicated to the replica.

To handle a "GET" request
  • Data server routes the request to the corresponding ep-engine responsible for the vBucket.
  • The ep-engine will lookup the document id from the in-memory hastable.  If the document content is found in cache (stored in the value of the hashtable), it will be returned.  Otherwise, a background disk fetch task will be created and queued into the RO dispatcher queue.
  • The RO dispatcher then reads the value from the underlying storage engine and populates the corresponding entry in the vbucket hash table.
  • Finally, the notification thread notifies the disk fetch completion to the memcached pending connection, so that the memcached worker thread can revisit the engine to process a get request.
To handle a "SET" request,  a success response will be returned to the calling client once the updated document has been put into the in-memory hashtable with a write request put into the checkpoint buffer.  Later on the Flusher thread will pickup the outstanding write request from each checkpoint buffer, lookup the corresponding document content from the hashtable and write it out to the storage engine.

Of course, data can be lost if the server crashes before the data has been replicated to another server and/or persisted.  If the client requires a high data availability across different crashes, it can issue a subsequent observe() call which blocks on the condition  that the server persist data on disk, or the server has replicated the data to another server (and get its ACK).  Overall speaking, the client has various options to tradeoff data integrity with throughput.

Hashtable Management

To synchronize accesses to a vbucket hash table, each incoming thread needs to acquire a lock before accessing a key region of the hash table. There are multiple locks per vbucket hash table, each of which is responsible for controlling exclusive accesses to a certain ket region on that hash table. The number of regions of a hash table can grow dynamically as more documents are inserted into the hash table.

To control the memory size of the hashtable, Item pager thread will monitor the memory utilization of the hashtable.  Once a high watermark is reached, it will initiate an eviction process to remove certain document content from the hashtable.  Only entries that is not referenced by entries in the checkpoint buffer can be evicted because otherwise the outstanding update (which only exists in hashtable but not persisted) will be lost.

After eviction, the entry of the document still remains in the hashtable; only the document content of the document will be removed from memory but the metadata is still there.  The eviction process stops after reaching the low watermark.  The high / low water mark is determined by the bucket memory quota. By default, the high water mark is set to 75% of bucket quota, while the low water mark is set to 60% of bucket quota. These water marks can be configurable at runtime.

In CouchDb, every document is associated with an expiration time and will be deleted once it is expired.  Expiry pager is responsible for tracking and removing expired document from both the hashtable as well as the storage engine (by scheduling a delete operation).

Checkpoint Manager
Checkpoint manager is responsible to recycle the checkpoint buffer, which holds the outstanding update request, consumed by the two downstream processes, Flusher and TAP replicator.  When all the request in the checkpoint buffer has been  processed, the checkpoint buffer will be deleted and a new one will be created.

TAP Replicator
TAP replicator is responsible to handle vBucket migration as well as vBucket replication from active server to replica server.  It does this by propagating the latest modified document to the corresponding replica server.

At the time a replica vBucket is established, the entire vBucket need to be copied from the active server to the empty destination replica server as follows
  • The in-memory hashtable at the active server will be transferred to the replica server.  Notice that during this period, some data may be updated and therefore the data set transfered to the replica can be inconsistent (some are the latest and some are outdated).
  • Nevertheless, all updates happen after the start of transfer is tracked in the checkpoint buffer.
  • Therefore, after the in-memory hashtable transferred is completed, the TAP replicator can pickup those updates from the checkpoint buffer.  This ensures the latest versioned of changed documents are sent to the replica, and hence fix the inconsistency.
  • However the hashtable cache doesn’t contain all the document content.  Data also need to be read from the vBucket file and send to the replica.  Notice that during this period, update of vBucket will happen in active server.  However, since the file is appended only, subsequent data update won’t interfere the vBucket copying process.
After the replica server has caught up, subsequent update at the active server will be available at its checkpoint buffer which will be pickup by the TAP replicator and send to the replica server.

CouchDB Storage Structure

Data server defines an interface where different storage structure can be plugged-in.  Currently it supports both a SQLite DB as well as CouchDB.  Here we describe the details of CouchDb, which provides a super high performance storage mechanism underneath the Couchbase technology.

Under the CouchDB structure, there will be one file per vBucket.  Data are written to this file in an append-only manner, which enables Couchbase to do mostly sequential writes for update, and provide the most optimized access patterns for disk I/O.  This unique storage structure attributes to Couchbase’s fast on-disk performance for write-intensive applications.

The following diagram illustrate the storage model and how it is modified by 3 batch updates (notice that since updates are asynchronous, it is perform by "Flusher" thread in batches).



The Flusher thread works as follows:

1) Pick up all pending write request from the dirty queue and de-duplicate multiple update request to the same document.

2) Sort each request (by key) into corresponding vBucket and open the corresponding file

3) Append the following into the vBucket file (in the following contiguous sequence)
  • All document contents in such write request batch.  Each document will be written as [length, crc, content] one after one sequentially.
  • The index that stores the mapping from document id to the document’s position on disk (called the BTree by-id)
  • The index that stores the mapping from update sequence number to the document’s position on disk.  (called the BTree by-seq)
The by-id index plays an important role for looking up the document by its id.  It is organized as a B-Tree where each node contains a key range.  To lookup a document by id, we just need to start from the header (which is the end of the file), transfer to the root BTree node of the by-id index, and then further traverse to the leaf BTree node that contains the pointer to the actual document position on disk.

During the write, the similar mechanism is used to trace back to the corresponding BTree node that contains the id of the modified documents.  Notice that in the append-only model, update is not happening in-place, instead we located the existing location and copy it over by appending.  In other words, the modified BTree node will be need to be copied over and modified and finally paste to the end of file, and then its parent need to be modified to point to the new location, which triggers the parents to be copied over and paste to the end of file.  Same happens to its parents’ parent and eventually all the way to the root node of the BTree.  The disk seek can be at the O(logN) complexity.

The by-seq index is used to keep track of the update sequence of lived documents and is used for asynchronous catchup purposes.  When a document is created, modified or deleted, a sequence number is added to the by-seq btree and the previous seq node will be deleted.  Therefore, for cross-site replication, view index update and compaction, we can quickly locate all the lived documents in the order of their update sequence.   When a vBucket replicator asks for the list of update since a particular time, it provides the last sequence number in previous update, the system will then scan through the by-seq BTree node to locate all the document that has sequence number larger than that, which effectively includes all the document that has been modified since the last replication.

As time goes by, certain data becomes garbage (see the grey-out region above) and become unreachable in the file.  Therefore, we need a garbage collection mechanism to clean up the garbage.  To trigger this process, the by-id and by-seq B-Tree node will keep track of the data size of lived documents (those that is not garbage) under its substree.  Therefore, by examining the root BTree node, we can determine the size of all lived documents within the vBucket.  When the ratio of actual size and vBucket file size fall below a certain threshold, a compaction process will be triggered  whose job is to open the vBucket file and copy the survived data to another file.

Technically, the compaction process opens the file and read the by-seq BTree at the end of the file.  It traces the Btree all the way to the leaf node and copy the corresponding document content to the new file.  The compaction process happens while the vBucket is being updated.  However, since the file is appended only, new changes are recorded after the BTree root that the compaction has opened, so subsequent data update won’t interfere with the compaction process.  When the compaction is completed, the system need to copy over the data that was appended since the beginning of the compaction to the new file.

View Index Structure

Unlike most indexing structure which provide a pointer from the search attribute back to the document.  The CouchDb index (called View Index) is better perceived as a denormalized table with arbitrary keys and values loosely associated to the document.

Such denormalized table is defined by a user-provided map() and reduce() function.

map = function(doc) {
   …
   emit(k1, v1)
   …
   emit(k2, v2)
   …
}

reduce = function(keys, values, isRereduce) {
    if (isRereduce) {
        // Do the re-reduce only on values (keys will be null)
    } else {
        // Do the reduce on keys and values
    }
    // result must be ready for input values to re-reduce

    return result
}

Whenever a document is created, updated, deleted, the corresponding map(doc) function will be invoked (in an asynchronous manner) to generate a set of key/value pairs.  Such key/value will be stored in a B-Tree structure.  All the key/values pairs of each B-Tree node will be passed into the reduce() function, which compute an aggregated value within that B-Tree node.  Re-reduce also happens in non-leaf B-Tree nodes which further aggregate the aggregated value of child B-Tree nodes.

The management server maintains the view index and persisted it to a separate file.

Create a view index is perform by broadcast the index creation request to all machines in the cluster.  The management process of each machine will read its active vBucket file and feed each surviving document to the Map function.  The key/value pairs emitted by the Map function will be stored in a separated BTree index file.  When writing out the BTree node, the reduce() function will be called with the list of all values in the tree node.  Its return result represent a partially reduced value is attached to the BTree node.

The view index will be updated incrementally as documents are subsequently getting into the system.  Periodically, the management process will open the vBucket file and scan all documents since the last sequence number.  For each changed document since the last sync, it invokes the corresponding map function to determine the corresponding key/value into the BTree node.  The BTree node will be split if appropriate.

Underlying, Couchbase use a back index to keep track of the document with the keys that it previously emitted.  Later when the document is deleted, it can look up the back index to determine what those key are and remove them.  In case the document is updated, the back index can also be examined; semantically a modification is equivalent to a delete followed by an insert.

The following diagram illustrates how the view index file will be incrementally updated via the append-only mechanism.



Query Processing

Query in Couchbase is made against the view index.  A query is composed of the view name, a start key and end key.  If the reduce() function isn’t defined, the query result will be the list of values sorted by the keys within the key range.  In case the reduce() function is defined, the query result will be a single aggregated value of all keys within the key range.



If the view has no reduce() function defined, the query processing proceeds as follows:
  • Client issue a query (with view, start/end key) to the management process of any server (unlike a key based lookup, there is no need to locate a specific server).
  • The management process will broadcast the request to other management process on all servers (include itself) within the cluster.
  • Each management process (after receiving the broadcast request) do a local search for value within the key range by traversing the BTree node of its view file, and start sending back the result (automatically sorted by the key) to the initial server.
  • The initial server will merge the sorted result and stream them back to the client.
 However, if the view has reduce() function defined, the query processing will involve computing a single aggregated value as follows:
  • Client issue a query (with view, start/end key) to the management process of any server (unlike a key based lookup, there is no need to locate a specific server).
  • The management process will broadcast the request to other management process on all servers (include itself) within the cluster.
  • Each management process do a local reduce for value within the key range by traversing the BTree node of its view file to compute the reduce value of the key range.  If the key range span across a BTree node, the pre-computed of the sub-range can be used.  This way, the reduce function can reuse a lot of partially reduced values and doesn’t need to recomputed every value of the key range from scratch.
  • The original server will do a final re-reduce() in all the return value from each other servers, and then passed back the final reduced value to the client.
To illustrate the re-reduce concept, lets say the query has its key range from A to F.



Instead of calling reduce([A,B,C,D,E,F]), the system recognize the BTree node that contains [B,C,D] has been pre-reduced and the result P is stored in the BTree node, so it only need to call reduce(A,P,E,F).

Update View Index as vBucket migrates
Since the view index is synchronized with the vBuckets in the same server, when the vBucket has migrated to a different server, the view index is no longer correct; those key/value that belong to a migrated vBucket should be discarded and the reduce value cannot be used anymore.

To keep track of the vBucket and key in the view index, each bTree node has a 1024-bitmask indicating all the vBuckets that is covered in the subtree (ie: it contains a key emitted from a document belonging to the vBucket).  Such bit-mask is maintained whenever the bTree node is updated.

At the server-level, a global bitmask is used to indicate all the vBuckets that this server is responsible for.

In processing the query of the map-only view, before the key/value pair is returned, an extra check will be perform for each key/value pair to make sure its associated vBucket is what this server is responsible for.

When processing the query of a view that has a reduce() function, we cannot use the pre-computed reduce value if the bTree node contains a vBucket that the server is not responsible for.  In this case, the bTree node’s bit mask is compared with the global bit mask.  In case if they are not aligned, then the reduce value need to be recomputed.




Here is an example to illustrate this process


Couchbase is one of the popular NOSQL technology built on a solid technology foundation designed for high performance.  In this post, we have examined a number of such key features:
  • Load balancing between servers inside a cluster that can grow and shrink according to workload conditions.  Data migration can be used to re-achieve workload balance.
  • Asynchronous write provides lowest possible latency to client as it returns once the data is store in memory.
  • Append-only update model pushes most update transaction into sequential disk access, hence provide extremely high throughput for write intensive applications.
  • Automatic compaction ensures the data lay out on disk are kept optimized all the time.
  • Map function can be used to pre-compute view index to enable query access.  Summary data can be pre-aggregated using the reduce function.  Overall, this cut down the workload of query processing dramatically.

For a review on NOSQL architecture in general and some theoretical foundation, I have wrote a NOSQL design pattern blog, as well as some fundamental difference between SQL and NOSQL.

For other NOSQL technologies, please read my other blog on MongoDb, Cassandra and HBase, Memcached

Special thanks to Damien Katz and Frank Weigel from Couchbase team who provide a lot of implementation details of Couchbase.

Monday, April 2, 2012

MongoDb Architecture

NOSQL has become a very heated topic for large web-scale deployment where scalability and semi-structured data driven the DB requirement towards NOSQL. There has been many NOSQL products evolving in over last couple years. In my past blogs, I have been covering the underlying distributed system theory of NOSQL, as well as some specific products such as CouchDB and Cassandra/HBase.

Last Friday I was very lucky to meet with Jared Rosoff from 10gen in a technical conference and have a discussion about the technical architecture of MongoDb. I found the information is very useful and want to share with more people.

One thing I am very impressed by MongoDb is that it is extremely easy to use and the underlying architecture is also very easy to understand.

Here are some simple admin steps to start/stop MongoDb server
# Install MongoDB
mkdir /data/lib

# Start Mongod server
.../bin/mongod # data stored in /data/db

# Start the command shell
.../bin/mongo
> show dbs
> show collections

# Remove collection
> db.person.drop()

# Stop the Mongod server from shell
> use admin
> db.shutdownServer()

Major difference from RDBMS
MongoDb differs from RDBMS in the following way
  • Unlike an RDBMS record which is "flat" (a fixed number of simple data type), the basic unit of MongoDb is "document" which is "nested" and can contain multi-value fields (arrays, hash).
  • Unlike RDBMS where all records stored in a table must be confined to the table schema, documents of any structure can be stored in the same collection.
  • There is no "join" operation in the query. Overall, data is encouraged to be organized in a more denormalized manner and the more burden of ensuring data consistency is pushed to the application developers
  • There is no concept of "transaction" in MongoDb. "Atomicity" is guaranteed only at the document level (no partial update of a document will occurred).
  • There is no concept of "isolation", any data read by one client may have its value modified by another concurrent client.
By removing some of those features that a classical RDBMS will provide, MongoDb can be more light-weight and be more scalable in processing big data.
Query processingMongoDb belongs to the type of document-oriented DB. In this model, data is organized as JSON document, and store into a collection. Collection can be thought for equivalent to Table and Document is equivalent to records in RDBMS world.

Here are some basic example.
# create a doc and save into a collection
> p = {firstname:"Dave", lastname:"Ho"}
> db.person.save(p)
> db.person.insert({firstname:"Ricky", lastname:"Ho"})

# Show all docs within a collection
> db.person.find()

# Iterate result using cursor
> var c = db.person.find()
> p1 = c.next()
> p2 = c.next()

To specify the search criteria, an example document containing the fields that needs to match against need to be provided.
> p3 = db.person.findone({lastname:"Ho"})
Notice that in the query, the value portion need to be determined before the query is made (in other words, it cannot be based on other attributes of the document). For example, lets say if we have a collection of "Person", it is not possible to express a query that return person whose weight is larger than 10 times of their height.
# Return a subset of fields (ie: projection)
> db.person.find({lastname:"Ho"}, {firstname:true})

# Delete some records
> db.person.remove({firstname:"Ricky"})
To speed up the query, index can be used. In MongoDb, index is stored as a BTree structure (so range query is automatically supported). Since the document itself is a tree, the index can be specified as a path and drill into deep nesting level inside the document.
# To build an index for a collection
> db.person.ensureIndex({firstname:1})

# To show all existing indexes
> db.person.getIndexes()

# To remove an index
> db.person.dropIndex({firstname:1})

# Index can be build on a path of the doc.
> db.person.ensureIndex({"address.city":1})

# A composite key can be used to build index
> db.person.ensureIndex({lastname:1, firstname:1})
Index can also be build on an multi-valued attribute such as an array. In this case, each element in the array will have a separate node in the BTree.

Building an index can be done in both offline foreground mode or online background mode. Foreground mode will proceed much faster but the DB cannot be access during the build index period. If the system is running in a replica set (describe below), it is recommended to rotate each member DB offline and build the index in foreground.

When there are multiple selection criteria in a query, MongoDb attempts to use one single best index to select a candidate set and then sequentially iterate through them to evaluate other criteria.

When there are multiple indexes available for a collection. When handling a query the first time, MongoDb will create multiple execution plans (one for each available index) and let them take turns (within certain number of ticks) to execute until the fastest plan finishes. The result of the fastest executor will be returned and the system remembers the corresponding index used by the fastest executor. Subsequent query will use the remembered index until certain number of updates has happened in the collection, then the system repeats the process to figure out what is the best index at that time.

Since only one index will be used, it is important to look at the search or sorting criteria of the query and build additional composite index to match the query better. Maintaining an index is not without cost as index need to be updated when docs are created, deleted and updated, which incurs overhead to the update operations. To maintain an optimal balance, we need to periodically measure the effectiveness of having an index (e.g. the read/write ratio) and delete less efficient indexes.

Storage Model
Written in C++, MongoDB uses a memory map file that directly map an on-disk data file to in-memory byte array where data access logic is implemented using pointer arithmetic. Each document collection is stored in one namespace file (which contains metadata information) as well as multiple extent data files (with an exponentially/doubling increasing size).


The data structure uses doubly-linked-list extensively. Each collection of data is organized in a linked list of extents each of which represents a contiguous disk space. Each extent points to a head/tail of another linked list of docs. Each doc contains a linked list to other documents as well as the actual data encoded in BSON format.

Data modification happens in place. In case the modification increases the size of record beyond its originally allocated space, the whole record will be moved to a bigger region with some extra padding bytes. The padding bytes is used as an growth buffer so that future expansion doesn't necessary require moving the data again. The amount of padding is dynamically adjusted per collection based on its modification statistics. On the other hand, the space occupied by the original doc will be free up. This is kept tracked by a list of free list of different size.

As we can imagine holes will be created over time as objects are created, deleted or modified, this fragmentation will hurt performance as less data is being read/write per disk I/O. Therefore, we need to run the "compact" command periodically, which copy the data to a contiguous space. This "compact" operation however is an exclusive operation and has to be done offline. Typically this is done in a replica setting by rotating each member offline one at a time to perform the compaction.

Index are implemented as BTree. Each BTree node contains a number of keys (within this node), as well as pointers to left children BTree nodes of each key.

Data update and Transaction
To update an existing doc, we can do the following
var p1 = db.person.findone({lastname:"Ho"})
p1["address"] = "San Jose"
db.person.save(p1)

# Do the same in one command
db.person.update({lastname:"Ho"},
              {$set:{address:"San Jose"}},
              false,
              true)
Write by default doesn't wait. There are various wait options that the client can specified what conditions to wait before the call returns (this can also achievable by a followup "getlasterror" call), such as where the changes is persisted on disk, or changes has been propagated to sufficient members in the replica set. MongoDb also provides a sophisticated way to assign tags to members of replica set to reflect their physical topology so that customized write policy for each collection can be made based on their reliability requirement.

In RDBMS, "Serializability" is a very fundamental concept about the net effect of concurrently executing work units is equivalent to as if these work units are arrange in some order of sequential execution (one at a time). Therefore, each client can treat as if the DB is exclusively available. The underlying implementation of DB server many use LOCKs or Multi-version to provide the isolation. However, this concept is not available in MongoDb (and many other NOSQL as well)

In MongoDb, every data you read should be treated as a snapshot of the past, which means by the time you look at it, it may have been changed in the DB. Therefore, if you are making a modification based on some previously read data, by the time you send the modification request, the condition where your modification is based on may have changed. If this is not acceptable for your application's consistency requirement, you may need to re-validate the condition at the time you request the modification (ie: a "conditional_modify" should be made).

Under this scheme, a "condition" is attached along with the modification request so that the DB server can validate the condition before applying the modification. (of course, the condition checking and modification must be atomic so no update can happen in between). In MongoDb, this can be achieved by the "findAndModify" call.
var account = db.bank.findone({id:1234})
var old_bal = account['balance']
var new_bal = old_bal + fund

# Pre-condition is specified in search criteria
db.bank.findAndModify({id:1234, balance:old_bal},
                   {$set: {balance: new_bal}})

# Check if the prev command successfully
var success =
   db.runCommand({getlasterror:1,j:true})

if (!success) {
 #retry_from_beginning
}
The concept of "transaction" is also missing in MongoDb. While MongoDb guarantee each document will be atomically modified (so no partial update will happen within a doc), but if the update modifies multiple documents, then there are no guarantee on the atomicity across documents.

Therefore, it is the application developers responsibility to implement the multi-update atomicity across multiple documents. We describe a common design pattern to achieve that. This technique is not specific to MongoDb and applicable to other NOSQL store, which can at least guarantee atomicity at the single record level.

The basic idea is to first create a separate document (called transaction) that links together all the documents that you want to modify. And then create a reverse link from each document (to be modified) back to the transaction. By carefully design the sequence of update in the documents and the transaction, we can achieve the atomicity of modifying multiple documents.


MongoDb's web site has also described a similar technique here (based on the same concept but the implementation is slightly different).

Replication Model
High availability is achieved in MongoDb via Replica Set, which provides data redundancy across multiple physical servers, including a single primary DB as well as multiple secondary DBs. For data consistency, all modifications (insert / update / deletes) request go to the primary DB where modification is made and asynchronously replicated to the other secondary DBs.


Within the replica set, members are interconnected with each other to exchange heartbeat message. A crashed server with missing heartbeat will be detected by other members and removed from the replica set membership. After the dead secondary recovers in future, it can rejoin the cluster by connecting to the primary to catchup the latest update since its last crashed. If the crashes happens in a lengthy period of time where the change log from the primary doesn't cover the whole crash period, then the recovered secondary need to reload the whole data from the primary (as if it was a brand new server).

In case of the primary DB crashes, a leader election protocol will be run among the remaining members to nominate the new primary, based on many factors such as the node priority, node uptime ... etc. After getting majority vote, the new primary server will take place. Notice that due to async replication, the newly elected primary DB doesn't necessary having all the latest updated from the crashed DB.

Client lib provides the API for the App to access the MongoDB server. At startup, the client lib will connect to some member (based on a seed list) of the Replica set and issue a "isMaster" command to gather the current picture of the set (who is the primary and secondaries). After that, the client lib connect to the single primary (where it will send all DB modification request) and some number of secondaries (where it will send read-only queries). The client library will periodically re-run the "isMaster" command to detect if any new members has joined the set. When an existing member in the set is crashed, connections to all existing clients will be dropped and forces a resynchronization of the latest picture.

There is also a special secondary DB called slave delay, which guarantee the data is propagated with a certain time lag with the master. This is used mainly to recover data after accidental deletion of data.

For data modification request, the client will send the request to the primary DB, by default the request will be returned once written to the primary, an optional parameter can be specified to indicate a certain number of secondaries need to receive the modification before return so the client can ensure the majority portion of members have got the request. Of course there is a tradeoff between latency and reliability.

For query request, by default the client will contact the primary which has the latest updated data. Optionally, the client can specify its willingness to read from any secondaries, and tolerate that the returned data may be outdated. This provide an opportunity to load balance the read request across all secondaries. Notice that in this case, a subsequent read following a write may not seen the update.

For read-mostly application, reading form any secondaries can be a big performance improvement. To select the fastest secondary DB member to issue query, the client driver periodically pings the members and will favor issuing the query to the one with lowest latency. Notice that read request is issued to only one node, there is no quorum read or read from multiple nodes in MongoDb.

The main purpose of Replica set is to provide data redundancy and also load balance read-request. It doesn't provide load balancing for write-request since all modification still has to go to the single master.

Another benefit of replica set is that members can be taken offline on an rotation basis to perform expensive operation such as compaction, indexing or backup, without impacting online clients using the alive members.

Sharding Model
To load balance write-request, we can use MongoDb shards. In the sharding setup, a collection can be partitioned (by a partition key) into chunks (which is a key range) and have chunks distributed across multiple shards (each shard will be a replica set). MongoDb sharding effectively provide an unlimited size for data collection, which is important for any big data scenario.

To reiterate, in the sharding model, a single partition key will be specified for each collection that is stored in the sharding cluster. The key space of the partition key is divided into contiguous key range called chunk, which is hosted by corresponding shards.


# To define the partition key
db.runcommand({shardcollection: "testdb.person",
         key: {firstname:1, lastname:1}})
In the shard setting, the client lib connects to a stateless routing server "MongoS", which behaves as if the "MongoD". The routing server plays an important role in forwarding the client request to the appropriate shard server according to the request characteristics.


For insert/delete/update request containing the partition key, based on the chunk/shard mapping information (obtained from the config Server and cache locally), the route server will forward the request to the corresponding primary server hosting the chunk whose key range covers the partition key of the modified doc. Given a particular partition key, the primary server containing that chunk can be unambiguously determined.

In case of query request, the routing server will examine whether the partition key is part of the selection criteria and if so will only "route" the request to the corresponding shard (primary or secondary). However, if the partition key is not part of the selection criteria, then the routing server will "scatter" the request to every shard (pick one member of each shard) which will compute its local search, and the result will be gathered at the routing server and return to the client. When the query requires the result to be sorted, and if the partition key is involved in the sort order, the routing server will route the request sequentially to multiple shards as the client iterate the result. In case the sort involves other key which is not the partition key, the router server will scatter the request to all shards which will perform its local sort, and then merge the result at the routing server (basically a distributed merge-sort).

As data are inserted into chunk and get close to its full capacity, we need to split the chunk. The routing server can detect this situation statistically based on the number of requests it forward as well as the number of other routing server exist. Then the routing server will contact the primary server of the shard that contains the full chunk and request for a chunk split. The shard server will compute the mid point of the key range that can evenly distribute the data and then split the chunk and update the configuration server about its split point. Notice that so far there is no data movement as data is still residing in the same shard server.

On the other hand, there is another "balancer" process (running in one of the routing server) whose job is to make sure each shard carry approximately same number of chunks. When the unbalance condition is detected, the balancer will contact the busy shard to trigger a chunk migration process. This migration process happens online where the origination contacts the destination to initiate a data transfer, and data will start to be copied from the origination to the destination. This process may take some time (depends on the data volume) during which update can happen continuously at the origination. These changes will be tracked at the origination and when the copy finishes, delta will then transfer and applied to the destination. After multiple rounds of applying deltas, the migration enters the final round and the origination will halt and withhold all request coming from the routing server. After the last round of changes have been applied to the destination, the destination will update the configuration server about the new shard configuration and notify the origination shard (which is still withholding the request) to return a StaleConfigException to the Routing server, which will then re-read the latest configuration from the configServer and re-submit the previous requests. At some future point in time, data at the origination will be physically deleted.

It is possible that under a high frequency update condition, the changes being applied to the destination is unable to catch up with the update rate happen at the origination. When this situation is detected, the whole migration process will be aborted. The routing server may pick a different chunk to start the migration afterwards.

Map/Reduce Execution
MongoDb provide a Map/Reduce framework to perform parallel data processing. The concept is similar to Hadoop Map/Reduce, but with the following small differences ...
  • It takes input from the query result of a collection rather than HDFS directory
  • The reduce output can be append to an existing collection rather than an empty HDFS directory
Map/Reduce in Mongo works in a slightly different way as follows
  1. Client define a map function, reduce function, query that scope the input data, and an output collection that store the output result.
  2. Client send the request to the MongoS routing server
  3. MongoS forward the request to the appropriated shards (route or scatter depends on whether partition key appears in the query). Notice that MongoS will pick one member of each shard, currently always send to the primaryDB
  4. Primary DB of each shard executes the query and pipe output to the user-defined map function, which emit a bunch of key value pairs stored in memory buffer. When the memory buffer is full, a user-defined reducer function will be invoked that partially reduce the key values pairs in the memory buffer, result stored on the local collection.
  5. When step (4) completes, the reduce function will be executed on all the previous partially reduced result to merge a single reduced result on this server.
  6. When step (5) finishes, MongoS notifies the corresponding shard servers that will store the output collection. (if the output collection is non-partitioned, only a single shard will be notified, otherwise all shards will be notified).
  7. The primary db of the shard(s) storing the final collection will call for every shard to collect the partially reduced data previously done. It will only ask for the result based on its corresponding key range.
  8. The primary db run the reduce() function again on the list of partially reduced result. Then store the final reduced result locally. If the user provide a finalize function, it will be invoked as well.
Here is a simple example to build an inverted index from document to topics
db.book.insert({title:"NOSQL",
             about:["software", "db"]})

db.book.insert({title:"Java programming",
             about:["software", "program"]})

db.book.insert({title:"Mongo",
             about:["db", "technology"]})

db.book.insert({title:"Oracle",
             about:["db", "software"]})

db.book.find()

m = function() {
 for (var i in this.about) {
     emit(this.about[i], this.title)
 }
}

r = function(k, vals) {
 return({topic:k, title:vals})
}

db.book.mapReduce(m, r, {query:{},
               out:{replace:"mroutput"}})

db.mroutput.find()
Overall speaking, I found MongoDb is very powerful and easy to use. I look forward to use MongoDb with Node.js and will share my experience in future blogs.

Especially thanks to Jared Rosoff who provides me a lot of details of how MongoDb is implemented.

Wednesday, October 6, 2010

BigTable Model with Cassandra and HBase

Recently in a number of "scalability discussion meeting", I've seen the following pattern coming up repeatedly ...
  1. To make your app scalable, you try to make your app layer “stateless”.
  2. OK, so you move the "state" out from your application layer out to a shared DB, or shared data layer.
  3. Now, how do we make the data tier scalable, by definition, we cannot make the data tier stateless.
  4. OK, now lets think about how to "partition" your data and spread them across multiple machines in such a way that workload is balanced.
  5. Now there are more boxes and what if some of them crashes.
  6. OK, we should replicate the data across machines.
  7. Now, how do we keep these data in sync ...
  8. And then Cloud computing gets into the picture (as it always does). Now we are not just having a pool of machines but also the pool size can grow and shrink according to workload fluctuation (you don't want to pay for something idle, right ?).
  9. Now we need to figure out as we add more machines into the pool or remove machine from the pool, how we should "redistribute" the data.
This is an area where NOSQL shines. In the last 18 months, NOSQL has become one of the hottest topic in the software industry. It has been introduced as a solution to large scale data storage problem at the range of Terabytes or Petabytes. Dozens of NOSQL products has come to the market, but two leaders HBase and Cassandra seems to stand out from the rest in terms of their adoption.

Given an increasing demand of explaining these 2 products recently, I decide to write a post on this.

Not to repeat the basic theory of NOSQL here, for a foundation of distributed system theory underlying the NOSQL design, please refer to my earlier blog

Both Hbase and Cassandra are based on Google BigTable model, here lets introduce some key characteristic underlying Bigtable first.

Fundamentally Distributed
BigTable is built from the ground up on a "highly distributed", "share nothing" architecture. Data is supposed to store in large number of unreliable, commodity server boxes by "partitioning" and "replication". Data partitioning means the data are partitioned by its key and stored in different servers. Replication means the same data element is replicated multiple times at different servers.


Column Oriented
Unlike traditional RDBMS implementation where each "row" is stored contiguous on disk, BigTable, on the other hand, store each column contiguously on disk. The underlying assumption is that in most cases not all columns are needed for data access, column oriented layout allows more records sitting in a disk block and hence can reduce the disk I/O.

Column oriented layout is also very effective to store very sparse data (many cells have NULL value) as well as multi-value cell. The following diagram illustrate the difference between a Row-oriented layout and a Column-oriented layout


Variable number of Columns
In RDBMS, each row must have a fixed set of columns defined by the table schema, and therefore it is not easy to support columns with multi-value attributes. The BigTable model introduces the "Column Family" concept such that a row has a fixed number of "column family" but within the "column family", a row can have a variable number of columns that can be different in each row.


In the Bigtable model, the basic data storage unit is a cell, (addressed by a particular row and column). Bigtable allow multiple timestamp version of data within a cell. In other words, user can address a data element by the rowid, column name and the timestamp. At the configuration level, Bigtable allows the user to specify how many versions can be stored within each cell either by count (how many) or by freshness (how old).

At the physical level, BigTable store each column family contiguously on disk (imagine one file per column family), and physically sort the order of data by rowid, column name and timestamp. After that, the sorted data will be compressed so that a disk block size can store more data. On the other hand, since data within a column family usually has a similar pattern, data compression can be very effective.

Note: Although not shown in this example, rowid of different column families can be completely different types. For example, in the above example, I can have another column family "UserIdx" whose rowid is a string (user's name) and it has columns whose columnKey is the u1, u2 (ie: the row id of the User Column family) and columnValue is null (ie: not used). This is a common technique to build index at the application level.

Sequential write
BigTable model is highly optimized for write operation (insert/update/delete) with sequential write (no disk seek is needed). Basically, write happens by first appending a transaction entry to a log file (hence the disk write I/O is sequential with no disk seek), followed by writing the data into an in-memory Memtable . In case of the machine crashes and all in-memory state is lost, the recovery step will bring the Memtable up to date by replaying the updates in the log file.

All the latest update therefore will be stored at the Memtable, which will grow until reaching a size threshold, then it will flushed the Memtable to the disk as an SSTable (sorted by the String key). Over a period of time there will be multiple SSTables on the disk that store the data.

Merged read
Whenever a read request is received, the system will first lookup the Memtable by its row key to see if it contains the data. If not, it will look at the on-disk SSTable to see if the row-key is there. We call this the "merged read" as the system need to look at multiple places for the data. To speed up the detection, SSTable has a companion Bloom filter such that it can rapidly detect the absence of the row-key. In other words, only when the bloom filter returns positive will the system be doing a detail lookup within the SSTable.

Periodic Data Compaction
As you can imagine, it can be quite inefficient for the read operation when there are too many SSTables scattering around. Therefore, the system periodically merge the SSTable. Notice that since each of the SSTable is individually sorted by key, a simple "merge sort" is sufficient to merge multiple SSTable into one. The merge mechanism is based on a logarithm property where two SSTable of the same size will be merge into a single SSTable will doubling the size. Therefore the number of SSTable is proportion to O(logN) where N is the number of rows.



After looking at the common part, lets look at their difference of Hbase and Cassandra.

HBase
Based on the BigTable, HBase uses the Hadoop Filesystem (HDFS) as its data storage engine. The advantage of this approach is then HBase doesn't need to worry about data replication, data consistency and resiliency because HDFS has handled it already. Of course, the downside is that it is also constrained by the characteristics of HDFS, which is not optimized for random read access. In addition, there will be an extra network latency between the DB server to the File server (which is the data node of Hadoop).


In the HBase architecture, data is stored in a farm of Region Servers. The "key-to-server" mapping is needed to locate the corresponding server and this mapping is stored as a "Table" similar to other user data table.

Before a client do any DB operation, it needs to first locate the corresponding region server.
  1. The client contacts a predefined Master server who replies the endpoint of a region server that holds a "Root Region" table.
  2. The client contacts the region server who replies the endpoint of a second region server who holds a "Meta Region" table, which contains a mapping from "user table" to "region server".
  3. The client contacts this second region server, passing along the user table name. This second region server will lookup its meta region and reply an endpoint of a third region server who holds a "User Region", which contains a mapping from "key range" to "region server"
  4. The client contacts this third region server, passing along the row key that it wants to lookup. This third region server will lookup its user region and reply the endpoint of a fourth region server who holds the data that the client is looking for.
  5. Client will cache the result along this process so subsequent request doesn't need to go through this multi-step process again to resolve the corresponding endpoint.
In Hbase, the in-memory data storage (what we refer to as "Memtable" in above paragraph) is implemented in Memcache. The on-disk data storage (what we refer to as "SSTable" in above paragraph) is implemented as a HDFS file residing in Hadoop data node server. The Log file is also stored as an HDFS file. (I feel storing a transaction log file remotely will hurt performance)

Also in the HBase architecture, there is a special machine playing the "role of master" who monitors and coordinates the activities of all region servers (the heavy-duty worker node). To the best of my knowledge, the master node is the single point of failure at this moment.

For a more detail architecture description, Lars George has a very good explanation in the log file implementation as well as the data storage architecture of Hbase.

Cassandra
Also based on the BigTable model, Cassandra use the DHT (distributed hash table) model to partition its data, based on the paper described in the Amazon Dynamo model.

Consistent Hashing via O(1) DHT
Each machine (node) is associated with a particular id that is distributed in a keyspace (e.g. 128 bit). All the data element is also associated with a key (in the same key space). The server owns all the data whose key lies between its id and the preceding server's id.

Data is also replicated across multiple servers. Cassandra offers multiple replication schema including storing the replicas in neighbor servers (whose id succeed the server owning the data), or a rack-aware strategy by storing the replicas in a physical location. The simple partition strategy is as follows ...


Tunable Consistency Level
Unlike Hbase, Cassandra allows you to choose the consistency level that is suitable to your application, so you can gain more scalability if willing to tradeoff some data consistency.

For example, it allows you to choose how many ACK to receive from different replicas before considering a WRITE to be successful. Similarly, you can choose how many replica's response to be received in the case of READ before return the result to the client.

By choosing the appropriate number for W and R response, you can choose the level of consistency you like. For example, to achieve Strict Consistency, we just need to pick W, R such that W + R > N. This including the possibility of (W = one and R = all), (R = one and W = all), (W = quorum and R = quorum). Of course, if you don't need strict consistency, you can even choose a smaller value for W and R and gain a bigger availability. Regardless of what consistency level you choose, the data will be eventual consistent by the "hinted handoff", "read repair" and "anti-entropy sync" mechanism described below.

Hinted Handoff
The client performs a write by send the request to any Cassandra node which will act as the proxy to the client. This proxy node will located N corresponding nodes that holds the data replicas and forward the write request to all of them. In case any node is failed, it will pick a random node as a handoff node and write the request with a hint telling it to forward the write request back to the failed node after it recovers. The handoff node will then periodically check for the recovery of the failed node and forward the write to it. Therefore, the original node will eventually receive all the write request.

Conflict Resolution
Since write can reach different replica, the corresponding timestamp of the data is used to resolve conflict, in other words, the latest timestamp wins and push the earlier timestamps into an earlier version (they are not lost)

Read Repair
When the client performs a "read", the proxy node will issue N reads but only wait for R copies of responses and return the one with the latest version. In case some nodes respond with an older version, the proxy node will send the latest version to them asynchronously, hence these left-behind node will still eventually catch up with the latest version.

Anti-Entropy data sync
To ensure the data is still in sync even there is no READ and WRITE occurs to the data, replica nodes periodically gossip with each other to figure out if anyone out of sync. For each key range of data, each member in the replica group compute a Merkel tree (a hash encoding tree where the difference can be located quickly) and send it to other neighbors. By comparing the received Merkel tree with its own tree, each member can quickly determine which data portion is out of sync. If so, it will send the diff to the left-behind members.

Anti-entropy is the "catch-all" way to guarantee eventual consistency, but is also pretty expensive and therefore is not done frequently. By combining the data sync with read repair and hinted handoff, we can keep the replicas pretty up-to-date.

BigTable trade offs
To retain the scalability features of BigTable, some of the basic features of what RDBMS has provided is missing in the BigTable model. Here we highlight the rough edges of Bigtable.

1) Primitive transaction support
Transaction protection is only guaranteed within a single row. In other words, you cannot start a atomic transaction to modify multiple rows.

2) Primitive isolation support
While you are reading a row, other people may have modified the same row and update it before you. Your view is not current anymore but your later update can easily wipe off other people's change.

There are many techniques how concurrent update can be isolated, including pessimistic approach like locking or optimistic approach by using vector clock to be the version stamp. But to the best of my understanding, there is no robust test-and-set operation in the BigTable model (this is some getLock mechanism in Hbase which I haven't looked into), my impression is that there is no easy way to check there is no concurrent update happen in between.

Because of this limitation, I think BigTable model is more suitable for those applications where concurrent update to the same row is very rare, or some inconsistency is tolerable at the application level. Fortunately, there are still a lot of applications falling into this bucket.

3) No indexes
Notice that data within BigTable are all physically sorted; by rowid, column name and timestamp. There is no index from the column value to its containing rowid.

This model is quite different from RDBMS where you typically define a table and worry about defining the index later. There is no such "index" concept in BigTable and you need to carefully plan out the physical sorting order of your data layout.

Lacking index turns out to be quite inconvenient and many people using Bigtable ends up building their own index at the application level. This usually results in having a highly denormalized data model with lots of column family who store links to other tables. Any update to the base data need to carefully update these other column family as well. From a performance angle, this is actually better than maintaining index in RDBMS because Bigtable is optimized from writes. However, since it is now the application logic to maintain the index, this can be a source of application bugs.

4) No referential integrity enforcement
As mentioned above, since you are building artificial index at the application level, you need to maintain the integrity of your index as well. This includes update your index when the base data is inserted, modified or deleted. This kind of handling logic is traditionally residing at the RDBMS level, but since BigTable has no such referential integrity concept, this responsibility is now landed on your application logic.

5) Lack of surrounding tools
As NOSQL or BigTable is very new, the tools surrounding it is definitely not comparable to the RDBMS world at this moment, such tools includes report generation, BI, data warehouse ... etc.

I observe the general trend that most NOSQL products are moving towards the direction to provide an ODBC / JDBC interface to integrate with existing tool markets easier. But at this moment, to the best of my understanding, such interface is not wide spread yet.


Design Patterns for Bigtable model
Due to the very different model of Bigtable, the data model design methodology is also quite different from traditional RDBMS schema design. Here is a sequence of steps that are pretty common ...

1) Identify all your query scenarios
Since there is no index concept, you have to plan out carefully how your data is physically sorted. Therefore it is important to find all your query use cases first.

2) Define your "entity table" and its corresponding column families
For an entity table, it is pretty common to have one column family storing all the entity attributes, and multiple column families to store the links to other entities. (e.g. A "UserTable" may contain a column family "baseInfo" to store all attributes of the user, a column family "friend" to store the links to another user, a column family "company" to store links to another CompanyTable)

3) Define your "index table"
The "index table" is what your application build to support reverse lookup. The "key" is typically base on the search criteria you have identified in your query scenario. It is not uncommon that each query may have its own specific index table.

4) Make sure your application logic updates the index correctly
Since the index table has to be maintained by application logic, you need to check to make sure it is done correctly. In many cases, this can be quite a source of bugs.

It is important to realize that NOSQL is not advocating a replacement of RDBMS which has been proven in many lines of application. The NOSQL should be considered a complementary technologies for some niche area where RDBMS is not covering well.

5) Design your update to be idempotent
Max's post has a good articulation on this. The basic idea is that due to the eventual consistency model based on "read/repair" and "quorum update". It is possible that a failed update is in fact successful. Here is how this can happen.
  1. Client issue a quorum update.
  2. The server distributed the update to all replica servers, but unfortunately doesn't get more than half to respond successfully. So it returns a failure to the client.
  3. Nevertheless, the update has been received by some minority replicas (in other words, they don't rollback even the update is not successful).
  4. Later, if the client read one of this minority, it will get this update (even it has failed). Even more, since this update has a later version, it will read-repair the other copies (ie: further propagate the failed update).
Therefore, the usual recovery is that user should retry the operation when it fails. And the application logic need to deal with potentially duplicated updates. One way is to find some way to detect duplications and ignore them once detected.

Sunday, May 2, 2010

The NOSQL Debate

I have attended the Stanford InfoLab conference, and there are 2 panel discussions in Cloud computing transaction processing and analytic processing.

The session turns out to be a debate between people from the academia side with the open source developer. Both sides have their points ...

Background of RDBMS

RDBMS is based on a clear separation between application logic and data management. This loose coupling allows application and DB technologies to be evolved independently.

This philosophy drives the DBMS architecture to be more general in order to support a wide range of applications. Over many years of evolution, it has a well-defined data model + query model based on relational algebra. On the other hand, it also have a well-defined transaction processing model.

On the other hand, applications also benefit from having a unified data model. They have more freedom to switch their DB vendor without too much of code changes.

For OLTP applications, the RDBMS model has been proven to be highly successful in many large enterprises. The success of both Oracle and MySQL can speak to that.

For Analytic applications, the RDBMS model is also used widely for implementing data warehousing based on a STAR schema model (composed of Facts table and Dimension tables).

This model also put DBA into a very important position in the enterprise. They are equipped with sophisticated management tools and specialized knowledge to control the commonly shared DB schema.

Background of NOSQL

In the last 10 years, there are a very few of highly successful web2.0 companies whose applications have gone beyond what a centralized RDBMS can handle. Partition data across many servers and spread the workload across them seems to be a reasonable first thing to try. Many RDBMS solution provides a Master/Slave replication mechanism where one can spread the READ-ONLY workload across many servers, and it helps.

In a partitioned environment, application needs to be more tolerant to data integrity issues, especially when data is asynchronously replicated between servers. The famous CAP theorem from Eric Brewer capture the essence of the tradeoff decisions for highly scalable applications (which must have partition tolerance), they have to choose between "Consistency" and "Availability".

Fortunately, most of these web-scale application have a high tolerance in data integrity, so they choose "availability" over "consistency". This is a very major deviation from the transaction processing model of RDBMS which typically weight "consistency" much higher than "availability".

On the other hand, the data structure used in these web-scale application (e.g. user profile, preference, social graph ... etc) are much richer than the simple row/column/table model. Some of the operations involves navigation within a large social graph which involves too many join operations in a RDBMS model.

The "higher tolerance of data integrity" as well as "efficiency support for rich data structure" challenges some of the very fundamental assumption of RDBMS. Amazon has experimented their large scale key/value store called Dynamo and Google also has their BigTable column-oriented storage model, both are designed from the ground up with a distributed architecture in mind.

Since then, there are many open source clones based on these two models. To represent the movement of this trend, Eric Evans from Rackspace coin a term "NOSQL". This term in my opinion is not accurately reflecting the underlying technologies but nevertheless provide a marketing term for every non RDBMS technologies to get together, even those (e.g. CouchDB, Neo4j) who is not originally trying to tackle the scalability problem.

Today, NOSQL provides an alternative approach for Non-Relational Database.

For analytical application, they also take a highly-parallel brute-force processing model based on the Map/reduce model.

The Debate

There are relatively few criticism on the data model aspects. Jun Rao from IBM Lab summarized the key difference between the philosophy. The traditional RDBMS approach is first to figure out the right model, and then provide an implementation and hope it is scalable. The modern NOSQL approach is doing the opposite by first figuring out how a highly scalable architecture looks like and then layer a data model on top of that. Basically people on both camps agree that we should use a data model that is optimized for the application's access patterns, weakening the "one-size-fits-all" philosophy of RDBMS.

Most of the debate is centered around the transaction processing model itself. Basically RDBMS proponents thinks NOSQL camp hasn't spent enough time to understand the theoretical foundation of the transaction processing model. The new "eventual consistency" model is not well-defined and different implementations may differs significantly with each other. This means figuring out all these inconsistent behavior lands on the application developer's responsibilities and make their life much harder. Hard to reason about the DB's behavior can be very dangerous if the application made wrong assumption about the underlying data integrity guarantees.

While agree that application developers now have more to worry about, the NOSQL camp argues that this is actually a benefit because it gives the domain-specific optimization opportunities back to the application developers who now no longer constrained by a one-size-fits-all model. But they admit that making such optimization decision requires a lot of experience and can be very error-prone and dangerous if not done by experts.

On the other hand, the academia also make a note that the movement to NOSQL may deem fashionable and cool to new technologists who may not have the expertise and skills. The community as a whole should articular the pros and cons of NOSQL.

Notice that this is not the debate of the first time. StoneBraker has written a very good article from the RDBMS side.