Friday, January 15, 2010

RoadMap to SaaS

I have observed a pattern from multiple enterprises moving from a traditional web app to a SaaS. Trying to capture this pattern and a number of lessons learned. I use a J2EE Web App architecture to illustrate but the same principles apply to other technology platforms.

Stage 1: Some working Web App

At the very beginning, we have an web application that works well. We analyze the function of the web application and group the implementation classes accordingly


Stage 2: Separate functionality across processes

We analyze the functions and partition them into different processes (JVM). The partition needs to be coarse-grain and each process will communicate with each other via a service interface exposed by a Facade class. The service interface can be any remote object invocation interface or XML over HTTP. Restful web service is the de-facto way for the service interface.


Stage 3: Move different processes to different machines

To scale out beyond a single server's capacity, we move the process to a separate machine. Notice that the machine can be a physical machine, or a virtual machine running on top of hypervisor.



Stage 4: Build service pools

If the service itself is stateless, we can easily scale out the service capacity by putting multiple machines (running the same service) into a server pool. A network load balancer will be used to spread the workload evenly to the member servers.

When the workload increase, more machines can be added to the pool to boost up the overall capacity of the service. Elastic scalability provided by Cloud computing provider make growing and shrinking the pool even more rapid, and can hence dynamic workload more effectively.



Stage 5: Scale the data tier by partitioning

After we scale out the processing tier, we found the data tier becomes the bottleneck. So we also need to distribute the data access workload by partition the database according to the data key. Here are some classical techniques how we can build a distributed database.


Stage 6: Add Cache to reduce server load

If the application is has a high read/write ratio, or has some tolerance of data staleness, we can add a cache layer to reduce the hit of the actual services. Clients will check the cache before sending the request to the service.

We need to make sure the cached items to remain fresh. There are various schemes to achieve this. e.g. cached items can be expired after certain timeout, or an explicit invalidation request can be make to specific cached items when the corresponding backend data has been changed.

We can use local cache (reside in the same machine as the client) or a distributed cache engine such as Memcached or Oracle Coherence.



Stage 7: Consider which service to expose to public

At this point, we want to expose some of the services to the public either because this can bring revenue to our company or can facilitate a better B2B integration with our business partners. There are a lot of considerations to decide what to be expose, such as security consideration, scalability, service level agreement, utilization tracking ... etc.


Stage 8: Deploy an ingress service gateway

Once we decide what services to be exposed, we decide to use a specialized ingress service gateway to handle the concern that we outline above. Most of the XML service gateway is equipped with message validation, security checking, message transformation, routing logic.



Stage 9: Deploy an egress service gateway

We not only providing service to the public, but may also consume other public services. In this case we deploy an egress service gateway which can help to lookup the service provider endpoints, extract the service policy of the public service providers ... etc.


One important function of the egress service gateway is to manage my dependencies to external service providers. It typically keeps a list of equivalent service providers together with their availability and response time, and routing my request to the one according to my selection criteria (e.g. cheapest, most-reliable, lowest-latency ... etc.)



Stage 10: Implement service version control

My service will evolve after exposing to the public. In the ideal case, only the service implementation change but not the service interface so there is no need to change the client code.

But mostly likely it is not that ideal. There are chances that I need to change the service interface or the message format. In this case, I may need to run multiple versions of the services simultaneously to make sure I am not breaking an existing clients. This means I need the ingress service gateway to be intelligent about routing the client request to the right version of the service implementation.


A typical way is to maintain a matrix of version and keep track of their compatibilities. For example, we can use a release convention such that all minor releases is required to be backward compatible but major release is not required for that. Having this compatibility matrix information, the ingress gateway can determine the client version from its incoming request and route it to the server which has the latest compatible version.


Stage 11: Outsource infrastructure to public Cloud provider

Purchase necessary hardware equipment and maintaining them can be very costly, especially when there are idle time in the usage of computing resources. Idle time is usually unavoidable because we need to budget the resource at the peak workload scenarios so there are idle time at non-peak hours.

For a more efficient use of computing resources, we can consider some public cloud provider such as Amazon AWS or Microsoft Azure.

But it is important that running an application in the cloud may need to redesign the application to cope with some unique characteristics of the cloud environment, such as how to deal with high network latency and bandwidth cost, as well as how to design application to live with an eventually consistent DB.

Tuesday, January 12, 2010

Notes on Oracle Coherence

Oracle Coherence is a distributed cache that functionally comparable to Memcached. On top of the basic cache API function, it has some additional capabilities that is attractive for building large scale enterprise applications.

The API is based on the Java Map (Hashtable) Interface, which provides a key/value store semantics where the value can be any Java Serializable object. Coherence allows data stored in multiple caches identified by a unique name (which they called a "named cache").

Below code examples are extracted from the great presentation from Brian Oliver of Oracle

The common usage pattern is to first locate a cache by its name, and then act on the cache.

Basic cache function (Map, JCache)
  • Get data by key
  • Update data by key
  • Remove data by key
NamedCache nc = CacheFactory.getCache("mine");

Object previous = nc.put("key", "hello world");

Object current = nc.get("key");

int size = nc.size();

Object value = nc.remove("key");

Set keys = nc.keySet();

Set entries = nc.entrySet();

boolean exists = nc.containsKey("key");

Cache Modification Event Listener (ObservableMap)

You can register an event listener on a cache to catch certain change events happen within the cache.
  • New cache item is inserted
  • Existing cache item is deleted
  • Existing cache item is updated
