Showing posts with label big data. Show all posts
Showing posts with label big data. Show all posts

Sunday, February 22, 2015

Big Data Processing in Spark

In the traditional 3-tier architecture, data processing is performed by the application server where the data itself is stored in the database server.  Application server and database server are typically two different machine.  Therefore, the processing cycle proceeds as follows
  1. Application server send a query to the database server to retrieve the necessary data
  2. Application server perform processing on the received data
  3. Application server will save the changed data to the database server
In the traditional data processing paradigm, we move data to the code.
It can be depicted as follows ...




Then big data phenomenon arrives.  Because the data volume is huge, it cannot be hold by a single database server.  Big data is typically partitioned and stored across many physical DB server machines.  On the other hand, application servers need to be added to increase the processing power of big data.


However, as we increase the number of App servers and DB servers for storing and processing the big data, more data need to be transfer back and forth across the network during the processing cycle, up to a point where the network becomes a major bottleneck.

Moving code to data

To overcome the network bottleneck, we need a new computing paradigm.  Instead of moving data to the code, we move the code to the data and perform the processing at where the data is stored.



Notice the change of the program structure
  • The program execution starts at a driver, which orchestrate the execution happening remotely across many worker servers within a cluster.
  • Data is no longer transferred to the driver program, the driver program holds a data reference in its variable rather than the data itself.  The data reference is basically an id to locate the corresponding data residing in the database server
  • Code is shipped from the program to the database server, where the execution is happening, and data is modified at the database server without leaving the server machine.
  • Finally the program request a save of the modified data.  Since the modified data resides in the database server, no data transfer happens over the network.
By moving the code to the data, the volume of data transfer over network is significantly reduced.  This is an important paradigm shift for big data processing.

In the following session, I will use Apache Spark to illustrate how this big data processing paradigm is implemented.

RDD

Resilient Distributed Dataset (RDD) is how Spark implements the data reference concept.  RDD is a logical reference of a dataset which is partitioned across many server machines in the cluster.


To make a clear distinction between data reference and data itself, a Spark program is organized as a sequence of execution steps, which can either be a "transformation" or an "action".

Programming Model

A typical program is organized as follows
  1. From an environment variable "context", create some initial data reference RDD objects
  2. Transform initial RDD objects to create more RDD objects.  Transformation is expressed in terms of functional programming where a code block is shipped from the driver program to multiple remote worker server, which hold a partition of the RDD.  Variable appears inside the code block can either be an item of the RDD or a local variable inside the driver program which get serialized over to the worker machine.  After the code (and the copy of the serialized variables) is received by the remote worker server, it will be executed there by feeding the items of RDD residing in that partition.  Notice that the result of a transformation is a brand new RDD (the original RDD is not mutated)
  3. Finally, the RDD object (the data reference) will need to be materialized.  This is achieved through an "action", which will dump the RDD into a storage, or return its value data to the driver program.
Here is a word count example

# Get initial RDD from the context
file = spark.textFile("hdfs://...")

# Three consecutive transformation of the RDD
counts = file.flatMap(lambda line: line.split(" "))
             .map(lambda word: (word, 1))
             .reduceByKey(lambda a, b: a + b)


# Materialize the RDD using an action
counts.saveAsTextFile("hdfs://...") 

When the driver program starts its execution, it builds up a graph where nodes are RDD and edges are transformation steps.  However, no execution is happening at the cluster until an action is encountered.  At that point, the driver program will ship the execution graph as well as the code block to the cluster, where every worker server will get a copy.

The execution graph is a DAG.
  • Each DAG is a atomic unit of execution. 
  • Each source node (no incoming edge) is an external data source or driver memory
  • Each intermediate node is a RDD
  • Each sink node (no outgoing edge) is an external data source or driver memory
  • Green edge connecting to RDD represents a transformation.  Red edge connecting to a sink node represents an action



Data Shuffling

Although we ship the code to worker server where the data processing happens, data movement cannot be completely eliminated.  For example, if the processing requires data residing in different partitions to be grouped first, then we need to shuffle data among worker server.

Spark carefully distinguish "transformation" operation in two types.
  • "Narrow transformation" refers to the processing where the processing logic depends only on data that is already residing in the partition and data shuffling is unnecessary.  Examples of narrow transformation includes filter(), sample(), map(), flatMap() .... etc.
  • "Wide transformation" refers to the processing where the processing logic depends on data residing in multiple partitions and therefore data shuffling is needed to bring them together in one place.  Example of wide transformation includes groupByKey(), reduceByKey() ... etc.



Joining two RDD can also affect the amount of data being shuffled.  Spark provides two ways to join data.  In a shuffle join implementation, data of two RDD with the same key will be redistributed to the same partition.  In other words, each of the items in each RDD will be shuffled across worker servers.


Beside shuffle join, Spark provides another alternative call broadcast join.  In this case, one of the RDD will be broadcasted and copied over to every partition.  Imagine the situation when one of the RDD is significantly smaller relative to the other, then broadcast join will reduce the network traffic because only the small RDD need to be copied to all worker servers while the large RDD doesn't need to be shuffled at all.



In some cases, transformation can be re-ordered to reduce the amount of data shuffling.  Below is an example of a JOIN between two huge RDDs followed by a filtering.



Plan1 is a naive implementation which follows the given order.  It first join the two huge RDD and then apply the filter on the join result.  This ends up causing a big data shuffling because the two RDD is huge, even though the result after filtering is small.

Plan2 offers a smarter way by using the "push-down-predicate" technique where we first apply the filtering in both RDDs before joining them.  Since the filtering will reduce the number of items of each RDD significantly, the join processing will be much cheaper.

Execution planning

As explain above, data shuffling incur the most significant cost in the overall data processing flow.  Spark provides a mechanism that generate an execute plan from the DAG that minimize the amount of data shuffling.
  1. Analyze the DAG to determine the order of transformation.  Notice that we starts from the action (terminal node) and trace back to all dependent RDDs.
  2. To minimize data shuffling, we group the narrow transformation together in a "stage" where all transformation tasks can be performed within the partition and no data shuffling is needed.  The transformations becomes tasks that are chained together within a stage
  3. Wide transformation sits at the boundary of two stages, which requires data to be shuffled to a different worker server.  When a stage finishes its execution, it persist the data into different files (one per partition) of the local disks.  Worker nodes of the subsequent stage will come to pickup these files and this is where data shuffling happens
Below is an example how the execution planning turns the DAG into an execution plan involving stages and tasks.
 

Reliability and Fault Resiliency

Since the DAG defines a deterministic transformation steps between different partitions of data within each RDD RDD, fault recovery is very straightforward.  Whenever a worker server crashes during the execution of a stage, another worker server can simply re-execute the stage from the beginning by pulling the input data from its parent stage that has the output data stored in local files.  In case the result of the parent stage is not accessible (e.g. the worker server lost the file), the parent stage need to be re-executed as well.  Imagine this is a lineage of transformation steps, and any failure of a step will trigger a restart of execution from its last step.

