Showing posts with label parallel processing. Show all posts
Showing posts with label parallel processing. Show all posts

Friday, December 27, 2013

Spark: Low latency, massively parallel processing framework

While Hadoop fits well in most batch processing workload, and is the primary choice of big data processing today, it is not optimized for other types of workload  due to its following limitation
  • Lack of iteration support
  • High latency due to persisting intermediate data onto disk
 For a more detail elaboration of the Hadoop limitation, refer to my previous post.

Nevertheless, the Map/Reduce processing paradigm is a proven mechanism for dealing with large scale data.  On the other hand, many of Hadoop's infrastructure piece such as HDFS, HBase has been mature over time.

In this blog post, we'll look at a different architecture called Spark, which has taken the strength of Hadoop and make improvement in a number of Hadoop's weakness, and provides a more efficient batch processing framework with a much lower latency (from the benchmark result, Spark (using RAM cache) claims to be 100x faster than Hadoop, and 10x faster than Hadoop when running on disk.  Although competing with Hadoop MapRed, Spark integrates well with other parts of Hadoop Ecosystem (such as HDFS, HBase ... etc.).  Spark has generated a lot of excitement in the big data community and represents a very promising parallel execution stack for big data analytics.

Berkeley Spark

Within the Spark cluster, there is a driver program where the application logic execution is started, with multiple workers which processing data in parallel.  Although this is not mandated, data is typically collocated with the worker and partitioned across the same set of machines within the cluster.  During the execution, the driver program will passed code/closure into the worker machine where processing of corresponding partition of data will be conducted.  The data will undergoing different steps of transformation while staying in the same partition as much as possible (to avoid data shuffling across machines).  At the end of the execution, actions will be executed at the worker and result will be returned to the driver program.


Underlying the cluster, there is an important Distributed Data Structure called RDD (Resilient Distributed Dataset), which is a logically centralized entity but physically partitioned across multiple machines inside a cluster based on some notion of key.  Controlling how different RDD are co-partitioned (with the same keys) across machines can reduce inter-machine data shuffling within a cluster.  Spark provides a "partition-by" operator which create a new RDD by redistributing the data in the original RDD across machines within the cluster.



RDD can optionally be cached in RAM and hence providing fast access.  Currently the granularity of caching is done at the RDD level, either the whole or none of the RDD is cached.  Cached is a hint but not a guarantee.  Spark will try to cache the RDD if sufficient memory is available in the cluster, based on LRU (Least Recent Use) eviction algorithm.

RDD provides an abstract data structure from which application logic can be expressed as a sequence of transformation processing, without worrying about the underlying distributed nature of the data.

Typically an application logic are expressed in terms of a sequence of TRANSFORMATION and ACTION.  "Transformation" specifies the processing dependency DAG among RDDs and "Action" specifies what the output will be (ie: the sink node of the DAG with no outgoing edge).  The scheduler will perform a topology sort to determine the execution sequence of the DAG, tracing all the way back to the source nodes, or node that represents a cached RDD.


Notice that dependencies in Spark come in two forms.  "Narrow dependency" means the all partitions of an RDD will be consumed by a single child RDD (but a child RDD is allowed to have multiple parent RDDs).  "Wide dependencies" (e.g. group-by-keys, reduce-by-keys, sort-by-keys) means a parent RDD will be splitted with elements goes to different children RDDs based on their keys.  Notice that RDD with narrow dependencies preserve the key partitioning between parent and child RDD.  Therefore RDD can be co-partitioned with the same keys (parent key range to be a subset of child key range) such that the processing (generating child RDD from parent RDD) can be done within a machine with no data shuffling across network.  On the other hand, RDD will wide dependencies involves data shuffling.


Narrow transformation (involves no data shuffling) includes the following operators
  • Map
  • FlatMap
  • Filter
  • Sample
Wide transformation (involves data shuffling) includes the following operators
  •  SortByKey
  • ReduceByKey
  • GroupByKey
  • CogroupByKey
  • Join
  • Cartesian
Action output the RDD to the external world and includes the following operators
  • Collect
  • Take(n)
  • Reduce
  • ForEach
  • Sample
  • Count
  • Save
The scheduler will examine the type of dependencies and group the narrow dependency RDD into a unit of processing called a stage.  Wide dependencies will span across consecutive stages within the execution and require the number of partition of the child RDD to be explicitly specified.


A typical execution sequence is as follows ...
  1. RDD is created originally from external data sources (e.g. HDFS, Local file ... etc)
  2. RDD undergoes a sequence of TRANSFORMATION (e.g. map, flatMap, filter, groupBy, join), each provide a different RDD that feed into the next transformation.
  3. Finally the last step is an ACTION (e.g. count, collect, save, take), which convert the last RDD into an output to external data sources
The above sequence of processing is called a lineage (outcome of the topological sort of the DAG).  Each RDD produced within the lineage is immutable.  In fact, unless if it is cached, it is used only once to feed the next transformation to produce the next RDD and finally produce some action output.

In a classical distributed system, fault resilience is achieved by replicating data across different machines together with a active monitoring system.  In case of any machine crashes, there is always another copy of data residing in a different machine from where recovery can take place.

Fault resiliency in Spark takes a different approach.  First of all, as a large scale compute cluster, Spark is not meant to be a large scale data cluster at all.  Spark makes two assumptions of its workload.
  • The processing time is finite (although the longer it takes, the cost of recovery after fault will be higher)
  • Data persistence is the responsibility of external data sources, which keeps the data stable within the duration of processing.
Spark has made a tradeoff decision that in case of any data lost during the execution, it will re-execute the previous steps to recover the lost data.  However, this doesn't mean everything done so far is discarded and we need to start from scratch at the beginning.  We just need to re-executed the corresponding partition in the parent RDD which is responsible for generating the lost partitions, in case of narrow dependencies, this resolved to the same machine.

Notice that the re-execution of lost partition is exactly the same as the lazy evaluation of the DAG, which starts from the leaf node of the DAG, tracing back the dependencies on what parent RDD is needed and then eventually track all the way to the source node.  Recomputing the lost partition is done is a similar way, but taking partition as an extra piece of information to determine which parent RDD partition is needed.

However, re-execution across wide dependencies can touch a lot of parent RDD across multiple machines and may cause re-execution of everything. To mitigate this, Spark persist the intermediate data output from a Map phase before it shuffle them to different machines executing the reduce phase.  In case of machine crash, the re-execution (from another surviving machine) just need to trace back to fetch the intermediate data from the corresponding partition of the mapper's persisted output.  Spark also provide a checkpoint API to explicitly persist intermediate RDD so re-execution (when crash) doesn't need to trace all the way back to the beginning.  In future, Spark will perform check-pointing automatically by figuring out a good balance between the latency of recovery and the overhead of check-pointing based on statistical result.

Spark provides a powerful processing framework for building low latency, massively parallel processing for big data analytics.  It supports API around the RDD abstraction with a set of operation for transformation and action for a number of popular programming language like Scala, Java and Python.

In future posts, I'll cover other technologies in the Spark stack including real-time analytics using streaming as well as machine learning frameworks.

Sunday, August 29, 2010

Designing algorithms for Map Reduce

Since the emerging of Hadoop implementation, I have been trying to morph existing algorithms from various areas into the map/reduce model. The result is pretty encouraging and I've found Map/Reduce is applicable in a wide spectrum of application scenarios.

So I want to write down my findings but then found the scope is too broad and also I haven't spent enough time to explore different problem domains. Finally, I realize that there is no way for me to completely cover what Map/Reduce can do in all areas, so I just dump out what I know at this moment over the long weekend when I have an extra day.

Notice that Map/Reduce is good for "data parallelism", which is different from "task parallelism". Here is a description about their difference and a general parallel processing design methodology.

I'll cover the abstract Map/Reduce processing model below. For a detail description of the implementation of Hadoop framework, please refer to my earlier blog here.


Abstract Processing Model
There are no formal definition of the Map/reduce model. Basic on the Hadoop implementation, we can think of it as a "distributed merge-sort engine". The general processing flow is as follows.
  • Input data is "split" into multiple mapper process which executes in parallel
  • The result of the mapper is partitioned by key and locally sorted
  • Result of mapper of the same key will land on the same reducer and consolidated there
  • Merge sorted happens at the reducer so all keys arriving the same reducer is sorted

Within the processing flow, user defined functions can be plugged-in to the framework.
  • map(key1, value1) -> emit(key2, value2)
  • reduce(key2, value2_list) -> emit(key2, aggregated_value2)
  • combine(key2, value2_list) -> emit(key2, combined_value2)
  • partition(key2) return reducerNo
Design the algorithm for map/reduce is about how to morph your problem into a distributed sorting problem and fit your algorithm into the user defined functions of above.

To analyze the complexity of the algorithm, we need to understand the processing cost, especially the cost of network communication in such a highly distributed system.

Lets first consider the communication between Input data split and Mapper. To minimize this overhead, we need to run the mapper logic at the data split (without moving the data). How well we do this depends on how the input data is stored and whether we can run the mapper code there. For HDFS and Cassandra, we can the mapper at the storage node and the scheduler algorithm of JobTracker will assign the mapper to the data split that it collocates with and hence significantly reduce the data movement. Other data store such as Amazon S3 doesn't allow execution of mapper logic at the storage node and therefore incur more data traffic.

The communication between Mapper and Reducer cannot be collocated because it depends on the emit key. The only mechanism available is the combine() function which can perform a local consolidation and hence can reduce the data sent to the reducer.

Finally the communication between the reducer and the output data store depends on the store's implementation. For HDFS, the data is triply replicated and hence the cost of writing can be high. Cassandra (a NOSQL data store) allows configurable latency with various degree of data consistency trade-off. Fortunately, in most case the volume of result data after a Map/Reduce processing is not high.

Now, we see how to fit various different kinds of algorithms into the Map/Reduce model ...


Map-Only
"Embarrassing parallel" problems are those that the same processing is applied in each data element in a pretty independent way, in other words, there is no need to consolidate or aggregate individual results.

These kinds of problem can be expressed as a Map-only job (by specifying the number of reducers to zero). In this case, Mapper's emitted result will directly go to the output format.

Some examples of map-only examples are ...
  • Distributed grep
  • Document format conversion
  • ETL
  • Input data sampling

Sorting
As we described above, Hadoop is fundamentally a distributed sorting engine, so using it for sorting is a natural fit.

For example, we can use an Identity function for both map() and reduce(), then the output is equivalent to sorting the input data. Notice that we are using a single reducer here. So the merge is still sequential although the sorting is done at the mapper in parallel.

We can perform the merge in parallel by using multiple reducers. In this case, output of each reducer are sorted. We may need to do a final merge on all the reducer's output. Another way is to use a customized partition() function such that the keys are partitioned by range. In this case, each reducer is sorting a particular range and the final result is just to concatenate the each reducer's sorted result.
partition(key) {
  range = (KEY_MAX - KEY_MIN) / NUM_OF_REDUCERS
  reducer_no = (key - KEY_MIN) / range
  return reducer_no
}


Inverted Indexes
The map reduce model is originated from Google which has a lot of scenarios of building large scale inverted index. Building an inverted index is about parsing different documents to build a word -> document index for keyword search.

In fact, inverted index is pretty general and can be applied in many scenarios. To build an inverted index, we can feed the mapper each document (or lines within a document). The Mapper will parse the words in the document to emit [word, doc] pairs along with other metadata such as where in the document this word occurs ... etc. The reducer can simply be an identity function that just dump out the list, or it can perform some statistic aggregation per word.

In a more general form of Inverted index, there is a "container" and "element" concept. The Map and Reduce function will be organized in the following patterns.
map(key, container) {
  for each element in container {
      element_meta =
           extract_metadata(element, container)
      emit(element, [container_id, element_meta])
  }
}

reduce(element, container_ids) {
  element_stat =
       compute_stat(container_ids)
  emit(element, [element_stat, container_ids])
}

In Text index, we are not just counting the actual frequency of the terms but also adjust its weighting based on its frequency distribution so common words will have less significance when they appears in the document. The final value after normalization is called TF-IDF (term frequency times inverse document frequency) and can be computed using Map Reduce as well.


Simple Statistics ComputationComputing max, min, count is very straightforward since this operation is commutative and associative. Each mapper will perform the local computation and send the result to a single reducer to do the final computation.

Combine function is typically used to reduce the network traffic. Notice that the input to the combine function must look the same as the input to the reducer function and the output of the combine function must look the same as the output of the map function. There is also no guarantee that the combiner function will be invoked at all.

class Mapper {
  buffer

  map(key, number) {
      buffer.append(number)
      if (buffer.is_full) {
          max = compute_max(buffer)
          emit(1, max)
      }
  }
}


class Reducer {
  reduce(key, list_of_local_max) {
      global_max = 0
      for local_max in list_of_local_max {
          if local_max > global_max {
              global_max = local_max
          }
      }        emit(1, global_max)
  }
}


class Combiner {
  combine(key, list_of_local_max) {
      local_max = maximum(list_of_local_max)
      emit(1, local_max)
  }
}
Computing avg is done in a similar way except that instead of computing the local avg, we compute the local sum and local count. The reducer will do the final sum divided by the final count to come up with the final avg.

Computing a histogram is pretty common in statistics and can give a quick idea about the data distribution. A typical approach is to divide the number into different intervals. The mapper will compute the count per interval, and emit that per interval and the reducer will compute the sum of that interval.
class Mapper {
  interval_start = [0, 20, 40, 60, 80]

  map(key, number) {
      i = 0;
      while (i < NO_OF_INTERVALS) {
          if (number < interval_start[i]) {
              emit(i, 1)
              break
          }
      }
  }
}


class Reducer {
  reduce(interval, counts) {
      total_counts = 0
      for each count in counts {
          total_counts += count
      }
      emit(interval, total_counts)
  }
}


class Combiner {
  combine(interval, occurrence) {
      emit(interval, occurrence.size)
  }
}
Notice that a non-uniform distribution of values across intervals may cause an unbalanced workload among reducers and hence undermine the degree of parallelism. We'll address this in the later part of this post.


In-Mapper Combine
Jimmy Lin, in his excellent book, talks about a technique call "in-mapper combine" which regains control at the application level when the combine takes place. The general idea is to maintain a HashMap to buffer the intermediate result and has a separate logic to determine when to actually emit the data from the buffer. The general code structure is as follows ...
class Mapper {
  buffer

  init() {
      buffer = HashMap.new
  }

  map(key, data) {
      elements = process(data)
      for each element {
          ....
          check_and_put(buffer, k2, v2)
      }
  }

  check_and_put(buffer, k2, v2) {
      if buffer.full {
          for each k2 in buffer.keys {
              emit(k2, buffer[k2])
          }
      }
  }

  close() {        for each k2 in buffer.keys {
          emit(k2, buffer[k2])
      }    }
}

SQL Model
The SQL model can be used to extract data from the data source. It contains a number of primitives.

Projection / Filter
This logic is typically implemented in the Mapper
  • result = SELECT c1, c2, c3, c4 FROM source WHERE conditions
Aggregation / Group by / Having
This logic is typically implemented in the Reducer
  • SELECT sum(c3) as s1, avg(c4) as s2 ... FROM result GROUP BY c1, c2 HAVING conditions
The above example can be realized by the following map/reduce job
class Mapper {
  map(k, rec) {
      select_fields =
          [rec.c1, rec.c2, rec.c3, rec.c4]
      group_fields =
          [rec.c1, rec.c2]
      if (filter_condition == true) {
          emit(group_fields, select_fields)
      }
  }
}

class Reducer {
  reduce(group_fields, list_of_rec) {
      s1 = 0
      s2 = 0
      for each rec in list_of_rec {
          s1 += rec.c3
          s2 += rec.c4
      }
      s2 = s2 / rec.size
      if (having_condition == true) {
          emit(group_fields, [s1, s2])
      }
  }
}

Data Joins
Joining 2 data set is a very common operation in Relational Data Model and has been very mature in RDBMS implementation. The common join mechanism in a centralized DB architecture is as follows
  1. Nested loop join -- This is the most basic and naive mechanism and is organized as two loops. The outer loop reads from data set1, the inner loop scan through the whole data set2 and compare with the records just read from data set1.
  2. Indexed join -- An index (e.g. B-Tree index) is built for one of the data sets (say data set2 which is the smaller one). The join will scan through data set1 and lookup the index to find the matched records of data set2.
  3. Merge join -- Pre-sort both data sets so they are arranged physically in increasing order. The join is realized by just merging the two data sets. a) Locate the first record in both data set1 & set2, which is their corresponding minimum key b) In the one with a smaller minimum key (say data set1), keep scanning until finding the next key which is bigger than the minimum key of the other data set (ie. data set2), call this the next minimum key of data set1. c) Switch position and repeat the whole thing until one of the data set is exhausted.
  4. Hash / Partition join -- Partition the data set1 and data set2 into smaller size and apply other join algorithm in a smaller data set size. A linear scan with a hash() function is typically performed to partition the data sets such that data in set1 and data in set2 with the same key will land on the same partition.
  5. Semi join -- This is mainly used to join two sets of data that is stored at different locations and the goal is to reduce the amount of data transfer such that only the full records appears in the final joint result will be send through. a) Data set2 will send its key set to machine holding Data set1. b) Machine holding Data set1 will do a join and send back the records in Data set1 that matches one of the send-over keys. c) The machine holding data set2 will do a final join to the data send back.
