Showing posts with label map reduce. Show all posts
Showing posts with label map reduce. Show all posts

Wednesday, January 2, 2013

MapReduce: Detecting Cycles in Network Graph

I recently received an email from an audience of my blog on Map/Reduce algorithm design regarding how to detect whether a graph is acyclic using Map/Reduce.  I think this is an interesting problem and can imagine there can be wide range of application to it.

Although I haven't solved this exact problem in the past, I'd like to sketch out my thoughts on a straightforward approach, which may not be highly optimized.  My goal is to invite other audience who has solved this problem to share their tricks.

To define the problem:  Given a simple directed graph, we want to tell whether it contains any cycles.

Lets say the graph is represented in incidence edge format where each line represent an edge from one node to another node.  The graph can be split into multiple files.

node1, node2
node4, node7
node7, node1
....

Here is a general description of the algorithm.

We'll maintain two types of graph data.
  1. A set of link files where each line represent an edge in the graph.  This file will be static.
  2. A set of path files where each line represent a path from one node to another node.  This file will be updated in each round of map/reduce.
The general idea is to grow the path file until either the path cannot grow further, or the path contain a cycle, we'll use two global flags to keep track of that: "change" flag to keep track of whether new path is discovered, and "cycle" flag to keep track whether a cycle is detected.

The main driver program will create the initial path file by copying the link file.  And it will set the initial flag condition:  Set cycle and change flag to true.  In each round, the driver will
  • Check if the cycle is still false and change is true, exit if this is not the case
  • Otherwise, it will clear the change flag and invoke the map/reduce
Within each round, the map function will do the following
  • Emit the tuple [key=toNode, value=fromNode] if it is a path
  • Emit the tuple [key=fromNode, value=toNode] if it is a link
This ensure a path and all extended link reaches the same reducer, which will do the following
  • Check to see if the beginning point of the path is the same as the endpoint of the link.  If so, it detects a cycle and mark the cycle flag to be true.
  • Otherwise, it compute the product of every path to every link, and form the new path which effectively extend the length by one.

The following diagram illustrates the algorithm ...




Sunday, September 23, 2012

Location Sensitive Hashing in Map Reduce

Inspired by Dr. Gautam Shroff who teaches the class: Web Intelligence and Big data in coursera.org, there are many scenarios where we want to compute similarity between large amount of items (e.g. photos, products, persons, resumes ... etc).  I want to add another algorithm to my Map/Reduce algorithm catalog.

For the background of Map/Reduce implementation on Hadoop.  I have a previous post that covers the details.

Large Scale Similarity Computation

Lets say there are N items (N is billions) and we want to find all those that are similar to each other.  (similarity is defined by a distance function).  The goal is to output a similarity matrix.  (Notice that this matrix is very sparse and most of the cells are infinite)

One naive way is to compute the similarity of each possible pairs of items, hence an O(N^2) problem which is huge.  Can we reduce the order of complexity ?

Location Sensitive Hashing

First idea: Find a hashing function such that similar items (say distance is less than some predefined threshold) will be hashed to the same bucket.

Lets say if we pick the hash function such that Probability(H(a) == H(b)) is proportional to similarity between a and b.  And then we only perform detail comparison on items that falls into the same bucket.

Here is some R code that plots the relationship between similarity and the chance of performing a detail comparison.

x <- seq(0, 1, 0.01)
y <- x
plot(x, y, xlab="similarity", ylab="prob of detail compare")



Lets say we are interested in comparing all pairs of items whose similarity is above 0.3, we have a problem here because we have probability 0.7 = 1 - 0.3 of missing them (as they are not landing on the same bucket).  We want a mechanism that is highly selective; probability of performing a detail comparison should be close to one when similarity is above 0.3 and close to zero when similarity is below 0.3.

Second idea: Lets use 100 hash functions and 2 items that has 30 or more matches of such hash functions will be selected for detail comparison.

Here is some R code that plots the relationship between similarity and the chance of performing a detail comparison.

# Probability of having more than "threshold" matches out 
# of "no_of_hash" with a range of varying similarities

prob_select <- function(threshold, similarity, no_of_hash) {
  sum <- rep(0, length(similarity))
  for (k in 0:floor(no_of_hash * threshold)) {
    sum <- sum + dbinom(k, no_of_hash, similarity)
  }
  return(1 - sum)
}

x <- seq(0, 1, 0.01)
y <- prob_select(0.3, x, 100)
plot(x, y, main="black: 100 hashes, Red: 1000 hashes", 
xlab="similarity", ylab="prob of detail compare")
lines(x, y)
y <- prob_select(0.3, x, 1000)
lines(x, y, col="red")


The graph looks much better this time, the chance of being selected for detail comparison jumps from zero to one sharply when the similarity crosses 0.3

To compare the items that are similar, we first compute 100 hashes (based on 100 different hash functions) for each item and output all combination 30 hashes as a key.  Then we perform pairwise comparison for all items that has same key.

But look at the combination of 30 out of 100, it is 100!/(30! * 70!) = 2.93 * 10^25, which is impractically huge.  Even the graph is a nice, we cannot use this mechanism in practice.

Third idea: Lets use 100 hash function and break them into b groups of r each (ie: b*r = 100).  Further let assume b = 20, and r = 5.  In other words, we have 20 groups and Group1 has hash1 to hash5, Group2 has hash6 to hash10 ... etc.  Now, we call itemA's group1 matches itemB's group1 if all their hash1 to hash5 are equal to each other.  Now, we'll perform a detail comparison of itemA and itemB if any of the groups are equal to each other.

Probability of being selected is  1 - (1-s^r)^b

The idea can be visualized as follows




Notice that in this model, finding r and b based on s is a bit trial and error.  Here we try 20 by 5, 33 by 3, 10 by 10.

prob_select2 <- function(similarity, row_per_grp, no_of_grp) {
  return(1 - (1 - similarity^row_per_grp)^no_of_grp)
}

x <- seq(0, 1, 0.01)
y <- prob_select2(x, 5, 20)

plot(x, y, 
main="black:20 by 5, red:10 by 10, blue:33 by 3", 
xlab="similarity", ylab="prob of detail compare")

lines(x, y)
y <- prob_select2(x, 10, 10)
lines(x, y, col="red")
y <- prob_select2(x, 3, 33)
lines(x, y, col="blue")



From the graph, we see the blue curve fits better to select the similarity at 0.3.  So lets use 33 by 3.