Since the DAG itself is an atomic unit of execution, all the RDD values will be forgotten after the DAG finishes its execution.  Therefore, after the driver program finishes an action (which execute a DAG to its completion), all the RDD value will be forgotten and if the program access the RDD again in subsequent statement, the RDD needs to be recomputed again from its dependents.  To reduce this repetitive processing, Spark provide a caching mechanism to remember RDDs in worker server memory (or local disk).  Once the execution planner finds the RDD is already cache in memory, it will use the RDD right away without tracing back to its parent RDDs.  This way, we prune the DAG once we reach an RDD that is in the cache.


Overall speaking, Apache Spark provides a powerful framework for big data processing.  By the caching mechanism that holds previous computation result in memory, Spark out-performs Hadoop significantly because it doesn't need to persist all the data into disk for each round of parallel processing.  Although it is still very new, I think Spark will take off as the main stream approach to process big data.

Friday, November 28, 2014

Spark Streaming

In this post, we'll discuss another important topic of big data processing: real-time stream processing area.  This is an area where Hadoop falls short because of its high latency, and another open source framework Storm is developed to cover the need in real-time processing.  Unfortunately, Hadoop and Storm provides quite different programming model, resulting in high development and maintenance cost.

Continue from my previous post on Spark, which provides a highly efficient parallel processing framework.  Spark streaming is a natural extension of its core programming paradigm to provide large-scale, real-time data processing.  The biggest benefits of using Spark Streaming is that it is based on a similar programming paradigm of its core and there is no need to develop and maintain a completely different programming paradigm for batch and realtime processing.


Spark Core Programming Paradigm Recap

The core Spark programming paradigm consists of the following steps ...
  1. Taking input data from an external data source and create an RDD (a distributed data set across many servers)
  2. Transform the RDD to another RDD (these transformation defines a direct acyclic graph of dependencies between RDD)
  3. Output the final RDD to an external data source


Notice that the RDD is immutable, therefore the sequence of transformations is deterministic and therefore recovery from intermediate processing failure is simply by tracing back to the parent of the failure node (in the DAG) and redo the processing from there.


Spark Streaming

Spark Streaming introduce a data structure call DStream which is basically a sequence of RDD where each RDD contains data associated with a time interval.  DStream is created with a frequency parameters which defines the frequency RDD creation into the sequence.

Transformation of a DStream boils down to transformation of each RDD (within the sequence of RDD that the DStream contains).  Within the transformation, the RDD inside the DStream can "join" with another RDD (outside the DStream), hence provide a mix processing paradigm between DStream and other RDDs.  Also, since each transformation produces an output RDD, the result of transforming a DStream results in another sequence of RDDs that defines an output DStream.

Here is the basic transformation where each RDD in the output DStream has a one to one correspondence with each RDD in the input DStream. 


Instead of performing a 1 to 1 transformation of each RDD in the DStream.  Spark streaming enable a sliding window operation by defining a WINDOW which groups consecutive RDDs along the time dimension.  There are 2 parameters that the window is defined ...
  1. Window length: defines how many consecutive RDDs will be combined for performing the transformation.
  2. Slide interval: defines how many RDD will be skipped before the next transformation executes.


By providing a similar set of transformation operation for both RDD and DStream, Spark enable a unified programming paradigm across both batch and real-time processing, and hence reduce the corresponding development and maintenance cost.

Friday, December 27, 2013

Spark: Low latency, massively parallel processing framework

While Hadoop fits well in most batch processing workload, and is the primary choice of big data processing today, it is not optimized for other types of workload  due to its following limitation
  • Lack of iteration support
  • High latency due to persisting intermediate data onto disk
 For a more detail elaboration of the Hadoop limitation, refer to my previous post.

Nevertheless, the Map/Reduce processing paradigm is a proven mechanism for dealing with large scale data.  On the other hand, many of Hadoop's infrastructure piece such as HDFS, HBase has been mature over time.