In the map reduce environment, it has the corresponding joins.

General reducer-side join
This is the most basic one, records from data set1 and set2 with the same key will land on the same reducer, which will then do a cartesian product. The downside of this model is that the reducer need to have enough memory to hold all records of each key.
map(k1, rec) {
  emit(rec.key, [rec.type, rec])
}

reduce(k2, list_of_rec) {
  list_of_typeA = []
  list_of_typeB = []
  for each rec in list_of_rec {
      if (rec.type == 'A') {
          list_of_typeA.append(rec)
      } else {
          list_of_typeB.append(rec)
      }
  }

  # Compute the catesian product
  products = []
  for recA in list_of_typeA {
      for recB in list_of_typeB {
          emit(k2, [recA, recB])
      }
  }
}

Optimized reducer-side join
You can "secondary sort" the data type for each key by defining a customized partition function. In this model, you arrange the data type (which has less records per key to arrive first) and you only need to store these types.
map(k1, rec) {
  emit([rec.key, rec.type], rec])
}

partition(key_pair) {
  super.partition(key_pair[0])
}

reduce(k2, list_of_rec) {
  list_of_typeA = []
  for each rec in list_of_rec {
      if (rec.type == 'A') {
          list_of_typeA.append(rec)
      } else { # receive records of typeA
          for recA in list_of_typeA {
              emit(k2, [recA, rec])
          }
      }
  }
}