Notice that the ideal graph should be a step function where probability jumps from 0 to 1 when similarity cross over the similarity threshold that we are interested to capture (ie: we want to put all pairs whose similarity bigger than this threshold to be in a same bucket and all pairs whose similarity less that this threshold to be in a different bucket).  Unfortunately, our curve is a S-curve, not a step function.  This means there will be false positive and false negative.  False position lies on the left side of the similarity threshold where we have a small chance to put them into the same bucket, which will cost up some extra work to compare them later and throw them away.  On the other hand, false negative lies on the right side where we have a small chance of putting items that are very similar into different buckets and not considering them in the detail comparison.  Depends on whether we need to catch all the similar items above the threshold, we may need shift the S curve left or right by tuning the r and b parameters. 

To perform the detail comparison, we can use a parallel Map/Reduce implementation

Map Reduce Implementation

Here we have two round of Map/Reduce.  In the first round, map function will compute all the groupKeys for each item and emit the groupKey with the item.  All the items that has the groupKey matches will land on the same reducer, which creates all the possible pairs of items (these are candidates for pairwise comparison).

However, we don't want to perform the detail comparison in the first round as there may be many duplicates for item pairs that matches more than one group.  Therefore we want to perform another round of Map/reduce to remove the duplicates.

The first round proceeds as follows ...




After that, the second round proceeds as follows ...




By combining Location Sensitive Hashing and Map/Reduce,  we can perform large scale similarity calculation in an effective manner.

Thursday, April 21, 2011

K-Means Clustering in Map Reduce

Unsupervised machine learning has broad application in many e-commerce sites and one common usage is to find clusters of consumers with common behaviors. In clustering methods, K-means is the most basic and also efficient one.

K-Means clustering involve the following logical steps

1) Determine the value of k
2) Determine the initial k centroids
3) Repeat until converge
- Determine membership: Assign each point to the closest centroid
- Update centroid position: Compute new centroid position from assigned members

Determine the value of K
This is basically asking the question of: "How many clusters you are interested to discover ?"
So the answer is specific to the problem domain.

One way is to try different K. At some point, we'll see increasing K doesn't help much to improve the overall quality of clustering. Then that is the right value of K.

Notice that the overall quality of cluster is the average distance from each data point to its associated cluster.


Determine the initial K centroids
We need to pick K centroids to start the algorithm. So one way to pick them is to randomly pick K points from the whole data set.

However, picking a good set of centroids can reduce the number of subsequent iterations and by "good" I mean the K centroid should be as far apart to each other as possible, or even better the initial K centroid is close to the final K centroid. As you can see, choosing the random K points is reasonable but non-optimum.

Another approach is to take a small random sample set from the input data set and do a hierarchical clustering within this smaller set (note that hierarchical clustering is not-scaling to large data set).

We can also partition the space into overlapping region using canopy cluster technique (describe below) and pick the center of each canopy as the initial centroid.

Iteration
Each iteration is implemented as a Map/Reduce job. First of all, we need a control program on the client side to initialize the centroid positions, kickoff the iteration of Map/Reduce jobs and determine whether the iteration should end ...

kmeans(data) {
  initial_centroids = pick(k, data)
  upload(data)
  writeToS3(initial_centroids)
  old_centroids = initial_centroids
  while (true){
    map_reduce()
    new_centroids = readFromS3()
    if change(new_centroids, old_centroids) < delta {
      break
    } else {
      old_centroids = new_centroids
    }
  }
  result = readFromS3()
  return result
}


Within each iteration, most of the processing will be done in the Map task, which determine the membership for each point, as well as compute a partial sum of each member points of each cluster.

The reducer did the easy job by aggregating all partial sums and compute the update centroid position, and then out them into a shared store (S3 in this case) that can be picked up by the Map/Reduce job of next round.



Complexity Analysis
Most of the work is done by the Mapper and the workload is pretty balanced. So the time complexity will be O(k*n/p) where k is number of clusters, n is number of data points and p is number of machines. Note that the factor of k comes in at the closest_centroid() function above when comparing each data point with each intermediate centroid as follows ...
closest_centroid(point, listOfCentroids) {
  bestCentroid = listOfCentroids[0]
  minDistance = INFINITY
  for each centroid in listOfCentroids {
    distance = dist(point, centroid)
    if distance < minDistance {
      minDistance = distance
      bestCentroid = centroid
    }
  }
  return bestCentroid
}

If we partition the space into proximity regions, we only need to compare each point with centroid within the same proximity region and treat other centroids infinite distance. In other words, we don't have to compare each point with all k centroids.

Canopy clustering provide such a partitioning mechanism.


Canopy Clustering
To define the proximity region (canopy), we can draw a circle (or hypersphere) centered at a data point. Points outside this sphere is considered to be too far.

However, if we apply this definition to every point, then we will have as many proximity region as the number of points, which ends up doesn't save much processing. We also observed that points are very close by each other can stay in the same region without each point creating their own. Therefore, we can draw a smaller circle within the big circle (with the same center) such that data points within the small circle is not allowed to form its own proximity region.


Notice that each proximity region can overlap with each other and the degree of overlapping will be affected by the choice of T1. Also the choice of T2 affects how many canopies will be formed. Picking the right number of T1 and T2 is domain-specific, and also depends on the number of clusters and the space volume. If there is a small number of clusters within a big space, then a bigger T1 should be chosen.

To create the canopies (and mark the data points with the canopies), we will do the following steps ...
1) Create the canopy centers, with one scan
  • Keep a list of canopies, initially an empty list
  • Scan each data point, if it is within T2 distance of existing canopies, discard it. Otherwise, add this point into the list of canopies

2) Assign data points to the canopies, with another scan
  • Start with a list of canopies from last step
  • Scan each data point, if it is within T1 of the canopyA, add A as the assigned canopy to the data point. Notice that the data point can be assigned to multiple canopies
  • When done, each data point will look like

Notice that now the input data points has been added with an extra attribute that contains the assigned canopies. When compare the point with the intermediate centroids, we only need to compare centroids within the same canopy. Here is the modified version of the algorithm ...

closest_centroid(point, listOfCentroids) {
  bestCentroid = listOfCentroids[0]
  minDistance = INFINITY
  for each cent in listOfCentroids {
    if (not point.myCanopy.intersects(cent.myCanopy)) {
      continue
    }
    distance = dist(point, centroid)
    if distance < minDistance {
      minDistance = distance
      bestCentroid = centroid
    }
  }
  return bestCentroid
}

Friday, November 5, 2010

Map Reduce and Stream Processing

Hadoop Map/Reduce model is very good in processing large amount of data in parallel. It provides a general partitioning mechanism (based on the key of the data) to distribute aggregation workload across different machines. Basically, map/reduce algorithm design is all about how to select the right key for the record at different stage of processing.