NamedCache nc = CacheFactory.getCache("stocks");

nc.addMapListener(new MapListener() {
public void onInsert(MapEvent mapEvent) {
...
}

public void onUpdate(MapEvent mapEvent) {
...
}

public void onDelete(MapEvent mapEvent) {
...
}
});


View of Filtered Cache (QueryMap)

You can also define a "view" by providing a "filter" which is basically a boolean function. Only items that is evaluated to be true by this function will be visible in this view.

NamedCache nc = CacheFactory.getCache("people");

Set keys =
nc.keySet(new LikeFilter("getLastName", "%Stone%"));

Set entries =
nc.entrySet( new EqualsFilter("getAge", 35));


Continuous Query Support (ContinuousQueryCache)

The view can also be used as a "continuous query". All new coming data that fulfilled the filter criteria will be included automatically in the view.

NamedCache nc = CacheFactory.getCache("stocks");

NamedCache expensiveItems =
new ContinuousQueryCache(nc,
new GreaterThan("getPrice", 1000));


Parallel Query Support (InvocableMap)

We can also spread a query execution and partial aggregation across all nodes and have them execute in parallel, followed by the final aggregation.
NamedCache nc = CacheFactory.getCache("stocks");

Double total =
(Double)nc.aggregate(AlwaysFilter.INSTANCE,
new DoubleSum("getQuantity"));

Set symbols =
(Set)nc.aggregate(new EqualsFilter("getOwner", "Larry"),
new DistinctValue("getSymbol"));


Parallel Execution Processing Support (InvocableMap)

We can ship a piece of processing logic to all nodes which will execute the processing in parallel
NamedCache nc = CacheFactory.getCache("stocks");

nc.invokeAll(new EqualsFilter("getSymbol", "ORCL"),
new StockSplitProcessor());

class StockSplitProcessor extends AbstractProcessor {
Object process(Entry entry) {
Stock stock = (Stock)entry.getValue();
stock.quantity *= 2;
entry.setValue(stock);
return null;
}
}


Implementation Architecture

Oracle Coherence runs on a cluster of identical server machines connected via a network. Within each server, there are multiple layers of software provide a unified data storage and processing abstraction over a distributed environment.


Smart Data Proxy

Application typically runs within a node of the cluster as well. The cache interface is implemented by a set of smart data proxy which knows the location of master (primary) and slave (backup) copy of data based on its key.

Read through with 2 level cache

When the client "read" data from the proxy, it first try to find the data in a local cache (also called the "near cache" within the same machine). If it is not found, the smart proxy will then locate the distributed cache for the corresponding copy (also called the L2 cache). Since this is a read, either a master or a slave copy is fine. If the smart proxy wouldn't find data from the distributed cache, it will lookup data from the backend DB. The return data will then propagate back to the client and the cache will be populated.

Master/Slave data partitioning

Updating data (insert, update, delete) is done in the reverse way. Under the master/slave architecture, all updates will go to the corresponding master node that owns that piece of data. Coherence support two modes of update; "Write through" and "Write behind". "Write through" will update the DB backend immediately after updating the master copy, but before updating the slave copy, and therefore keep the DB always up to date. "Write behind" will update the slave copy and then the DB in an asynchronous fashion. Data lost is possible in "write behind" mode, which has a higher throughput because multiple write can be merge in a single write, resulting in a fewer number of writes.

Moving processing logic towards data

While extracting data from the cache to the application is a typical way of processing data, it is not very scalable when large volume of data is required to be processed. Instead of shipping the data to the processing logic, a much more efficient way is to ship the processing logic to where the data is residing. This is exactly why Oracle Coherence provide an invocableMap interface where the client can provide a "processor" class that get shipped to every node where processing can be conducted with local data. Moving code towards the data dstributed across many nodes also enable parallel processing because now every node can conduct local processing in parallel.

The processor logic is shipped into the processing queue of the execution node, which has an active processor dequeue the processor object and execute it. Notice that this execution is performed in a serial manner, in other words, the processor will completely finished a processing job before proceeding to the next job. There is no worry about multi-threading issue and no need to use locks, and therefore no dead lock issue.

Monday, December 21, 2009

Takeaways on Responsive Design

I recently have attended a great talk from Kent Beck who is the thought leader in Extreme Programming, Test-Driven-Development and Code refactoring. This talk "Responsive Design" outline a set of key principles of how to create a design that is "malleable" so it can respond quickly to future changes.

Anticipating Changes
I don't think one can design a system without knowing the context of the problem. You have to know what problem your solution is trying to solve. Of course, you may not know the detail of every aspects, or you may know that certain parts of the requirements may undergoing some drastic changes. In these areas where changes are anticipated, you need to build in more flexibilities into your design.

In my opinion, knowing what you know well and what you don't know is important. Good designers usually have good instinct in sensing between the "known" and "unknown" and adjust the flexibility of his design along the way as more information is gathered.

As more information is gathered, the dynamics of "change anticipation" also evolves. Certain parts of your system has reduced its anticipated changes due to less unknowns so now you can trade off some flexibility for efficiency or simplicity. On the other hand, you may discover that certain parts of the system has increased its anticipated changes and so even more flexibility is needed.

Safe Steps
If the system is already in production, making changes in the architecture is harder because there are other systems that may already depend on it. And any changes on it may break those other systems. "Safe Steps" is about how we can design the changes to existing system with minimal impact to those other systems that depends on it.

