Saturday, March 19, 2011

Compare Machine Learning models with ROC Curve

ROC Curve is a common method to compare performance between different models. It can also be used to pick trade-off decisions between "false positives" and "false negatives". ROC curve is defined as a plot of "false positive rate" against "false negative rate". However, I don't find the ROC concept is intuitive and has been struggled for a while to grasp the concept.

Here is my attempt to explain ROC curve from a different angle. We use a binary classification example to illustrate the idea. (ie: predicting whether a patient has cancer or not)

First of all, all predictive model is not 100% correct. The desirable state is that a person who actually has cancer got a positive test result, and a person who actually has no cancer got a negative test result. Since the test is imperfect, it is possible that a person who actually has cancer was tested negative (ie: Fail to detect) or a person who actually has no cancer was tested positive (ie: False alarm).


In reality, there is always a tradeoff between the false negative rate and the false positive rate. People can tune the decision threshold to adjust them (e.g. In "random forest", we can set the threshold of predicting positive when more than 30% decision trees predicting positive). Usually, the threshold is set based on the consequence or cost of mis-classification. (e.g. in this example, fail to detect has a much higher cost than a false alarm)


This can also be used to compare model performance. A good model is one that has both low false positive rate and low false negative rate, which is indicated in the size of the gray area below (the smaller the better).

"Random guess" is the worst prediction model and is used as a baseline for comparison. The decision threshold of a random guess is a number between 0 to 1 in order to determine between positive and negative prediction.


ROC Curve is basically what I have described above with one transformation, which is transforming the y-axis from "fail to detect" to 1 - "fail to detect", which now become "success to detect". Honestly I don't understand why this representation is better though.

Now, the ROC curve will look as follows ...

Thursday, March 17, 2011

Predictive Analytics Conference 2011

I attended the San Francisco Predictive Analytic conference this week and got a chance to chat with some best data mining practitioners of the country. Here summarizes my key takeaways.

How is the division of labor between human and machine?

Another way to ask this question is how “machine learning” and “domain expertise” work together and complement each other, since each has different strength and weakness.


Machine learning is very good at processing large amount of data in an unbiased way while human is unable to process the same data volume and the judgment is usually biased. However, machine cannot look beyond the data being given. For example, if the prediction power is low, machine learning methods cannot distinguish whether it is because the data is not clean, or the wrong model is being chosen, or because some important input feature is not captured. Domain expertise must be brought in to figure out the problem.

So the consensus is data mining / machine learning is simply a toolbox that can be used to augment human’s domain expertise, but can never replace it. For example, the domain expert can throw in a large number of input features to the machine learning model, which can determine a subset that are most influential. But if the domain expert doesn’t recognize an important input feature (and not capturing it), there is no way the machine learning model can figure out what is missing, not even recognizing that something is missing.


On the other hand, human is also very good in visualizing data patterns. “Data visualization” technique can be a powerful means to get a good sense and quickly identify the area where drilldown analysis should be conducted. Of course, visualization is limited to low dimension data as human cannot comprehend more than a handful of dimensions. Human is also easily biased so they may find patterns where are actually coincidence. By having human and machine working together, they complement each other very well.

What are some of the key design decisions in data mining?
  1. Balance between false +ve and false –ve based on cost / consequence of making a wrong decision.
  2. We don’t have to use a method from beginning to end. We can use different methods at different stage of the analysis. For example, in a multi-class (A, B, C) problem, we can use decision tree to distinguish A from notA (ie: B, C) and then use support vector machine to separate B and C. As another example, we can use decision tree to determine the best input attributes to be used by the neural network.

What is the most powerful / most commonly used supervised machine learning modeling technique?


The general answer is that each modeling technique has its strength and weakness and none of them wins in all situations. So understand their corresponding strength and weakness is important to pick the right one.

Generalized Linear Regression
Linear and Logistic regression are based on fitting a linear plane into a set of data points such that the root mean square of error (distance between predicted output and actual output) is minimized. It is by far the most commonly used technique, one for numeric output and the other for categorical output. They have a long history in statistics. It is supported in pretty much all commercial and open source data mining tools.

Linear and Logistic regression model requires certain amount of data preparation such as missing data handling. It also assuming that the output (or logit output) is a linear combination of input features, error is expected to be normally distribution. However, real-life scenarios are not always linear. To deal with non-linearity, input terms will be mixed (usually by cross-multiplication) in different ways to generate additional input terms called “interactions”. This process is like trial and error and can generate huge number of combination. Nevertheless, they do a reasonably good job in a wide spectrum of business problems and are well-understood by statisticians and data miners. And they are commonly used as a baseline comparison with other models.

Neural Network
Neural Network is based on multiple layer of perceptrons (each is like a logistic regression with binary input and output). There is typically a hidden layer (so the number of layers is 3) with N perceptrons (where N is trial and error). Because of the extra layer and the logit() function in the neural network, it can handle non-linearity very well. If it has good predictor in its input data, Neural network can achieve very high performance in prediction.

Similar to linear regression, Neural network requires careful data preparation to remove noisy data as well as redundant input attributes (those that are highly correlated). Neural network also take much longer time to train as compared to other methods. Also the model that Neural network has learned is not explainable or make good sense out of it.

Support Vector Machine
Support Vector Machine is a binary classifier (input feature is numeric). It is based on finding a linear plane that can separate the binary output class such that the margin is maximized. The optimal solution is expressed in terms of the dot product of vectors. If the points are not linearly separable, we can use a function to transform the points to a higher dimension space such that it is linearly separable. The Math shows that the dot product (after transforming to a hi-dim space) can be generalized into a Kernel function (Radial basis function being the most common one). Although the underlying math is not easy for everyone to understand, SVM has demonstrated outstanding performance in a wide spectrum of problems and recently become one of the most effective methods.

Despite of its powerful capability, SVM is not broadly implemented in commercial products as there are some patent issue as AT&T holds the patent of SVM. On the other hand, the non-linear kernel function (such as the most common Radial Basis function) is difficult to implement in parallel programming model such as Map/Reduce. SVM is undergoing active research and a derivative Support Vector Regression can be used to predict numeric output.


Tree Ensembles

This is combining “ensemble methods” with “decision tree”.