In this blog post, we'll look at a different architecture called Spark, which has taken the strength of Hadoop and make improvement in a number of Hadoop's weakness, and provides a more efficient batch processing framework with a much lower latency (from the benchmark result, Spark (using RAM cache) claims to be 100x faster than Hadoop, and 10x faster than Hadoop when running on disk.  Although competing with Hadoop MapRed, Spark integrates well with other parts of Hadoop Ecosystem (such as HDFS, HBase ... etc.).  Spark has generated a lot of excitement in the big data community and represents a very promising parallel execution stack for big data analytics.

Berkeley Spark

Within the Spark cluster, there is a driver program where the application logic execution is started, with multiple workers which processing data in parallel.  Although this is not mandated, data is typically collocated with the worker and partitioned across the same set of machines within the cluster.  During the execution, the driver program will passed code/closure into the worker machine where processing of corresponding partition of data will be conducted.  The data will undergoing different steps of transformation while staying in the same partition as much as possible (to avoid data shuffling across machines).  At the end of the execution, actions will be executed at the worker and result will be returned to the driver program.


Underlying the cluster, there is an important Distributed Data Structure called RDD (Resilient Distributed Dataset), which is a logically centralized entity but physically partitioned across multiple machines inside a cluster based on some notion of key.  Controlling how different RDD are co-partitioned (with the same keys) across machines can reduce inter-machine data shuffling within a cluster.  Spark provides a "partition-by" operator which create a new RDD by redistributing the data in the original RDD across machines within the cluster.



RDD can optionally be cached in RAM and hence providing fast access.  Currently the granularity of caching is done at the RDD level, either the whole or none of the RDD is cached.  Cached is a hint but not a guarantee.  Spark will try to cache the RDD if sufficient memory is available in the cluster, based on LRU (Least Recent Use) eviction algorithm.

RDD provides an abstract data structure from which application logic can be expressed as a sequence of transformation processing, without worrying about the underlying distributed nature of the data.

Typically an application logic are expressed in terms of a sequence of TRANSFORMATION and ACTION.  "Transformation" specifies the processing dependency DAG among RDDs and "Action" specifies what the output will be (ie: the sink node of the DAG with no outgoing edge).  The scheduler will perform a topology sort to determine the execution sequence of the DAG, tracing all the way back to the source nodes, or node that represents a cached RDD.


Notice that dependencies in Spark come in two forms.  "Narrow dependency" means the all partitions of an RDD will be consumed by a single child RDD (but a child RDD is allowed to have multiple parent RDDs).  "Wide dependencies" (e.g. group-by-keys, reduce-by-keys, sort-by-keys) means a parent RDD will be splitted with elements goes to different children RDDs based on their keys.  Notice that RDD with narrow dependencies preserve the key partitioning between parent and child RDD.  Therefore RDD can be co-partitioned with the same keys (parent key range to be a subset of child key range) such that the processing (generating child RDD from parent RDD) can be done within a machine with no data shuffling across network.  On the other hand, RDD will wide dependencies involves data shuffling.


Narrow transformation (involves no data shuffling) includes the following operators
  • Map
  • FlatMap
  • Filter
  • Sample
Wide transformation (involves data shuffling) includes the following operators
  •  SortByKey
  • ReduceByKey
  • GroupByKey
  • CogroupByKey
  • Join
  • Cartesian
Action output the RDD to the external world and includes the following operators
  • Collect
  • Take(n)
  • Reduce
  • ForEach
  • Sample
  • Count
  • Save
The scheduler will examine the type of dependencies and group the narrow dependency RDD into a unit of processing called a stage.  Wide dependencies will span across consecutive stages within the execution and require the number of partition of the child RDD to be explicitly specified.


A typical execution sequence is as follows ...
  1. RDD is created originally from external data sources (e.g. HDFS, Local file ... etc)
  2. RDD undergoes a sequence of TRANSFORMATION (e.g. map, flatMap, filter, groupBy, join), each provide a different RDD that feed into the next transformation.
  3. Finally the last step is an ACTION (e.g. count, collect, save, take), which convert the last RDD into an output to external data sources
The above sequence of processing is called a lineage (outcome of the topological sort of the DAG).  Each RDD produced within the lineage is immutable.  In fact, unless if it is cached, it is used only once to feed the next transformation to produce the next RDD and finally produce some action output.

In a classical distributed system, fault resilience is achieved by replicating data across different machines together with a active monitoring system.  In case of any machine crashes, there is always another copy of data residing in a different machine from where recovery can take place.

Fault resiliency in Spark takes a different approach.  First of all, as a large scale compute cluster, Spark is not meant to be a large scale data cluster at all.  Spark makes two assumptions of its workload.
  • The processing time is finite (although the longer it takes, the cost of recovery after fault will be higher)
  • Data persistence is the responsibility of external data sources, which keeps the data stable within the duration of processing.
Spark has made a tradeoff decision that in case of any data lost during the execution, it will re-execute the previous steps to recover the lost data.  However, this doesn't mean everything done so far is discarded and we need to start from scratch at the beginning.  We just need to re-executed the corresponding partition in the parent RDD which is responsible for generating the lost partitions, in case of narrow dependencies, this resolved to the same machine.

Notice that the re-execution of lost partition is exactly the same as the lazy evaluation of the DAG, which starts from the leaf node of the DAG, tracing back the dependencies on what parent RDD is needed and then eventually track all the way to the source node.  Recomputing the lost partition is done is a similar way, but taking partition as an extra piece of information to determine which parent RDD partition is needed.

However, re-execution across wide dependencies can touch a lot of parent RDD across multiple machines and may cause re-execution of everything. To mitigate this, Spark persist the intermediate data output from a Map phase before it shuffle them to different machines executing the reduce phase.  In case of machine crash, the re-execution (from another surviving machine) just need to trace back to fetch the intermediate data from the corresponding partition of the mapper's persisted output.  Spark also provide a checkpoint API to explicitly persist intermediate RDD so re-execution (when crash) doesn't need to trace all the way back to the beginning.  In future, Spark will perform check-pointing automatically by figuring out a good balance between the latency of recovery and the overhead of check-pointing based on statistical result.

Spark provides a powerful processing framework for building low latency, massively parallel processing for big data analytics.  It supports API around the RDD abstraction with a set of operation for transformation and action for a number of popular programming language like Scala, Java and Python.

In future posts, I'll cover other technologies in the Spark stack including real-time analytics using streaming as well as machine learning frameworks.

Monday, August 13, 2012

BIG Data Analytics Pipeline

"Big Data Analytics" has recently been one of the hottest buzzwords.  It is a combination of "Big Data" and "Deep Analysis".  The former is a phenomenon of Web2.0 where a lot of transaction and user activity data has been collected which can be mined for extracting useful information.  The later is about using advanced mathematical/statistical technique to build models from the data.  In reality I've found these two areas are quite different and disjoint and people working in each area have a pretty different background.

Big Data Camp

People working in this camp typically come from Hadoop, PIG/Hive background.  They usually have implemented some domain-specific logic to process large amount of raw data.  Often the logic is relatively straight-forward based on domain-specific business rules.

From my personal experience, most of the people working in big data come from a computer science and distributed parallel processing system background but not from the statistical or mathematical discipline.

Deep Analysis Camp

On the other hand, people working in this camp usually come from statistical and mathematical background which the first thing being taught is how to use sampling to understand a large population's characteristic.  Notice the magic of "sampling" is that the accuracy of estimating the large population depends only in the size of sample but not the actual size of the population.  In their world, there is never a need to process all the data in the population in the first place.  Therefore, Big Data Analytics is unnecessary under this philosophy.

Typical Data Processing Pipeline

Learning from my previous projects, I observe most data processing pipeline fall into the following pattern.



In this model, data is created from the OLTP (On Line Transaction Processing) system, flowing into the BIG Data Analytics system which produced various output; including data mart/cubes for OLAP (On Line Analytic Processing), reports for the consumption of business executives, and predictive models that feedback decision support for OLTP.

Big Data + Deep Analysis

The BIG data analytics box is usually done in a batch fashion (e.g. once a day), usually we see big data processing and deep data analysis happens at different stages of this batch process.



The big data processing part (colored in orange) is usually done using Hadoop/PIG/Hive technology with classical ETL logic implementation.  By leveraging the Map/Reduce model that Hadoop provides, we can linearly scale up the processing by adding more machines into the Hadoop cluster.  Drawing cloud computing resources (e.g. Amazon EMR) is a very common approach to perform this kind of tasks.

The deep analysis part (colored in green) is usually done in R, SPSS, SAS using a much smaller amount of carefully sampled data that fits into a single machine's capacity (usually less than couple hundred thousands data records).  The deep analysis part usually involve data visualization, data preparation, model learning (e.g. Linear regression and regularization, K-nearest-neighbour/Support vector machine/Bayesian network/Neural network, Decision Tree and Ensemble methods), model evaluation.  For those who are interested, please read up my earlier posts on these topics.

Implementation Architecture

There are many possible ways to implement the data pipeline described above.  Here is one common implementation that works well in many projects.


In this architecture, "Flume" is used to move data from OLTP system to Hadoop File System HDFS.  A workflow scheduler (typically a cron-tab entry calling a script) will periodically run to process the data using Map/Reduce.  The data has two portions:  a) Raw transaction data from HDFS  b) Previous model hosted in some NOSQL server.  Finally the "reducer" will update the previous model which will be available to the OLTP system.