While being very flexible, the downside of Reducer side join is that all data need to be transfer from the mapper to the reducer and then result write to HDFS. Map-side join explore some special arrangement of the input file such that the join is being perform at the mapper. The advantage of doing in the mapper is that we can exploit the collocation of the Map reduce framework such that the mapper will be allocated an input split in its local machine, hence reduce the data transfer from the disk to the mapper. After the map-side join, the result is written directly to the output HDFS files and hence eliminate the data transfer between the mapper and the reducer.

Map-side partition join
In this model, it requires the 2 data sets to be partitioned into 2 sets of partition files (same number of partitions for each set). The size of the partition is such that it can fit into the memory of the Mapper machine. We also need to configure the Map/Reduce job such that there is no split in the partition file, in other words, the whole partition is assigned to a mapper task.

The mapper will detect the partition of the input file and then read the corresponding partition file of the other data set into an in-memory hashtable. After that, the mapper will lookup the Hashtable to do the join.
class Mapper {
  map = Hashtable.new

  init() {
      partition = detect_input_filename()
      map = load("hdfs://dataset2/" + partition)
  }

  map(k1, rec1) {
      rec2 = map[rec1.key]
      if (rec2 != nil) {
          emit(rec1.key, [rec1, rec2])
      }
  }
}

Map-side partition merge join
In additional, if the partition file is also sorted, then the mapper can use a merge join, which has an even smaller memory footprint.
class Mapper {
  rec2_key = nil
  next_rec2 = nil
  list_of_rec2 = []
  file = nil

  init() {
      partition = detect_input_filename()
      file = open("hdfs://dataset2/" + partition, "r")
      next_rec2 = file.read()
      fill_rec2_list()
  }

  # Fill up the list of rec2 list which has the same key
  fill_rec2_list() {
      rec2_key = next_rec2.key
      list_of_rec2.append(next_rec2)
      next_rec2 = file.read
      while(next_rec2.key == key) {
          list_of_rec2.append(next_rec2)
      }
  }

  map(k1, rec1) {
      while (rec1.key > rec2_key) {
          fill_rec2_list()
      }
        while (rec1.key == rec2.key) {
          for rec2 in list_of_rec2 {
              emit(rec1.key, [rec1, rec2])
          }
      }
    }
}

Memcache join
The model is very straightforward, the second data set is loaded into a distributed hash table (like memcache) which has effectively unlimited size. The mapper will receive input split from the first data set and then lookup the memcache for the corresponding record of the other data set.

There are also some other more sophisticated join mechanism such as semi-join described in this paper.

Graph Algorithms
Many problems can be modeled as a graph of Node and Edges. In the Search engine environment, computing the rank of a document using Page Rank or Hits can be model as a sequence of iterations of Map/Reduce jobs.

In the past, I have been blog a number of very basic graph algorithms in map reduce including doing topological sort, finding shortest path, minimum spanning tree etc. and also how to recommend people connection using Map/Reduce.

Due to the fact that graph traversal is inherently sequential, I am not sure Map/Reduce is the best parallel processing model for graph processing. Another problem is that due to the "stateless nature" of map() and reduce() functions, the whole graph need to be transferred between mapper and reducer which incur significant communication costs. Jimmy Lin has described a clever technique called Shimmy which exploit using a special partitioning function which let the reducer to retain the ownership of nodes across map/reduce jobs. I have described this technique as well as a general model of Map/Reduce graph processing in a previous blog.

I think a parallel programming model specific for Graph processing will perform much better. Google's Pregel model is a good example of that.


Machine Learning
Many of the machine learning algorithm involve multiple iterations of parallel processing that fits very well into Map/Reduce model.

For example, we can use map reduce to calculate the statistics for probabilistic methods such as naive Bayes.

A simple example of computing K-Means cluster can also be done in the following way.
  • Input: A set of points, with k initial centrods
  • Output: K final centroids
Iterate until no more change of membership
  1. For each point, assign it to be the member of closest centroid
  2. Re-compute the centroid from the assigned point members