Design for Evolution
One important aspect when design a system is not just by looking at what the end result should be, but also look at what the evolution path of the system should look like. The key idea is that a time dimension is introduced here and the overall cost and risk should be summed along the time dimension.

In other words, it is not about whether you have designed a solution that finally meet the business requirement. What is important is how your solution bring value to the business as it evolves over time. A good design is a live animal that can breath and evolve together with your business.

Kent also talk about 4 key design approaches under different conditions

1. Leap

"Leap" is a brave approach where you go ahead to design and implement the new system. When complete, swap out the existing components with the new one. This approach requires a very good understanding of the functionality of system you want to build, how the existing system and how other systems depends on the existing system.

"Leap" can be a very effective approach if the system is very self-contained with clearly defined responsibilities. But in general, this approach is somewhat high-risk and is an exception rather than a norm for large enterprise applications.

2. Parallel

"Parallel" takes a "wrap and replace" approach. The new system is designed to run in parallel with existing system so that migration can be conducted gradually in a risk-containing manner. If there is any problem happens during the migration, the whole system can be switched back to the original system immediately so the risk is contained.

After 100% client has been migrated to the new system for some period of time, the old system can be shutdown without even the clients notice it.

"Parallel" approach still requires you to have a clear understanding of what you want to build. But it relax you from knowing the dependencies of existing system. Of course, the design may be more complicated because it needs to run in parallel with the existing system and has to deal with things like data consistency and synchronization issues.

"Parallel" is a predominant approach that I've seen people used in reality.

3. Stepping Stone

"Stepping Stone" is very useful when you don't exactly know your destination, but you know that there are some intermediate steps that you have to do. In this approach, the designer focus in those intermediate steps that will lead to the final destination.

"Stepping Stone" requires the designer to have a "scope of variability" in mind about the final solution and then identify some common ground across the perceived variability. Knowing what you don't know is important to define the "stepping stone".

This approach is also very useful to design the evolution path of the system. Since you need to look at how the "stepping stone" will provide value to the existing systems while it evolves.

4. Simplification

"Simplification" is the about how you can simplify your intermediate design by breaking down the requirement into multiple phases. I personally don't think the designer has the flexibility to change the ultimate requirement but she definitely can break down the ultimate requirement into multiple phases so she can control the evolution path of her design. In other words, by simplifying the requirement in each phase, she can pick the challenges that she want to tackle in different phases.

"Simplification" is also an important skill for experienced designers. The only way to tackle any complex system beyond a human's brain power is to break the original complexity down into simpler systems and tackle them in incremental steps.

"Simplification" is also an important abstraction skills where experience designer can generalize a specific problem case into a generic problem pattern where a generic solution can be found (e.g. design pattern), and then customize a specific solution from the design pattern.

Saturday, November 28, 2009

Query Processing for NOSQL DB

The recent rise of NoSQL provides an alternative model in building extremely large scale storage system. Nevetheless, compare to the more mature RDBMS, NoSQL has some fundamental limitations that we need to be aware of.
  1. It calls for a more relaxed data consistency model
  2. It provides primitive querying and searching capability
There are techniques we can employ to mitigate each of these issue. Regarding the data consistency concern, I have discussed a number of design patterns in my previous blog to implement system with different strength of consistency guarantee.

Here I like to give myself a try to tackle the second issue.

So what is the problem ?

Many of the NoSQL DB today is based on the DHT (Distributed Hash Table) model, which provides a hashtable access semantics. To access or modify any object data, the client is required to supply the primary key of the object, then the DB will lookup the object using an equality match to the supplied key.

For example, if we use DHT to implement a customer DB, we can choose the customer id as the key. And then we can get/set/operate on any customer object if we know its id
  • cust_data = DHT.get(cust_id)
  • DHT.set(cust_id, modified_cust_data)
  • DHT.execute(cust_id, func(cust) {cust.add_credit(200)})
In the real world, we may want to search data based on other attributes than its primary key, we may also search attributes based on "greater/less than" relationship, or we may want to combine multiple search criteria using a boolean expression.

Using our customer DB example, we may do ...
  • Lookup customers within a zip code
  • Lookup customers whose income is above 200K
  • Lookup customer using keywords "chief executives"
Although query processing and indexing technique is pretty common in RDBMS world, it is seriously lacking in the NoSQL world because of the very nature of the "distributed architecture" underlying most of NoSQL DB.

It seems to me that the responsibility of building an indexing and query mechanism lands on the NoSQL user. Therefore, I want to explore some possible techniques to handle these.


Companion SQL-DB

A very straighforward approach is provide querying capability is to augment NoSQL with an RDBMS or TextDB (for keyword search). e.g. We add the metadata of the object into a RDBMS so we can query its metadata using standard SQL query.

Of course, this requires the RDBMS to be large enough to store the search-able attributes of each object. Since we only store the attributes required for search, rather than the whole object into the RDBMS, this turns out to be a very practical and common approach.


Scatter/Gather Local Search

Some of the NOSQL DB provides indexing and query processing mechanism within the local DB. In this case, we can have the query processor broadcast the query to every node in the DHT where a local search will be conducted with results sent back to the query processor which aggregates into a single response.

Notice that the search is happening in parallel across all nodes in the DHT.


Distributed B-Tree

B+Tree is a common indexing structure using in RDBMS. A distributed version of B+Tree can also be used in a DHT environment. The basic idea is to hash the search-able attribute to locate the root node of the B+ Tree. The "value" of the root node contains the id of its children node. So the client can then issue another DHT lookup call to find the children node. Continue this process, the client eventually navigate down to the leaf node, where the object id of the matching the search criteria is found. Then the client will issue another DHT lookup to extract the actual object.