Decision tree is the first generation machine learning algorithm based on a greedy approach. For a classification problem, decision tree try to split a branch where the combined “purity” (either by the Gini index or Entropy) after split is maximized. For a regression problem, decision tree try to split where the combined “between-class-variance” divided by “within-class-variance” can be maximized. This is equivalent to maximizing the F-value after split. The splitting continues until reaching the terminating condition such as there are too few member remains in the branch, or the gain of further split is insignificant.

Decision tree are very good at dealing with missing value (simply not using that value in learning and go own both path in scoring). Using a decision tree to capture the decision model is also very comprehensible and explainable. However, decision tree is relatively sensitive to noise and can easily overfit the data. Although the learning mechanism is easy to understand, Decision tree doesn’t perform very well in general and is rarely used in real system. However, when decision trees are used together with Ensemble methods, it becomes extraordinary powerful as all its weakness now disappears.


The idea of ensemble is simple. Instead of learning one model, we learning multiple models and combine the estimation of each individual learner (e.g. we let them vote on categorical output and compute the average for numeric output).


There are two main models for creating different learners. One is called “bagging”, which is basically drawing samples (with replacement) from the training set and then have the same Tree algorithm to learn on different sample data set. Another model is called “boosting”, which has a sequence of iterations where samples are drawn from the training set based on the probability distribution where the wrongly predicted items in last round will have a higher chance to be selected. In other words, the algorithm places more attention to learn from wrongly-classified examples.


It turns out Ensemble tree is the most popular method at this moment as it achieve very good prediction across the board, easy to understand and can be implemented in Map/reduce. Google recently published a good paper on their PLANET project which implements ensemble tree on map/reduce.

Sunday, December 5, 2010

BI at large scale

As more and more data being collected everywhere from pretty much everything a user do, such as transactions activities, social interactions, information search ... enterprises has been actively looking into ways to turn these vast amount of raw data into useful information.

BI process flow

It include the following stages of processing
  1. ETL: Extract operational data (inside enterprise or external sources) into data warehouse (typically organized in Star/Snowflake schema with Fact and Dimension tables).
  2. Data exploration: Get insight into data using simple visualization tools (e.g. histogram, summary statistics) or sophisticated OLAP tools (slice, dice, rollup, drilldown)
  3. Report generation: Produce executive reports
  4. Data mining: Extract patterns of the underlying data to form models (e.g. bayesian networks, linear regression, neural networks, decision trees, support vector machines, nearest neighbors, association rules, principal component analysis)
  5. Feedback: The model will be used to assist business decision making (predicting the future)
The gap of processing BIG data
Many data mining and machine learning algorithms are available in both commercial packages (e.g. SAS, SPSS) as well as open source libraries (e.g. Weka, R). Nevertheless, most of these ML algorithms implementation are based on fitting al data in memory and not designed to process big data (e.g. Tera byte data volume).

On the other hand, massively parallel processing platform such as Hadoop, Map/Reduce, over the last few years, has been proven in processing Terabyte or even Petabyte range of data. Although many sequential algorithm can be restructured to run in map reduce, including a big portion of machine learning algorithm, there isn't a corresponding parallel implementation of ML available in massively parallel form.

Approach 1: Apache Mahout
One approach is to "re-implement" the ML algorithm in Map/Reduce and this is the path of Apache Mahout project. Mahout seems to have implemented an impressive list of algorithms although I haven't used them for my projects yet.

Approach 2: Ensemble of parallel independent learners
This is an alternative path that doesn't require re-implementation of existing algorithms. It works in the following way.
  1. Draw samples from the Big data into many sample data sets, which can fit into the memory of a single, individual learner.
  2. Assign each sample data set to an individual learner, who use existing algorithms to learn the model. After learning, each individual learner keep their own learned model
  3. When a decision / prediction request is received, each individual learner will come up with its own prediction and then combine their results in some ways. (e.g. for classification task, the learners will vote for the predicted class and the majority wins. for regression, the average of the estimate values will be used to predict the output value)

I also found this approach can smoothly fade out outdated model. As user's behavior may change over time, same happens to the validity of a learned model. With this ensemble approach, I can have multiple learners each learn their model periodically. Everytime when a prediction is needed, I will pick the latest k models and combine the final prediction based on a time-decayed weighted voting model. Outdated model will automatically slide out the k-size window automatically.

One gotchas of sampling approach is the handling of rare events (since you may lost those rare events in sampling). In this case, stratified sampling (instead of simple random sampling) should be used.

Friday, November 5, 2010

Map Reduce and Stream Processing

Hadoop Map/Reduce model is very good in processing large amount of data in parallel. It provides a general partitioning mechanism (based on the key of the data) to distribute aggregation workload across different machines. Basically, map/reduce algorithm design is all about how to select the right key for the record at different stage of processing.

However, "time dimension" has a very different characteristic compared to other dimensional attributes of data, especially when real-time data processing is concerned. It presents a different set of challenges to the batch oriented, Map/Reduce model.
  1. Real-time processing demands a very low latency of response, which means there isn't too much data accumulated at the "time" dimension for processing.
  2. Data collected from multiple sources may not have all arrived at the point of aggregation.
  3. In the standard model of Map/Reduce, the reduce phase cannot start until the map phase is completed. And all the intermediate data is persisted in the disk before download to the reducer. All these added to significant latency of the processing.
Here is a more detail description of this high latency characteristic of Hadoop.

Although Hadoop Map/Reduce is designed for batch-oriented work load, certain application, such as fraud detection, ad display, network monitoring requires real-time response for processing large amount of data, have started to looked at various way of tweaking Hadoop to fit in the more real-time processing environment. Here I try to look at some technique to perform low-latency parallel processing based on the Map/Reduce model.


General stream processing model

In this model, data are produced at various OLTP system, which update the transaction data store and also asynchronously send additional data for analytic processing. The analytic processing will write the output to a decision model, which will feed back information to the OLTP system for real-time decision making.

Notice the "asynchronous nature" of the analytic processing which is decoupled from the OLTP system, this way the OLTP system won't be slow down waiting for the completion of the analytic processing. Nevetheless, we still need to perform the analytic processing ASAP, otherwise the decision model will not be very useful if it doesn't reflect the current picture of the world. What latency is tolerable is application specific.