However, "time dimension" has a very different characteristic compared to other dimensional attributes of data, especially when real-time data processing is concerned. It presents a different set of challenges to the batch oriented, Map/Reduce model.
  1. Real-time processing demands a very low latency of response, which means there isn't too much data accumulated at the "time" dimension for processing.
  2. Data collected from multiple sources may not have all arrived at the point of aggregation.
  3. In the standard model of Map/Reduce, the reduce phase cannot start until the map phase is completed. And all the intermediate data is persisted in the disk before download to the reducer. All these added to significant latency of the processing.
Here is a more detail description of this high latency characteristic of Hadoop.

Although Hadoop Map/Reduce is designed for batch-oriented work load, certain application, such as fraud detection, ad display, network monitoring requires real-time response for processing large amount of data, have started to looked at various way of tweaking Hadoop to fit in the more real-time processing environment. Here I try to look at some technique to perform low-latency parallel processing based on the Map/Reduce model.


General stream processing model

In this model, data are produced at various OLTP system, which update the transaction data store and also asynchronously send additional data for analytic processing. The analytic processing will write the output to a decision model, which will feed back information to the OLTP system for real-time decision making.

Notice the "asynchronous nature" of the analytic processing which is decoupled from the OLTP system, this way the OLTP system won't be slow down waiting for the completion of the analytic processing. Nevetheless, we still need to perform the analytic processing ASAP, otherwise the decision model will not be very useful if it doesn't reflect the current picture of the world. What latency is tolerable is application specific.

Micro-batch in Map/Reduce


One approach is to cut the data into small batches based on time window (e.g. every hour) and submit the data collected in each batch to the Map Reduce job. Staging mechanism is needed such that the OLTP application can continue independent of the analytic processing. A job scheduler is used to regulate the producer and consumer so each of them can proceed independently.

Continuous Map/Reduce

Here lets imagine some possible modification of the Map/Reduce execution model to cater for real-time stream processing. I am not trying to worry about the backward compatibility of Hadoop which is the approach that Hadoop online prototype (HOP) is taking.

Long running
The first modification is to make the mapper and reducer long-running. Therefore, we cannot wait for the end of the map phase before starting the reduce phase as the map phase never ends. This implies the mapper push the data to the reducer once it complete its processing and let the reducer to sort the data. A downside of this approach is that it offers no opportunity to run the combine() function on the map side to reduce the bandwidth utilization. It also shift more workload to the reducer which now needs to do the sorting.

Notice there is a tradeoff between latency and optimization. Optimization requires more data to be accumulated at the source (ie: the Mapper) so local consolidation (ie: combine) can be performed. Unfortunately, low latency requires the data to be sent ASAP so not much accumulation can be done.

HOP suggest an adaptive flow control mechanism such that data is pushed out to reducer ASAP until the reducer is overloaded and push back (using some sort of flow control protocol). Then the mapper will buffer the processed message and perform combine() before it send to the reducer. This approach automatically shift back and forth the aggregation workload between the reducer and the mapper.

Time Window: Slice and Range
This is a "time slice" concept and a "time range" concept. "Slice" defines a time window where result is accumulated before the reduce processing is executed. This is also the minimum amount of data that the mapper should accumulate before sending to the reducer.

"Range" defines the time window where results are aggregated. It can be a landmark window where it has a well-defined starting point, or a jumping window (consider a moving landmark scenario). It can also be a sliding window where is a fixed size window from the current time is aggregated.

After receiving a specific time slice from every mapper, the reducer can start the aggregation processing and combine the result with the previous aggregation result. Slice can be dynamically adjusted based on the amount of data sent from the mapper.

Incremental processing
Notice that the reducer need to compute the aggregated slice value after receive all records of the same slice from all mappers. After that it calls the user-defined merge() function to merge the slice value with the range value. In case the range need to be refreshed (e.g. reaching a jumping window boundary), the init() functin will be called to get a refreshed range value. If the range value need to be updated (when certain slice value falls outside a sliding range), the unmerge() function will be invoked.

Here is an example of how we keep tracked of the average hit rate (ie: total hits per hour) within a 24 hour sliding window with update happens per hour (ie: an one-hour slice).
# Call at each hit record
map(k1, hitRecord) {
site = hitRecord.site
# lookup the slice of the particular key
slice = lookupSlice(site)
if (slice.time - now > 60.minutes) {
# Notify reducer whole slice of site is sent
advance(site, slice)
slice = lookupSlice(site)
}
emitIntermediate(site, slice, 1)
}

combine(site, slice, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
# Send the message to the downstream node
emitIntermediate(site, slice, hitCount)
}

# Called when reducer receive full slice from all mappers
reduce(site, slice, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
sv = SliceValue.new
sv.hitCount = hitCount
return sv
}

# Called at each jumping window boundary
init(slice) {
rangeValue = RangeValue.new
rangeValue.hitCount = 0
return rangeValue
}

# Called after each reduce()
merge(rangeValue, slice, sliceValue) {
rangeValue.hitCount += sliceValue.hitCount
}

# Called when a slice fall out the sliding window
unmerge(rangeValue, slice, sliceValue) {
rangeValue.hitCount -= sliceValue.hitCount
}

Friday, October 15, 2010

Scalable System Design Patterns

Looking back after 2.5 years since my previous post on scalable system design techniques, I've observed an emergence of a set of commonly used design patterns. Here is my attempt to capture and share them.

Load Balancer

In this model, there is a dispatcher that determines which worker instance will handle the request based on different policies. The application should best be "stateless" so any worker instance can handle the request.

This pattern is deployed in almost every medium to large web site setup.



Scatter and Gather

In this model, the dispatcher multicast the request to all workers of the pool. Each worker will compute a local result and send it back to the dispatcher, who will consolidate them into a single response and then send back to the client.

This pattern is used in Search engines like Yahoo, Google to handle user's keyword search request ... etc.



Result Cache

In this model, the dispatcher will first lookup if the request has been made before and try to find the previous result to return, in order to save the actual execution.

This pattern is commonly used in large enterprise application. Memcached is a very commonly deployed cache server.



Shared Space

This model also known as "Blackboard"; all workers monitors information from the shared space and contributes partial knowledge back to the blackboard. The information is continuously enriched until a solution is reached.

This pattern is used in JavaSpace and also commercial product GigaSpace.



Pipe and Filter

This model is also known as "Data Flow Programming"; all workers connected by pipes where data is flow across.

This pattern is a very common EAI pattern.



Map Reduce

The model is targeting batch jobs where disk I/O is the major bottleneck. It use a distributed file system so that disk I/O can be done in parallel.

This pattern is used in many of Google's internal application, as well as implemented in open source Hadoop parallel processing framework. I also find this pattern can be used in many many application design scenarios.