For most the big data analytic projects that I got involved, the above architecture works pretty well.  I believe projects requiring real-time feedback loop may see some limitation in this architecture.  Real-time big data analytics is an interesting topic which I am planning to discuss in future posts.

Thursday, April 21, 2011

K-Means Clustering in Map Reduce

Unsupervised machine learning has broad application in many e-commerce sites and one common usage is to find clusters of consumers with common behaviors. In clustering methods, K-means is the most basic and also efficient one.

K-Means clustering involve the following logical steps

1) Determine the value of k
2) Determine the initial k centroids
3) Repeat until converge
- Determine membership: Assign each point to the closest centroid
- Update centroid position: Compute new centroid position from assigned members

Determine the value of K
This is basically asking the question of: "How many clusters you are interested to discover ?"
So the answer is specific to the problem domain.

One way is to try different K. At some point, we'll see increasing K doesn't help much to improve the overall quality of clustering. Then that is the right value of K.

Notice that the overall quality of cluster is the average distance from each data point to its associated cluster.


Determine the initial K centroids
We need to pick K centroids to start the algorithm. So one way to pick them is to randomly pick K points from the whole data set.

However, picking a good set of centroids can reduce the number of subsequent iterations and by "good" I mean the K centroid should be as far apart to each other as possible, or even better the initial K centroid is close to the final K centroid. As you can see, choosing the random K points is reasonable but non-optimum.

Another approach is to take a small random sample set from the input data set and do a hierarchical clustering within this smaller set (note that hierarchical clustering is not-scaling to large data set).

We can also partition the space into overlapping region using canopy cluster technique (describe below) and pick the center of each canopy as the initial centroid.

Iteration
Each iteration is implemented as a Map/Reduce job. First of all, we need a control program on the client side to initialize the centroid positions, kickoff the iteration of Map/Reduce jobs and determine whether the iteration should end ...

kmeans(data) {
  initial_centroids = pick(k, data)
  upload(data)
  writeToS3(initial_centroids)
  old_centroids = initial_centroids
  while (true){
    map_reduce()
    new_centroids = readFromS3()
    if change(new_centroids, old_centroids) < delta {
      break
    } else {
      old_centroids = new_centroids
    }
  }
  result = readFromS3()
  return result
}


Within each iteration, most of the processing will be done in the Map task, which determine the membership for each point, as well as compute a partial sum of each member points of each cluster.

The reducer did the easy job by aggregating all partial sums and compute the update centroid position, and then out them into a shared store (S3 in this case) that can be picked up by the Map/Reduce job of next round.



Complexity Analysis
Most of the work is done by the Mapper and the workload is pretty balanced. So the time complexity will be O(k*n/p) where k is number of clusters, n is number of data points and p is number of machines. Note that the factor of k comes in at the closest_centroid() function above when comparing each data point with each intermediate centroid as follows ...
closest_centroid(point, listOfCentroids) {
  bestCentroid = listOfCentroids[0]
  minDistance = INFINITY
  for each centroid in listOfCentroids {
    distance = dist(point, centroid)
    if distance < minDistance {
      minDistance = distance
      bestCentroid = centroid
    }
  }
  return bestCentroid
}

If we partition the space into proximity regions, we only need to compare each point with centroid within the same proximity region and treat other centroids infinite distance. In other words, we don't have to compare each point with all k centroids.

Canopy clustering provide such a partitioning mechanism.


Canopy Clustering
To define the proximity region (canopy), we can draw a circle (or hypersphere) centered at a data point. Points outside this sphere is considered to be too far.

However, if we apply this definition to every point, then we will have as many proximity region as the number of points, which ends up doesn't save much processing. We also observed that points are very close by each other can stay in the same region without each point creating their own. Therefore, we can draw a smaller circle within the big circle (with the same center) such that data points within the small circle is not allowed to form its own proximity region.


Notice that each proximity region can overlap with each other and the degree of overlapping will be affected by the choice of T1. Also the choice of T2 affects how many canopies will be formed. Picking the right number of T1 and T2 is domain-specific, and also depends on the number of clusters and the space volume. If there is a small number of clusters within a big space, then a bigger T1 should be chosen.

To create the canopies (and mark the data points with the canopies), we will do the following steps ...
1) Create the canopy centers, with one scan
  • Keep a list of canopies, initially an empty list
  • Scan each data point, if it is within T2 distance of existing canopies, discard it. Otherwise, add this point into the list of canopies

2) Assign data points to the canopies, with another scan
  • Start with a list of canopies from last step
  • Scan each data point, if it is within T1 of the canopyA, add A as the assigned canopy to the data point. Notice that the data point can be assigned to multiple canopies
  • When done, each data point will look like

Notice that now the input data points has been added with an extra attribute that contains the assigned canopies. When compare the point with the intermediate centroids, we only need to compare centroids within the same canopy. Here is the modified version of the algorithm ...

closest_centroid(point, listOfCentroids) {
  bestCentroid = listOfCentroids[0]
  minDistance = INFINITY
  for each cent in listOfCentroids {
    if (not point.myCanopy.intersects(cent.myCanopy)) {
      continue
    }
    distance = dist(point, centroid)
    if distance < minDistance {
      minDistance = distance
      bestCentroid = centroid
    }
  }
  return bestCentroid
}

Sunday, August 29, 2010

Designing algorithms for Map Reduce

Since the emerging of Hadoop implementation, I have been trying to morph existing algorithms from various areas into the map/reduce model. The result is pretty encouraging and I've found Map/Reduce is applicable in a wide spectrum of application scenarios.

So I want to write down my findings but then found the scope is too broad and also I haven't spent enough time to explore different problem domains. Finally, I realize that there is no way for me to completely cover what Map/Reduce can do in all areas, so I just dump out what I know at this moment over the long weekend when I have an extra day.

Notice that Map/Reduce is good for "data parallelism", which is different from "task parallelism". Here is a description about their difference and a general parallel processing design methodology.

I'll cover the abstract Map/Reduce processing model below. For a detail description of the implementation of Hadoop framework, please refer to my earlier blog here.


Abstract Processing Model
There are no formal definition of the Map/reduce model. Basic on the Hadoop implementation, we can think of it as a "distributed merge-sort engine". The general processing flow is as follows.
  • Input data is "split" into multiple mapper process which executes in parallel
  • The result of the mapper is partitioned by key and locally sorted
  • Result of mapper of the same key will land on the same reducer and consolidated there
  • Merge sorted happens at the reducer so all keys arriving the same reducer is sorted

Within the processing flow, user defined functions can be plugged-in to the framework.
  • map(key1, value1) -> emit(key2, value2)
  • reduce(key2, value2_list) -> emit(key2, aggregated_value2)
  • combine(key2, value2_list) -> emit(key2, combined_value2)
  • partition(key2) return reducerNo