Micro-batch in Map/Reduce


One approach is to cut the data into small batches based on time window (e.g. every hour) and submit the data collected in each batch to the Map Reduce job. Staging mechanism is needed such that the OLTP application can continue independent of the analytic processing. A job scheduler is used to regulate the producer and consumer so each of them can proceed independently.

Continuous Map/Reduce

Here lets imagine some possible modification of the Map/Reduce execution model to cater for real-time stream processing. I am not trying to worry about the backward compatibility of Hadoop which is the approach that Hadoop online prototype (HOP) is taking.

Long running
The first modification is to make the mapper and reducer long-running. Therefore, we cannot wait for the end of the map phase before starting the reduce phase as the map phase never ends. This implies the mapper push the data to the reducer once it complete its processing and let the reducer to sort the data. A downside of this approach is that it offers no opportunity to run the combine() function on the map side to reduce the bandwidth utilization. It also shift more workload to the reducer which now needs to do the sorting.

Notice there is a tradeoff between latency and optimization. Optimization requires more data to be accumulated at the source (ie: the Mapper) so local consolidation (ie: combine) can be performed. Unfortunately, low latency requires the data to be sent ASAP so not much accumulation can be done.

HOP suggest an adaptive flow control mechanism such that data is pushed out to reducer ASAP until the reducer is overloaded and push back (using some sort of flow control protocol). Then the mapper will buffer the processed message and perform combine() before it send to the reducer. This approach automatically shift back and forth the aggregation workload between the reducer and the mapper.

Time Window: Slice and Range
This is a "time slice" concept and a "time range" concept. "Slice" defines a time window where result is accumulated before the reduce processing is executed. This is also the minimum amount of data that the mapper should accumulate before sending to the reducer.

"Range" defines the time window where results are aggregated. It can be a landmark window where it has a well-defined starting point, or a jumping window (consider a moving landmark scenario). It can also be a sliding window where is a fixed size window from the current time is aggregated.

After receiving a specific time slice from every mapper, the reducer can start the aggregation processing and combine the result with the previous aggregation result. Slice can be dynamically adjusted based on the amount of data sent from the mapper.

Incremental processing
Notice that the reducer need to compute the aggregated slice value after receive all records of the same slice from all mappers. After that it calls the user-defined merge() function to merge the slice value with the range value. In case the range need to be refreshed (e.g. reaching a jumping window boundary), the init() functin will be called to get a refreshed range value. If the range value need to be updated (when certain slice value falls outside a sliding range), the unmerge() function will be invoked.

Here is an example of how we keep tracked of the average hit rate (ie: total hits per hour) within a 24 hour sliding window with update happens per hour (ie: an one-hour slice).
# Call at each hit record
map(k1, hitRecord) {
site = hitRecord.site
# lookup the slice of the particular key
slice = lookupSlice(site)
if (slice.time - now > 60.minutes) {
# Notify reducer whole slice of site is sent
advance(site, slice)
slice = lookupSlice(site)
}
emitIntermediate(site, slice, 1)
}

combine(site, slice, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
# Send the message to the downstream node
emitIntermediate(site, slice, hitCount)
}

# Called when reducer receive full slice from all mappers
reduce(site, slice, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
sv = SliceValue.new
sv.hitCount = hitCount
return sv
}

# Called at each jumping window boundary
init(slice) {
rangeValue = RangeValue.new
rangeValue.hitCount = 0
return rangeValue
}

# Called after each reduce()
merge(rangeValue, slice, sliceValue) {
rangeValue.hitCount += sliceValue.hitCount
}

# Called when a slice fall out the sliding window
unmerge(rangeValue, slice, sliceValue) {
rangeValue.hitCount -= sliceValue.hitCount
}

Friday, October 15, 2010

Scalable System Design Patterns

Looking back after 2.5 years since my previous post on scalable system design techniques, I've observed an emergence of a set of commonly used design patterns. Here is my attempt to capture and share them.

Load Balancer

In this model, there is a dispatcher that determines which worker instance will handle the request based on different policies. The application should best be "stateless" so any worker instance can handle the request.

This pattern is deployed in almost every medium to large web site setup.



Scatter and Gather

In this model, the dispatcher multicast the request to all workers of the pool. Each worker will compute a local result and send it back to the dispatcher, who will consolidate them into a single response and then send back to the client.

This pattern is used in Search engines like Yahoo, Google to handle user's keyword search request ... etc.



Result Cache

In this model, the dispatcher will first lookup if the request has been made before and try to find the previous result to return, in order to save the actual execution.

This pattern is commonly used in large enterprise application. Memcached is a very commonly deployed cache server.



Shared Space

This model also known as "Blackboard"; all workers monitors information from the shared space and contributes partial knowledge back to the blackboard. The information is continuously enriched until a solution is reached.

This pattern is used in JavaSpace and also commercial product GigaSpace.



Pipe and Filter

This model is also known as "Data Flow Programming"; all workers connected by pipes where data is flow across.

This pattern is a very common EAI pattern.



Map Reduce

The model is targeting batch jobs where disk I/O is the major bottleneck. It use a distributed file system so that disk I/O can be done in parallel.

This pattern is used in many of Google's internal application, as well as implemented in open source Hadoop parallel processing framework. I also find this pattern can be used in many many application design scenarios.



Bulk Synchronous Parellel

This model is based on lock-step execution across all workers, coordinated by a master. Each worker repeat the following steps until the exit condition is reached, when there is no more active workers.
  1. Each worker read data from input queue
  2. Each worker perform local processing based on the read data
  3. Each worker push local result along its direct connection
This pattern has been used in Google's Pregel graph processing model as well as the Apache Hama project.



Execution Orchestrator

This model is based on an intelligent scheduler / orchestrator to schedule ready-to-run tasks (based on a dependency graph) across a clusters of dumb workers.

This pattern is used in Microsoft's Dryad project



Although I tried to cover the whole set of commonly used design pattern for building large scale system, I am sure I have missed some other important ones. Please drop me a comment and feedback.

Also, there is a whole set of scalability patterns around data tier that I haven't covered here. This include some very basic patterns underlying NOSQL. And it worths to take a deep look at some leading implementations.

Wednesday, October 6, 2010

BigTable Model with Cassandra and HBase