Caution is needed when the B+Tree node is updated due to split/merge caused by object creation and deletion. This should be ideally done in an atomic fashion. This paper from Microsoft, HP and Toronto U describe a distributed transaction protocol to provide the required atomicity. Distributed transaction is an expensive operation but its uses here is justified because most of the B+ tree updates rarely involve more than a single machine.


Prefix Hash Table (distributed Trie)

Trie is an alternative data structure, where every path (from the root) contains the prefix of the key. Basically, every node in the Trie contains all the data whose key is prefixed by it. Berkeley and Intel research has a paper to describe this mechanism.

1. Lookup a key
To locate a particular key, we start with its one digit prefix and do a DHT lookup to see if we get a leaf node. If so, we search within this leaf node as we know the key must be contained inside. If it is not a leaf node, we extend the prefix with an extra digit and repeat the whole process again.
# Locate the key next to input key
def locate(key)
 leaf = locate_leaf_node(key)
 return leaf.locate(key)
end

# Locate leaf node containing input key
def locate_leaf_node(key)
 for (i in 1 .. key.length)
   node = DHT.lookup(key.prefix(n))
   return node if node.is_leaf?
 end
 raise exception
end

2. Range Query
Perform a range query can be done by first locate the leaf node that contains the start key and then walk in the ascending order direction until we exceed the end key. Note that we can walk across a leaf node by following the leaf node chain.
def range(startkey, endkey) {
 result = Array.new
 leaf = locate_leaf_node(startkey)
 while leaf != nil
   result.append(leaf.range(startkey, endkey))
   if (leaf.largestkey < endkey)
     leaf = leaf.nextleaf
   end
 end
 return result
end
To speedup the search, we can use a parallel search mechanism. Instead of walking from the start key in a sequential manner, we can find the common prefix of the start key and end key (as we know all the result is under its subtree) and perform a parallel search of the children leaf nodes of this subtree.

3. Insert and Delete keys
To insert a new key, we first identify the leaf node that contains the inserted key. If the leaf node has available capacity (less than B keys), then simply add it there. Otherwise, we need to split the leaf node into two branches and redistribute its existing keys to the newly created child nodes.

To delete a key, we similarly identify the leaf node that contains the deleted key and then remove it there. This may cause some of my parents to have less than B + 1 keys so I may need to merge some child nodes.


Combining Multiple Search Criteria

When we have multiple criteria in the search, each criteria may use a different index that resides within a different set of machines in the DHT. Multiple criterias can be combined using boolean operators such as OR / AND. Performing OR operation is very straightforward because we just need to union the results of each individual index search that is performed separately. On the other hand, performing AND operation is trickier because we need to deal with the situation that each individual criteria may have a large number of matches but their intersection is small. The challenge is: how can we efficiently perform an intersection between two potentially very large sets ?

One naive implementation is to send all matched object ids of each criteria to a server that performs the set intersection. If each data set is large, this approach may cause a large bandwidth consumption for sending across all the potential object ids.

A number of techniques are described here in this paper

1. Bloom Filter
Instead of sending the whole set of matched object id, we can send a more compact representation called "Bloom Filter". Bloom filter is a much more compact representation that can be used for testing set membership. The output has zero false negative, but has a chance of false positive p, which is controllable.


For minimizing bandwidth, we typically pick the one with the larger set as the sending machine and perform the intersection on the receiving machine who has the smaller set.

Notice that the false positive can actually be completely eliminated by sending the matched result of Set2 back to Set1 machine, which double check the membership of set1 again. In most cases, 100% precision is not needed and a small probability of false positive is often acceptable.

2. Caching
It is possible that certain search criteria is very popular and will be issued over and over again. The corresponding bloom filter of this hot spots can be cached in the receiving machine. Since the bloom filter has a small footprint, we can cache a lot of bloom filters of popular search criterias.

3. Incremental fetch
In case if the client doesn't need to get the full set of matched results, we can stream the data back to client using a cursor mode. Basically, at the sending side, set1 is sorted and broken into smaller chunks with a bloom filter computed and attached to each chunk. At the receiving side, every element of set2 is checked for every bloom filter per chunk.

Notice that we save computation at the sending side (compute the bloom filter for the chunk rather than the whole set1) at the cost of doing more at the receiving side (since we need to repeat the checking of the whole set2 for each chunk of set1). The assumption is that client only needs a small subset of all the matched data.

An optimization we can do is to mark the range of each chunk in set1 and ask set2 to skip the objects that falls within the same range.

Thursday, November 19, 2009

Cloud Computing Patterns

I have attended a presentation by Simon Guest from Microsoft on their cloud computing architecture. Although there was no new concept or idea introduced, Simon has provided an excellent summary on the major patterns of doing cloud computing.

I have to admit that I am not familiar with Azure and this is my first time hearing a Microsoft cloud computing presentation. I felt Microsoft has explained their Azure platform in a very comprehensible way. I am quite impressed.

Simon talked about 5 patterns of Cloud computing. Let me summarize it (and mix-in a lot of my own thoughts) ...

1. Use Cloud for Scaling
The key idea is to spin up and down machine resources according to workload so the user only pay for the actual usage. There is two types of access patterns: passive listener model and active worker model.

Passive listener model uses a synchronous communication pattern where the client pushes request to the server and synchronously wait for the processing result.
In the passive listener model, machine instances are typically sit behind a load balancer. To scale the resource according to the work load, we can use a monitor service that send NULL client request and use the measured response time to spin up and down the size of the machine resources.

On the other hand, Active worker model uses an asynchronous communication patterns where the client put the request to a queue, which will be periodically polled by the server. After queuing the request, the client will do some other work and come back later to pickup the result. The client can also provide a callback address where the server can push the result into after the processing is done.
In the active worker model, the monitor can measure the number of requests sitting in the queue and use that to determine whether machine instances (at the consuming end) need to be spin up or down.


2. Use Cloud for Multi-tenancy
Multi-tenancy is more a SaaS provider (rather than an enterprise) usage scenario. The key idea is to use the same set of code / software to host the application for different customers (tenants) who may have slightly different requirement in
  • UI branding
  • Business rules for decision criteria
  • Data schema
The approach is to provide sufficient "customization" capability for their customer. The most challenging part is to determine which aspects should be opened for customization and which shouldn't. After identifying these configurable parameters, it is straightforward to define configuration metadata to capture that.

3. Use Cloud for Batch processingThis is about running things like statistics computation, report generation, machine learning, analytics ... etc. These task is done in batch mode and so it is more economical to use the "pay as you go" model. On the other hand, batch processing has very high tolerance in latency and so is a perfect candidate of running in the cloud.
Here is an example of how to run Map/Reduce framework in the cloud. Microsoft hasn't provided a Map/Reduce solution at this moment but Simon mentioned that Dryad in Microsoft research may be a future Microsoft solution. Interestingly, Simon also recommended Hadoop.

Of course, one challenge is how to move the data from the cloud in the first place. In my earlier blog, I have describe some best practices on this.

4. Use Cloud for Storage
The idea of storing data into the cloud and no need to worry about DBA tasks. Most cloud vendor provide large scale key/value store as well as RDBMS services. Their data storage services will also take care of data partitioning, replication ... etc. Building cloud storage is a big topic involving many distributed computing concepts and techniques, I have covered it in a separate blog.

5. Use Cloud for Communication
A queue (or mailbox) service provide a mechanism for different machines to communicate in an asynchronous manner via message passing.

Azure also provide a relay service in the cloud which is quite useful for machines behind different firewall to communicate. In a typical firewall setup, incoming connection is not allowed so these machine cannot directly establish a socket to each other. In order for them to communicate, each need to open an on-going socket connection to the cloud relay, which will route traffic between these connections.

I have used the same technique in a previous P2P project where user's PC behind their firewall need to communicate, and I know this relay approach works very well.

Monday, November 16, 2009

Impression on Scala

I have been hearing quite a lot of good comments about the Scala programming language. I personally use Java extensively in the past and have switched to Ruby (and some Erlang) in last 2 years. The following features that I heard about Scala really attracts me ...
  • Scala code is compiled in Java byte code and run natively in JVMs. Code written in Scala immediately enjoy the performance and robustness of Java VM technologies.
  • Easy to integrate with Java code and libraries, immediately enjoy the wide portfolio of exiting Java libraries.
  • It has good support to the Actor model, which I believe is an important programming paradigm for multi-core machine architecture.
So I decide to take a Scala tutorial from Dean Wampler today in the Qcon conference. This is a summary of my impression on Scala after the class.

First of all, Scala is a strongly typed language. However it has a type inference mechanism so you don't have to type declaration is optional. But in some place (like a method signature), type declaration is mandatory. It is not very clear to me when I have to declare a type.

Having the "val" and "var" declaration in variables is very nice because it makes immutability explicit. In Ruby, you can make an object immutable by sending it a freeze() method but Scala do this more explicitly.

But I found it confusing to have a method define in 2 different ways

class A() {
def hello {
...
}
}
class A() {
def hello = {
...
}
}
The MyFunction[+A1, -A2] is really confusing to me. I feel the typeless language is much more easy.

Removing the open and close bracket is also causing a lot of confusion to me.
class Person(givenName: String) {
var myName = givenName
def name =(anotherName: String) = {
myName = anotherName
}
}

class Person(givenName: String) {
var myName = givenName
def name =(anotherName: String) = myName = anotherName
}
The special "implicit" conversion method provides a mechanism to develop DSL (Domain Specific Language) in Scala but it also looks very odd to me. Basically, you need to import a SINGLE implicit conversion method that needs to take care of all possible conversions.

All the method that ends with ":" has a reverse calling order is also an odd stuff to me.

Traits provides mixins for Scala but I feel the "Module" mechanism in Ruby has done a better job.

Scala has the notion of "function" and can pass "function" as parameters. Again, I feel Ruby blocks has done a better job.

Perhaps due to JVM's limitation of supporting a dynamic language, Scala is not very strong in doing meta-programming, Scala doesn't provide the "open class" property where you can modify an existing class (add methods, change method implementation, add class ... etc.) at run time

Scala also emulate a number of Erlang features but I don't feel it is doing a very clean job. For example, it emulate the pattern matching style of Erlang programming using the case Class and unapply() method but it seems a little bit odd to me.

Erlang has 2 cool features which I couldn't find in Scala (maybe I am expecting too much)
  • The ability to run two version of class at the same time
  • Able to create and pass function objects to a remote process (kinda like a remote code loading)
Overall impression

I have to admit that my impression on Scala is not as good as before I attend the tutorial. Scala tries to put different useful programming paradigm in the JVM but I have a feeling of force-fit. Of course its close tie to JVM is still a good reason to use Scala. But from a pure programming perspective, I will prefer to use a combination of Ruby and Erlang, rather than Scala.

Sunday, November 15, 2009

NOSQL Patterns

Over the last couple years, we see an emerging data storage mechanism for storing large scale of data. These storage solution differs quite significantly with the RDBMS model and is also known as the NOSQL. Some of the key players include ...
  • GoogleBigTable, HBase, Hypertable
  • AmazonDynamo, Voldemort, Cassendra, Riak
  • Redis
  • CouchDB, MongoDB
These solutions has a number of characteristics in common
  • Key value store
  • Run on large number of commodity machines
  • Data are partitioned and replicated among these machines
  • Relax the data consistency requirement. (because the CAP theorem proves that you cannot get Consistency, Availability and Partitioning at the the same time)
The aim of this blog is to extract the underlying technologies that these solutions have in common, and get a deeper understanding on the implication to your application's design. I am not intending to compare the features of these solutions, nor to suggest which one to use.


API model

The underlying data model can be considered as a large Hashtable (key/value store).

The basic form of API access is
  • get(key) -- Extract the value given a key
  • put(key, value) -- Create or Update the value given its key
  • delete(key) -- Remove the key and its associated value
More advance form of API allows to execute user defined function in the server environment
  • execute(key, operation, parameters) -- Invoke an operation to the value (given its key) which is a special data structure (e.g. List, Set, Map .... etc).
  • mapreduce(keyList, mapFunc, reduceFunc) -- Invoke a map/reduce function across a key range.

Machines layout

The underlying infratructure is composed of large number (hundreds or thousands) of cheap, commoditized, unreliable machines connected through a network. We call each machine a physical node (PN). Each PN has the same set of software configuration but may have varying hardware capacity in terms of CPU, memory and disk storage. Within each PN, there will be a variable number of virtual node (VN) running according to the available hardware capacity of the PN.


Data partitioning (Consistent Hashing)

Since the overall hashtable is distributed across many VNs, we need a way to map each key to the corresponding VN.

One way is to use
partition = key mod (total_VNs)

The disadvantage of this scheme is when we alter the number of VNs, then the ownership of existing keys has changed dramatically, which requires full data redistribution. Most large scale store use a "consistent hashing" technique to minimize the amount of ownership changes.


In the consistent hashing scheme, the key space is finite and lie on the circumference of a ring. The virtual node id is also allocated from the same key space. For any key, its owner node is defined as the first encountered virtual node if walking clockwise from that key. If the owner node crashes, all the key it owns will be adopted by its clockwise neighbor. Therefore, key redistribution happens only within the neighbor of the crashed node, all other nodes retains the same set of keys.


Data replication

To provide high reiability from individually unreliable resource, we need to replicate the data partitions.

Replication not only improves the overall reliability of data, it also helps performance by spreading the workload across multiple replicas.


While read-only request can be dispatched to any replicas, update request is more challenging because we need to carefully co-ordinate the update which happens in these replicas.

Membership Changes

Notice that virtual nodes can join and leave the network at any time without impacting the operation of the ring.

When a new node joins the network
  1. The joining node announce its presence and its id to some well known VNs or just broadcast)
  2. All the neighbors (left and right side) will adjust the change of key ownership as well as the change of replica memberships. This is typically done synchronously.
  3. The joining node starts to bulk copy data from its neighbor in parallel asynchronously.
  4. The membership change is asynchronously propagate to the other nodes.