For a complete list of Machine learning algorithms and how they can be implemented using the Map/Reduce model, here is a very good paper.


Matrix arithmetic
A lot of real-life relationships can be represented as a Matrix. One example is the vector space model of Information Retrieval where the column represents docs and the row represents terms. Another example is the social network graph where the column as well as the row representing people and a binary value of each cell to represent a "friend" relationship. In this case, M + M.M represents all the people that I can reach within 2 degree.

Processing for dense matrix is very easy to parallelized. But since the sequential version is O(N^3), it is not that interesting for Matrix with large size (millions range in rows and columns).

A lot of real-world graph problem can be represented as sparse matrix. So my interests is to focus more in the processing of sparse matrix. I don't have much to share at this moment but I hope this is something I will blog about in future.

Thursday, November 5, 2009

What Hadoop is good at

Hadoop is getting more popular these days. Lets look at what it is good at and what not.

The Map/Reduce Programming model
Map/Reduce offers a different programming model for handling concurrency than the traditional multi-thread model.

Multi-thread programming model allows multiple processing units (with different execution logic) to access the shared set of data. To maintain data integrity, each processing units co-ordinate their access to the shared data by using Locks, Semaphores. Problem such as "race condition", "deadlocks" can easily happen but hard to debug. This makes multi-thread programming difficult to write and hard to maintain. (Java provides a concurrent library package to ease the development of multi-thread programming)

Data-driven programming model feeds data into different processing units (with same or different execution logic). Execution is triggered by arrival of data. Since processing units can only access data piped to them, data sharing between processing units is prohibited upfront. Because of this, there is no need to co-ordinate access to data.

This doesn't mean there is no co-ordination for data access. We should think of the co-ordination is done explicitly by the graph. ie: by defining how the nodes (processing units) are connected to each other via data pipes.

Map-Reduce programming model is a specialized form of data-driven programming model where the graph is defined as a "sequential" list of MapReduce jobs. Within each Map/Reduce job, execution is broken down into a "map" phase and a "reduce" phase. In the map phase, each data split is processed and one or multiple output is produced with a key attached. This key is used to route the outputs (of the Map phase) to the second "reduce" phase, where data with the same key is collected and processed in an aggregated way.

Note that in a Map/Reduce model, parallelism happens only within a Job and execution between jobs are done in a sequential manner. As different jobs may access the same set of data, knowing that jobs is executed serially eliminate the needs of coordinating data access between jobs.

Design application to run in Hadoop is a matter of breaking down the algorithm in a number of sequential jobs and then exploit data parallelism within each job. Not all algorithms can fit in to the Map Reduce model. For a more general approach to break down an algorithm into parallel, please visit here.

Characteristics of Hadoop Processing
A detail explanation of Hadoop implementation can be found here. Basically Hadoop has the following characteristics ...
  • Hadoop is "data-parallel", but "process-sequential". Within a job, parallelism happens within a map phase as well as a reduce phase. But these two phases cannot run in parallel, the reduce phase cannot be started until the map phase is fully completed.
  • All data being accessed by the map process need to be freezed (update cannot happen) until the whole job is completed. This means Hadoop processes data in chunks using a batch-oriented fashion, making it not very suitable for stream-based processing where data flows in continuously and immediate processing is needed.
  • Data communication happens via a distributed file system (HDFS). Latency is introduced as extensive network I/O is involved in moving data around (ie: Need to write 3 copies of data synchronously). This latency is not an issue for batch-oriented processing where throughput is the primary factor. But this means Hadoop is not suitable for online access where low latency is critical.