Recently in a number of "scalability discussion meeting", I've seen the following pattern coming up repeatedly ...
  1. To make your app scalable, you try to make your app layer “stateless”.
  2. OK, so you move the "state" out from your application layer out to a shared DB, or shared data layer.
  3. Now, how do we make the data tier scalable, by definition, we cannot make the data tier stateless.
  4. OK, now lets think about how to "partition" your data and spread them across multiple machines in such a way that workload is balanced.
  5. Now there are more boxes and what if some of them crashes.
  6. OK, we should replicate the data across machines.
  7. Now, how do we keep these data in sync ...
  8. And then Cloud computing gets into the picture (as it always does). Now we are not just having a pool of machines but also the pool size can grow and shrink according to workload fluctuation (you don't want to pay for something idle, right ?).
  9. Now we need to figure out as we add more machines into the pool or remove machine from the pool, how we should "redistribute" the data.
This is an area where NOSQL shines. In the last 18 months, NOSQL has become one of the hottest topic in the software industry. It has been introduced as a solution to large scale data storage problem at the range of Terabytes or Petabytes. Dozens of NOSQL products has come to the market, but two leaders HBase and Cassandra seems to stand out from the rest in terms of their adoption.

Given an increasing demand of explaining these 2 products recently, I decide to write a post on this.

Not to repeat the basic theory of NOSQL here, for a foundation of distributed system theory underlying the NOSQL design, please refer to my earlier blog

Both Hbase and Cassandra are based on Google BigTable model, here lets introduce some key characteristic underlying Bigtable first.

Fundamentally Distributed
BigTable is built from the ground up on a "highly distributed", "share nothing" architecture. Data is supposed to store in large number of unreliable, commodity server boxes by "partitioning" and "replication". Data partitioning means the data are partitioned by its key and stored in different servers. Replication means the same data element is replicated multiple times at different servers.


Column Oriented
Unlike traditional RDBMS implementation where each "row" is stored contiguous on disk, BigTable, on the other hand, store each column contiguously on disk. The underlying assumption is that in most cases not all columns are needed for data access, column oriented layout allows more records sitting in a disk block and hence can reduce the disk I/O.

Column oriented layout is also very effective to store very sparse data (many cells have NULL value) as well as multi-value cell. The following diagram illustrate the difference between a Row-oriented layout and a Column-oriented layout


Variable number of Columns
In RDBMS, each row must have a fixed set of columns defined by the table schema, and therefore it is not easy to support columns with multi-value attributes. The BigTable model introduces the "Column Family" concept such that a row has a fixed number of "column family" but within the "column family", a row can have a variable number of columns that can be different in each row.


In the Bigtable model, the basic data storage unit is a cell, (addressed by a particular row and column). Bigtable allow multiple timestamp version of data within a cell. In other words, user can address a data element by the rowid, column name and the timestamp. At the configuration level, Bigtable allows the user to specify how many versions can be stored within each cell either by count (how many) or by freshness (how old).

At the physical level, BigTable store each column family contiguously on disk (imagine one file per column family), and physically sort the order of data by rowid, column name and timestamp. After that, the sorted data will be compressed so that a disk block size can store more data. On the other hand, since data within a column family usually has a similar pattern, data compression can be very effective.

Note: Although not shown in this example, rowid of different column families can be completely different types. For example, in the above example, I can have another column family "UserIdx" whose rowid is a string (user's name) and it has columns whose columnKey is the u1, u2 (ie: the row id of the User Column family) and columnValue is null (ie: not used). This is a common technique to build index at the application level.

Sequential write
BigTable model is highly optimized for write operation (insert/update/delete) with sequential write (no disk seek is needed). Basically, write happens by first appending a transaction entry to a log file (hence the disk write I/O is sequential with no disk seek), followed by writing the data into an in-memory Memtable . In case of the machine crashes and all in-memory state is lost, the recovery step will bring the Memtable up to date by replaying the updates in the log file.

All the latest update therefore will be stored at the Memtable, which will grow until reaching a size threshold, then it will flushed the Memtable to the disk as an SSTable (sorted by the String key). Over a period of time there will be multiple SSTables on the disk that store the data.

Merged read
Whenever a read request is received, the system will first lookup the Memtable by its row key to see if it contains the data. If not, it will look at the on-disk SSTable to see if the row-key is there. We call this the "merged read" as the system need to look at multiple places for the data. To speed up the detection, SSTable has a companion Bloom filter such that it can rapidly detect the absence of the row-key. In other words, only when the bloom filter returns positive will the system be doing a detail lookup within the SSTable.

Periodic Data Compaction
As you can imagine, it can be quite inefficient for the read operation when there are too many SSTables scattering around. Therefore, the system periodically merge the SSTable. Notice that since each of the SSTable is individually sorted by key, a simple "merge sort" is sufficient to merge multiple SSTable into one. The merge mechanism is based on a logarithm property where two SSTable of the same size will be merge into a single SSTable will doubling the size. Therefore the number of SSTable is proportion to O(logN) where N is the number of rows.



After looking at the common part, lets look at their difference of Hbase and Cassandra.

HBase
Based on the BigTable, HBase uses the Hadoop Filesystem (HDFS) as its data storage engine. The advantage of this approach is then HBase doesn't need to worry about data replication, data consistency and resiliency because HDFS has handled it already. Of course, the downside is that it is also constrained by the characteristics of HDFS, which is not optimized for random read access. In addition, there will be an extra network latency between the DB server to the File server (which is the data node of Hadoop).


In the HBase architecture, data is stored in a farm of Region Servers. The "key-to-server" mapping is needed to locate the corresponding server and this mapping is stored as a "Table" similar to other user data table.

Before a client do any DB operation, it needs to first locate the corresponding region server.
  1. The client contacts a predefined Master server who replies the endpoint of a region server that holds a "Root Region" table.
  2. The client contacts the region server who replies the endpoint of a second region server who holds a "Meta Region" table, which contains a mapping from "user table" to "region server".
  3. The client contacts this second region server, passing along the user table name. This second region server will lookup its meta region and reply an endpoint of a third region server who holds a "User Region", which contains a mapping from "key range" to "region server"
  4. The client contacts this third region server, passing along the row key that it wants to lookup. This third region server will lookup its user region and reply the endpoint of a fourth region server who holds the data that the client is looking for.
  5. Client will cache the result along this process so subsequent request doesn't need to go through this multi-step process again to resolve the corresponding endpoint.
In Hbase, the in-memory data storage (what we refer to as "Memtable" in above paragraph) is implemented in Memcache. The on-disk data storage (what we refer to as "SSTable" in above paragraph) is implemented as a HDFS file residing in Hadoop data node server. The Log file is also stored as an HDFS file. (I feel storing a transaction log file remotely will hurt performance)

Also in the HBase architecture, there is a special machine playing the "role of master" who monitors and coordinates the activities of all region servers (the heavy-duty worker node). To the best of my knowledge, the master node is the single point of failure at this moment.

For a more detail architecture description, Lars George has a very good explanation in the log file implementation as well as the data storage architecture of Hbase.

Cassandra
Also based on the BigTable model, Cassandra use the DHT (distributed hash table) model to partition its data, based on the paper described in the Amazon Dynamo model.

Consistent Hashing via O(1) DHT
Each machine (node) is associated with a particular id that is distributed in a keyspace (e.g. 128 bit). All the data element is also associated with a key (in the same key space). The server owns all the data whose key lies between its id and the preceding server's id.

Data is also replicated across multiple servers. Cassandra offers multiple replication schema including storing the replicas in neighbor servers (whose id succeed the server owning the data), or a rack-aware strategy by storing the replicas in a physical location. The simple partition strategy is as follows ...


Tunable Consistency Level
Unlike Hbase, Cassandra allows you to choose the consistency level that is suitable to your application, so you can gain more scalability if willing to tradeoff some data consistency.

For example, it allows you to choose how many ACK to receive from different replicas before considering a WRITE to be successful. Similarly, you can choose how many replica's response to be received in the case of READ before return the result to the client.

By choosing the appropriate number for W and R response, you can choose the level of consistency you like. For example, to achieve Strict Consistency, we just need to pick W, R such that W + R > N. This including the possibility of (W = one and R = all), (R = one and W = all), (W = quorum and R = quorum). Of course, if you don't need strict consistency, you can even choose a smaller value for W and R and gain a bigger availability. Regardless of what consistency level you choose, the data will be eventual consistent by the "hinted handoff", "read repair" and "anti-entropy sync" mechanism described below.

Hinted Handoff
The client performs a write by send the request to any Cassandra node which will act as the proxy to the client. This proxy node will located N corresponding nodes that holds the data replicas and forward the write request to all of them. In case any node is failed, it will pick a random node as a handoff node and write the request with a hint telling it to forward the write request back to the failed node after it recovers. The handoff node will then periodically check for the recovery of the failed node and forward the write to it. Therefore, the original node will eventually receive all the write request.

Conflict Resolution
Since write can reach different replica, the corresponding timestamp of the data is used to resolve conflict, in other words, the latest timestamp wins and push the earlier timestamps into an earlier version (they are not lost)

Read Repair
When the client performs a "read", the proxy node will issue N reads but only wait for R copies of responses and return the one with the latest version. In case some nodes respond with an older version, the proxy node will send the latest version to them asynchronously, hence these left-behind node will still eventually catch up with the latest version.

Anti-Entropy data sync
To ensure the data is still in sync even there is no READ and WRITE occurs to the data, replica nodes periodically gossip with each other to figure out if anyone out of sync. For each key range of data, each member in the replica group compute a Merkel tree (a hash encoding tree where the difference can be located quickly) and send it to other neighbors. By comparing the received Merkel tree with its own tree, each member can quickly determine which data portion is out of sync. If so, it will send the diff to the left-behind members.

Anti-entropy is the "catch-all" way to guarantee eventual consistency, but is also pretty expensive and therefore is not done frequently. By combining the data sync with read repair and hinted handoff, we can keep the replicas pretty up-to-date.

BigTable trade offs
To retain the scalability features of BigTable, some of the basic features of what RDBMS has provided is missing in the BigTable model. Here we highlight the rough edges of Bigtable.

1) Primitive transaction support
Transaction protection is only guaranteed within a single row. In other words, you cannot start a atomic transaction to modify multiple rows.

2) Primitive isolation support
While you are reading a row, other people may have modified the same row and update it before you. Your view is not current anymore but your later update can easily wipe off other people's change.