Notice that other nodes may not have their membership view updated yet so they may still forward the request to the old nodes. But since these old nodes (which is the neighbor of the new joined node) has been updated (in step 2), so they will forward the request to the new joined node.

On the other hand, the new joined node may still in the process of downloading the data and not ready to serve yet. We use the vector clock (described below) to determine whether the new joined node is ready to serve the request and if not, the client can contact another replica.

When an existing node leaves the network (e.g. crash)
  1. The crashed node no longer respond to gossip message so its neighbors knows about it.
  2. The neighbor will update the membership changes and copy data asynchronously

We haven't talked about how the virtual nodes is mapped into the physical nodes. Many schemes are possible with the main goal that Virtual Node replicas should not be sitting on the same physical node. One simple scheme is to assigned Virtual node to Physical node in a random manner but check to make sure that a physical node doesn't contain replicas of the same key ranges.

Notice that since machine crashes happen at the physical node level, which has many virtual nodes runs on it. So when a single Physical node crashes, the workload (of its multiple virtual node) is scattered across many physical machines. Therefore the increased workload due to physical node crashes is evenly balanced.


Client Consistency

Once we have multiple copies of the same data, we need to worry about how to synchronize them such that the client can has a consistent view of the data.

There is a number of client consistency models
  1. Strict Consistency (one copy serializability): This provides the semantics as if there is only one copy of data. Any update is observed instantaneously.
  2. Read your write consistency: The allows the client to see his own update immediately (and the client can switch server between requests), but not the updates made by other clients
  3. Session consistency: Provide the read-your-write consistency only when the client is issuing the request under the same session scope (which is usually bind to the same server)
  4. Monotonic Read Consistency: This provide the time monotonicity guarantee that the client will only see more updated version of the data in future requests.
  5. Eventual Consistency: This provides the weakness form of guarantee. The client can see an inconsistent view as the update are in progress. This model works when concurrent access of the same data is very unlikely, and the client need to wait for some time if he needs to see his previous update.

