Counting with MapReduce seems straightforward. All what is needed is to map the pairs to the same intermediate key, and leave the reduce take care of counting all the items. But wait, what if we have millions of items? Then one reducer, that is to say one process on one computer, will be forced to handle millions of pairs at once. Nonetheless this is going to be very slow, and all the interest of having a cluster will be missed, but there is something more important: what if the data is too big to fit in memory? Here I am showing how to count elements using MapReduce in a way that really split up the task between multiple workers.

## The one-iteration solution

Let us have a look at the solution discussed above. This solution counts items in a data in only one MapReduce iteration. Note that the values are replace with the value 1. Indeed, as counting does not require to keep track of the values, they are all changed to a common simple value to simplify computations. This solution seems pretty sweet, except that as we can see on Figure 1, reducing all the pairs to the same intermediate key gives one reducer, and one reducer only, a huge workload for counting the items. This can be efficient if the dataset is small. But there are cases in which the dataset is so big that it does not even fit into the memory of a single computer, or maybe it is so big that the computation on only one reducer is going to be very slow, and we need to know the count as soon as possible. As we will see in the next section, there is a way to improve workload balance along with computation time, at the cost of an additional iteration.

Figure 1: MapReduce - Counting with one iteration

## The two-iteration solution

The solution to the counting problem on a distributed system is to use an additional iteration, in order to better split up the workload. Instead of reducing all the pairs to a unique intermediate key, here the mappers reduce the pairs to intermediate keys taken sequentially out of a common set. It does not matter really what this set is, as long as the keys are the same for all the mappers, and are taken sequentially. The set of intermediate keys can be simply the series of integer numbers from 1 to n: (1, 2, 3, …, n). By using such a set, we guarantee that the workload will be correctly shared between several reducers, actually between as many reducers as they are intermediate keys. So the number of possible intermediate keys, denoted N, have to be chosen carefully. Indeed, you don’t want N to be too small, because then the memory issues of the one-iteration solution might not be fixed, and you don’t want N to be too big, because you will end up running one reducer per pair, which is quite inefficient.

Of course, the input dataset is never partitioned exactly equally between the mappers, thus the first intermediate keys in the set will be associated with more values than the last keys of the set. However, the pairs will be distributed enough to improve workload balance. Figure 2 shows this computation, and we can see that Reducer 4, which handle all pairs of which the intermediate key is 4, get less pairs to handle than the other reducers. Then the reducers count the pairs directed to them. At this stage, we do not have the count that we want, only intermediate counts. We need to sum up these counts, which requires another iteration.

In Figure 2, we can see that the second iteration is very simple: it is the same as the iteration we used in the one-iteration solution. All pairs are reduced to the same intermediate keys. The previous iteration guarantees us that there are only N pairs, so by choosing N appropriately, the mapper and the reducer of this iteration will receive workloads adapted to the architecture and the available memory of they machines they are running on. Finally The output this second iteration is the count of all items.

Figure 2: MapReduce - Counting efficiently with two iterations

## An implementation of the two-iteration solution using Python and the Prince API

Prince is an API that allows Python programs to run on Hadoop Streaming. The Prince API is available here: http://wiki.github.com/goossaert/prince/. Using Python, we can implement the solution discussed in the previous section. Here is the source code for efficiently counting words in input data, that you can also download here: totalcount.py.

For simplicity, I have chosen to use a modulo to loop on intermediate keys. Remember that the modulo is a costly operation, and that alternatives exist to improve computation time in case optimization is required. The `count_mapper()` and `count_reducer()` methods are used for the first iteration, and `sum_mapper()` and `count_mapper()` methods are used for the second iteration. The `count_items()` method takes care of chaining the two iterations and of running the tasks.

1. import sys
2. import prince
3.
4.
5. def count_mapper(key, value):
6.     """
7. Distribute the values over keys, equality enough to average computational
8. complexity for the reducers. By using a modulo, we are sure to balance the
9. number of values for each key in the reducers. Therefore, this method
10. avoids the case where a single reducer faces the whole data set.
11. """
12.     nb_buckets = 100 # this value is correct for small- and medium-sized
13.                      # data sets, but should be adjusted to each case
14.     key = int(key)
15.     for index, item in enumerate(value.split()):
16.         yield (key + index) % nb_buckets, 1
17.
18.
19. def count_reducer(key, values):
20.     """Sum up the items of same key"""
21.     try: yield key, sum([int(v) for v in values])
22.     except ValueError: pass # discard non-numerical values
23.
24.
25. def sum_mapper(key, value):
26.     """Map all intermediate sums to same key"""
27.     (index, count) = value.split()
28.     yield 1, count
29.
30.
31. def count_items(input, output):
32.     """Sum all the items in the input data set"""
33.     # Intermediate file name
34.     inter = output + ‘_inter’
35.
36.     # Run the task with specified mapper and reducer methods
37.     prince.run(count_mapper, count_reducer, input, inter, inputformat=‘text’, outputformat=‘text’)
38.     prince.run(sum_mapper, count_reducer, inter + ‘/part*’, output, inputformat=‘text’, outputformat=‘text’)
39.
40.     # Read the output file and print it
41.     file = prince.dfs.read(output + ‘/part*’, first=1)
42.     return int(file.split()[1])
43.
44.
45. def display_usage():
46.     print ‘usage: %s input output’ % sys.argv[0]
47.     print ‘ input: input file on the DFS’
48.     print ‘ output: output file on the DFS’
49.
50.
51. if __name__ == "__main__":
52.     # Always call prince.init() at the beginning of the program
53.     prince.init()
54.
55.     if len(sys.argv) != 3:
56.         display_usage()
57.         sys.exit(0)
58.
59.     input = sys.argv[1]
60.     output = sys.argv[2]
61.
62.     # Count all items in the input data set and print the result
63.     print ‘Total items:’, count_items(input, output)
One Response leave one →