mapReduce Reduced (& Ported to R)

September 10, 2009

Saying MapReduce and Sector’s implementation of User Defined Functions (UDF) over a storage cloud are innovative is only partly correct. The programming models they implement are quite old. Any programmer versed in functional languages recognizes this.

But mapReduce does come with two important innovations. The first is a framework that is specifically designed for large clusters of low-priced, commodity servers. What mapReduce has done is taken the concurrent programming models and applie them to the economic realities of the day. Large and formerly expensive computations can this be accomplished cheaply when distributed to inexpensive machines. The complexity of managing individual machines and tasks is masked from the coder. The coder does not need to worry about associating or managing which tasks get run on which machine. This is invisible to the coder.

The second innovation is the recognition that a large class of practical problems (but not all) can be solved using mapReduce framework. Because the first innovation allowed solutions to problems that were intractable with conventional techniques, technologist began framing problems to run with the MapReduce. They had a hammer; everything began looking like a nail. Fortunately, there were a lot of nails.

As mentioned above, the algorithmic pattern, itself, is not new. It is actually decades old and is a throwback to the early days of functional programming (think Lisp!) and big mainframes. The method was rediscovered, applied over a distributed virtual filesystem, applied to Google’s toughest problems, renamed mapReduce and the rest is history.

The mapReduce algorithm provides a framework for dividing a problem and working on it in parallel. There are two steps: a map step and a reduce step. Although, the two steps must proceed serially — map must preceded reduce — each step can be accomplished in parallel. In the map step, data is mapped to key-value pairs. In the reduce step, the values that share the same key are transformed (‘reduce’) by some algorithm. More complexity can be added; other functions can be used; arbitrary UDF can be supported, as in Sector. But, in essence, the algorithm is as a series of function calls.

The pattern is fairly common and most programmers have used the mapReduce pattern without knowing it, thinking about it, or calling it mapReduce. In fact, much of SAS is setup in a mapReduce style. SAS programs are comprised of DATA STEPs and PROCEDURE STEPs. In certain problems, the DATA step can be a mapper and either a DATA or PROCEDURE step can function as a reducer. If you disregard, the distribution of the problem across servers, I’d venture to say that every SAS programmer has followed this paradigm, often, numerous times in the course of a single program. This simplicity allowed for the application to a wide series of problems, the second innovation.

The same can be said for our favorite statistical programming language, R. In fact, owing to the fact that R’s is a vectorized, functional language, mapReduce boils down to a single line of code:

apply( map(data), reduce )

Where, map and reduce are the user-defined functions for the mapper and reducer respectively and apply distribution the problem in parallel. Any R programmer that was taking advantage of R’s vectorization was probably writing mapReduce problems from day one. Most often, the jobs were vectorized on a individual, single core machines.

Coupled with R packages such as Rmpi, rpvm and nws, the apply-map-reduce pattern can be distributed to several machines. And even more recently, the mutlicore has allowed the easiest implementation on multicores.

We recognized this several years ago, wrote some simple code and have been distributing work across available servers for some time. More recently, we have released our work as an open source package on CRAN for implementing this pattern. Our implementation follows closely to the mapReduce Google paper, is written in pure R and is agnostic to the parallelization backend whether rpvm, rmpi, nws, multicore, or others. ( Revolution Computing recognized this as a goof idea and adopted the same approach with their ParallelR package. )

The use of the mapReduce is exceedingly simple. The package provided a single function, mapReduce. The function is defined as:

Usage
mapReduce( map, ..., data, apply = sapply)

Arguments

map An expression to be evaluated on data which yielding a vector that is subsequently
used to split the data into parts that can be operated on independently.
... The reduce step(s). One or more expressions that are evaluated for each of the partitions made
data A R data structure such as a matrix, list or data.frame.
apply The functions used for parallelization

Take the iris dataset, data(iris). Using mapReduce, we can quickly compute the mean and max petal lengths as so:


mapReduce(
map=Species,
mean.petal.length=mean(Petal.Length),
max.petal.length=max(Petal.Length) ,
data = iris
)

mean.petal.length max.petal.length
setosa 1.462 1.9
versicolor 4.260 5.1
virginica 5.552 6.9

The mean and max petal lengths are computed for each Species and returned as a matrix with two columns, mean.petal.length and max.petal.length and one row for each Species.

Because we have used expressions in our implementations, you can use almost any R function for the map and reduce step. ( Most work, there are few edge case exceptions.) For example, suppose we wanted to do the above calculation but wanted versicolor and virginica lumped together.


mapReduce(
substr(Species,1,1),
mean.petal.length=mean(Petal.Length),
max.petal.length=max(Petal.Length),
data=iris
)

mean.petal.length max.petal.length
s 1.462 1.9
v 4.906 6.9

There you have it, simple yet powerful mapReduce in R. mapReduce can be downloaded from any CRAN mirror. If you get a chance to use it, please let me know what you think.