Depends on which consistency model to provide, 2 mechanisms need to be arranged ...
  • How the client request is dispatched to a replica
  • How the replicas propagate and apply the updates
There are various models how these 2 aspects can be done, with different tradeoffs.

Master Slave (or Single Master) Model

Under this model, each data partition has a single master and multiple slaves. In above model, B is the master of keyAB and C, D are the slaves. All update requests has to go to the master where update is applied and then asynchronously propagated to the slaves. Notice that there is a time window of data lost if the master crashes before it propagate its update to any slaves, so some system will wait synchronously for the update to be propagated to at least one slave.

Read requests can go to any replicas if the client can tolerate some degree of data staleness. This is where the read workload is distributed among many replicas. If the client cannot tolerate staleness for certain data, it also need to go to the master.

Note that this model doesn't mean there is one particular physical node that plays the role as the master. The granularity of "mastership" happens at the virtual node level. Each physical node has some virtual nodes acts as master of some partitions while other virtual nodes acts as slaves of other partitions. Therefore, the write workload is also distributed across different physical node, although this is due to partitioning rather than replicas

When a physical node crashes, the masters of certain partitions will be lost. Usually, the most updated slave will be nominated to become the new master.

Master Slave model works very well in general when the application has a high read/write ratio. It also works very well when the update happens evenly in the key range. So it is the predominant model of data replication.

