"Term" is a generalized element contains within a document. A "term" is a generalized idea of what a document contains. (e.g. a term can be a word, a phrase, or a concept).
Intuitively, the relevancy of a document to a term can be calculated from the percentage of that term shows up in the document (ie: the count of the term in that document divide by the total number of terms in it). We called this the "term frequency"
On the other hand, if this is a very common term which appears in many other documents, then its relevancy should be reduced. (ie: the count of documents having this term divided by total number of documents). We called this the "document frequency"
The overall relevancy of a document with respect to a term can be computed using both the term frequency and document frequency.
relevancy = term frequency * log (1 / document frequency)
This is called tf-idf. A "document" can be considered as a multi-dimensional vector where each dimension represents a term with the tf-idf as its value.
Compute TF-IDF using Map/Reduce
To extract the terms from a document, the following process is common
- Extract words by tokenize the input streams
- Make the words case-insensitive (e.g. transform to all lower case)
- Apply n-gram to extract phrases (e.g. statistically frequent n-grams is likely a phrase)
- Filter out stop words
- Stemming (e.g. transform cat, cats, kittens to cat)
We use multiple rounds of Map/Reduce to gradually compute …
- the word count of per word/doc combination
- the total number of words per doc
- the total number of docs per word. And finally compute the TF-IDF
Implementation in Apache PIG
There are many ways to implement the Map/Reduce paradigm above. Apache Hadoop is a pretty popular approach using Java or other programming language (ie: Hadoop Streaming).
Apache PIG is another approach based on a higher level language with parallel processing construct built in. Here is the 3 rounds of map/reduce logic implemented in PIG Script
REGISTER rickyudf.jar
/* Build up the input data stream */
A1 = LOAD 'dirdir/data.txt' AS (words:chararray);
DocWordStream1 =
FOREACH A1 GENERATE
'data.txt' AS docId,
FLATTEN(TOKENIZE(words)) AS word;
A2 = LOAD 'dirdir/data2.txt' AS (words:chararray);
DocWordStream2 =
FOREACH A2 GENERATE
'data2.txt' AS docId,
FLATTEN(TOKENIZE(words)) AS word;
A3 = LOAD 'dirdir/data3.txt' AS (words:chararray);
DocWordStream3 =
FOREACH A3 GENERATE
'data3.txt' AS docId,
FLATTEN(TOKENIZE(words)) AS word;
InStream = UNION DocWordStream1,
DocWordStream2,
DocWordStream3;
/* Round 1: word count per word/doc combination */
B = GROUP InStream BY (word, docId);
Round1 = FOREACH B GENERATE
group AS wordDoc,
COUNT(InStream) AS wordCount;
/* Round 2: total word count per doc */
C = GROUP Round1 BY wordDoc.docId;
WW = GROUP C ALL;
C2 = FOREACH WW GENERATE
FLATTEN(C),
COUNT(C) AS totalDocs;
Round2 = FOREACH C2 GENERATE
FLATTEN(Round1),
SUM(Round1.wordCount) AS wordCountPerDoc,
totalDocs;
/* Round 3: Compute the total doc count per word */
D = GROUP Round2 BY wordDoc.word;
D2 = FOREACH D GENERATE
FLATTEN(Round2),
COUNT(Round2) AS docCountPerWord;
Round3 = FOREACH D2 GENERATE
$0.word AS word,
$0.docId AS docId,
com.ricky.TFIDF(wordCount,
wordCountPerDoc,
totalDocs,
docCountPerWord) AS tfidf;
/* Order the output by relevancy */
ORDERRound3 = ORDER Round3 BY word ASC,
tfidf DESC;
DUMP ORDERRound3;
Here is the corresponding User Defined Function in Java (contained in rickyudf.jar)
package com.ricky;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class TFIDF extends EvalFunc<Double> {
@Override
public Double exec(Tuple input) throws IOException {
// TODO Auto-generated method stub
long wordCount = (Long) input.get(0);
long wordCountPerDoc = (Long) input.get(1);
long totalDocs = (Long) input.get(2);
long docCountPerWord = (Long) input.get(3);
double tf = (wordCount * 1.0) / wordCountPerDoc;
double idf = Math.log((totalDocs * 1.0) / docCountPerWord);
return tf * idf;
}
}