Bulk Synchronous Parellel

This model is based on lock-step execution across all workers, coordinated by a master. Each worker repeat the following steps until the exit condition is reached, when there is no more active workers.
  1. Each worker read data from input queue
  2. Each worker perform local processing based on the read data
  3. Each worker push local result along its direct connection
This pattern has been used in Google's Pregel graph processing model as well as the Apache Hama project.



Execution Orchestrator

This model is based on an intelligent scheduler / orchestrator to schedule ready-to-run tasks (based on a dependency graph) across a clusters of dumb workers.

This pattern is used in Microsoft's Dryad project



Although I tried to cover the whole set of commonly used design pattern for building large scale system, I am sure I have missed some other important ones. Please drop me a comment and feedback.

Also, there is a whole set of scalability patterns around data tier that I haven't covered here. This include some very basic patterns underlying NOSQL. And it worths to take a deep look at some leading implementations.

Sunday, August 29, 2010

Designing algorithms for Map Reduce

Since the emerging of Hadoop implementation, I have been trying to morph existing algorithms from various areas into the map/reduce model. The result is pretty encouraging and I've found Map/Reduce is applicable in a wide spectrum of application scenarios.

So I want to write down my findings but then found the scope is too broad and also I haven't spent enough time to explore different problem domains. Finally, I realize that there is no way for me to completely cover what Map/Reduce can do in all areas, so I just dump out what I know at this moment over the long weekend when I have an extra day.

Notice that Map/Reduce is good for "data parallelism", which is different from "task parallelism". Here is a description about their difference and a general parallel processing design methodology.

I'll cover the abstract Map/Reduce processing model below. For a detail description of the implementation of Hadoop framework, please refer to my earlier blog here.


Abstract Processing Model
There are no formal definition of the Map/reduce model. Basic on the Hadoop implementation, we can think of it as a "distributed merge-sort engine". The general processing flow is as follows.
  • Input data is "split" into multiple mapper process which executes in parallel
  • The result of the mapper is partitioned by key and locally sorted
  • Result of mapper of the same key will land on the same reducer and consolidated there
  • Merge sorted happens at the reducer so all keys arriving the same reducer is sorted

Within the processing flow, user defined functions can be plugged-in to the framework.
  • map(key1, value1) -> emit(key2, value2)
  • reduce(key2, value2_list) -> emit(key2, aggregated_value2)
  • combine(key2, value2_list) -> emit(key2, combined_value2)
  • partition(key2) return reducerNo
Design the algorithm for map/reduce is about how to morph your problem into a distributed sorting problem and fit your algorithm into the user defined functions of above.

To analyze the complexity of the algorithm, we need to understand the processing cost, especially the cost of network communication in such a highly distributed system.

Lets first consider the communication between Input data split and Mapper. To minimize this overhead, we need to run the mapper logic at the data split (without moving the data). How well we do this depends on how the input data is stored and whether we can run the mapper code there. For HDFS and Cassandra, we can the mapper at the storage node and the scheduler algorithm of JobTracker will assign the mapper to the data split that it collocates with and hence significantly reduce the data movement. Other data store such as Amazon S3 doesn't allow execution of mapper logic at the storage node and therefore incur more data traffic.

The communication between Mapper and Reducer cannot be collocated because it depends on the emit key. The only mechanism available is the combine() function which can perform a local consolidation and hence can reduce the data sent to the reducer.

Finally the communication between the reducer and the output data store depends on the store's implementation. For HDFS, the data is triply replicated and hence the cost of writing can be high. Cassandra (a NOSQL data store) allows configurable latency with various degree of data consistency trade-off. Fortunately, in most case the volume of result data after a Map/Reduce processing is not high.

Now, we see how to fit various different kinds of algorithms into the Map/Reduce model ...


Map-Only
"Embarrassing parallel" problems are those that the same processing is applied in each data element in a pretty independent way, in other words, there is no need to consolidate or aggregate individual results.

These kinds of problem can be expressed as a Map-only job (by specifying the number of reducers to zero). In this case, Mapper's emitted result will directly go to the output format.

Some examples of map-only examples are ...
  • Distributed grep
  • Document format conversion
  • ETL
  • Input data sampling

Sorting
As we described above, Hadoop is fundamentally a distributed sorting engine, so using it for sorting is a natural fit.

For example, we can use an Identity function for both map() and reduce(), then the output is equivalent to sorting the input data. Notice that we are using a single reducer here. So the merge is still sequential although the sorting is done at the mapper in parallel.

We can perform the merge in parallel by using multiple reducers. In this case, output of each reducer are sorted. We may need to do a final merge on all the reducer's output. Another way is to use a customized partition() function such that the keys are partitioned by range. In this case, each reducer is sorting a particular range and the final result is just to concatenate the each reducer's sorted result.
partition(key) {
  range = (KEY_MAX - KEY_MIN) / NUM_OF_REDUCERS
  reducer_no = (key - KEY_MIN) / range
  return reducer_no
}


Inverted Indexes
The map reduce model is originated from Google which has a lot of scenarios of building large scale inverted index. Building an inverted index is about parsing different documents to build a word -> document index for keyword search.

In fact, inverted index is pretty general and can be applied in many scenarios. To build an inverted index, we can feed the mapper each document (or lines within a document). The Mapper will parse the words in the document to emit [word, doc] pairs along with other metadata such as where in the document this word occurs ... etc. The reducer can simply be an identity function that just dump out the list, or it can perform some statistic aggregation per word.

In a more general form of Inverted index, there is a "container" and "element" concept. The Map and Reduce function will be organized in the following patterns.
map(key, container) {
  for each element in container {
      element_meta =
           extract_metadata(element, container)
      emit(element, [container_id, element_meta])
  }
}

reduce(element, container_ids) {
  element_stat =
       compute_stat(container_ids)
  emit(element, [element_stat, container_ids])
}

In Text index, we are not just counting the actual frequency of the terms but also adjust its weighting based on its frequency distribution so common words will have less significance when they appears in the document. The final value after normalization is called TF-IDF (term frequency times inverse document frequency) and can be computed using Map Reduce as well.


Simple Statistics ComputationComputing max, min, count is very straightforward since this operation is commutative and associative. Each mapper will perform the local computation and send the result to a single reducer to do the final computation.

Combine function is typically used to reduce the network traffic. Notice that the input to the combine function must look the same as the input to the reducer function and the output of the combine function must look the same as the output of the map function. There is also no guarantee that the combiner function will be invoked at all.

class Mapper {
  buffer

  map(key, number) {
      buffer.append(number)
      if (buffer.is_full) {
          max = compute_max(buffer)
          emit(1, max)
      }
  }
}