There are 2 ways how the master propagate updates to the slave; State transfer and Operation transfer. In State transfer, the master passes its latest state to the slave, which then replace its current state with the latest state. In operation transfer, the master propagate a sequence of operations to the slave which then apply the operations in its local state.

The state transfer model is more robust against message lost because as long as a latter more updated message arrives, the replica still be able to advance to the latest state.

Even in state transfer mode, we don't want to send the full object for updating other replicas because changes typically happens within a small portion of the object. In will be a waste of network bandwidth if we send the unchanged portion of the object, so we need a mechanism to detect and send just the delta (the portion that has been changed). One common approach is break the object into chunks and compute a hash tree of the object. So the replica can just compare their hash tree to figure out which chunk of the object has been changed and only send those over.

In operation transfer mode, usually much less data need to be send over the network. However, it requires a reliable message mechanism with delivery order guarantee.


Multi-Master (or No Master) Model

If there is hot spots in certain key range, and there is intensive write request, the master slave model will be unable to spread the workload evenly. Multi-master model allows updates to happen at any replica (I think call it "No-Master" is more accurate).

If any client can issue any update to any server, how do we synchronize the states such that we can retain client consistency and also eventually every replica will get to the same state ? We describe a number of different approaches in following ...

Quorum Based 2PC

To provide "strict consistency", we can use a traditional 2PC protocol to bring all replicas to the same state at every update. Lets say there is N replicas for a data. When the data is update, there is a "prepare" phase where the coordinator ask every replica to confirm whether each of them is ready to perform the update. Each of the replica will then write the data to a log file and when success, respond to the coordinator.

After gathering all replicas responses positively, the coordinator will initiate the second "commit" phase and then ask every replicas to commit and each replica then write another log entry to confirm the update. Notice that there are some scalability issue as the coordinator need to "synchronously" wait for quite a lot of back and forth network roundtrip and disk I/O to complete.

On the other hand, if any one of the replica crashes, the update will be unsuccessful. As there are more replicas, chance of having one of them increases. Therefore, replication is hurting the availability rather than helping. This make traditional 2PC not a popular choice for high throughput transactional system.

A more efficient way is to use the quorum based 2PC (e.g. PAXOS). In this model, the coordinator only need to update W replicas (rather than all N replicas) synchronously. The coordinator still write to all the N replicas but only wait for positive acknowledgment for any W of the N to confirm. This is much more efficient from a probabilistic standpoint.

However, since no all replicas are update, we need to be careful when reading the data to make sure the read can reach at least one replica that has been previously updated successful. When reading the data, we need to read R replicas and return the one with the latest timestamp.

For "strict consistency", the important condition is to make sure the read set and the write set overlap. ie: W + R > N


As you can see, the quorum based 2PC can be considered as a general 2PC protocol where the traditional 2PC is a special case where W = N and R = 1. The general quorum-based model allow us to pick W and R according to our tradeoff decisions between read and write workload ratio.

If the user cannot afford to pick W, R large enough, ie: W + R <= N, then the client is relaxing its consistency model to a weaker one.
If the client can tolerate a more relax consistency model, we don't need to use the 2PC commit or quorum based protocol as above. Here we describe a Gossip model where updates are propagate asynchronous via gossip message exchanges and an auto-entropy protocol to apply the update such that every replica eventually get to the latest state.

Vector Clock


Vector Clock is a timestamp mechanism such that we can reason about causal relationship between updates. First of all, each replica keeps vector clock. Lets say replica i has its clock Vi. Vi[i] is the logical clock which if every replica follows certain rules to update its vector clock.
  • Whenever an internal operation happens at replica i, it will advance its clock Vi[i]
  • Whenever replica i send a message to replica j, it will first advance its clock Vi[i] and attach its vector clock Vi to the message
  • Whenever replica j receive a message from replica i, it will first advance its clock Vj[j] and then merge its clock with the clock Vm attached in the message. ie: Vj[k] = max(Vj[k], Vm[k])

A partial order relationship can be defined such that Vi > Vj iff for all k, Vi[k] >= Vj[k]. We can use these partial ordering to derive causal relationship between updates. The reasoning behind is
  • The effect of an internal operation will be seen immediately at the same node
  • After receiving a message, the receiving node knows the situation of the sending node at the time when the message is send. The situation is not only including what is happening at the sending node, but also all the other nodes that the sending node knows about.
  • In other words, Vi[i] reflects the time of the latest internal operation happens at node i. Vi[k] = 6 reflects replica i has known the situation of replica k up to its logical clock 6.