Given the above characteristics, Hadoop is NOT good at the following ...
  • Perform online data access where low latency is critical (Hadoop can be used together with HBase or NOSQL store to deliver low latency query response)
  • Perform random ad/hoc processing of a small subset of data within a large data set (Hadoop is designed to scan all data in parallel)
  • Process small data volume (for data volume less than hundred GB range, many more mature solutions exist)
  • Perform real-time, stream-based processing where data is arrived continuously and immediate processing is needed (to keep the overhead small enough, typically data need to be batched for at least 30 minutes, which you won't be able to see the current data until 30 minutes has passed)

Tuesday, November 25, 2008

Hadoop Map/Reduce Implementation

In my previous post, I talk about the methodology of transforming a sequential algorithm into parallel. After that, we can implement the parallel algorithm, one of the popular framework we can use is the Apache Opensource Hadoop Map/Reduce framework.

Functional Programming

Multithreading is one of the popular way of doing parallel programming, but major complexity of multi-thread programming is to co-ordinate the access of each thread to the shared data. We need things like semaphores, locks, and also use them with great care, otherwise dead locks will result.

If we can eliminate the shared state completely, then the complexity of co-ordination will disappear. This is the fundamental concept of functional programming. Data is explicitly passed between functions as parameters or return values which can only be changed by the active function at that moment. Imagine functions are connected to each other via a directed acyclic graph. Since there is no hidden dependency (via shared state), functions in the DAG can run anywhere in parallel as long as one is not an ancestor of the other. In other words, analyze the parallelism is much easier when there is no hidden dependency from shared state.

Map/Reduce functions

Map/reduce is a special form of such a DAG which is applicable in a wide range of use cases. It is organized as a “map” function which transform a piece of data into some number of key/value pairs. Each of these elements will then be sorted by their key and reach to the same node, where a “reduce” function is use to merge the values (of the same key) into a single result.


map(input_record) {
…
emit(k1, v1)
…
emit(k2, v2)
…
}

reduce (key, values) {
aggregate = initialize()
while (values.has_next) {
    aggregate = merge(values.next)
}
collect(key, aggregate)
}

The Map/Reduce DAG is organized in this way.



A parallel algorithm is usually structure as multiple rounds of Map/Reduce




Distributed File Systems

The distributed file system is designed to handle large files (multi-GB) with sequential read/write operation. Each file is broken into chunks, and stored across multiple data nodes as local OS files.



There is a master “NameNode” to keep track of overall file directory structure and the placement of chunks. This NameNode is the central control point and may re-distributed replicas as needed.

To read a file, the client API will calculate the chunk index based on the offset of the file pointer and make a request to the NameNode. The NameNode will reply which DataNodes has a copy of that chunk. From this points, the client contacts the DataNode directly without going through the NameNode.

To write a file, client API will first contact the NameNode who will designate one of the replica as the primary (by granting it a lease). The response of the NameNode contains who is the primary and who are the secondary replicas. Then the client push its changes to all DataNodes in any order, but this change is stored in a buffer of each DataNode. After changes are buffered at all DataNodes, the client send a “commit” request to the primary, which determines an order to update and then push this order to all other secondaries. After all secondaries complete the commit, the primary will response to the client about the success.

All changes of chunk distribution and metadata changes will be written to an operation log file at the NameNode. This log file maintain an order list of operation which is important for the NameNode to recover its view after a crash. The NameNode also maintain its persistent state by regularly check-pointing to a file.

In case of the NameNode crash, all lease granting operation will fail and so any write operation is effectively fail also. Read operation should continuously to work as long as the clinet program has a handle to the DataNode. To recover from NameNode crash, a new NameNode can take over after restoring the state from the last checkpoint file and replay the operation log.

When a DataNode crashes, it will be detected by the NameNode after missing its hearbeat for a while. The NameNode removes the crashed DataNode from the cluster and spread its chunks to other surviving DataNodes. This way, the replication factor of each chunk will be maintained across the cluster.

Later when the DataNode recover and rejoin the cluster, it reports all its chunks to the NameNode at boot time. Each chunk has a version number which will advanced at each update. Therefore, the NameNode can easily figure out if any of the chunks of a DataNode becomes stale. Those stale chunks will be garbage collected at a later time.


Job Execution

Hadoop MapRed is based on a “pull” model where multiple “TaskTrackers” poll the “JobTracker” for tasks (either map task or reduce task).

The job execution starts when the client program uploading three files: “job.xml” (the job config including map, combine, reduce function and input/output data path, etc.), “job.split” (specifies how many splits and range based on dividing files into ~16 – 64 MB size), “job.jar” (the actual Mapper and Reducer implementation classes) to the HDFS location (specified by the “mapred.system.dir” property in the “hadoop-default.conf” file). Then the client program notifies the JobTracker about the Job submission. The JobTracker returns a Job id to the client program and starts allocating map tasks to the idle TaskTrackers when they poll for tasks.




Each TaskTracker has a defined number of "task slots" based on the capacity of the machine. There are heartbeat protocol allows the JobTracker to know how many free slots from each TaskTracker. The JobTracker will determine appropriate jobs for the TaskTrackers based on how busy thay are, their network proximity to the data sources (preferring same node, then same rack, then same network switch). The assigned TaskTrackers will fork a MapTask (separate JVM process) to execute the map phase processing. The MapTask extracts the input data from the splits by using the “RecordReader” and “InputFormat” and it invokes the user provided “map” function which emits a number of key/value pair in the memory buffer.

When the buffer is full, the output collector will spill the memory buffer into disk. For optimizing the network bandwidth, an optional “combine” function can be invoked to partially reduce values of each key. Afterwards, the “partition” function is invoked on each key to calculate its reducer node index. The memory buffer is eventually flushed into 2 files, the first index file contains an offset pointer of each partition. The second data file contains all records sorted by partition and then by key.

When the map task has finished executing all input records, it start the commit process, it first flush the in-memory buffer (even it is not full) to the index + data file pair. Then a merge sort for all index + data file pairs will be performed to create a single index + data file pair.

The index + data file pair will then be splitted into are R local directories, one for each partition. After all the MapTask completes (all splits are done), the TaskTracker will notify the JobTracker which keeps track of the overall progress of job. JobTracker also provide a web interface for viewing the job status.

When the JobTracker notices that some map tasks are completed, it will start allocating reduce tasks to subsequent polling TaskTrackers (there are R TaskTrackers will be allocated for reduce task). These allocated TaskTrackers remotely download the region files (according to the assigned reducer index) from the completed map phase nodes and concatenate (merge sort) them into a single file. Whenever more map tasks are completed afterwards, JobTracker will notify these allocated TaskTrackers to download more region files (merge with previous file). In this manner, downloading region files are interleaved with the map task progress. The reduce phase is not started at this moment yet.

Eventually all the map tasks are completed. The JobTracker then notifies all the allocated TaskTrackers to proceed to the reduce phase. Each allocated TaskTracker will fork a ReduceTask (separate JVM) to read the downloaded file (which is already sorted by key) and invoke the “reduce” function, which collects the key/aggregatedValue into the final output file (one per reducer node). Note that each reduce task (and map task as well) is single-threaded. And this thread will invoke the reduce(key, values) function in assending (or descending) order of the keys assigned to this reduce task. This provides an interesting property that all entries written by the reduce() function is sorted in increasing order. The output of each reducer is written to a temp output file in HDFS. When the reducer finishes processing all keys, the temp output file will be renamed atomically to its final output filename.

The Map/Reduce framework is resilient to crashes of any components. TaskTracker nodes periodically report their status to the JobTracker which keeps track of the overall job progress. If the JobTracker hasn’t heard from any TaskTracker nodes for a long time, it assumes the TaskTracker node has been crashed and will reassign its tasks appropriately to other TaskTracker nodes. Since the map phase result is stored in the local disk, which will not be available when the TaskTracker node crashes. In case a map-phase TaskTracker node crashes, the crashed MapTasks (regardless of whether it is complete or not) will be reassigned to a different TaskTracker node, which will rerun all the assigned splits. However, the reduce phase result is stored in HDFS, which is available even the TaskTracker node crashes. Therefore, in case a reduce-phase TaskTracker node crashes, only the incomplete ReduceTasks need to be reassigned to a different TaskTracker node, where the incompleted reduce tasks will be re-run.

The job submission process is asynchronous. Client program can poll for the job status at any time by supplying the job id.

Tuesday, June 10, 2008

Exploring Erlang with Map/Reduce

Under the category of "Concurrent Oriented Programming", Erlang has got some good attention recently due to some declared success from Facebook engineers of using Erlang in large scale applications. Tempted to figure out the underlying ingredients of Erlang, I decided to spent some time to learn the language.


Multi-threading Problem

Multiple threads of execution is a common programming model in modern languages because it enable a more efficient use of computing resources provided by multi-core and multi-machine architecture. One of question to be answered though, is how these parallel threads of execution interact and work co-operative to solve the application problem.

There are basically two models for communication between concurrent executions. One is based on a "Shared Memory" model which one thread of execution write the information into a shared place where other threads will read from. Java's thread model is based on such a "shared memory" semantics. The typical problem of this model is that concurrent update requires very sophisticated protection scheme, otherwise uncoordinated access can result in inconsistent data.

Unfortunately, this protection scheme is very hard to analyze once there are multiple threads start to interact in combinatorial explosion number of different ways. Hard to debug deadlock problem are frequently pop up. To reduce the complexity, using a coarse grain locking model is usually recommended but this may reduce the concurrency.

Erlang has picked the other model based on "message passing". In this model, any information that needs to be shared will be "copied" into a message and send to other executions. In this model, each thread of execution has its state "completely local" (not viewable by other thread of executions). Their local state is updated when they learn what is going on in other threads by receiving their messages. This model mirrors how people in real life interact with each other.


Erlang Sequential Processing

Coming from an object oriented imperative programming background, there are a couple of things I need to unlearn/learn in Erlang.

Erlang is a functional programming language and have no OO concepts. Erlang code is structured as "function" at a basic unit, grouped under a "module". Each "function" takes a number of inputs parameters and produce an output value. Like many functional programming language, Erlang encourage the use of "pure function" which is "side-effect-free" and "deterministic". "Side-effect-free" means there is no state changes within the execution of the function. "Deterministic" means the same output will always be produced from the same input.

Erlang has a very different concept in variable assignment in that all variables in Erlang is immutable. In other words, every variable can only be assigned once and from then onwards can never be changed. So I cannot do X = X + 1, and I have to use a new variable and assigned it with the changed value, e.g. Y = X + 1. This "immutability" characteristic simplify debugging a lot because I don't need to worry about how the value of X is changed at different point of execution (it simply won't change).

Another uncommon thing about Erlang is that there is no "while loop" construct in the language. To achieve the looping effect, you need to code the function in a recursive way, basically putting a terminal clause to check for the exit condition, as well as carefully structure the logic in a tail recursion fashion. Otherwise, you may run out of memory in case the stack grow too much. Tail recursion function means the function either returns a value (but not an expression) or a recursive function call. Erlang is smart enough to do tail recursion across multiple functions, such as if funcA calls funcB, which calls funcC, which call funcA. Tail recursion is especially important in writing server daemon which typically make a self recursive call after process a request.


Erlang Parallel Processing

The execution thread in Erlang is called a "Process". Don't be confused with OS-level processes, Erlang process is extremely light-weight, much lighter than Java threads. A process is created by a spawn(Node, Module, Function, Arguments) function call and it terminates when that function is return.