There are many techniques how concurrent update can be isolated, including pessimistic approach like locking or optimistic approach by using vector clock to be the version stamp. But to the best of my understanding, there is no robust test-and-set operation in the BigTable model (this is some getLock mechanism in Hbase which I haven't looked into), my impression is that there is no easy way to check there is no concurrent update happen in between.

Because of this limitation, I think BigTable model is more suitable for those applications where concurrent update to the same row is very rare, or some inconsistency is tolerable at the application level. Fortunately, there are still a lot of applications falling into this bucket.

3) No indexes
Notice that data within BigTable are all physically sorted; by rowid, column name and timestamp. There is no index from the column value to its containing rowid.

This model is quite different from RDBMS where you typically define a table and worry about defining the index later. There is no such "index" concept in BigTable and you need to carefully plan out the physical sorting order of your data layout.

Lacking index turns out to be quite inconvenient and many people using Bigtable ends up building their own index at the application level. This usually results in having a highly denormalized data model with lots of column family who store links to other tables. Any update to the base data need to carefully update these other column family as well. From a performance angle, this is actually better than maintaining index in RDBMS because Bigtable is optimized from writes. However, since it is now the application logic to maintain the index, this can be a source of application bugs.

4) No referential integrity enforcement
As mentioned above, since you are building artificial index at the application level, you need to maintain the integrity of your index as well. This includes update your index when the base data is inserted, modified or deleted. This kind of handling logic is traditionally residing at the RDBMS level, but since BigTable has no such referential integrity concept, this responsibility is now landed on your application logic.

5) Lack of surrounding tools
As NOSQL or BigTable is very new, the tools surrounding it is definitely not comparable to the RDBMS world at this moment, such tools includes report generation, BI, data warehouse ... etc.

I observe the general trend that most NOSQL products are moving towards the direction to provide an ODBC / JDBC interface to integrate with existing tool markets easier. But at this moment, to the best of my understanding, such interface is not wide spread yet.