Notice that the term "situation" is used here in an abstract sense. Depends on what information is passed in the message, the situation can be different. This will affect how the vector clock will be advanced. In below, we describe the "state transfer model" and the "operation transfer model" which has different information passed in the message and the advancement of their vector clock will also be different.

Because state is always flow from the replica to the client but not the other way round, the client doesn't have an entry in the Vector clock. The vector clock contains only one entry for each replica. However, the client will also keep a vector clock from the last replica it contacts. This is important for support the client consistency model we describe above. For example, to support monotonic read, the replica will make sure the vector clock attached to the data is > the client's submitted vector clock in the request.


Gossip (State Transfer Model)

In a state transfer model, each replica maintain a vector clock as well as a state version tree where each state is neither > or < among each other (based on vector clock comparison). In other words, the state version tree contains all the conflicting updates.

At query time, the client will attach its vector clock and the replica will send back a subset of the state tree which precedes the client's vector clock (this will provide monotonic read consistency). The client will then advance its vector clock by merging all the versions. This means the client is responsible to resolve the conflict of all these versions because when the client sends the update later, its vector clock will precede all these versions.


At update, the client will send its vector clock and the replica will check whether the client state precedes any of its existing version, if so, it will throw away the client's update.


Replicas also gossip among each other in the background and try to merge their version tree together.

Gossip (Operation Transfer Model)

In an operation transfer approach, the sequence of applying the operations is very important. At the minimum causal order need to be maintained. Because of the ordering issue, each replica has to defer executing the operation until all the preceding operations has been executed. Therefore replicas save the operation request to a log file and exchange the log among each other and consolidate these operation logs to figure out the right sequence to apply the operations to their local store in an appropriate order.

"Causal order" means every replica will apply changes to the "causes" before apply changes to the "effect". "Total order" requires that every replica applies the operation in the same sequence.

In this model, each replica keeps a list of vector clock, Vi is the vector clock the replica itself and Vj is the vector clock when replica i receive replica j's gossip message. There is also a V-state that represent the vector clock of the last updated state.

When a query is submitted by the client, it will also send along its vector clock which reflect the client's view of the world. The replica will check if it has a view of the state that is later than the client's view.


When an update operation is received, the replica will buffer the update operation until it can be applied to the local state. Every submitted operation will be tag with 2 timestamp, V-client indicates the client's view when he is making the update request. V-@receive is the replica's view when it receives the submission.

This update operation request will be sitting in the queue until the replica has received all the other updates that this one depends on. This condition is reflected in the vector clock Vi when it is larger than V-client


On the background, different replicas exchange their log for the queued updates and update each other's vector clock. After the log exchange, each replica will check whether certain operation can be applied (when all the dependent operation has been received) and apply them accordingly. Notice that it is possible that multiple operations are ready for applying at the same time, the replica will sort these operation in causal order (by using the Vector clock comparison) and apply them in the right order.


The concurrent update problem at different replica can also happen. Which means there can be multiple valid sequences of operation. In order for different replica to apply concurrent update in the same order, we need a total ordering mechanism.

One approach is whoever do the update first acquire a monotonic sequence number and late comers follow the sequence. On the other hand, if the operation itself is commutative, then the order to apply the operations doesn't matter

After applying the update, the update operation cannot be immediately removed from the queue because the update may not be fully exchange to every replica yet. We continuously check the Vector clock of each replicas after log exchange and after we confirm than everyone has receive this update, then we'll remove it from the queue.

Map Reduce Execution

Notice that the distributed store architecture fits well into distributed processing as well. For example, to process a Map/Reduce operation over an input key list.

The system will push the map and reduce function to all the nodes (ie: moving the processing logic towards the data). The map function of the input keys will be distributed among the replicas of owning those input, and then forward the map output to the reduce function, where the aggregation logic will be executed.


Handling Deletes

In a multi-master replication system, we use Vector clock timestamp to determine causal order, we need to handle "delete" very carefully such that we don't lost the associated timestamp information of the deleted object, otherwise we cannot even reason the order of when to apply the delete.

Therefore, we typically handle delete as a special update by marking the object as "deleted" but still keep its metadata / timestamp information around. Around a long enough time that we are confident that every replica has marked this object deleted, then we garbage collected the deleted object to reclaim its space.


Storage Implementaton

One strategy is to use make the storage implementation pluggable. e.g. A local MySQL DB, Berkeley DB, Filesystem or even a in memory Hashtable can be used as a storage mechanism.

Another strategy is to implement the storage in a highly scalable way. Here are some techniques that I learn from CouchDB and Google BigTable.

CouchDB has a MVCC model that uses a copy-on-modified approach. Any update will cause a private copy being made which in turn cause the index also need to be modified and causing the a private copy of the index as well, all the way up to the root pointer.

Notice that the update happens in an append-only mode where the modified data is appended to the file and the old data becomes garbage. Periodic garbage collection is done to compact the data. Here is how the model is implemented in memory and disks

In Google BigTable model, the data is broken down into multiple generations and the memory is use to hold the newest generation. Any query will search the mem data as well as all the data sets on disks and merge all the return results. Fast detection of whether a generation contains a key can be done by checking a bloom filter.

When update happens, both the mem data and the commit log will be written so that if the machine crashes before the mem data flush to disk, it can be recovered from the commit log.