Erlang processes communicate with each other by passing messages. Process ids are used by the sender to specify the recipient addresses. The send call happens asynchronously and returns immediately. The receiving process will make a synchronous receive call and specify a number of matching patterns. Arriving messages that match the pattern will be delivered to the receiving process, otherwise it will stay in the queue forever. Therefore, it is good practices to have a match all pattern to clean up garbage message. The receive call also accepts a timeout parameter so that it will return if no matched messages happen within the timeout period.

Error handling in Erlang is also quite different from other programming languages. Although Erlang provides a try/catch model, it is not the preferred approach. Instead of catching the error and handle it within the local process, the process should simply die and let another process to take care of what should be done after its crash. Erlang have the concept of having processes "linked" to each other and monitor the life status among themselves. In a default setting, a dying process will propagate an exit signal to all the processes it links to (links are bi-directional). So there is a chaining effect that when one process die, the whole chain of processes will die. However, a process can redefine its behavior after receiving the exit signal. Instead of "dying", a process can choose to handle the error (perhaps by restarting the dead process).


Other Erlang Features
Pattern matching is a common programming construct in many places of Erlang, namely "Function calls", "Variable assignment", "Case statements" and "Receive messages". It takes some time to get used to this style. After that I feel this construct to be very powerful.

Another cool feature that Erlang provides is the code hot swap. By specifying the module name when making the function call, a running Erlang process can execute the latest code without restarting itself. This is a powerful features for code evolution because you don't need to shutdown the VM when deploying new code.

Since the function itself can be passed as a message to a remote process, execute code remotely is extremely easy in Erlang. The problem of installation, deployment is pretty much non-existent in Erlang

Map/Reduce using Erlang

After learning the basic concepts, my next step is to search for a problem and get some hands on with the language. Based on a work-partition, aggregation, parallel processing model, Map/Reduce seems to have the characteristic model that aligns very nicely into Erlang's parallel processing model. So I pick my project to implement a simple Map/Reduce framework in Erlang.

Here is the Erlang implementation ...




First of all, I need some Helper functions

-module(mapreduce).
-export([reduce_task/2, map_task/2,
        test_reduce_task/0, test_map_reduce/0,
        repeat_exec/2]).

%%% Execute the function N times,
%%%   and put the result into a list
repeat_exec(N,Func) ->
 lists:map(Func, lists:seq(0, N-1)).
 

%%% Identify the reducer process by
%%%   using the hashcode of the key
find_reducer(Processes, Key) ->
 Index = erlang:phash(Key, length(Processes)),
 lists:nth(Index, Processes).

%%% Identify the mapper process by random
find_mapper(Processes) ->
 case random:uniform(length(Processes)) of
   0 ->
     find_mapper(Processes);
   N ->
     lists:nth(N, Processes)
 end.

%%% Collect result synchronously from
%%%   a reducer process
collect(Reduce_proc) ->
 Reduce_proc ! {collect, self()},
 receive
   {result, Result} ->
     Result
 end.


Main function
The MapReduce() function is the entry point of the system.
  1. It first starts all the R number of Reducer processes
  2. It starts all the M number of Mapper processes, passing them the R reducer processes ids
  3. For each line of input data, it randomly pick one of the M mapper processes and send the line to it
  4. Wait until the completion has finished
  5. Collect result from the R reducer processes
  6. Return the collected result
The corresponding Erlang code is as follows ...
%%% The entry point of the map/reduce framework
map_reduce(M, R, Map_func,
          Reduce_func, Acc0, List) ->

 %% Start all the reducer processes
 Reduce_processes =
   repeat_exec(R,
     fun(_) ->
       spawn(mapreduce, reduce_task,
             [Acc0, Reduce_func])
     end),

 io:format("Reduce processes ~w are started~n",
           [Reduce_processes]),

 %% Start all mapper processes
 Map_processes =
   repeat_exec(M,
     fun(_) ->
       spawn(mapreduce, map_task,
             [Reduce_processes, Map_func])
     end),

 io:format("Map processes ~w are started~n",
           [Map_processes]),

 %% Send the data to the mapper processes
 Extract_func =
   fun(N) ->
     Extracted_line = lists:nth(N+1, List),
     Map_proc = find_mapper(Map_processes),
     io:format("Send ~w to map process ~w~n",
               [Extracted_line, Map_proc]),
     Map_proc ! {map, Extracted_line}
   end,

 repeat_exec(length(List), Extract_func),

 timer:sleep(2000),

 %% Collect the result from all reducer processes
 io:format("Collect all data from reduce processes~n"),
 All_results =
   repeat_exec(length(Reduce_processes),
     fun(N) ->
       collect(lists:nth(N+1, Reduce_processes))
     end),
 lists:flatten(All_results).


Map Process

The Map processes, once started, will perform the following ...
  1. Receive the input line
  2. Execute the User provided Map function to turn into a list of key, value pairs
  3. For each key and value, select a reducer process and send the key, value to it
The corresponding Erlang code will be as follows ...

%%% The mapper process
map_task(Reduce_processes, MapFun) ->
 receive
   {map, Data} ->
     IntermediateResults = MapFun(Data),
     io:format("Map function produce: ~w~n",
               [IntermediateResults ]),
     lists:foreach(
       fun({K, V}) ->
         Reducer_proc =
           find_reducer(Reduce_processes, K),
         Reducer_proc ! {reduce, {K, V}}
       end, IntermediateResults),

     map_task(Reduce_processes, MapFun)
 end.


Reduce Process
On the other hand, the reducer processes will execute as follows ...
  1. Receive the key, value from the Mapper process
  2. Get the current accumulated value by the key. If no accumulated value is found, use the initial accumulated value
  3. Invoke the user provided reduce function to calculate the new accumulated value
  4. Store the new accumulated value under the key

The corresponding Erlang code will be as follows ...

%%% The reducer process
reduce_task(Acc0, ReduceFun) ->
 receive
   {reduce, {K, V}} ->
     Acc = case get(K) of
             undefined ->
               Acc0;
             Current_acc ->
               Current_acc
           end,
     put(K, ReduceFun(V, Acc)),
     reduce_task(Acc0, ReduceFun);
   {collect, PPid} ->
     PPid ! {result, get()},
     reduce_task(Acc0, ReduceFun)
 end.

Word Count Example
To test the Map/Reduce framework using a word count example ...

%%% Testing of Map reduce using word count
test_map_reduce() ->
 M_func = fun(Line) ->
            lists:map(
              fun(Word) ->
                {Word, 1}
              end, Line)
          end,

 R_func = fun(V1, Acc) ->
            Acc + V1
          end,

 map_reduce(3, 5, M_func, R_func, 0,
            [[this, is, a, boy],
             [this, is, a, girl],
             [this, is, lovely, boy]]).

This is the result when execute the test program.

Erlang (BEAM) emulator version 5.6.1 [smp:2] [async-threads:0]

Eshell V5.6.1  (abort with ^G)
1> c (mapreduce).
{ok,mapreduce}
2>
2> mapreduce:test_map_reduce().
Reduce processes [<0.37.0>,<0.38.0>,<0.39.0>,<0.40.0>,<0.41.0>] are started
Map processes [<0.42.0>,<0.43.0>,<0.44.0>] are started
Send [this,is,a,boy] to map process <0.42.0>
Send [this,is,a,girl] to map process <0.43.0>
Map function produce: [{this,1},{is,1},{a,1},{boy,1}]
Send [this,is,lovely,boy] to map process <0.44.0>
Map function produce: [{this,1},{is,1},{a,1},{girl,1}]
Map function produce: [{this,1},{is,1},{lovely,1},{boy,1}]
Collect all data from reduce processes
[{is,3},{this,3},{boy,2},{girl,1},{a,2},{lovely,1}]
3>


The complete Erlang code is attached here ...