Design the algorithm for map/reduce is about how to morph your problem into a distributed sorting problem and fit your algorithm into the user defined functions of above.

To analyze the complexity of the algorithm, we need to understand the processing cost, especially the cost of network communication in such a highly distributed system.

Lets first consider the communication between Input data split and Mapper. To minimize this overhead, we need to run the mapper logic at the data split (without moving the data). How well we do this depends on how the input data is stored and whether we can run the mapper code there. For HDFS and Cassandra, we can the mapper at the storage node and the scheduler algorithm of JobTracker will assign the mapper to the data split that it collocates with and hence significantly reduce the data movement. Other data store such as Amazon S3 doesn't allow execution of mapper logic at the storage node and therefore incur more data traffic.

The communication between Mapper and Reducer cannot be collocated because it depends on the emit key. The only mechanism available is the combine() function which can perform a local consolidation and hence can reduce the data sent to the reducer.

Finally the communication between the reducer and the output data store depends on the store's implementation. For HDFS, the data is triply replicated and hence the cost of writing can be high. Cassandra (a NOSQL data store) allows configurable latency with various degree of data consistency trade-off. Fortunately, in most case the volume of result data after a Map/Reduce processing is not high.

Now, we see how to fit various different kinds of algorithms into the Map/Reduce model ...


Map-Only
"Embarrassing parallel" problems are those that the same processing is applied in each data element in a pretty independent way, in other words, there is no need to consolidate or aggregate individual results.

These kinds of problem can be expressed as a Map-only job (by specifying the number of reducers to zero). In this case, Mapper's emitted result will directly go to the output format.

Some examples of map-only examples are ...
  • Distributed grep
  • Document format conversion
  • ETL
  • Input data sampling

Sorting
As we described above, Hadoop is fundamentally a distributed sorting engine, so using it for sorting is a natural fit.

For example, we can use an Identity function for both map() and reduce(), then the output is equivalent to sorting the input data. Notice that we are using a single reducer here. So the merge is still sequential although the sorting is done at the mapper in parallel.

We can perform the merge in parallel by using multiple reducers. In this case, output of each reducer are sorted. We may need to do a final merge on all the reducer's output. Another way is to use a customized partition() function such that the keys are partitioned by range. In this case, each reducer is sorting a particular range and the final result is just to concatenate the each reducer's sorted result.
partition(key) {
  range = (KEY_MAX - KEY_MIN) / NUM_OF_REDUCERS
  reducer_no = (key - KEY_MIN) / range
  return reducer_no
}


Inverted Indexes
The map reduce model is originated from Google which has a lot of scenarios of building large scale inverted index. Building an inverted index is about parsing different documents to build a word -> document index for keyword search.

In fact, inverted index is pretty general and can be applied in many scenarios. To build an inverted index, we can feed the mapper each document (or lines within a document). The Mapper will parse the words in the document to emit [word, doc] pairs along with other metadata such as where in the document this word occurs ... etc. The reducer can simply be an identity function that just dump out the list, or it can perform some statistic aggregation per word.

In a more general form of Inverted index, there is a "container" and "element" concept. The Map and Reduce function will be organized in the following patterns.
map(key, container) {
  for each element in container {
      element_meta =
           extract_metadata(element, container)
      emit(element, [container_id, element_meta])
  }
}

reduce(element, container_ids) {
  element_stat =
       compute_stat(container_ids)
  emit(element, [element_stat, container_ids])
}

In Text index, we are not just counting the actual frequency of the terms but also adjust its weighting based on its frequency distribution so common words will have less significance when they appears in the document. The final value after normalization is called TF-IDF (term frequency times inverse document frequency) and can be computed using Map Reduce as well.


Simple Statistics ComputationComputing max, min, count is very straightforward since this operation is commutative and associative. Each mapper will perform the local computation and send the result to a single reducer to do the final computation.

Combine function is typically used to reduce the network traffic. Notice that the input to the combine function must look the same as the input to the reducer function and the output of the combine function must look the same as the output of the map function. There is also no guarantee that the combiner function will be invoked at all.

class Mapper {
  buffer

  map(key, number) {
      buffer.append(number)
      if (buffer.is_full) {
          max = compute_max(buffer)
          emit(1, max)
      }
  }
}


class Reducer {
  reduce(key, list_of_local_max) {
      global_max = 0
      for local_max in list_of_local_max {
          if local_max > global_max {
              global_max = local_max
          }
      }        emit(1, global_max)
  }
}


class Combiner {
  combine(key, list_of_local_max) {
      local_max = maximum(list_of_local_max)
      emit(1, local_max)
  }
}
Computing avg is done in a similar way except that instead of computing the local avg, we compute the local sum and local count. The reducer will do the final sum divided by the final count to come up with the final avg.

Computing a histogram is pretty common in statistics and can give a quick idea about the data distribution. A typical approach is to divide the number into different intervals. The mapper will compute the count per interval, and emit that per interval and the reducer will compute the sum of that interval.
class Mapper {
  interval_start = [0, 20, 40, 60, 80]

  map(key, number) {
      i = 0;
      while (i < NO_OF_INTERVALS) {
          if (number < interval_start[i]) {
              emit(i, 1)
              break
          }
      }
  }
}


class Reducer {
  reduce(interval, counts) {
      total_counts = 0
      for each count in counts {
          total_counts += count
      }
      emit(interval, total_counts)
  }
}


class Combiner {
  combine(interval, occurrence) {
      emit(interval, occurrence.size)
  }
}
Notice that a non-uniform distribution of values across intervals may cause an unbalanced workload among reducers and hence undermine the degree of parallelism. We'll address this in the later part of this post.


In-Mapper Combine
Jimmy Lin, in his excellent book, talks about a technique call "in-mapper combine" which regains control at the application level when the combine takes place. The general idea is to maintain a HashMap to buffer the intermediate result and has a separate logic to determine when to actually emit the data from the buffer. The general code structure is as follows ...
class Mapper {
  buffer

  init() {
      buffer = HashMap.new
  }

  map(key, data) {
      elements = process(data)
      for each element {
          ....
          check_and_put(buffer, k2, v2)
      }
  }

  check_and_put(buffer, k2, v2) {
      if buffer.full {
          for each k2 in buffer.keys {
              emit(k2, buffer[k2])
          }
      }
  }

  close() {        for each k2 in buffer.keys {
          emit(k2, buffer[k2])
      }    }
}

SQL Model
The SQL model can be used to extract data from the data source. It contains a number of primitives.

Projection / Filter
This logic is typically implemented in the Mapper
  • result = SELECT c1, c2, c3, c4 FROM source WHERE conditions
