It seems that the combination of R and Hadoop is a must-have toolkit for people working with both statistics and large data set.
An aggregation example
The Hadoop version used here is
Cloudera’s CDH4, and the underlying Linux OS is CentOS 6. The data used is a simulated sales data set form
a training course by Udacity. Format of each line of the data set is: date, time, store name, item description, cost and method of payment. The six fields are separated by tab. Only two fields, store and cost, are used to aggregate the cost by each store.
A typical MapReduce job contains two R scripts: Mapper.R and reducer.R.
Mapper.R
# Use batch mode under R (don't use the path like /usr/bin/R)
#! /usr/bin/env Rscript
options(warn=-1)
# We need to input tab-separated file and output tab-separated file
input = file("stdin", "r")
while(length(currentLine = readLines(input, n=1, warn=FALSE)) > 0) {
fields = unlist(strsplit(currentLine, "\t"))
# Make sure the line has six fields
if (length(fields)==6) {
cat(fields[3], fields[5], "\n", sep="\t")
}
}
close(input)
Reducer.R
#! /usr/bin/env Rscript
options(warn=-1)
salesTotal = 0
oldKey = ""
# Loop around the data by the formats such as key-val pair
input = file("stdin", "r")
while(length(currentLine = readLines(input, n=1, warn=FALSE)) > 0) {
data_mapped = unlist(strsplit(currentLine, "\t"))
if (length(data_mapped) != 2) {
# Something has gone wrong. However, we can do nothing.
continue
}
thisKey = data_mapped[1]
thisSale = as.double(data_mapped[2])
if (!identical(oldKey, "") && !identical(oldKey, thisKey)) {
cat(oldKey, salesTotal, "\n", sep="\t")
oldKey = thisKey
salesTotal = 0
}
oldKey = thisKey
salesTotal = salesTotal + thisSale
}
if (!identical(oldKey, "")) {
cat(oldKey, salesTotal, "\n", sep="\t")
}
close(input)
Testing
Before running MapReduce, it is better to test the codes by some linux commands.
# Make R scripts executable
chmod w+x mapper.R
chmod w+x reducer.R
ls -l
# Strip out a small file to test
head -500 purchases.txt > test1.txt
cat test1.txt | ./mapper.R | sort | ./reducer.R
Execution
One way is to specify all the paths and therefore start the expected MapReduce job.
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.1.jar
-mapper mapper.R –reducer reducer.R
–file mapper.R –file reducer.R
-input myinput
-output joboutput
Or we can use the alias under CDH4, which saves a lot of typing.
hs mapper.R reducer.R myinput joboutput
Overall, the MapReduce job driven by R is performed smoothly. The Hadoop JobTracker can be used to monitor or diagnose the overall process.
Rhadoop or streaming?
RHadoop is a package developed under Revolution Alytics, which allows the users to apply MapReduce job directly in R and is surely a much more popular way to integrate R and Hadoop. However, this package currently undergoes fast evolution and requires
complicated dependency. As an alternative, the functionality of streaming is embedded with Hadoop, and supports all programming languages including R. If the proper installation of RHadoop poses a challenge, then streaming is a good starting point.