-module(mapreduce).
-export([reduce_task/2, map_task/2,
        test_reduce_task/0, test_map_reduce/0,
        repeat_exec/2]).

%%% Execute the function N times,
%%%   and put the result into a list
repeat_exec(N,Func) ->
 lists:map(Func, lists:seq(0, N-1)).
 

%%% Identify the reducer process by
%%%   using the hashcode of the key
find_reducer(Processes, Key) ->
 Index = erlang:phash(Key, length(Processes)),
 lists:nth(Index, Processes).

%%% Identify the mapper process by random
find_mapper(Processes) ->
 case random:uniform(length(Processes)) of
   0 ->
     find_mapper(Processes);
   N ->
     lists:nth(N, Processes)
 end.

%%% Collect result synchronously from
%%%   a reducer process
collect(Reduce_proc) ->
 Reduce_proc ! {collect, self()},
 receive
   {result, Result} ->
     Result
 end.


%%% The reducer process
reduce_task(Acc0, ReduceFun) ->
 receive
   {reduce, {K, V}} ->
     Acc = case get(K) of
             undefined ->
               Acc0;
             Current_acc ->
               Current_acc
           end,
     put(K, ReduceFun(V, Acc)),
     reduce_task(Acc0, ReduceFun);
   {collect, PPid} ->
     PPid ! {result, get()},
     reduce_task(Acc0, ReduceFun)
 end.

%%% The mapper process
map_task(Reduce_processes, MapFun) ->
 receive
   {map, Data} ->
     IntermediateResults = MapFun(Data),
     io:format("Map function produce: ~w~n",
               [IntermediateResults ]),
     lists:foreach(
       fun({K, V}) ->
         Reducer_proc =
           find_reducer(Reduce_processes, K),
         Reducer_proc ! {reduce, {K, V}}
       end, IntermediateResults),

     map_task(Reduce_processes, MapFun)
 end.


%%% The entry point of the map/reduce framework
map_reduce(M, R, Map_func,
          Reduce_func, Acc0, List) ->

 %% Start all the reducer processes
 Reduce_processes =
   repeat_exec(R,
     fun(_) ->
       spawn(mapreduce, reduce_task,
             [Acc0, Reduce_func])
     end),

 io:format("Reduce processes ~w are started~n",
           [Reduce_processes]),

 %% Start all mapper processes
 Map_processes =
   repeat_exec(M,
     fun(_) ->
       spawn(mapreduce, map_task,
             [Reduce_processes, Map_func])
     end),

 io:format("Map processes ~w are started~n",
           [Map_processes]),

 %% Send the data to the mapper processes
 Extract_func =
   fun(N) ->
     Extracted_line = lists:nth(N+1, List),
     Map_proc = find_mapper(Map_processes),
     io:format("Send ~w to map process ~w~n",
               [Extracted_line, Map_proc]),
     Map_proc ! {map, Extracted_line}
   end,

 repeat_exec(length(List), Extract_func),

 timer:sleep(2000),

 %% Collect the result from all reducer processes
 io:format("Collect all data from reduce processes~n"),
 All_results =
   repeat_exec(length(Reduce_processes),
     fun(N) ->
       collect(lists:nth(N+1, Reduce_processes))
     end),
 lists:flatten(All_results).

%%% Testing of Map reduce using word count
test_map_reduce() ->
 M_func = fun(Line) ->
            lists:map(
              fun(Word) ->
                {Word, 1}
              end, Line)
          end,

 R_func = fun(V1, Acc) ->
            Acc + V1
          end,

 map_reduce(3, 5, M_func, R_func, 0,
            [[this, is, a, boy],
             [this, is, a, girl],
             [this, is, lovely, boy]]).
  


Summary

From this exercise of implementing a simple Map/Reduce model using Erlang, I found that Erlang is very powerful in developing distributed systems.

Sunday, May 25, 2008

Parallel data processing language for Map/Reduce

In my previous post, I introduce Map/Reduce model as a powerful model for parallelism. However, although Map/Reduce is simple, powerful and provide a good opportunity to parallelize algorithm, it is based on a rigid procedural structure that require injection of custom user code and therefore it not easy to understand the big picture from a high level. You need to drill into the implementation code of the map and reduce function in order to figure out what is going on.

It will be desirable to have a higher level declarative language that describe the parallel data processing model. This is similar to the idea of SQL query where the user specify the "what" and leave the "how" to the underlying processing engine. In this post, we will explore the possibility of such a declarative language. We will start from the Map/Reduce model and see how it can be generalized into a "Parallel data processing model".

Lets revisit Map/Reduce in a more abstract sense.

The Map/Reduce processing model composes of the following steps ...
  • From many distributed data store, InputReader extract out data tuples A = <a1,a2,...> and feed them randomly into the many Map tasks.
  • For each tuple A, the Map task emit zero to many tuples A'
  • The output A' will be sorted by its key, A' with the same key will reach the same Reduce task
  • The Reduce task aggregate over the group of tuples A' (of the same key) and then turn them into a tuple B = reduce(array<A'>)
  • The OutputWriter store the data tuple B into the distributed data store.
Paralleizing more sophisticated algorithm typically involve multiple phases of Map/Reduce phases, each phase may have a different Map task and Reduce task.


Looking at the abstract Map/Reduce model, there are some similarities with the SQL query model. We can express the above Map/Reduce model using a SQL-like query language.

INSERT INTO A FROM InputReader("dfs:/data/myInput")

INSERT INTO A'
 SELECT flatten(map(*)) FROM A

INSERT INTO B
 SELECT reduce(*) FROM A' GROUP BY A'.key

INSERT INTO  "dfs:/data/myOutput"  FROM B

Similarly, SQL queries can also be expressed by different forms of map() and reduce() functions. Lets look at a couple typical SQL query examples.

Simple Query
SELECT a1, a2 FROM A
 WHERE a3 > 5 AND a4 < 6

Here is the corresponding Map and Reduce function
def map(tuple)
 /* tuple is implemented as a map, key by attribute name */
 if  (tuple["a3"] > 5  &&  tuple["a4"] < 6)
   key = random()
   emit key, "a1" => tuple["a1"], "a2" => tuple["a2"]
 end
end

def reduce(tuples)
 tuples.each do |tuple|
   store tuple
 end
end

Query with Grouping
SELECT sum(a1), avg(a2) FROM A
 GROUP BY a3, a4
   HAVING count() < 10
Here is the coresponding Map and Reduce function
def map(tuple)
 key = [tuple["a3"], tuple["a4"]]
 emit key, "a1" => tuple["a1"], "a2" => tuple["a2"]
end

def reduce(tuples)
 sums = {"a1" => 0, "a2" => 0}
 count = 0

 tuples.each do |tuple|
   count += 1
   sums.each_key do |attr|
     sums[attr] += tuple[attr]
   end
 end

 if count < 10
 /* omit denominator check for simplcity */
   store {"type" => B, "b1" => sums["a1"], "b2" => sums["a2"] / count}
 end
end

Query with Join
SELECT a2, p2
 FROM A JOIN P
         ON A.a1 = P.p1
Here is the corresponding Map and Reduce function
def map(tuple)
 if (tuple["type"] == A)
   key = tuple["a1"]
   emit key, "a2" => tuple["a2"]
 elsif (tuple["type"] == P)
   key = tuple["p1"]
   emit key, "p2" => tuple["p2"]
 end
end

def reduce(tuples)
 all_A_tuples = []
 all_P_tuples = []

 tuples.each do |tuple|
   if (tuple["type"] == A)
     all_A_tuples.add(tuple)
     all_P_tuples.each do |p_tuple|
       joined_tuple = p_tuple.merge(tuple)
       joined_tuple["type"] = B
       store joined_tuple
     end
   elsif (tuple["type"] == P)
     /* do similar things */
   end
 end