class Reducer {
  reduce(key, list_of_local_max) {
      global_max = 0
      for local_max in list_of_local_max {
          if local_max > global_max {
              global_max = local_max
          }
      }        emit(1, global_max)
  }
}


class Combiner {
  combine(key, list_of_local_max) {
      local_max = maximum(list_of_local_max)
      emit(1, local_max)
  }
}
Computing avg is done in a similar way except that instead of computing the local avg, we compute the local sum and local count. The reducer will do the final sum divided by the final count to come up with the final avg.

Computing a histogram is pretty common in statistics and can give a quick idea about the data distribution. A typical approach is to divide the number into different intervals. The mapper will compute the count per interval, and emit that per interval and the reducer will compute the sum of that interval.
class Mapper {
  interval_start = [0, 20, 40, 60, 80]

  map(key, number) {
      i = 0;
      while (i < NO_OF_INTERVALS) {
          if (number < interval_start[i]) {
              emit(i, 1)
              break
          }
      }
  }
}


class Reducer {
  reduce(interval, counts) {
      total_counts = 0
      for each count in counts {
          total_counts += count
      }
      emit(interval, total_counts)
  }
}


class Combiner {
  combine(interval, occurrence) {
      emit(interval, occurrence.size)
  }
}
Notice that a non-uniform distribution of values across intervals may cause an unbalanced workload among reducers and hence undermine the degree of parallelism. We'll address this in the later part of this post.


In-Mapper Combine
Jimmy Lin, in his excellent book, talks about a technique call "in-mapper combine" which regains control at the application level when the combine takes place. The general idea is to maintain a HashMap to buffer the intermediate result and has a separate logic to determine when to actually emit the data from the buffer. The general code structure is as follows ...
class Mapper {
  buffer

  init() {
      buffer = HashMap.new
  }

  map(key, data) {
      elements = process(data)
      for each element {
          ....
          check_and_put(buffer, k2, v2)
      }
  }

  check_and_put(buffer, k2, v2) {
      if buffer.full {
          for each k2 in buffer.keys {
              emit(k2, buffer[k2])
          }
      }
  }

  close() {        for each k2 in buffer.keys {
          emit(k2, buffer[k2])
      }    }
}

SQL Model
The SQL model can be used to extract data from the data source. It contains a number of primitives.

Projection / Filter
This logic is typically implemented in the Mapper
  • result = SELECT c1, c2, c3, c4 FROM source WHERE conditions
Aggregation / Group by / Having
This logic is typically implemented in the Reducer
  • SELECT sum(c3) as s1, avg(c4) as s2 ... FROM result GROUP BY c1, c2 HAVING conditions
The above example can be realized by the following map/reduce job
class Mapper {
  map(k, rec) {
      select_fields =
          [rec.c1, rec.c2, rec.c3, rec.c4]
      group_fields =
          [rec.c1, rec.c2]
      if (filter_condition == true) {
          emit(group_fields, select_fields)
      }
  }
}

class Reducer {
  reduce(group_fields, list_of_rec) {
      s1 = 0
      s2 = 0
      for each rec in list_of_rec {
          s1 += rec.c3
          s2 += rec.c4
      }
      s2 = s2 / rec.size
      if (having_condition == true) {
          emit(group_fields, [s1, s2])
      }
  }
}

Data Joins
Joining 2 data set is a very common operation in Relational Data Model and has been very mature in RDBMS implementation. The common join mechanism in a centralized DB architecture is as follows
  1. Nested loop join -- This is the most basic and naive mechanism and is organized as two loops. The outer loop reads from data set1, the inner loop scan through the whole data set2 and compare with the records just read from data set1.
  2. Indexed join -- An index (e.g. B-Tree index) is built for one of the data sets (say data set2 which is the smaller one). The join will scan through data set1 and lookup the index to find the matched records of data set2.
  3. Merge join -- Pre-sort both data sets so they are arranged physically in increasing order. The join is realized by just merging the two data sets. a) Locate the first record in both data set1 & set2, which is their corresponding minimum key b) In the one with a smaller minimum key (say data set1), keep scanning until finding the next key which is bigger than the minimum key of the other data set (ie. data set2), call this the next minimum key of data set1. c) Switch position and repeat the whole thing until one of the data set is exhausted.
  4. Hash / Partition join -- Partition the data set1 and data set2 into smaller size and apply other join algorithm in a smaller data set size. A linear scan with a hash() function is typically performed to partition the data sets such that data in set1 and data in set2 with the same key will land on the same partition.
  5. Semi join -- This is mainly used to join two sets of data that is stored at different locations and the goal is to reduce the amount of data transfer such that only the full records appears in the final joint result will be send through. a) Data set2 will send its key set to machine holding Data set1. b) Machine holding Data set1 will do a join and send back the records in Data set1 that matches one of the send-over keys. c) The machine holding data set2 will do a final join to the data send back.
In the map reduce environment, it has the corresponding joins.

General reducer-side join
This is the most basic one, records from data set1 and set2 with the same key will land on the same reducer, which will then do a cartesian product. The downside of this model is that the reducer need to have enough memory to hold all records of each key.
map(k1, rec) {
  emit(rec.key, [rec.type, rec])
}

reduce(k2, list_of_rec) {
  list_of_typeA = []
  list_of_typeB = []
  for each rec in list_of_rec {
      if (rec.type == 'A') {
          list_of_typeA.append(rec)
      } else {
          list_of_typeB.append(rec)
      }
  }

  # Compute the catesian product
  products = []
  for recA in list_of_typeA {
      for recB in list_of_typeB {
          emit(k2, [recA, recB])
      }
  }
}

Optimized reducer-side join
You can "secondary sort" the data type for each key by defining a customized partition function. In this model, you arrange the data type (which has less records per key to arrive first) and you only need to store these types.
map(k1, rec) {
  emit([rec.key, rec.type], rec])
}

partition(key_pair) {
  super.partition(key_pair[0])
}

reduce(k2, list_of_rec) {
  list_of_typeA = []
  for each rec in list_of_rec {
      if (rec.type == 'A') {
          list_of_typeA.append(rec)
      } else { # receive records of typeA
          for recA in list_of_typeA {
              emit(k2, [recA, rec])
          }
      }
  }
}

While being very flexible, the downside of Reducer side join is that all data need to be transfer from the mapper to the reducer and then result write to HDFS. Map-side join explore some special arrangement of the input file such that the join is being perform at the mapper. The advantage of doing in the mapper is that we can exploit the collocation of the Map reduce framework such that the mapper will be allocated an input split in its local machine, hence reduce the data transfer from the disk to the mapper. After the map-side join, the result is written directly to the output HDFS files and hence eliminate the data transfer between the mapper and the reducer.