Design Patterns for Bigtable model
Due to the very different model of Bigtable, the data model design methodology is also quite different from traditional RDBMS schema design. Here is a sequence of steps that are pretty common ...

1) Identify all your query scenarios
Since there is no index concept, you have to plan out carefully how your data is physically sorted. Therefore it is important to find all your query use cases first.

2) Define your "entity table" and its corresponding column families
For an entity table, it is pretty common to have one column family storing all the entity attributes, and multiple column families to store the links to other entities. (e.g. A "UserTable" may contain a column family "baseInfo" to store all attributes of the user, a column family "friend" to store the links to another user, a column family "company" to store links to another CompanyTable)

3) Define your "index table"
The "index table" is what your application build to support reverse lookup. The "key" is typically base on the search criteria you have identified in your query scenario. It is not uncommon that each query may have its own specific index table.

4) Make sure your application logic updates the index correctly
Since the index table has to be maintained by application logic, you need to check to make sure it is done correctly. In many cases, this can be quite a source of bugs.

It is important to realize that NOSQL is not advocating a replacement of RDBMS which has been proven in many lines of application. The NOSQL should be considered a complementary technologies for some niche area where RDBMS is not covering well.

5) Design your update to be idempotent
Max's post has a good articulation on this. The basic idea is that due to the eventual consistency model based on "read/repair" and "quorum update". It is possible that a failed update is in fact successful. Here is how this can happen.
  1. Client issue a quorum update.
  2. The server distributed the update to all replica servers, but unfortunately doesn't get more than half to respond successfully. So it returns a failure to the client.
  3. Nevertheless, the update has been received by some minority replicas (in other words, they don't rollback even the update is not successful).
  4. Later, if the client read one of this minority, it will get this update (even it has failed). Even more, since this update has a later version, it will read-repair the other copies (ie: further propagate the failed update).
Therefore, the usual recovery is that user should retry the operation when it fails. And the application logic need to deal with potentially duplicated updates. One way is to find some way to detect duplications and ignore them once detected.

Sunday, August 29, 2010

Designing algorithms for Map Reduce

Since the emerging of Hadoop implementation, I have been trying to morph existing algorithms from various areas into the map/reduce model. The result is pretty encouraging and I've found Map/Reduce is applicable in a wide spectrum of application scenarios.

So I want to write down my findings but then found the scope is too broad and also I haven't spent enough time to explore different problem domains. Finally, I realize that there is no way for me to completely cover what Map/Reduce can do in all areas, so I just dump out what I know at this moment over the long weekend when I have an extra day.

Notice that Map/Reduce is good for "data parallelism", which is different from "task parallelism". Here is a description about their difference and a general parallel processing design methodology.

I'll cover the abstract Map/Reduce processing model below. For a detail description of the implementation of Hadoop framework, please refer to my earlier blog here.


Abstract Processing Model
There are no formal definition of the Map/reduce model. Basic on the Hadoop implementation, we can think of it as a "distributed merge-sort engine". The general processing flow is as follows.
  • Input data is "split" into multiple mapper process which executes in parallel
  • The result of the mapper is partitioned by key and locally sorted
  • Result of mapper of the same key will land on the same reducer and consolidated there
  • Merge sorted happens at the reducer so all keys arriving the same reducer is sorted

Within the processing flow, user defined functions can be plugged-in to the framework.
  • map(key1, value1) -> emit(key2, value2)
  • reduce(key2, value2_list) -> emit(key2, aggregated_value2)
  • combine(key2, value2_list) -> emit(key2, combined_value2)
  • partition(key2) return reducerNo
Design the algorithm for map/reduce is about how to morph your problem into a distributed sorting problem and fit your algorithm into the user defined functions of above.

To analyze the complexity of the algorithm, we need to understand the processing cost, especially the cost of network communication in such a highly distributed system.

Lets first consider the communication between Input data split and Mapper. To minimize this overhead, we need to run the mapper logic at the data split (without moving the data). How well we do this depends on how the input data is stored and whether we can run the mapper code there. For HDFS and Cassandra, we can the mapper at the storage node and the scheduler algorithm of JobTracker will assign the mapper to the data split that it collocates with and hence significantly reduce the data movement. Other data store such as Amazon S3 doesn't allow execution of mapper logic at the storage node and therefore incur more data traffic.

The communication between Mapper and Reducer cannot be collocated because it depends on the emit key. The only mechanism available is the combine() function which can perform a local consolidation and hence can reduce the data sent to the reducer.

Finally the communication between the reducer and the output data store depends on the store's implementation. For HDFS, the data is triply replicated and hence the cost of writing can be high. Cassandra (a NOSQL data store) allows configurable latency with various degree of data consistency trade-off. Fortunately, in most case the volume of result data after a Map/Reduce processing is not high.

Now, we see how to fit various different kinds of algorithms into the Map/Reduce model ...


Map-Only
"Embarrassing parallel" problems are those that the same processing is applied in each data element in a pretty independent way, in other words, there is no need to consolidate or aggregate individual results.

These kinds of problem can be expressed as a Map-only job (by specifying the number of reducers to zero). In this case, Mapper's emitted result will directly go to the output format.

Some examples of map-only examples are ...
  • Distributed grep
  • Document format conversion
  • ETL
  • Input data sampling

Sorting
As we described above, Hadoop is fundamentally a distributed sorting engine, so using it for sorting is a natural fit.

For example, we can use an Identity function for both map() and reduce(), then the output is equivalent to sorting the input data. Notice that we are using a single reducer here. So the merge is still sequential although the sorting is done at the mapper in parallel.

We can perform the merge in parallel by using multiple reducers. In this case, output of each reducer are sorted. We may need to do a final merge on all the reducer's output. Another way is to use a customized partition() function such that the keys are partitioned by range. In this case, each reducer is sorting a particular range and the final result is just to concatenate the each reducer's sorted result.
partition(key) {
  range = (KEY_MAX - KEY_MIN) / NUM_OF_REDUCERS
  reducer_no = (key - KEY_MIN) / range
  return reducer_no
}


Inverted Indexes
The map reduce model is originated from Google which has a lot of scenarios of building large scale inverted index. Building an inverted index is about parsing different documents to build a word -> document index for keyword search.

In fact, inverted index is pretty general and can be applied in many scenarios. To build an inverted index, we can feed the mapper each document (or lines within a document). The Mapper will parse the words in the document to emit [word, doc] pairs along with other metadata such as where in the document this word occurs ... etc. The reducer can simply be an identity function that just dump out the list, or it can perform some statistic aggregation per word.

In a more general form of Inverted index, there is a "container" and "element" concept. The Map and Reduce function will be organized in the following patterns.
map(key, container) {
  for each element in container {
      element_meta =
           extract_metadata(element, container)
      emit(element, [container_id, element_meta])
  }
}

reduce(element, container_ids) {
  element_stat =
       compute_stat(container_ids)
  emit(element, [element_stat, container_ids])
}

In Text index, we are not just counting the actual frequency of the terms but also adjust its weighting based on its frequency distribution so common words will have less significance when they appears in the document. The final value after normalization is called TF-IDF (term frequency times inverse document frequency) and can be computed using Map Reduce as well.


Simple Statistics ComputationComputing max, min, count is very straightforward since this operation is commutative and associative. Each mapper will perform the local computation and send the result to a single reducer to do the final computation.

Combine function is typically used to reduce the network traffic. Notice that the input to the combine function must look the same as the input to the reducer function and the output of the combine function must look the same as the output of the map function. There is also no guarantee that the combiner function will be invoked at all.

class Mapper {
  buffer

  map(key, number) {
      buffer.append(number)
      if (buffer.is_full) {
          max = compute_max(buffer)
          emit(1, max)
      }
  }
}


class Reducer {
  reduce(key, list_of_local_max) {
      global_max = 0
      for local_max in list_of_local_max {
          if local_max > global_max {
              global_max = local_max
          }
      }        emit(1, global_max)
  }
}


class Combiner {
  combine(key, list_of_local_max) {
      local_max = maximum(list_of_local_max)
      emit(1, local_max)
  }
}
Computing avg is done in a similar way except that instead of computing the local avg, we compute the local sum and local count. The reducer will do the final sum divided by the final count to come up with the final avg.

Computing a histogram is pretty common in statistics and can give a quick idea about the data distribution. A typical approach is to divide the number into different intervals. The mapper will compute the count per interval, and emit that per interval and the reducer will compute the sum of that interval.
class Mapper {
  interval_start = [0, 20, 40, 60, 80]

  map(key, number) {
      i = 0;
      while (i < NO_OF_INTERVALS) {
          if (number < interval_start[i]) {
              emit(i, 1)
              break
          }
      }
  }
}


class Reducer {
  reduce(interval, counts) {
      total_counts = 0
      for each count in counts {
          total_counts += count
      }
      emit(interval, total_counts)
  }
}


class Combiner {
  combine(interval, occurrence) {
      emit(interval, occurrence.size)
  }
}
Notice that a non-uniform distribution of values across intervals may cause an unbalanced workload among reducers and hence undermine the degree of parallelism. We'll address this in the later part of this post.


In-Mapper Combine
Jimmy Lin, in his excellent book, talks about a technique call "in-mapper combine" which regains control at the application level when the combine takes place. The general idea is to maintain a HashMap to buffer the intermediate result and has a separate logic to determine when to actually emit the data from the buffer. The general code structure is as follows ...
class Mapper {
  buffer

  init() {
      buffer = HashMap.new
  }

  map(key, data) {
      elements = process(data)
      for each element {
          ....
          check_and_put(buffer, k2, v2)
      }
  }

  check_and_put(buffer, k2, v2) {
      if buffer.full {
          for each k2 in buffer.keys {
              emit(k2, buffer[k2])
          }
      }
  }

  close() {        for each k2 in buffer.keys {
          emit(k2, buffer[k2])
      }    }
}

SQL Model
The SQL model can be used to extract data from the data source. It contains a number of primitives.

Projection / Filter
This logic is typically implemented in the Mapper
  • result = SELECT c1, c2, c3, c4 FROM source WHERE conditions
Aggregation / Group by / Having
This logic is typically implemented in the Reducer
  • SELECT sum(c3) as s1, avg(c4) as s2 ... FROM result GROUP BY c1, c2 HAVING conditions
The above example can be realized by the following map/reduce job
class Mapper {
  map(k, rec) {
      select_fields =
          [rec.c1, rec.c2, rec.c3, rec.c4]
      group_fields =
          [rec.c1, rec.c2]
      if (filter_condition == true) {
          emit(group_fields, select_fields)
      }
  }
}

class Reducer {
  reduce(group_fields, list_of_rec) {
      s1 = 0
      s2 = 0
      for each rec in list_of_rec {
          s1 += rec.c3
          s2 += rec.c4
      }
      s2 = s2 / rec.size
      if (having_condition == true) {
          emit(group_fields, [s1, s2])
      }
  }
}

Data Joins
Joining 2 data set is a very common operation in Relational Data Model and has been very mature in RDBMS implementation. The common join mechanism in a centralized DB architecture is as follows
  1. Nested loop join -- This is the most basic and naive mechanism and is organized as two loops. The outer loop reads from data set1, the inner loop scan through the whole data set2 and compare with the records just read from data set1.
  2. Indexed join -- An index (e.g. B-Tree index) is built for one of the data sets (say data set2 which is the smaller one). The join will scan through data set1 and lookup the index to find the matched records of data set2.
  3. Merge join -- Pre-sort both data sets so they are arranged physically in increasing order. The join is realized by just merging the two data sets. a) Locate the first record in both data set1 & set2, which is their corresponding minimum key b) In the one with a smaller minimum key (say data set1), keep scanning until finding the next key which is bigger than the minimum key of the other data set (ie. data set2), call this the next minimum key of data set1. c) Switch position and repeat the whole thing until one of the data set is exhausted.
  4. Hash / Partition join -- Partition the data set1 and data set2 into smaller size and apply other join algorithm in a smaller data set size. A linear scan with a hash() function is typically performed to partition the data sets such that data in set1 and data in set2 with the same key will land on the same partition.
  5. Semi join -- This is mainly used to join two sets of data that is stored at different locations and the goal is to reduce the amount of data transfer such that only the full records appears in the final joint result will be send through. a) Data set2 will send its key set to machine holding Data set1. b) Machine holding Data set1 will do a join and send back the records in Data set1 that matches one of the send-over keys. c) The machine holding data set2 will do a final join to the data send back.