end

As you can see, transforming a SQL query to Map/Reduce function is pretty straightforward.

We put the following logic inside the map() function
  • Select columns that appears in the SELECT clause
  • Evaluate the WHERE clause and filter out tuples that doesn't match the condition
  • Compute the key for the JOIN clause or the GROUP clause
  • Emit the tuple

On the other hand, we put the following logic inside the reduce() function
  • Compute the aggregate value of the columns appears in the SELECT clause
  • Evaluate the HAVING clause and filter things out
  • Compute the cartesian product of the JOIN clause
  • Store the final tuple
As we've seen the potential opportunity to use a "SQL-like" declarative language to express the parallel data processing and use a Map/Reduce model to execute it, the open source Hadoop community is working on a project call Pig to develop such a language.

PIG is similar to SQL in the following way.
  • PIG's tuple is same as SQL record, containing multiple fields
  • PIG has define its own set
  • Like SQL optimizer which compiles the query into an execution plan, PIG compiler compiles its query into a Map/Reduce task.

However, there are a number of important difference between PIG (in its current form) and the SQL language.
  • While fields within a SQL record must be atomic (contain one single value), fields within a PIG tuple can be multi-valued, e.g. a collection of another PIG tuples, or a map with key be an atomic data and value be anything
  • Unlike relational model where each DB record must have a unique combination of data fields, PIG tuple doesn't require uniqueness.
  • Unlike SQL query where the input data need to be physically loaded into the DB tables, PIG extract the data from its original data sources directly during execution.
  • PIG is lazily executed. It use a backtracking mechansim from its "store" statement to determine which statement needs to be executed.
  • PIG is procedural and SQL is declarative. In fact, PIG looks a lot like a SQL query execution plan.
  • PIG enable easy plug-in of user defined functions
For more details, please refer to PIG's project site.

Monday, April 28, 2008

Parallelism with Map/Reduce

We explore the Map/Reduce approach to turn sequential algorithm into parallel

Map/Reduce Overview

Since the "reduce" operation need to accumulate results for the whole job, as well as communication overhead in sending and collecting data, Map/Reduce model is more suitable for long running, batch-oriented jobs.

In the Map/Reduce model, "parallelism" is achieved via a "split/sort/merge/join" process and is described as follows.
  • A MapReduce Job starts from a predefined set of Input data (usually sitting in some directory of a distributed file system). A master daemon (which is a central co-ordinator) is started and get the job configuration.
  • According to the job config, the master daemon will start multiple Mapper daemons as well as Reducer daemons in different machines. And then it start the input reader to read data from some DFS directory. The input reader will chunk the read data accordingly and send them to "randomly" chosen Mapper. This is the "split" phase and begins the parallelism.
  • After getting the data chunks, the mapper daemon will run a "user-supplied map function" and produce a collection of (key, value) pairs. Each item within this collection will be sorted according to the key and then send to the corresponding Reducer daemon. This is the "sort" phase.
  • All items with the same key will come to the same Reducer daemon, which collect all the items of that key and invoke a "user-supplied reduce function" and produce a single entry (key, aggregatedValue) as a result. This is the "merge" phase.
  • The output of reducer daemon will be collected by the Output writer, which is effective the "join" phase and ends the parallelism.
Here is an simple word-counting example ...






















Tuesday, April 15, 2008

Parallelizing Algorithms

The growth of a single CPU has been limited by physical factors such as clock rate, generated heat, power ... etc. Current trend is moving to multi-core system, ie: multiple CPU within a chip, multiple CPU within a machine, or just a cluster of machines connected to a high speed network.

However, most traditional algorithms are developed in a sequential way (which is easier to design and analyze). Without redesigning the algorithm in a parallelized form, they are not ready to run on multiple CPUs. Recently, Google's Map/Reduce model has gained momentum to become the de facto approach to handle high volume processing using large number of low-cost commodity hardware. In the Opensource community, Hadoop is a Java clone of Google's Map/Reduce model, and there are a couple of Ruby clone as well. Since then, parallelizing traditionally sequential algorithm to run on a multi-CPU network has been drawing a lot of attention in the software community.

Model

A sequential algorithm contains a number of "steps" ordered by the sequence of execution. Parallelizing such an algorithm means trying to run these steps "simultaneously" on multiple CPUs, and hopefully can speed up the whole process of execution.

Lets define T(p) to be the time it takes to execute the algorithm in p CPUs.
So, T(1) is the time takes to execute on a single CPU.
Obviously, T(p) >= T(1) / p.

When T(p) == T(1) / p, we say it has linear speedup. Unfortunately, linear speedup is usually not possible when p increase beyond a certain number, due to "sequential dependency" and "coordination overhead".


Sequential Dependency
StepA and StepB cannot be executed simultaneously if there is a sequential dependency between them. Sequential dependency means one step cannot be started before the other step has completed, which happens if
  • StepB reads some data that StepA writes
  • StepA reads some data that StepB writes
  • StepA and StepB write to same data
Let T(infinity) be the execution time given infinite number of CPUs. Due to sequential dependency, at some point throwing in more CPUs won't help. If we use a DAG to represent dependency, T(infinity) is the time take to execute the longest path within the DAG.

T(p) >= max(T(1)/p, T(infinity))


Coordination Overhead

Even steps can be execute in parallel, there are certain processing overhead such as
  • Data need to be transfered to the corresponding CPU before processing can take place
  • Schedule the CPU for execution and keep track of their corresponding work load
  • Monitor the completion of all parallel tasks and move forward to next steps
We need to make sure the coordination overhead does not offset the gain in parallelizing the execution. That means we cannot break the steps into too fine-grain, we need to control the granularity of the steps at the right level.


Design Goal

Given T(p) >= max(T(1)/p, T(infinity)), there is no benefit to increase p beyond T(1)/T(infinity), which is called parallelism.

  • Let O-1(n) be the time complexity of the parallel algorithm when there is one CPU
  • Let O-infinity(n) be the time complexity of the parallel algorithm when there is infinite CPUs

  • Our goal is to design the parallel algorithm to maximize parallelism: O-1(n) / O-infinity(n).

    If we can do this, we can throw more CPUs to help when n increases.

    Recall master method
    T(n) = a.T(n/b) + f(n)

    case 1:  if  f(n) << n ** log(a, base=b)
            T(n) = O(n ** log(a, base=b))
    
    case 2:  if  f(n) ~ n ** log(a, base=b)
            T(n) = O((lg(n) ** k+1) * (n ** log(a, base=b)))
    
    case 3:  if  f(n) >> n ** log(a, base=b)
            T(n) = O(f(n))

    Lets walk through an example ... of adding two arrays of size n.

    Sequential Algorithm:
    def sum(a, b)
     for i in 0 .. a.size
       c[i] = a[i] + b[i]
     return c
    
    This is of O(n) complexity


    Parallel Algorithm:
    def sum(a, b, start, end)
     if start == end
       c[start] = a[start] + b[start]
       return
    
     mid = start + (end - start) / 2
    
     spawn sum(a, b, start, mid)
     spawn sum(a, b, mid, end)

    For a single CPU, the algorithm will be ...

    T(n) = 2.T(n/2) + O(1)
    This is case 1, and so it is O(n)

    For infinite number of CPU, the algorithm will be ...
    T(n) = T(n/2) + O(1)
    This is case 2, k = 0, so it is O(lg(n))

    So the parallelism = O(n / lg(n))

    In other words, we can improve the performance from the sequential algorithm O(n) to the parallel algorithm O(n/p) by throwing in p CPUs. And the growth of p is limited by n/lg(n)