Map-side partition join
In this model, it requires the 2 data sets to be partitioned into 2 sets of partition files (same number of partitions for each set). The size of the partition is such that it can fit into the memory of the Mapper machine. We also need to configure the Map/Reduce job such that there is no split in the partition file, in other words, the whole partition is assigned to a mapper task.

The mapper will detect the partition of the input file and then read the corresponding partition file of the other data set into an in-memory hashtable. After that, the mapper will lookup the Hashtable to do the join.
class Mapper {
  map = Hashtable.new

  init() {
      partition = detect_input_filename()
      map = load("hdfs://dataset2/" + partition)
  }

  map(k1, rec1) {
      rec2 = map[rec1.key]
      if (rec2 != nil) {
          emit(rec1.key, [rec1, rec2])
      }
  }
}

Map-side partition merge join
In additional, if the partition file is also sorted, then the mapper can use a merge join, which has an even smaller memory footprint.
class Mapper {
  rec2_key = nil
  next_rec2 = nil
  list_of_rec2 = []
  file = nil

  init() {
      partition = detect_input_filename()
      file = open("hdfs://dataset2/" + partition, "r")
      next_rec2 = file.read()
      fill_rec2_list()
  }

  # Fill up the list of rec2 list which has the same key
  fill_rec2_list() {
      rec2_key = next_rec2.key
      list_of_rec2.append(next_rec2)
      next_rec2 = file.read
      while(next_rec2.key == key) {
          list_of_rec2.append(next_rec2)
      }
  }

  map(k1, rec1) {
      while (rec1.key > rec2_key) {
          fill_rec2_list()
      }
        while (rec1.key == rec2.key) {
          for rec2 in list_of_rec2 {
              emit(rec1.key, [rec1, rec2])
          }
      }
    }
}

Memcache join
The model is very straightforward, the second data set is loaded into a distributed hash table (like memcache) which has effectively unlimited size. The mapper will receive input split from the first data set and then lookup the memcache for the corresponding record of the other data set.

There are also some other more sophisticated join mechanism such as semi-join described in this paper.

Graph Algorithms
Many problems can be modeled as a graph of Node and Edges. In the Search engine environment, computing the rank of a document using Page Rank or Hits can be model as a sequence of iterations of Map/Reduce jobs.

In the past, I have been blog a number of very basic graph algorithms in map reduce including doing topological sort, finding shortest path, minimum spanning tree etc. and also how to recommend people connection using Map/Reduce.

Due to the fact that graph traversal is inherently sequential, I am not sure Map/Reduce is the best parallel processing model for graph processing. Another problem is that due to the "stateless nature" of map() and reduce() functions, the whole graph need to be transferred between mapper and reducer which incur significant communication costs. Jimmy Lin has described a clever technique called Shimmy which exploit using a special partitioning function which let the reducer to retain the ownership of nodes across map/reduce jobs. I have described this technique as well as a general model of Map/Reduce graph processing in a previous blog.

I think a parallel programming model specific for Graph processing will perform much better. Google's Pregel model is a good example of that.


Machine Learning
Many of the machine learning algorithm involve multiple iterations of parallel processing that fits very well into Map/Reduce model.

For example, we can use map reduce to calculate the statistics for probabilistic methods such as naive Bayes.

A simple example of computing K-Means cluster can also be done in the following way.
  • Input: A set of points, with k initial centrods
  • Output: K final centroids
Iterate until no more change of membership
  1. For each point, assign it to be the member of closest centroid
  2. Re-compute the centroid from the assigned point members


For a complete list of Machine learning algorithms and how they can be implemented using the Map/Reduce model, here is a very good paper.


Matrix arithmetic
A lot of real-life relationships can be represented as a Matrix. One example is the vector space model of Information Retrieval where the column represents docs and the row represents terms. Another example is the social network graph where the column as well as the row representing people and a binary value of each cell to represent a "friend" relationship. In this case, M + M.M represents all the people that I can reach within 2 degree.

Processing for dense matrix is very easy to parallelized. But since the sequential version is O(N^3), it is not that interesting for Matrix with large size (millions range in rows and columns).

A lot of real-world graph problem can be represented as sparse matrix. So my interests is to focus more in the processing of sparse matrix. I don't have much to share at this moment but I hope this is something I will blog about in future.

Wednesday, August 4, 2010

Map/Reduce to recommend people connection

Once common feature in Social Network site is to recommend people connection. e.g. "People you may know" from Linkedin. The basic idea is very simple; if person A and person B doesn't know each other but they have a lot of common friends, then the system should recommend person B to person A and vice versa.

From a graph theory perspective, for each person who is 2-degree reachable from person A, we count how many distinct paths (with 2 connecting edges) exist between this person and person A. Rank this list in terms the number of paths and show the top 10 persons that person A should connect with.

We should how we can use Map/Reduce to compute this top-10 connection list for every person. The problem can be stated as: For every person X, we determine a list of person X1, X2 ... X10 which is the top 10 persons that person X has common friends with.

The social network graph is generally very sparse. Here we assume the input records is an adjacency list sorted by name.
"ricky" => ["jay", "peter", "phyllis"]
"peter" => ["dave", "jack", "ricky", "susan"]
We use two rounds of Map/Reduce job to compute the top-10 list

First Round MR Job
The purpose of this MR job is to compute the number of distinct path between all pairs of people who is 2 degree separated from each other.
  • In Map(), we do a cartesian product for all pairs of friends (since these friends may be connected in 2-dgrees). We also need to eliminate the pairs if they already have a direct connection. Therefore, the The Map() function should also emit pairs of direct connected persons. We need to order the key space such that all keys with the same pair of people with go to the same reducer. On the other hand, we need the pair of direct connection come before the pairs of 2 degree of separations.
  • In Reduce(), we know all the key pairs reaching the same reducer will be sorted. So the direct connect pair will come before the 2-degree pairs. So the reducer just need to check if the first pair is a direct connected one and if so skip the rest.
Input record ...  person -> connection_list
e.g. "ricky" => ["jay", "john", "mitch", "peter"]
also the connection list is sorted by alphabetical order

def map(person, connection_list)
  # Compute a cartesian product using nested loops
  for each friend1 in connection_list
     # Eliminate all 2-degree pairs if they already
     # have a one-degree connection
     emit([person, friend1, 0])
     for each friend2 > friend1 in connection_list
         emit([friend1, friend2, 1],  1)

def partition(key)
  #use the first two elements of the key to choose a reducer
  return super.partition([key[0], key[1]])