In the map reduce environment, it has the corresponding joins.

General reducer-side join
This is the most basic one, records from data set1 and set2 with the same key will land on the same reducer, which will then do a cartesian product. The downside of this model is that the reducer need to have enough memory to hold all records of each key.
map(k1, rec) {
  emit(rec.key, [rec.type, rec])
}

reduce(k2, list_of_rec) {
  list_of_typeA = []
  list_of_typeB = []
  for each rec in list_of_rec {
      if (rec.type == 'A') {
          list_of_typeA.append(rec)
      } else {
          list_of_typeB.append(rec)
      }
  }

  # Compute the catesian product
  products = []
  for recA in list_of_typeA {
      for recB in list_of_typeB {
          emit(k2, [recA, recB])
      }
  }
}

Optimized reducer-side join
You can "secondary sort" the data type for each key by defining a customized partition function. In this model, you arrange the data type (which has less records per key to arrive first) and you only need to store these types.
map(k1, rec) {
  emit([rec.key, rec.type], rec])
}

partition(key_pair) {
  super.partition(key_pair[0])
}

reduce(k2, list_of_rec) {
  list_of_typeA = []
  for each rec in list_of_rec {
      if (rec.type == 'A') {
          list_of_typeA.append(rec)
      } else { # receive records of typeA
          for recA in list_of_typeA {
              emit(k2, [recA, rec])
          }
      }
  }
}