Aggregation / Group by / Having
This logic is typically implemented in the Reducer
  • SELECT sum(c3) as s1, avg(c4) as s2 ... FROM result GROUP BY c1, c2 HAVING conditions
The above example can be realized by the following map/reduce job
class Mapper {
  map(k, rec) {
      select_fields =
          [rec.c1, rec.c2, rec.c3, rec.c4]
      group_fields =
          [rec.c1, rec.c2]
      if (filter_condition == true) {
          emit(group_fields, select_fields)
      }
  }
}

class Reducer {
  reduce(group_fields, list_of_rec) {
      s1 = 0
      s2 = 0
      for each rec in list_of_rec {
          s1 += rec.c3
          s2 += rec.c4
      }
      s2 = s2 / rec.size
      if (having_condition == true) {
          emit(group_fields, [s1, s2])
      }
  }
}

Data Joins
Joining 2 data set is a very common operation in Relational Data Model and has been very mature in RDBMS implementation. The common join mechanism in a centralized DB architecture is as follows
  1. Nested loop join -- This is the most basic and naive mechanism and is organized as two loops. The outer loop reads from data set1, the inner loop scan through the whole data set2 and compare with the records just read from data set1.
  2. Indexed join -- An index (e.g. B-Tree index) is built for one of the data sets (say data set2 which is the smaller one). The join will scan through data set1 and lookup the index to find the matched records of data set2.
  3. Merge join -- Pre-sort both data sets so they are arranged physically in increasing order. The join is realized by just merging the two data sets. a) Locate the first record in both data set1 & set2, which is their corresponding minimum key b) In the one with a smaller minimum key (say data set1), keep scanning until finding the next key which is bigger than the minimum key of the other data set (ie. data set2), call this the next minimum key of data set1. c) Switch position and repeat the whole thing until one of the data set is exhausted.
  4. Hash / Partition join -- Partition the data set1 and data set2 into smaller size and apply other join algorithm in a smaller data set size. A linear scan with a hash() function is typically performed to partition the data sets such that data in set1 and data in set2 with the same key will land on the same partition.
  5. Semi join -- This is mainly used to join two sets of data that is stored at different locations and the goal is to reduce the amount of data transfer such that only the full records appears in the final joint result will be send through. a) Data set2 will send its key set to machine holding Data set1. b) Machine holding Data set1 will do a join and send back the records in Data set1 that matches one of the send-over keys. c) The machine holding data set2 will do a final join to the data send back.
In the map reduce environment, it has the corresponding joins.

General reducer-side join
This is the most basic one, records from data set1 and set2 with the same key will land on the same reducer, which will then do a cartesian product. The downside of this model is that the reducer need to have enough memory to hold all records of each key.
map(k1, rec) {
  emit(rec.key, [rec.type, rec])
}

reduce(k2, list_of_rec) {
  list_of_typeA = []
  list_of_typeB = []
  for each rec in list_of_rec {
      if (rec.type == 'A') {
          list_of_typeA.append(rec)
      } else {
          list_of_typeB.append(rec)
      }
  }

  # Compute the catesian product
  products = []
  for recA in list_of_typeA {
      for recB in list_of_typeB {
          emit(k2, [recA, recB])
      }
  }
}

Optimized reducer-side join
You can "secondary sort" the data type for each key by defining a customized partition function. In this model, you arrange the data type (which has less records per key to arrive first) and you only need to store these types.
map(k1, rec) {
  emit([rec.key, rec.type], rec])
}

partition(key_pair) {
  super.partition(key_pair[0])
}

reduce(k2, list_of_rec) {
  list_of_typeA = []
  for each rec in list_of_rec {
      if (rec.type == 'A') {
          list_of_typeA.append(rec)
      } else { # receive records of typeA
          for recA in list_of_typeA {
              emit(k2, [recA, rec])
          }
      }
  }
}

While being very flexible, the downside of Reducer side join is that all data need to be transfer from the mapper to the reducer and then result write to HDFS. Map-side join explore some special arrangement of the input file such that the join is being perform at the mapper. The advantage of doing in the mapper is that we can exploit the collocation of the Map reduce framework such that the mapper will be allocated an input split in its local machine, hence reduce the data transfer from the disk to the mapper. After the map-side join, the result is written directly to the output HDFS files and hence eliminate the data transfer between the mapper and the reducer.

Map-side partition join
In this model, it requires the 2 data sets to be partitioned into 2 sets of partition files (same number of partitions for each set). The size of the partition is such that it can fit into the memory of the Mapper machine. We also need to configure the Map/Reduce job such that there is no split in the partition file, in other words, the whole partition is assigned to a mapper task.

The mapper will detect the partition of the input file and then read the corresponding partition file of the other data set into an in-memory hashtable. After that, the mapper will lookup the Hashtable to do the join.
class Mapper {
  map = Hashtable.new

  init() {
      partition = detect_input_filename()
      map = load("hdfs://dataset2/" + partition)
  }

  map(k1, rec1) {
      rec2 = map[rec1.key]
      if (rec2 != nil) {
          emit(rec1.key, [rec1, rec2])
      }
  }
}

Map-side partition merge join
In additional, if the partition file is also sorted, then the mapper can use a merge join, which has an even smaller memory footprint.
class Mapper {
  rec2_key = nil
  next_rec2 = nil
  list_of_rec2 = []
  file = nil

  init() {
      partition = detect_input_filename()
      file = open("hdfs://dataset2/" + partition, "r")
      next_rec2 = file.read()
      fill_rec2_list()
  }

  # Fill up the list of rec2 list which has the same key
  fill_rec2_list() {
      rec2_key = next_rec2.key
      list_of_rec2.append(next_rec2)
      next_rec2 = file.read
      while(next_rec2.key == key) {
          list_of_rec2.append(next_rec2)
      }
  }

  map(k1, rec1) {
      while (rec1.key > rec2_key) {
          fill_rec2_list()
      }
        while (rec1.key == rec2.key) {
          for rec2 in list_of_rec2 {
              emit(rec1.key, [rec1, rec2])
          }
      }
    }
}

Memcache join
The model is very straightforward, the second data set is loaded into a distributed hash table (like memcache) which has effectively unlimited size. The mapper will receive input split from the first data set and then lookup the memcache for the corresponding record of the other data set.

There are also some other more sophisticated join mechanism such as semi-join described in this paper.

Graph Algorithms
Many problems can be modeled as a graph of Node and Edges. In the Search engine environment, computing the rank of a document using Page Rank or Hits can be model as a sequence of iterations of Map/Reduce jobs.

In the past, I have been blog a number of very basic graph algorithms in map reduce including doing topological sort, finding shortest path, minimum spanning tree etc. and also how to recommend people connection using Map/Reduce.

