Each context node perform a sequence of steps independently (hence achieving parallelism)
- Aggregate all incoming messages received from its direct inward arcs during the last iteration
- With this aggregated message, perform some local computation (ie: the node and its direct outward arcs' local state)
- Pass the result of local computation along all outward arcs to its direct neighbors
Issue of using Map/Reduce
However, due to the functional programming nature of Map() and Reduce(), M/R does not automatically retain "state" between jobs. To retain the graph across iterations, the mapper need to explicitly pass along the corresponding portion of the graph to the reducer, in additional to the messages itself. Similarly, the reducer need to handle a different type of data passed along.
map(id, node) {
emit(id, node)
partial_result = local_compute()
for each neighbor in node.outE.inV {
emit(neighbor.id, partial_result)
}
}
reduce(id, list_of_msg) {
node = null
result = 0
for each msg in list_of_msg {
if type_of(msg) == Node
node = msg
else
result = aggregate(result, msg)
end
}
node.value = result
emit(id, node)
}
This downside of this approach is a substantial amount of I/O processing and bandwidth is consumed to just passing the graph itself around.
Google's Pregel model provides an alternative message distribution model so that state can be retained at the processing node across iterations.
The Schimmy Trick
In a recent research paper, Jimmy Lin and Michael Schatz use a clever partition() algorithm in Map /Reduce which can achieve "stickiness" of graph distribution as well as maintaining a sorted-order of node id on disk.
The whole graph is broken down into multiple files and stored in HDFS. Each file contains multiple records and each record describe a node and its corresponding adjacency list.
id -> [nodeProps, [[arcProps, toNodeId], [arcProps, toNodeId] ...]
In addition, the records are physically sorted within the file by their node id.
There will be as many reducers as the number of above files and so each Reducer task is assigned with one of this file. On the other hand, the partition() function assign all nodes within the file to land on its associated reducer.
Mapper does the same thing before, except the first line in the method is removed as it no longer need to emit the graph.
Reducer will receive all the message emitted from the mapper, which is sorted by the Map/Reduce framework by the key (which happens to be the node id). On the other hand, the reducer can open the corresponding file in HDFS, which also maintain a sorted list of nodes based on their ids. The reducer can just read the HDFS file sequentially on each reduce() call and confident that all preceding nodes in the file has already received their corresponding messages.
reduce(id, list_of_msg) {
nodeInFile = readFromFile()
# Emit preceding nodes that receives no message
while(nodeInFile.id < id)
emit(nodeInFile.id, nodeInFile)
end
result = 0
for each msg in list_of_msg {
result = aggregate(result, msg)
}
nodeInFile.value = result
emit(id, nodeInFile)
}
Although the Schimmy trick provides an improvement over the classical way of map/reduce, it only eliminates the communication between the mapper and the reducer. At each iteration, the mapper still needs to read the whole graph from HDFS to the mapper node and the reducer still need to write the whole graph back to HDFS, which maintains a 3-way replication for each file.
Hadoop provides some co-location mechanism for the mapper and try to assign files that is sitting at the same machine to the mapper. However, this co-location mechanism is not available for the reducer and so reducer still need to write the graph back over the network.
Pregel Advantage
Since Pregel model retain worker state (the same worker is responsible for the same set of nodes) across iteration, the graph can be loaded in memory once and reuse across iterations. This will reduce I/O overhead as there is no need to read and write to disk at each iteration. For fault resilience, there will be a periodic check point where every worker write their in-memory state to disk.
Also, Pregel (with its stateful characteristic), only send local computed result (but not the graph structure) over the network, which implies the minimal bandwidth consumption.
Of course, Pregel is very new and relative immature as compared to Map/Reduce.
1 comment:
Thanks, I finally understand how/why Pregel can work better on graphs than MapReduce.
Just one more question; if the entire graph is too big to fit in memory and on each iteration only an small portion of graph is going to participate and change is it better to continue using MapReduce?
Post a Comment