While being very flexible, the downside of Reducer side join is that all data need to be transfer from the mapper to the reducer and then result write to HDFS. Map-side join explore some special arrangement of the input file such that the join is being perform at the mapper. The advantage of doing in the mapper is that we can exploit the collocation of the Map reduce framework such that the mapper will be allocated an input split in its local machine, hence reduce the data transfer from the disk to the mapper. After the map-side join, the result is written directly to the output HDFS files and hence eliminate the data transfer between the mapper and the reducer.

Map-side partition join
In this model, it requires the 2 data sets to be partitioned into 2 sets of partition files (same number of partitions for each set). The size of the partition is such that it can fit into the memory of the Mapper machine. We also need to configure the Map/Reduce job such that there is no split in the partition file, in other words, the whole partition is assigned to a mapper task.

The mapper will detect the partition of the input file and then read the corresponding partition file of the other data set into an in-memory hashtable. After that, the mapper will lookup the Hashtable to do the join.
class Mapper {
  map = Hashtable.new

  init() {
      partition = detect_input_filename()
      map = load("hdfs://dataset2/" + partition)
  }

  map(k1, rec1) {
      rec2 = map[rec1.key]
      if (rec2 != nil) {
          emit(rec1.key, [rec1, rec2])
      }
  }
}

Map-side partition merge join
In additional, if the partition file is also sorted, then the mapper can use a merge join, which has an even smaller memory footprint.
class Mapper {
  rec2_key = nil
  next_rec2 = nil
  list_of_rec2 = []
  file = nil

  init() {
      partition = detect_input_filename()
      file = open("hdfs://dataset2/" + partition, "r")
      next_rec2 = file.read()
      fill_rec2_list()
  }

  # Fill up the list of rec2 list which has the same key
  fill_rec2_list() {
      rec2_key = next_rec2.key
      list_of_rec2.append(next_rec2)
      next_rec2 = file.read
      while(next_rec2.key == key) {
          list_of_rec2.append(next_rec2)
      }
  }

  map(k1, rec1) {
      while (rec1.key > rec2_key) {
          fill_rec2_list()
      }
        while (rec1.key == rec2.key) {
          for rec2 in list_of_rec2 {
              emit(rec1.key, [rec1, rec2])
          }
      }
    }
}

Memcache join
The model is very straightforward, the second data set is loaded into a distributed hash table (like memcache) which has effectively unlimited size. The mapper will receive input split from the first data set and then lookup the memcache for the corresponding record of the other data set.

There are also some other more sophisticated join mechanism such as semi-join described in this paper.

Graph Algorithms
Many problems can be modeled as a graph of Node and Edges. In the Search engine environment, computing the rank of a document using Page Rank or Hits can be model as a sequence of iterations of Map/Reduce jobs.

In the past, I have been blog a number of very basic graph algorithms in map reduce including doing topological sort, finding shortest path, minimum spanning tree etc. and also how to recommend people connection using Map/Reduce.

Due to the fact that graph traversal is inherently sequential, I am not sure Map/Reduce is the best parallel processing model for graph processing. Another problem is that due to the "stateless nature" of map() and reduce() functions, the whole graph need to be transferred between mapper and reducer which incur significant communication costs. Jimmy Lin has described a clever technique called Shimmy which exploit using a special partitioning function which let the reducer to retain the ownership of nodes across map/reduce jobs. I have described this technique as well as a general model of Map/Reduce graph processing in a previous blog.

I think a parallel programming model specific for Graph processing will perform much better. Google's Pregel model is a good example of that.


Machine Learning
Many of the machine learning algorithm involve multiple iterations of parallel processing that fits very well into Map/Reduce model.

For example, we can use map reduce to calculate the statistics for probabilistic methods such as naive Bayes.

A simple example of computing K-Means cluster can also be done in the following way.
  • Input: A set of points, with k initial centrods
  • Output: K final centroids
Iterate until no more change of membership
  1. For each point, assign it to be the member of closest centroid
  2. Re-compute the centroid from the assigned point members


For a complete list of Machine learning algorithms and how they can be implemented using the Map/Reduce model, here is a very good paper.


Matrix arithmetic
A lot of real-life relationships can be represented as a Matrix. One example is the vector space model of Information Retrieval where the column represents docs and the row represents terms. Another example is the social network graph where the column as well as the row representing people and a binary value of each cell to represent a "friend" relationship. In this case, M + M.M represents all the people that I can reach within 2 degree.

Processing for dense matrix is very easy to parallelized. But since the sequential version is O(N^3), it is not that interesting for Matrix with large size (millions range in rows and columns).

A lot of real-world graph problem can be represented as sparse matrix. So my interests is to focus more in the processing of sparse matrix. I don't have much to share at this moment but I hope this is something I will blog about in future.