def reduce(person_pair, frequency_list)
  # Check if this is a new pair
  if @current_pair != [person_pair[0], person_pair[1]]
      @current_pair = [person_pair[0], person_pair[1]]
      # Skip all subsequent pairs if these two person
      # already know each other
      @skip = true if person_pair[2] == 0

  if !skip
      path_count = 0
      for each count in frequency_list
          path_count += count
      emit(person_pair, path_count)

Output record ... person_pair => path_count
e.g. ["jay", "john"] => 5


Second Round MR Job

The purpose of this MR job is to rank the connections for every person by the number of distinct path between them.
  • In Map(), we rearrange the input records so it will be sorted before reaching the reducer
  • In Reduce(), all the connections from the person is sorted, we just need to aggregate the top 10 to a list and then write the list out.
Input record = Output record of round 1

def map(person_pair, path_count)
  emit([person_pair[0], path_count], person_pair[1])

def partition(key)
  #use the first element of the key to choose a reducer
  return super.partition(key[0])

def reduce(connection_count_pair, candidate_list)
  # Check if this is a new person
  if @current_person != connection_count_pair[0]
      emit(@current_person, @top_ten)
      @top_ten = []
      @current_person = connection_count_pair[0]

  #Pick the top ten candidates to connect with
  if @top_ten.size < 10
      for each candidate in candidate_list
          @top_ten.append([candidate, connection_count_pair[1]])
          break if @pick_count > 10

Output record ... person -> candidate_count_list
e.g.  "ricky" => [["jay", 5],  ["peter", 3] ...]

Tuesday, July 20, 2010

Graph Processing in Map Reduce

In my previous post about Google's Pregel model, a general pattern of parallel graph processing can be expressed as multiple iterations of processing until a termination condition is reached. Within each iteration, same processing happens at a set of nodes (ie: context nodes).