Due to the fact that graph traversal is inherently sequential, I am not sure Map/Reduce is the best parallel processing model for graph processing. Another problem is that due to the "stateless nature" of map() and reduce() functions, the whole graph need to be transferred between mapper and reducer which incur significant communication costs. Jimmy Lin has described a clever technique called Shimmy which exploit using a special partitioning function which let the reducer to retain the ownership of nodes across map/reduce jobs. I have described this technique as well as a general model of Map/Reduce graph processing in a previous blog.

I think a parallel programming model specific for Graph processing will perform much better. Google's Pregel model is a good example of that.


Machine Learning
Many of the machine learning algorithm involve multiple iterations of parallel processing that fits very well into Map/Reduce model.

For example, we can use map reduce to calculate the statistics for probabilistic methods such as naive Bayes.

A simple example of computing K-Means cluster can also be done in the following way.
  • Input: A set of points, with k initial centrods
  • Output: K final centroids
Iterate until no more change of membership
  1. For each point, assign it to be the member of closest centroid
  2. Re-compute the centroid from the assigned point members


For a complete list of Machine learning algorithms and how they can be implemented using the Map/Reduce model, here is a very good paper.


Matrix arithmetic
A lot of real-life relationships can be represented as a Matrix. One example is the vector space model of Information Retrieval where the column represents docs and the row represents terms. Another example is the social network graph where the column as well as the row representing people and a binary value of each cell to represent a "friend" relationship. In this case, M + M.M represents all the people that I can reach within 2 degree.

Processing for dense matrix is very easy to parallelized. But since the sequential version is O(N^3), it is not that interesting for Matrix with large size (millions range in rows and columns).

A lot of real-world graph problem can be represented as sparse matrix. So my interests is to focus more in the processing of sparse matrix. I don't have much to share at this moment but I hope this is something I will blog about in future.

Sunday, May 2, 2010

The NOSQL Debate

I have attended the Stanford InfoLab conference, and there are 2 panel discussions in Cloud computing transaction processing and analytic processing.

The session turns out to be a debate between people from the academia side with the open source developer. Both sides have their points ...

Background of RDBMS

RDBMS is based on a clear separation between application logic and data management. This loose coupling allows application and DB technologies to be evolved independently.

This philosophy drives the DBMS architecture to be more general in order to support a wide range of applications. Over many years of evolution, it has a well-defined data model + query model based on relational algebra. On the other hand, it also have a well-defined transaction processing model.

On the other hand, applications also benefit from having a unified data model. They have more freedom to switch their DB vendor without too much of code changes.

For OLTP applications, the RDBMS model has been proven to be highly successful in many large enterprises. The success of both Oracle and MySQL can speak to that.

For Analytic applications, the RDBMS model is also used widely for implementing data warehousing based on a STAR schema model (composed of Facts table and Dimension tables).

This model also put DBA into a very important position in the enterprise. They are equipped with sophisticated management tools and specialized knowledge to control the commonly shared DB schema.

Background of NOSQL

In the last 10 years, there are a very few of highly successful web2.0 companies whose applications have gone beyond what a centralized RDBMS can handle. Partition data across many servers and spread the workload across them seems to be a reasonable first thing to try. Many RDBMS solution provides a Master/Slave replication mechanism where one can spread the READ-ONLY workload across many servers, and it helps.

In a partitioned environment, application needs to be more tolerant to data integrity issues, especially when data is asynchronously replicated between servers. The famous CAP theorem from Eric Brewer capture the essence of the tradeoff decisions for highly scalable applications (which must have partition tolerance), they have to choose between "Consistency" and "Availability".

Fortunately, most of these web-scale application have a high tolerance in data integrity, so they choose "availability" over "consistency". This is a very major deviation from the transaction processing model of RDBMS which typically weight "consistency" much higher than "availability".

On the other hand, the data structure used in these web-scale application (e.g. user profile, preference, social graph ... etc) are much richer than the simple row/column/table model. Some of the operations involves navigation within a large social graph which involves too many join operations in a RDBMS model.

The "higher tolerance of data integrity" as well as "efficiency support for rich data structure" challenges some of the very fundamental assumption of RDBMS. Amazon has experimented their large scale key/value store called Dynamo and Google also has their BigTable column-oriented storage model, both are designed from the ground up with a distributed architecture in mind.

Since then, there are many open source clones based on these two models. To represent the movement of this trend, Eric Evans from Rackspace coin a term "NOSQL". This term in my opinion is not accurately reflecting the underlying technologies but nevertheless provide a marketing term for every non RDBMS technologies to get together, even those (e.g. CouchDB, Neo4j) who is not originally trying to tackle the scalability problem.

Today, NOSQL provides an alternative approach for Non-Relational Database.

For analytical application, they also take a highly-parallel brute-force processing model based on the Map/reduce model.

The Debate

There are relatively few criticism on the data model aspects. Jun Rao from IBM Lab summarized the key difference between the philosophy. The traditional RDBMS approach is first to figure out the right model, and then provide an implementation and hope it is scalable. The modern NOSQL approach is doing the opposite by first figuring out how a highly scalable architecture looks like and then layer a data model on top of that. Basically people on both camps agree that we should use a data model that is optimized for the application's access patterns, weakening the "one-size-fits-all" philosophy of RDBMS.

Most of the debate is centered around the transaction processing model itself. Basically RDBMS proponents thinks NOSQL camp hasn't spent enough time to understand the theoretical foundation of the transaction processing model. The new "eventual consistency" model is not well-defined and different implementations may differs significantly with each other. This means figuring out all these inconsistent behavior lands on the application developer's responsibilities and make their life much harder. Hard to reason about the DB's behavior can be very dangerous if the application made wrong assumption about the underlying data integrity guarantees.

While agree that application developers now have more to worry about, the NOSQL camp argues that this is actually a benefit because it gives the domain-specific optimization opportunities back to the application developers who now no longer constrained by a one-size-fits-all model. But they admit that making such optimization decision requires a lot of experience and can be very error-prone and dangerous if not done by experts.

On the other hand, the academia also make a note that the movement to NOSQL may deem fashionable and cool to new technologists who may not have the expertise and skills. The community as a whole should articular the pros and cons of NOSQL.

Notice that this is not the debate of the first time. StoneBraker has written a very good article from the RDBMS side.

Saturday, November 28, 2009

Query Processing for NOSQL DB

The recent rise of NoSQL provides an alternative model in building extremely large scale storage system. Nevetheless, compare to the more mature RDBMS, NoSQL has some fundamental limitations that we need to be aware of.
  1. It calls for a more relaxed data consistency model
  2. It provides primitive querying and searching capability
There are techniques we can employ to mitigate each of these issue. Regarding the data consistency concern, I have discussed a number of design patterns in my previous blog to implement system with different strength of consistency guarantee.

Here I like to give myself a try to tackle the second issue.

So what is the problem ?

