Introduction
This guide is intended to provide an introduction to answering the following question: How do I coordinate partial results in a MapReduce algorithm, given that all the mappers run in parallel and in isolation? In short, you have to hide synchronization in the reduce phase. This guide illustrates the three tools at your disposal to accomplish this:
- The ability to hold state in the reducer across multiple key-value pairs.
- The ability to control sort order of keys.
- The ability to specify different partitioners.
In short, you want to sort the key-value pairs in the order you wish to use them in your computation, and then have the reducer perform that computation when the key-value pairs arrive. The partitioner ensures that all the key-value pairs you need arrive at the correct reducer. This will be illustrated with the task of computing simple conditional probabilities.
The Problem
The relevant example we'll discuss
is DemoWordCondProbTuple in
Cloud9, which builds on
DemoWordCountTuple1. Note
that DemoWordCondProbJSON does the same thing, except
using JSON objects instead. DemoWordCountTuple1
generates tuples of the following form:
... (admirable, 0) 9 (admirable, 1) 6 (admiral, 0) 2 (admiral, 1) 4 (admiration, 0) 10 (admiration, 1) 6 (admire, 0) 5 (admire, 1) 3 (admired, 0) 12 (admired, 1) 7 ...
The key is a tuple. The first field of the tuple contains a token, the second field indicates whether it was found on a even-length or odd-length line. The value is the count of the tuple occurrences in the collection.
DemoWordCondProbTuple generates tuples of the
following form:
... (admirable, *) 15.0 (admirable, 0) 0.6 (admirable, 1) 0.4 (admiral, *) 6.0 (admiral, 0) 0.33333334 (admiral, 1) 0.6666667 (admiration, *) 16.0 (admiration, 0) 0.625 (admiration, 1) 0.375 (admire, *) 8.0 (admire, 0) 0.625 (admire, 1) 0.375 (admired, *) 19.0 (admired, 0) 0.6315789 (admired, 1) 0.36842105 ...
In other words, the demo converts the count into conditional
probabilities, i.e., p(EvenOrOdd|token). How is this
accomplished? Using exactly the three tools described above.
The Solution
First, we need the count of each token, independent of what type of line it appeared on. We can accomplish this by emitting (admirable,*) every time we see the word "admirable", in addition to either (admirable,0) or (admirable,1) as appropriate.
Intuitively, to compute the conditional probability, we first need to get the total count, and then divide the conditional counts by the total count. In other words, we need to compute:
- (admirable,0)/(admirable,*)
- (admirable,1)/(admirable,*)
This is accomplished by making sure that (admirable,*) comes before either of the other tuples in the reduce phase. Thus, when the reducer receives (admirable,*), it can hold the value in memory (i.e., the reducer can hold state across multiple key-value pairs). When (admirable,0) or (admirable,1) arrives afterward, the reducer can simply divide by the total count, computing the two quantities above. The computed probability is then emitted as the final value (with the same key). We enforce the order in which the key-value pairs arrive at the reducer by controlling the sort order of the keys—which is defined by its comparator. In the case of the Tuple class, you don't actually need to do anything since the Tuple already defines a natural ordering consistent with this computation (i.e., Tuples with special symbols get sorted first).
There is one more thing needed to make this work. The Hadoop
Partitioner is responsible for splitting the key space
and determining which reducer a key-value pair gets sent
to. HashPartitioner is the default Partitioner, which
simply takes the hash value of the key and mods it by the number of
reduce tasks. The problem with this is that (admirable,0),
(admirable,1), (admirable,*) may be sent to different
reducers... which would cause the above algorithm to fail.
The solution is to write a custom Partitioner that
only pays attention to the token. That is, the partitioner uses the
hashcode of the token (not the entire key) to determine which reduce
task the key gets sent to. The setPartitionerClass
in JobConf allows you to specify a custom
partitioner.
Potential gotcha: Be very careful in your use of a
Combiner class! In the basic word count demo, the
reducer can also be used as the combiner, since all it did was compute
the sum (i.e., the operation is both associative and commutative).
This is no longer the case for the reducer in
DemoWordCondProb, since there's a division at the end to
compute the conditional probability. As a result, this must
happen at the reduce stage, because only then are you guaranteed that
all values with the same key have been collected. So, use the
identity combiner; or write a custom combiner that simply sums the
partial counts.
Postscript
For more details, including a more efficient way to compute conditional probabilities, see the following paper:
Jimmy Lin. (2008) Scalable Language Processing Algorithms for the Masses: A Case Study in Computing Word Co-occurrence Matrices with MapReduce. Proceedings of the 2008 Conference on Empirical Methods in Natural Language Processing (EMNLP 2008).
What I described above corresponds to the "pairs" algorithm discussed in the paper.