Each context node perform a sequence of steps independently (hence achieving parallelism)
  1. Aggregate all incoming messages received from its direct inward arcs during the last iteration
  2. With this aggregated message, perform some local computation (ie: the node and its direct outward arcs' local state)
  3. Pass the result of local computation along all outward arcs to its direct neighbors
This processing pattern can be implemented using Map/Reduce model, using a MR job for each iteration. The sequence is a little different from above. Typically a mapper will perform (2) and (3) where it emits the message using its neighbor's node id as key. Reducer will be responsible to perform (1).

Issue of using Map/Reduce

However, due to the functional programming nature of Map() and Reduce(), M/R does not automatically retain "state" between jobs. To retain the graph across iterations, the mapper need to explicitly pass along the corresponding portion of the graph to the reducer, in additional to the messages itself. Similarly, the reducer need to handle a different type of data passed along.

map(id, node) {
  emit(id, node)
  partial_result = local_compute()
  for each neighbor in node.outE.inV {
      emit(neighbor.id, partial_result)
  }
}

reduce(id, list_of_msg) {
  node = null
  result = 0

  for each msg in list_of_msg {
      if type_of(msg) == Node
          node = msg
      else
          result = aggregate(result, msg)
      end
  }

  node.value = result
  emit(id, node)
}

This downside of this approach is a substantial amount of I/O processing and bandwidth is consumed to just passing the graph itself around.

Google's Pregel model provides an alternative message distribution model so that state can be retained at the processing node across iterations.

The Schimmy Trick

In a recent research paper, Jimmy Lin and Michael Schatz use a clever partition() algorithm in Map /Reduce which can achieve "stickiness" of graph distribution as well as maintaining a sorted-order of node id on disk.

The whole graph is broken down into multiple files and stored in HDFS. Each file contains multiple records and each record describe a node and its corresponding adjacency list.

id -> [nodeProps, [[arcProps, toNodeId], [arcProps, toNodeId] ...]

In addition, the records are physically sorted within the file by their node id.

There will be as many reducers as the number of above files and so each Reducer task is assigned with one of this file. On the other hand, the partition() function assign all nodes within the file to land on its associated reducer.

Mapper does the same thing before, except the first line in the method is removed as it no longer need to emit the graph.

Reducer will receive all the message emitted from the mapper, which is sorted by the Map/Reduce framework by the key (which happens to be the node id). On the other hand, the reducer can open the corresponding file in HDFS, which also maintain a sorted list of nodes based on their ids. The reducer can just read the HDFS file sequentially on each reduce() call and confident that all preceding nodes in the file has already received their corresponding messages.

reduce(id, list_of_msg) {
   nodeInFile = readFromFile()

   # Emit preceding nodes that receives no message
   while(nodeInFile.id < id)
       emit(nodeInFile.id, nodeInFile)
   end

   result = 0

   for each msg in list_of_msg {
       result = aggregate(result, msg)
   }

   nodeInFile.value = result
   emit(id, nodeInFile)
}

Although the Schimmy trick provides an improvement over the classical way of map/reduce, it only eliminates the communication between the mapper and the reducer. At each iteration, the mapper still needs to read the whole graph from HDFS to the mapper node and the reducer still need to write the whole graph back to HDFS, which maintains a 3-way replication for each file.

Hadoop provides some co-location mechanism for the mapper and try to assign files that is sitting at the same machine to the mapper. However, this co-location mechanism is not available for the reducer and so reducer still need to write the graph back over the network.

Pregel Advantage

Since Pregel model retain worker state (the same worker is responsible for the same set of nodes) across iteration, the graph can be loaded in memory once and reuse across iterations. This will reduce I/O overhead as there is no need to read and write to disk at each iteration. For fault resilience, there will be a periodic check point where every worker write their in-memory state to disk.

Also, Pregel (with its stateful characteristic), only send local computed result (but not the graph structure) over the network, which implies the minimal bandwidth consumption.

Of course, Pregel is very new and relative immature as compared to Map/Reduce.

Monday, July 12, 2010

Google Pregel Graph Processing

A lot of real life problems can be expressed in terms of entities related to each other and best captured using graphical models. Well defined graph theory can be applied to processing the graph and return interesting results. The general processing patterns can be categorized into the following ...
  1. Capture (e.g. When John is connected to Peter in a social network, a link is created between two Person nodes)
  2. Query (e.g. Find out all of John's friends of friends whose age is less than 30 and is married)
  3. Mining (e.g. Find out the most influential person in Silicon Valley)

Distributed and Parallel Graph Processing

Although using a Graph to represent a relationship network is not new, the size of network has been dramatically increase in the past decade such that storing the whole graph in one place is impossible. Therefore, the graph need to be broken down into multiple partitions and stored in different places. Traditional graph algorithm that assume the whole graph can be resided in memory becomes invalid. We need to redesign the algorithm such that it can work in a distributed environment. On the other hand, by breaking the graph into different partitions, we can manipulate the graph in parallel to speed up the processing.

Property Graph Model

The paper “Constructions from Dots and Lines” by Marko A. Rodriguez and Peter Neubauer illustrate the idea very well. Basically, a graph contains nodes and arcs.

A node has a "type" which defines a set of properties (name/value pairs) that the node can be associated with.

An arc defines a directed relationship between nodes, and hence contains the fromNode, toNode as well as a set of properties defined by the "type" of the arc.



General Parallel Graph Processing

Most of the graph processing algorithm can be expressed in terms of a combination of "traversal" and "transformation".

Parallel Graph Traversal

In the case of "traversal", it can be expressed as a path which contains a sequence of segments. Each segment contains a traversal from a node to an arc, followed by a traversal from an arc to a node. In Marko and Peter's model, a Node (Vertex) contains a collection of "inE" and another collection of "outE". On the other hand, an Arc (Edge) contains one "inV", one "outV". So to expressed a "Friend-of-a-friend" relationship over a social network, we can use the following

./outE[@type='friend']/inV/outE[@type='friend']/inV

Loops can also be expressed in the path, to expressed all persons that is reachable from this person, we can use the following

.(/outE[@type='friend']/inV)*[@cycle='infinite']

On the implementation side, a traversal can be processed in the following way
  1. Start with a set of "context nodes", which can be defined by a list of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
  2. Repeat until all segments in the path are exhausted. Perform a walk from all context nodes in parallel. Evaluate all outward arcs (ie: outE) with conditions (ie: @type='friend'). The nodes that this arc points to (ie: inV) will become the context node of next round
  3. Return the final context nodes
Such traversal path can also be used to expressed inference (or derived) relationships, which doesn't have a physical arc stored in the graph model.

Parallel Graph Transformation

The main goal of Graph transformation is to modify the graph. This include modifying the properties of existing nodes and arcs, creating new arcs / nodes and removing existing arcs / nodes. The modification logic is provided by a user-defined function, which will be applied to all active nodes.

The Graph transformation process can be implemented in the following steps
  1. Start with a set of "active nodes", which can be defined by a lost of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
  2. Repeat until there is no more active nodes. Execute the user-defined transformation which modifies the properties of the context nodes and outward arcs. It can also remove outwards arcs or create new arcs that point to existing or new nodes (in other words, the graph connectivity can be modified). It can also send message to other nodes (the message will be picked up in the next round) as well as receive message sent from other nodes in the previous round.
  3. Return the transformed graph, or a traversal can be performed to return a subset of the transformed graph.
Google's Pregel

Pregel can be thought as a generalized parallel graph transformation framework. In this model, the most basic (atomic) unit is a "node" that contains its properties, outward arcs (and its properties) as well as the node id (just the id) that the outward arc points to. The node also has a logical inbox to receive all messages sent to it.


The whole graph is broken down into multiple "partitions", each contains a large number of nodes. Partition is a unit of execution and typically has an execution thread associated with it. A "worker" machine can host multiple "partitions".


The execution model is based on BSP (Bulk Synchronous Processing) model. In this model, there are multiple processing units proceeding in parallel in a sequence of "supersteps". Within each "superstep", each processing units first receive all messages delivered to them from the preceding "superstep", and then manipulate their local data and may queue up the message that it intends to send to other processing units. This happens asynchronously and simultaneously among all processing units. The queued up message will be delivered to the destined processing units but won't be seen until the next "superstep". When all the processing unit finishes the message delivery (hence the synchronization point), the next superstep can be started, and the cycle repeats until the termination condition has been reached.

Notice that depends on the graph algorithms, the assignment of nodes to a partition may have an overall performance impact. Pregel provides a default assignment where partition = nodeId % N but user can overwrite this assignment algorithm if they want. In general, it is a good idea to put close-neighbor nodes into the same partition so that message between these nodes doesn't need to flow into the network and hence reduce communication overhead. Of course, this also means traversing the neighboring nodes all happen within the same machine and hinder parallelism. This usually is not a problem when the context nodes are very diverse. In my experience of parallel graph processing, coarse-grain parallelism is preferred over fine-grain parallelism as it reduces communication overhead.

The complete picture of execution can be implemented as follows:


The basic processing unit is a "thread" associated with each partition, running inside a worker. Each worker receive messages from previous "superstep" from its "inQ" and dispatch the message to the corresponding partition that the destination node is residing. After that, a user defined "compute()" function is invoked on each node of the partition. Notice that there is a single thread per partition so nodes within a partition are executed sequentially and the order of execution is undeterministic.

The "master" is playing a central role to coordinate the execute of supersteps in sequence. It signals the beginning of a new superstep to all workers after knowing all of them has completed the previous one. It also pings each worker to know their processing status and periodically issue "checkpoint" command to all workers who will then save its partition to a persistent graph store. Pregel doesn't define or mandate the graph storage model so any persistent mechanism should work well. There is a "load" phase at the beginning where each partition starts empty and read a slice of the graph storage. For each node read from the storage, a "partition()" function will be invoked and load the node in the current partition if the function returns the same node, otherwise the node is queue to another partition who the node is assigned to.

Fault resilience is achieved by having the checkpoint mechanism where each worker is instructed to save its in-memory graph partition to the graph storage periodically (at the beginning of a superstep). If the worker is detected to be dead (not responding to the "ping" message from the master), the master will instruct the surviving workers to take up the partitions of the failed worker. The whole processing will be reverted back to the previous checkpoint and proceed again from there (even the healthy worker need to redo the previous processing). The Pregel paper mention a potential optimization to just re-execute the processing of the failed partitions from the previous checkpoint by replaying the previous received message, of course this requires keeping a log of all received messages between nodes at every super steps since previous checkpoint. This optimization, however, rely on the algorithm to be deterministic (in other words, same input execute at a later time will achieve the same output).

Further optimization is available in Pregel to reduce the network bandwidth usage. Messages destined to the same node can be combined using a user-defined "combine()" function, which is required to be associative and commutative. This is similar to the same combine() method in Google Map/Reduce model.

In addition, each node can also emit an "aggregate value" at the end of "compute()". Worker will invoke an user-defined "aggregate()" function that aggregate all node's aggregate value into a partition level aggregate value and all the way to the master. The final aggregated value will be made available to all nodes in the next superstep. Just aggregate value can be used to calculate summary statistic of each node as well as coordinating the progress of each processing units.

I think the Pregel model is general enough for a large portion of classical graph algorithm. I'll cover how we map these traditional algorithms in Pregel in subsequent postings.

Reference

http://www.slideshare.net/slidarko/graph-windycitydb2010