Many of the NoSQL DB today is based on the DHT (Distributed Hash Table) model, which provides a hashtable access semantics. To access or modify any object data, the client is required to supply the primary key of the object, then the DB will lookup the object using an equality match to the supplied key.

For example, if we use DHT to implement a customer DB, we can choose the customer id as the key. And then we can get/set/operate on any customer object if we know its id
  • cust_data = DHT.get(cust_id)
  • DHT.set(cust_id, modified_cust_data)
  • DHT.execute(cust_id, func(cust) {cust.add_credit(200)})
In the real world, we may want to search data based on other attributes than its primary key, we may also search attributes based on "greater/less than" relationship, or we may want to combine multiple search criteria using a boolean expression.

Using our customer DB example, we may do ...
  • Lookup customers within a zip code
  • Lookup customers whose income is above 200K
  • Lookup customer using keywords "chief executives"
Although query processing and indexing technique is pretty common in RDBMS world, it is seriously lacking in the NoSQL world because of the very nature of the "distributed architecture" underlying most of NoSQL DB.

It seems to me that the responsibility of building an indexing and query mechanism lands on the NoSQL user. Therefore, I want to explore some possible techniques to handle these.


Companion SQL-DB

A very straighforward approach is provide querying capability is to augment NoSQL with an RDBMS or TextDB (for keyword search). e.g. We add the metadata of the object into a RDBMS so we can query its metadata using standard SQL query.

Of course, this requires the RDBMS to be large enough to store the search-able attributes of each object. Since we only store the attributes required for search, rather than the whole object into the RDBMS, this turns out to be a very practical and common approach.


Scatter/Gather Local Search

Some of the NOSQL DB provides indexing and query processing mechanism within the local DB. In this case, we can have the query processor broadcast the query to every node in the DHT where a local search will be conducted with results sent back to the query processor which aggregates into a single response.

Notice that the search is happening in parallel across all nodes in the DHT.


Distributed B-Tree

B+Tree is a common indexing structure using in RDBMS. A distributed version of B+Tree can also be used in a DHT environment. The basic idea is to hash the search-able attribute to locate the root node of the B+ Tree. The "value" of the root node contains the id of its children node. So the client can then issue another DHT lookup call to find the children node. Continue this process, the client eventually navigate down to the leaf node, where the object id of the matching the search criteria is found. Then the client will issue another DHT lookup to extract the actual object.

Caution is needed when the B+Tree node is updated due to split/merge caused by object creation and deletion. This should be ideally done in an atomic fashion. This paper from Microsoft, HP and Toronto U describe a distributed transaction protocol to provide the required atomicity. Distributed transaction is an expensive operation but its uses here is justified because most of the B+ tree updates rarely involve more than a single machine.


Prefix Hash Table (distributed Trie)

Trie is an alternative data structure, where every path (from the root) contains the prefix of the key. Basically, every node in the Trie contains all the data whose key is prefixed by it. Berkeley and Intel research has a paper to describe this mechanism.

1. Lookup a key
To locate a particular key, we start with its one digit prefix and do a DHT lookup to see if we get a leaf node. If so, we search within this leaf node as we know the key must be contained inside. If it is not a leaf node, we extend the prefix with an extra digit and repeat the whole process again.
# Locate the key next to input key
def locate(key)
 leaf = locate_leaf_node(key)
 return leaf.locate(key)
end

# Locate leaf node containing input key
def locate_leaf_node(key)
 for (i in 1 .. key.length)
   node = DHT.lookup(key.prefix(n))
   return node if node.is_leaf?
 end
 raise exception
end

2. Range Query
Perform a range query can be done by first locate the leaf node that contains the start key and then walk in the ascending order direction until we exceed the end key. Note that we can walk across a leaf node by following the leaf node chain.
def range(startkey, endkey) {
 result = Array.new
 leaf = locate_leaf_node(startkey)
 while leaf != nil
   result.append(leaf.range(startkey, endkey))
   if (leaf.largestkey < endkey)
     leaf = leaf.nextleaf
   end
 end
 return result
end
To speedup the search, we can use a parallel search mechanism. Instead of walking from the start key in a sequential manner, we can find the common prefix of the start key and end key (as we know all the result is under its subtree) and perform a parallel search of the children leaf nodes of this subtree.

3. Insert and Delete keys
To insert a new key, we first identify the leaf node that contains the inserted key. If the leaf node has available capacity (less than B keys), then simply add it there. Otherwise, we need to split the leaf node into two branches and redistribute its existing keys to the newly created child nodes.

To delete a key, we similarly identify the leaf node that contains the deleted key and then remove it there. This may cause some of my parents to have less than B + 1 keys so I may need to merge some child nodes.


Combining Multiple Search Criteria

When we have multiple criteria in the search, each criteria may use a different index that resides within a different set of machines in the DHT. Multiple criterias can be combined using boolean operators such as OR / AND. Performing OR operation is very straightforward because we just need to union the results of each individual index search that is performed separately. On the other hand, performing AND operation is trickier because we need to deal with the situation that each individual criteria may have a large number of matches but their intersection is small. The challenge is: how can we efficiently perform an intersection between two potentially very large sets ?

One naive implementation is to send all matched object ids of each criteria to a server that performs the set intersection. If each data set is large, this approach may cause a large bandwidth consumption for sending across all the potential object ids.

A number of techniques are described here in this paper

1. Bloom Filter
Instead of sending the whole set of matched object id, we can send a more compact representation called "Bloom Filter". Bloom filter is a much more compact representation that can be used for testing set membership. The output has zero false negative, but has a chance of false positive p, which is controllable.


For minimizing bandwidth, we typically pick the one with the larger set as the sending machine and perform the intersection on the receiving machine who has the smaller set.

Notice that the false positive can actually be completely eliminated by sending the matched result of Set2 back to Set1 machine, which double check the membership of set1 again. In most cases, 100% precision is not needed and a small probability of false positive is often acceptable.

2. Caching
It is possible that certain search criteria is very popular and will be issued over and over again. The corresponding bloom filter of this hot spots can be cached in the receiving machine. Since the bloom filter has a small footprint, we can cache a lot of bloom filters of popular search criterias.

3. Incremental fetch
In case if the client doesn't need to get the full set of matched results, we can stream the data back to client using a cursor mode. Basically, at the sending side, set1 is sorted and broken into smaller chunks with a bloom filter computed and attached to each chunk. At the receiving side, every element of set2 is checked for every bloom filter per chunk.

Notice that we save computation at the sending side (compute the bloom filter for the chunk rather than the whole set1) at the cost of doing more at the receiving side (since we need to repeat the checking of the whole set2 for each chunk of set1). The assumption is that client only needs a small subset of all the matched data.

An optimization we can do is to mark the range of each chunk in set1 and ask set2 to skip the objects that falls within the same range.