Tuesday, July 20, 2010

Graph Processing in Map Reduce

In my previous post about Google's Pregel model, a general pattern of parallel graph processing can be expressed as multiple iterations of processing until a termination condition is reached. Within each iteration, same processing happens at a set of nodes (ie: context nodes).

Each context node perform a sequence of steps independently (hence achieving parallelism)
  1. Aggregate all incoming messages received from its direct inward arcs during the last iteration
  2. With this aggregated message, perform some local computation (ie: the node and its direct outward arcs' local state)
  3. Pass the result of local computation along all outward arcs to its direct neighbors
This processing pattern can be implemented using Map/Reduce model, using a MR job for each iteration. The sequence is a little different from above. Typically a mapper will perform (2) and (3) where it emits the message using its neighbor's node id as key. Reducer will be responsible to perform (1).

Issue of using Map/Reduce

However, due to the functional programming nature of Map() and Reduce(), M/R does not automatically retain "state" between jobs. To retain the graph across iterations, the mapper need to explicitly pass along the corresponding portion of the graph to the reducer, in additional to the messages itself. Similarly, the reducer need to handle a different type of data passed along.

map(id, node) {
  emit(id, node)
  partial_result = local_compute()
  for each neighbor in node.outE.inV {
      emit(neighbor.id, partial_result)
  }
}

reduce(id, list_of_msg) {
  node = null
  result = 0

  for each msg in list_of_msg {
      if type_of(msg) == Node
          node = msg
      else
          result = aggregate(result, msg)
      end
  }

  node.value = result
  emit(id, node)
}

This downside of this approach is a substantial amount of I/O processing and bandwidth is consumed to just passing the graph itself around.

Google's Pregel model provides an alternative message distribution model so that state can be retained at the processing node across iterations.

The Schimmy Trick

In a recent research paper, Jimmy Lin and Michael Schatz use a clever partition() algorithm in Map /Reduce which can achieve "stickiness" of graph distribution as well as maintaining a sorted-order of node id on disk.

The whole graph is broken down into multiple files and stored in HDFS. Each file contains multiple records and each record describe a node and its corresponding adjacency list.

id -> [nodeProps, [[arcProps, toNodeId], [arcProps, toNodeId] ...]

In addition, the records are physically sorted within the file by their node id.

There will be as many reducers as the number of above files and so each Reducer task is assigned with one of this file. On the other hand, the partition() function assign all nodes within the file to land on its associated reducer.

Mapper does the same thing before, except the first line in the method is removed as it no longer need to emit the graph.

Reducer will receive all the message emitted from the mapper, which is sorted by the Map/Reduce framework by the key (which happens to be the node id). On the other hand, the reducer can open the corresponding file in HDFS, which also maintain a sorted list of nodes based on their ids. The reducer can just read the HDFS file sequentially on each reduce() call and confident that all preceding nodes in the file has already received their corresponding messages.

reduce(id, list_of_msg) {
   nodeInFile = readFromFile()

   # Emit preceding nodes that receives no message
   while(nodeInFile.id < id)
       emit(nodeInFile.id, nodeInFile)
   end

   result = 0

   for each msg in list_of_msg {
       result = aggregate(result, msg)
   }

   nodeInFile.value = result
   emit(id, nodeInFile)
}

Although the Schimmy trick provides an improvement over the classical way of map/reduce, it only eliminates the communication between the mapper and the reducer. At each iteration, the mapper still needs to read the whole graph from HDFS to the mapper node and the reducer still need to write the whole graph back to HDFS, which maintains a 3-way replication for each file.

Hadoop provides some co-location mechanism for the mapper and try to assign files that is sitting at the same machine to the mapper. However, this co-location mechanism is not available for the reducer and so reducer still need to write the graph back over the network.

Pregel Advantage

Since Pregel model retain worker state (the same worker is responsible for the same set of nodes) across iteration, the graph can be loaded in memory once and reuse across iterations. This will reduce I/O overhead as there is no need to read and write to disk at each iteration. For fault resilience, there will be a periodic check point where every worker write their in-memory state to disk.

Also, Pregel (with its stateful characteristic), only send local computed result (but not the graph structure) over the network, which implies the minimal bandwidth consumption.

Of course, Pregel is very new and relative immature as compared to Map/Reduce.

Monday, July 12, 2010

Google Pregel Graph Processing

A lot of real life problems can be expressed in terms of entities related to each other and best captured using graphical models. Well defined graph theory can be applied to processing the graph and return interesting results. The general processing patterns can be categorized into the following ...
  1. Capture (e.g. When John is connected to Peter in a social network, a link is created between two Person nodes)
  2. Query (e.g. Find out all of John's friends of friends whose age is less than 30 and is married)
  3. Mining (e.g. Find out the most influential person in Silicon Valley)

Distributed and Parallel Graph Processing

Although using a Graph to represent a relationship network is not new, the size of network has been dramatically increase in the past decade such that storing the whole graph in one place is impossible. Therefore, the graph need to be broken down into multiple partitions and stored in different places. Traditional graph algorithm that assume the whole graph can be resided in memory becomes invalid. We need to redesign the algorithm such that it can work in a distributed environment. On the other hand, by breaking the graph into different partitions, we can manipulate the graph in parallel to speed up the processing.

Property Graph Model

The paper “Constructions from Dots and Lines” by Marko A. Rodriguez and Peter Neubauer illustrate the idea very well. Basically, a graph contains nodes and arcs.

A node has a "type" which defines a set of properties (name/value pairs) that the node can be associated with.

An arc defines a directed relationship between nodes, and hence contains the fromNode, toNode as well as a set of properties defined by the "type" of the arc.



General Parallel Graph Processing

Most of the graph processing algorithm can be expressed in terms of a combination of "traversal" and "transformation".

Parallel Graph Traversal

In the case of "traversal", it can be expressed as a path which contains a sequence of segments. Each segment contains a traversal from a node to an arc, followed by a traversal from an arc to a node. In Marko and Peter's model, a Node (Vertex) contains a collection of "inE" and another collection of "outE". On the other hand, an Arc (Edge) contains one "inV", one "outV". So to expressed a "Friend-of-a-friend" relationship over a social network, we can use the following

./outE[@type='friend']/inV/outE[@type='friend']/inV

Loops can also be expressed in the path, to expressed all persons that is reachable from this person, we can use the following

.(/outE[@type='friend']/inV)*[@cycle='infinite']

On the implementation side, a traversal can be processed in the following way
  1. Start with a set of "context nodes", which can be defined by a list of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
  2. Repeat until all segments in the path are exhausted. Perform a walk from all context nodes in parallel. Evaluate all outward arcs (ie: outE) with conditions (ie: @type='friend'). The nodes that this arc points to (ie: inV) will become the context node of next round
  3. Return the final context nodes
Such traversal path can also be used to expressed inference (or derived) relationships, which doesn't have a physical arc stored in the graph model.

Parallel Graph Transformation

The main goal of Graph transformation is to modify the graph. This include modifying the properties of existing nodes and arcs, creating new arcs / nodes and removing existing arcs / nodes. The modification logic is provided by a user-defined function, which will be applied to all active nodes.

The Graph transformation process can be implemented in the following steps
  1. Start with a set of "active nodes", which can be defined by a lost of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
  2. Repeat until there is no more active nodes. Execute the user-defined transformation which modifies the properties of the context nodes and outward arcs. It can also remove outwards arcs or create new arcs that point to existing or new nodes (in other words, the graph connectivity can be modified). It can also send message to other nodes (the message will be picked up in the next round) as well as receive message sent from other nodes in the previous round.
  3. Return the transformed graph, or a traversal can be performed to return a subset of the transformed graph.
Google's Pregel

Pregel can be thought as a generalized parallel graph transformation framework. In this model, the most basic (atomic) unit is a "node" that contains its properties, outward arcs (and its properties) as well as the node id (just the id) that the outward arc points to. The node also has a logical inbox to receive all messages sent to it.


The whole graph is broken down into multiple "partitions", each contains a large number of nodes. Partition is a unit of execution and typically has an execution thread associated with it. A "worker" machine can host multiple "partitions".


The execution model is based on BSP (Bulk Synchronous Processing) model. In this model, there are multiple processing units proceeding in parallel in a sequence of "supersteps". Within each "superstep", each processing units first receive all messages delivered to them from the preceding "superstep", and then manipulate their local data and may queue up the message that it intends to send to other processing units. This happens asynchronously and simultaneously among all processing units. The queued up message will be delivered to the destined processing units but won't be seen until the next "superstep". When all the processing unit finishes the message delivery (hence the synchronization point), the next superstep can be started, and the cycle repeats until the termination condition has been reached.

Notice that depends on the graph algorithms, the assignment of nodes to a partition may have an overall performance impact. Pregel provides a default assignment where partition = nodeId % N but user can overwrite this assignment algorithm if they want. In general, it is a good idea to put close-neighbor nodes into the same partition so that message between these nodes doesn't need to flow into the network and hence reduce communication overhead. Of course, this also means traversing the neighboring nodes all happen within the same machine and hinder parallelism. This usually is not a problem when the context nodes are very diverse. In my experience of parallel graph processing, coarse-grain parallelism is preferred over fine-grain parallelism as it reduces communication overhead.

The complete picture of execution can be implemented as follows:


The basic processing unit is a "thread" associated with each partition, running inside a worker. Each worker receive messages from previous "superstep" from its "inQ" and dispatch the message to the corresponding partition that the destination node is residing. After that, a user defined "compute()" function is invoked on each node of the partition. Notice that there is a single thread per partition so nodes within a partition are executed sequentially and the order of execution is undeterministic.

The "master" is playing a central role to coordinate the execute of supersteps in sequence. It signals the beginning of a new superstep to all workers after knowing all of them has completed the previous one. It also pings each worker to know their processing status and periodically issue "checkpoint" command to all workers who will then save its partition to a persistent graph store. Pregel doesn't define or mandate the graph storage model so any persistent mechanism should work well. There is a "load" phase at the beginning where each partition starts empty and read a slice of the graph storage. For each node read from the storage, a "partition()" function will be invoked and load the node in the current partition if the function returns the same node, otherwise the node is queue to another partition who the node is assigned to.

Fault resilience is achieved by having the checkpoint mechanism where each worker is instructed to save its in-memory graph partition to the graph storage periodically (at the beginning of a superstep). If the worker is detected to be dead (not responding to the "ping" message from the master), the master will instruct the surviving workers to take up the partitions of the failed worker. The whole processing will be reverted back to the previous checkpoint and proceed again from there (even the healthy worker need to redo the previous processing). The Pregel paper mention a potential optimization to just re-execute the processing of the failed partitions from the previous checkpoint by replaying the previous received message, of course this requires keeping a log of all received messages between nodes at every super steps since previous checkpoint. This optimization, however, rely on the algorithm to be deterministic (in other words, same input execute at a later time will achieve the same output).

Further optimization is available in Pregel to reduce the network bandwidth usage. Messages destined to the same node can be combined using a user-defined "combine()" function, which is required to be associative and commutative. This is similar to the same combine() method in Google Map/Reduce model.

In addition, each node can also emit an "aggregate value" at the end of "compute()". Worker will invoke an user-defined "aggregate()" function that aggregate all node's aggregate value into a partition level aggregate value and all the way to the master. The final aggregated value will be made available to all nodes in the next superstep. Just aggregate value can be used to calculate summary statistic of each node as well as coordinating the progress of each processing units.

I think the Pregel model is general enough for a large portion of classical graph algorithm. I'll cover how we map these traditional algorithms in Pregel in subsequent postings.

Reference

http://www.slideshare.net/slidarko/graph-windycitydb2010

Saturday, June 12, 2010

Strategy for software engineering position interview

From my experience in hiring and attending job interviews, employers are generally looking at 5 areas when hiring a software engineer ...
  1. Technical knowledge of specific technology product
  2. Experience of the business problem domain
  3. General technical and architecture sense
  4. Personality and working style
  5. IQ, problem solving skills
While 1 to 4 are very straightforward to answer, the most interesting and challenging part is the item (5) because there are effectively no time to prepare. Depends on how good you can manage pressure, you brain can be totally blank at the interview session.

I personally believe that a short interview doesn't necessary reflects a person's ability in doing the actual work. I personally have worked with many people who is a high performer in work but a poor interviewee. So don't lose confidence because you fail an interview, there is a factor of luck involved in this process.

Nevertheless, most employers are willing to accept a higher false negative, but the chance of false positive must be very low. Because the pass rate of these IQ and algorithm questions are generally pretty low, it is a very effective means of filtering the candidates. Therefore, be able to tackle this kinds of question well is a critical success factor of interviewing.

Notice that there is no substitution of "good knowledge", "high IQ" and "the ability to speak/think under a pressured environment", but I've found there are some very useful technique and strategies.

1. Rephrase the question slowly in your own words

This can help you to make sure you fully understand the question and clarify if there is any hidden assumptions you have made. Repeat the question "slowly" also gives you more time to think.

From the interview perspective, he/she can see clearly the candidate's ability to digest a problem.

2. Construct a Visual model of the problem

Use a whiteboard, or paper (if this is a phone interview) to diagram the problem that you perceive. Our brain is good in understanding picture than words so having a diagram will be very useful to come up with solution ideas.

From an interview's perspective, he/she can see a clear picture how you analyze the problem.

3. Use a special, simple case to guide you

Never try to tackle the general problem at first, start with a super-simple, special case, and think how you would solve this simple case first. This is very helpful to reduce the amount of things that you need to consider and let you focus in the core part of the problem.

4. Start with a very naive solution as a baseline

Tell the interviewer that you want to start with a very naive solution to establish a baseline. The naive solution can usually be constructed using a brute-force approach (try all combination until you find a matched solution). After that analyze the complexity of this naive solution as a baseline for future comparison.

5. Improve your solution

At this stage, you need to evolve and improve your solution. Here are some general techniques.
  • Divide and conquer: Decompose the problem into smaller ones and solve each sub-problem separately, then combine the solutions for the overall problem.
  • Reduce to well-known algorithm models: Try to model your problem in terms of well-studied computer science data structure model (e.g. Tree, Graph, Search, Sort) and then apply well known algorithm to solve them
  • Recursive structure: Try to structure your problem in a recursive form. Finding the solution for the base case and then expand the solution in a recursive manner.
  • Greedy method: Try to modify attributes of your current best solution to see if you can get a better one. Watch out of being trapped in a local optimal solution.
  • Approximation: Instead of finding an exact solution, try to see if it is acceptable to find an approximate solution. Probabilistic approach (try 100 random combination and pick the best outcome)
6. Keep talking while you think

Don't wait until you fully figure out the answer. Keep talking while you are thinking about the solution so the interviewer understand how you analyzing things, and you are also showing how well you can express your thoughts. It is also easier for the interviewer to guide you or give you hints. And finally you may impressed the interviewer even you cannot get to the exact answer.


7. Generalize your solution for the final answer

After you find a working solution for the simple case, extend the simple case to see how you would solve it. See if you can find a general pattern how the solution would look like. Once you find it, generalize your solution for the general problem

From the interview's perspective, he/she can assess if the candidate can think in different levels of abstraction, and the ability to apply a solution in a broader scope of problem.


8. Remember the interview hasn't ended at the office building

You always have the chance to think through the question that you haven't given satisfactory answer after you walk out from the office. Submit a solution via email once you get back home (do it ASAP though), along with a thank you note to the interviewer.

Sunday, May 2, 2010

The NOSQL Debate

I have attended the Stanford InfoLab conference, and there are 2 panel discussions in Cloud computing transaction processing and analytic processing.

The session turns out to be a debate between people from the academia side with the open source developer. Both sides have their points ...

Background of RDBMS

RDBMS is based on a clear separation between application logic and data management. This loose coupling allows application and DB technologies to be evolved independently.

This philosophy drives the DBMS architecture to be more general in order to support a wide range of applications. Over many years of evolution, it has a well-defined data model + query model based on relational algebra. On the other hand, it also have a well-defined transaction processing model.

On the other hand, applications also benefit from having a unified data model. They have more freedom to switch their DB vendor without too much of code changes.

For OLTP applications, the RDBMS model has been proven to be highly successful in many large enterprises. The success of both Oracle and MySQL can speak to that.

For Analytic applications, the RDBMS model is also used widely for implementing data warehousing based on a STAR schema model (composed of Facts table and Dimension tables).

This model also put DBA into a very important position in the enterprise. They are equipped with sophisticated management tools and specialized knowledge to control the commonly shared DB schema.

Background of NOSQL

In the last 10 years, there are a very few of highly successful web2.0 companies whose applications have gone beyond what a centralized RDBMS can handle. Partition data across many servers and spread the workload across them seems to be a reasonable first thing to try. Many RDBMS solution provides a Master/Slave replication mechanism where one can spread the READ-ONLY workload across many servers, and it helps.

In a partitioned environment, application needs to be more tolerant to data integrity issues, especially when data is asynchronously replicated between servers. The famous CAP theorem from Eric Brewer capture the essence of the tradeoff decisions for highly scalable applications (which must have partition tolerance), they have to choose between "Consistency" and "Availability".

Fortunately, most of these web-scale application have a high tolerance in data integrity, so they choose "availability" over "consistency". This is a very major deviation from the transaction processing model of RDBMS which typically weight "consistency" much higher than "availability".

On the other hand, the data structure used in these web-scale application (e.g. user profile, preference, social graph ... etc) are much richer than the simple row/column/table model. Some of the operations involves navigation within a large social graph which involves too many join operations in a RDBMS model.

The "higher tolerance of data integrity" as well as "efficiency support for rich data structure" challenges some of the very fundamental assumption of RDBMS. Amazon has experimented their large scale key/value store called Dynamo and Google also has their BigTable column-oriented storage model, both are designed from the ground up with a distributed architecture in mind.

Since then, there are many open source clones based on these two models. To represent the movement of this trend, Eric Evans from Rackspace coin a term "NOSQL". This term in my opinion is not accurately reflecting the underlying technologies but nevertheless provide a marketing term for every non RDBMS technologies to get together, even those (e.g. CouchDB, Neo4j) who is not originally trying to tackle the scalability problem.

Today, NOSQL provides an alternative approach for Non-Relational Database.

For analytical application, they also take a highly-parallel brute-force processing model based on the Map/reduce model.

The Debate

There are relatively few criticism on the data model aspects. Jun Rao from IBM Lab summarized the key difference between the philosophy. The traditional RDBMS approach is first to figure out the right model, and then provide an implementation and hope it is scalable. The modern NOSQL approach is doing the opposite by first figuring out how a highly scalable architecture looks like and then layer a data model on top of that. Basically people on both camps agree that we should use a data model that is optimized for the application's access patterns, weakening the "one-size-fits-all" philosophy of RDBMS.

Most of the debate is centered around the transaction processing model itself. Basically RDBMS proponents thinks NOSQL camp hasn't spent enough time to understand the theoretical foundation of the transaction processing model. The new "eventual consistency" model is not well-defined and different implementations may differs significantly with each other. This means figuring out all these inconsistent behavior lands on the application developer's responsibilities and make their life much harder. Hard to reason about the DB's behavior can be very dangerous if the application made wrong assumption about the underlying data integrity guarantees.

While agree that application developers now have more to worry about, the NOSQL camp argues that this is actually a benefit because it gives the domain-specific optimization opportunities back to the application developers who now no longer constrained by a one-size-fits-all model. But they admit that making such optimization decision requires a lot of experience and can be very error-prone and dangerous if not done by experts.

On the other hand, the academia also make a note that the movement to NOSQL may deem fashionable and cool to new technologists who may not have the expertise and skills. The community as a whole should articular the pros and cons of NOSQL.

Notice that this is not the debate of the first time. StoneBraker has written a very good article from the RDBMS side.

Tuesday, March 2, 2010

Two approaches on Multi-tenancy in Cloud

Continue on my previous blog on how multi-tenancy related to cloud computing

My thoughts has changed that now I think both the Amazon approach (Hypervisor isolation) and Salesforce approach (DB isolation) are both valid but attract a different set of audiences.

First of all, increase efficiency through sharing is a fundamental value proposition underlying all cloud computing initiatives, there is no debate that ...
  • We should "share resource" to increase utilization and hence improve efficiency
  • We should accommodate highly dynamic growth and shrink requirement rapidly and smoothly
  • We should "isolate" the tenant so there is no leakage on sensitive information
But at which layer should be facilitate that ? Hypervisor level or DB level.

Hypervisor level Isolation

Hypervisor is a very low-level layer of software that maps the physical machine to a virtualized machine on which a regular OS runs on. When the regular OS issue system calls to the VM, it is intercepted by the Hypervisor which maps to the underlying hardware. The hypervisor also provide some traditional OS functions such as process scheduling to determine which VM to run. Hypervisor can be considered to be a very lean OS that sits very close to the bare hardware.

Depends on the specific implementation, Hypervisor introduces an extra layer of indirection and hence incur a certain % of overhead. If we need a VM with capacity less than a physical machine, Hypervisor allows us to partition the hardware into finer granularity and hence improve the efficiency by having more tenants running on the same physical machine. For light-usage tenant, such increment in efficiency should offset the lost from the overhead.

Since Hypervisor focus on low-level system level primitives, it provides the cleanest separation and hence lessen security concerns. On the other hand, by intercepting at the lowest layer, Hypervisor retain the familar machine model that existing system/network admin are familiar with. Since Application is now completely agnostic to the presence of Hyervisor, this minimize the change required to move existing apps into the cloud and makes cloud adoption easier.

Of course, the downside is that virtualization introduce a certain % of overhead. And the tenant still need to pay for the smallest VM even none of its user is using it.

DB level Isolation

Here is another school of thought, if tenants are running the same kind of application, the only difference is the data each tenant store. Why can't we just introduce an extra attribute "tenantId" in every table and then append a "where tenantId = $thisTenantId" in every query ? In other words, add some hidden column and modify each submitted query.

In additional, the cloud provider usually need to re-architect the underlying data layer and move to a distributed and partitioned DB. Some of the more sophisticate providers also need to invest in developing intelligent data placement algorithm based on workload patterns.

In this approach, the degree of isolating is as good as the rewritten query. In my opinion, this doesn't seem to be hard, although it is less proven than the Hypervisor approach.

The advantage of DB level isolation is there is no VM overhead and there is no minimum charge to the tenant.

However, we should compare these 2 approach not just from a resource utilization / efficiency perspective, but also other perspectives as well, such as ...

Freedom of choice on technology stack

Hypervisor isolation gives it tenant maximum freedom of the underlying technology stack. Each tenant can choose the stack that fits best to its application's need and inhouse IT skills. The tenant can also free to move to latest technologies as they evolve.

This freedom of choice comes with a cost though. The tenant need to hire system administrators to configure and maintain the technology stack.

In a DB level isolation, the tenants are live within a set of predefined data schemas and application flows. So their degree of freedom is limited to whatever the set of parameters that the cloud provider exposes. Also the tenants' applications are "lock-in" to the cloud provider's framework, and a tight coupling and dependency is created between the tenant and the cloud provider.

Of course, the advantage is that there is no administration needed in the technology stack.

Reuse of Domain Specific Logic

Since it focus in the lowest layer of resource sharing, Hypervisor isolation provides no reuse at the app logic level. Tenants need to build their own technology stack from the ground up and write their own application logic.

In the DB isolation approach, the cloud provider pre-defines a set of templates in DB schemas and Application flow logic based on their domain expertise (it is important that the cloud provider must be the recognized expert in that field). The tenant can leverage the cloud provider's domain expertise and focus in purely business operation.

Conclusion

I think each approach will attract a very different (and clearly disjoint) set of audiences.

Notice that DB-level isolation commoditize everything and make it very hard to create product feature differentiations. If I am a technology startup company trying to develop a killer product, then my core value is my domain expertise. In this case, I won't go with the DB-level isolation which impose too much constraints on me to distinguish my product from "anyone else". Hypervisor level isolation much better because I can outsource the infrastructure layer and focus in my core value.

On the other hand, if I am operating a business but not building a product, then I would like to outsource all supporting functions including my applications as well. In this case, I would pick the best app framework provided by the market leader and follow their best practices (also very willing to live by their constraints), the DB level isolation is more compelling in this case.

Monday, March 1, 2010

Search Engine Basics

Receive the question of "how search works ?" couple times recently so try to document the whole process. This is intended to highlight the key concepts but not specific implementation details, which will be much more complicated and sophisticated than this one.

A very basic search engine includes a number of processing phases.
  • Crawling: to discover the web pages on the internet
  • Indexing: to build an index to facilitate query processing
  • Query Procesisng: Extract the most relevant page based on user's query terms
  • Ranking: Order the result based on relevancy


Notice that each element in the above diagram reflects a logical function unit but not its physical boundary. For example, the processing unit in each orange box is in fact executed across many machines in parallel. Similarly, each of the data store element is spread physically across many machines based on the key partitioning.


Vector Space Model

Here we use the "Vector Space Model" where each document is modeled as a multi-dimensional vector (each word represents a dimension). If we put all documents together, we form a matrix where the rows are documents and columns are words, and each cell contains the TF/IDF value of the word within the document.


To determine the similarity between 2 documents, we can apply the dot product between 2 documents and the result will represents the degree of similarity.


Crawler

Crawler's job is to collect web pages on the internet, it is typically done by a farm of crawlers, who do the following

Start from a set of seed URLs, repeat following ...
  1. Pick the URL that has the highest traversal priority.
  2. Download the page content from the URLs to the content repository (which can be a distributed file system, or DHT), as well as update the entry in the doc index
  3. Discover new URL links from the download pages. Add the link relationship into the link index and add these links to the traversal candidates
  4. Prioritize the traversal candidates
The content repository can be any distributed file system, here lets say it is a DHT.

There are a number of considerations.
  • How to make sure different Crawlers are working on different set of contents (rather than crawling the same page twice) ? When the crawler detects overlapping is happening (url is already exist in the page repository with pretty recent time), the crawler will skip the processing on this URL and pick up the next best URL to crawl.
  • How does the crawler determines which is the next candidate to crawl ? We can use a heuristic algorithm based on some utility function (e.g. we can pick the URL candidate which has the highest page rank score)
  • How frequent do we re-crawl ? We can track the rate of changes of the page to determine the frequency of crawling.

Indexer


The Indexer's job is to build the inverted index for the query processor to serve the online search requests.

First the indexer will build the "forward index"
  1. The indexer will parse the documents from the content repository into a token stream.
  2. Build up a "hit list" which describe each occurrence of the token within the document (e.g. position in the doc, font size, is it a title, archor text ... etc).
  3. Apply various "filters" to the token stream (like stop word filters to remove words like "a", "the", or a stemming filter to normalize words "happy", "happily", "happier" into "happy")
  4. Compute the term frequency within the document.
From the forward index, the indexer will proceed to build a reverse index (typically through a Map/Reduce mechanism). The result will be keyed by word and stored in a DHT.


Ranker


Ranker's job is to compute the rank of a document, based on how many in-links pointing to the document as well as the rank of the referrers (hence a recursive definition). Two popular ranking algorithms including the "Page Rank" and "HITs".
  • Page Rank Algorithm
Page rank is a global rank mechanism. It is precomputed upfront and is independent of the query

  • HITS Algorithm
In HITS, every page is playing a dual role: "hub" role and "authority" role. It has two corresponding ranks on these two roles. Hub rank measures the quality of the outlinks. A good hub is one that points to many good authorities. Authority ranks measures the quality of my content. A good authority is one that has many good hubs pointing to.

Notice that HITS doesn't pre-compute the hub and authority score. Instead it invoke a regular search engine (which only do TF/IDF matches but not ranking) to get a set of initial results (typically with a predefined fix size) and then expand this result set by tracing the outlinks into the expand result set. It also incorporate a fix size of inlinks (by sampling the inlinks into the initial result set) into the expanded result set. After this expansion, it runs an iterative algorithm to compute the authority ranks and hub ranks. And use the combination of these 2 ranks to calculate the ultimate rank of each page, usually pages with high hub rank will weight more than high authority rank.

Notice that the HITS algorithm is perform at query time and not pre-computed upfront. The advantage of HITS is that it is sensitive to the query (as compare to PageRank which is not). The disadvantage is that it perform ranking per query and hence expensive.


Query Processor

When user input a search query (containing multiple words), the query will be treated as a "query document". Relevancy is computed and combined with the rank of the document and return an ordered list of result.

There are many ways to compute the relevancy. We can consider only the documents that contains all the terms specified in the query. In this model, we search for each term (with the query) a list of document id and then do an intersection with them. If we order the document list by the document id, the intersection can be computed pretty efficiently.

Alternatively, we can return the union (instead of intersection) of all document and order them by a combination of the page rank TF/IDF score. Document that have more terms intersecting with the query will have a higher TF/IDF score.

In some cases, an automatic query result feedback loop can be used to improve the relevancy.
  1. In first round, the search engine will perform a search (as described above) based on user query
  2. Construct a second round query by expanding the original query with additional terms found in the return documents which has high rank in the first round result
  3. Perform a second round of query and return the result.

Outstanding Issues


Fighting the spammer is a continuous battle in search engine. Because of the financial value of being shown up in the first page of search result. Many spammers try to manipulate their page. Earlier attempt is to modify a page to repeat the terms many many times (trying to increase the TF/IDF score). The evolution of Page rank has mitigate this to some degree because page rank in based on "out-of-page" information that the site owner is much harder to manipulate.

But people use Link-farms to game the page rank algorithms. The ideas is to trade links between different domains. There is active research in this area about how to catch these patterns and discount their ranks

Wednesday, February 17, 2010

Spatial Index RTree

For location-based search, it is very common to search for objects based on their spatial location. e.g. find all restaurants within 5 miles of my current location, or find all schools within the zipcode of 95110 ... etc.

Every spatial object can be represented by an object id, a minimal bounded rectangle (MBR), as well as other attributes. So the space can be represented by a collection of spatial objects. (here we use 2 dimension to illustrate the idea, but the concept can be extended to N dimensions.)

A query can be represented as another rectangle. The query is about locating the spatial objects whose MBR overlaps with the query rectangle.


RTree is a spatial indexing technique such that given a query rectangle, we can quickly locate the spatial object results.

The concept is similar to BTree. We group spatial objects that are close by each other and form a tree whose intermediate nodes contains "close-by" objects. Since the MBR of the parent node contains all MBR of its children, the Objects are close by if their parent's MBR is minimized.


Search

Start from the root, we examine each children's MRB to see if it overlaps with the query MBR. We skip the whole subtree if there is no overlapping, otherwise, we recurse the search by drilling into each child.

Notice that unlike other tree algorithm where only traverse down one path. Our search here needs to traverse down multiple path if the overlaps happen. Therefore, we need to structure the tree to minimize the overlapping as high in the tree as possible. This means we want to minimize the sum of MBR areas along each path (from the root to the leaf) as much as possible.

Insert

To insert a new spatial object, starting from the root node, pick the children node whose MBR will be extended least if the new spatial object is added, walk down this path until reaching the leaf node.

If the leaf node has space left, insert the object to the leaf node and update the MBR of the leaf node as well as all its parents. Otherwise, split the leaf node into two (create a new leaf node and copy some of the content of the original leaf node to this new one). And then add the newly created leaf node to the parent of the original leaf node. If the parent has no space left, the parent will be split as well.

If the split goes all the way to the root, the original root will then be split and a new root is created.

Delete

Deleting a spatial node will first search for the containing leaf node. Remove the spatial node from the leaf node's content and update its MBR and its parent's MBR all the way to the root. If the leaf node now has less than m node, then we need to condense the node by marking the leaf node to be delete. And then we remove the leaf node from its parent's content as well as updating the . If the parent is now less than m node, we mark the parent to be delete also and remote the parent from the parent's parent. At this point, all the node that is marked delete is removed from the RTree.

Notice that the content with these delete node is not all garbage, since they still have some children that are valid nodes (but were removed from the tree). Now we need to reinsert all these valid nodes back in the tree.

Finally, we check if the root node contains only one child, we throw away the original root and use its own child to become the new root.

Update

Update happens when an existing spatial node changes its dimension. One way is to just change the spatial node's MBR but not change the RTree. A better way (but more expensive) is to delete the node, modify it MBR and then insert it back to the RTree.