Transcription

USING JMSL IN HADOOPMAPREDUCE APPLICATIONS

Big Data has become an industry buzzword over the last decade, even though it can mean differentthings to different people. To be sure, the size and complexity of data has exploded. Some estimates[CIO Insight, 2014] put the size of the digital universe by the year 2020 to 44 zettabytes1. Thanks tothe internet, smart phones, and other devices making up the Internet of Things, plus the availabilityof inexpensive commodity hardware, more data and newer types of data are being collected,processed, stored, and analyzed.While the scale today may be unprecedented, research into solving large computational problemshas been going on for some time. The canonical strategy is to break the problem down first intoparts of manageable size and then send the parts out to independent workers to process themin parallel. Each worker reports results to some centralized controller, which in turn combines theinformation into an actionable form. Distributed computing is the term generally used for breakingthe problem down into parts and the communication of those parts across a network. Parallelcomputing, on the other hand, refers to the simultaneous processing of parts of the larger problem2.MapReduce is a programming model for just such a strategy. MapReduce was developed by Googlescientists in 20043 in response to their own challenges in processing large scale data. The two basicsteps in a MapReduce program are map and reduce. The map step reads raw data and collates itfor the reduce step, which combines the data in a meaningful way. One of the most widely usedimplementations of the MapReduce model is Apache Hadoop. Apache Hadoop is a Java open sourceproject for distributed computing. Hadoop has two main components: the Hadoop Distributed FileSystem (HDFS) and the MapReduce programming and job management framework.The JMSL Numerical Library is a pure Java numerical library, providing a broad range of advancedmathematics, statistics, and charting for the Java environment. It extends core Java numerics andallows developers to seamlessly integrate advanced analytics into their Java applications.In a distributed computing environment, JMSL can be made available to each node in a cluster toprocess a section of the data and produce output in a form that can be gathered and combined ina logical fashion. The following sections illustrate using JMSL in Hadoop MapReduce applications.A zettabyte is 1,000 exabytes 1M petabytes 1B terabytes. 44 zettabytes is 44 billion terabytes!Some authors define parallel computing as the use of multiple threads on the same di04-slides/index-auto-0006.html12ROGUEWAVE.COM2

THE STRUCTURE OF A HADOOP MAPREDUCEAPPLICATIONThe prototypical Hadoop MapReduce application has at least one Mapper class, one Reducer class, and onedriver class. The Mapper class reads the input data and creates intermediate data. The Reducer class accumulatesintermediate information and emits a combined result. The Driver class extends the Hadoop class Configured andimplements the Hadoop Tool interface, and is used to configure and execute the job.Figure 1: Components of a simple Hadoop applicationThe programmer must write (override) the methods to handle the format of the data, the instructions for how the datashould be processed or combined, and the desired form of the output. All of this depends on the nature of the dataanalysis problem that the programmer needs to solve. For our first example we begin with a very simple problem:summarizing a numeric value by key.Example 1: JMSL summaryMapReduce operates on data organized into key, Item pairs. It assumes that every data “item” belongs to one and onlyone “key” and this is how the Mapper expects the raw data. In the first example, the key is text and the item is a scalarvalue, such as ABC, 10 . A sample of the contents of a comma delimited input file may OM3

The JMSL Summary class produces summary statistical information such as counts, means, variances, and otherdescriptive statistics. To summarize data by key on a Hadoop cluster, we write three classes, SummaryDriver,SummaryMapper, and SummaryReducer.Figure 2: Summary componentsSummaryMapper extends the Hadoop class Mapper and overrides the map() method. The map() method reads theinput data and writes intermediate output that is sent to the SummaryReducer. This intermediate output is managedby Hadoop and is written to the local file system.public static class SummaryMapper extendsMapper Object, Text, Text, DoubleWritable {Text word new Text(“key”);DoubleWritable val new DoubleWritable();@Overridepublic void map(Object key, Text value, Context context)throws InterruptedException, IOException {}}String columns[] .write(word, val);ROGUEWAVE.COM4

SummaryReducer extends the Hadoop class Reducer and overrides the method, reduce(). The reduce() methodcombines all the intermediate values for each unique key and merges the output. It is in the reduce() method that weuse the JMSL Summary class to define how the items should be combined.public static class SummaryReducer extendsReducer Text, DoubleWritable, Text, DoubleWritable {Text out new Text();DoubleWritable result new DoubleWritable();@Overridepublic void reduce(Text key, Iterable DoubleWritable values,Context context) throws IOException, InterruptedException {// JMSLSummary summary new Summary();// For manual calculation.double sum 0;double count 0;for (DoubleWritable val : values) {// JMSLsummary.update(val.get());// Manual calculation, just for validation.sum val.get();count 1;}sum sum / ystem.out.println(key “,” sum);}}context.write(out, result);On a large cluster, data for a specific key may be reside on several different nodes. The map tasks take care of pullingall the data for a specific key together, before calling the reduce() method. Within the reduce() method summary.update() accepts one value at a time, so we don’t need to create additional data arrays.Of course, calculating the mean is simple; we could do it manually and Hadoop would keep track of the results by keyfor us. The advantage comes when we can leverage JMSL methods for more advanced analyses on distributed networks,knowing that Hadoop manages the communication overhead for us.Next, the run() method in SummaryDriver handles the set up and configuration of the MapReduce job. Firstinstantiate a new job with this line:Job job new Job(super.getConf(), “calculate mean”);ROGUEWAVE.COM5

The following lines set the input and output values for the Mapper and Reducer and the input and output [email protected] int run(String[] args) throws IOException, InterruptedException,ClassNotFoundException {// Set up the configuration for the MapReduce job.Job job new Job(super.getConf(), “calculate ummaryReducer.class);// Set the input and output data types.// Note that Mapper output key and value must match// reducer input key and DoubleWritable.class);// Set the input path and output path.FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new true) ? 0 : 1);return 0;The statement to run the job is:System.exit(job.waitForCompletion(true) ? 0 : 1);The main() method in SummaryDriver calls the run() method.public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException,Exception {Configuration conf new Configuration();String[] otherArgs new ;if (otherArgs.length ! 2) {System.err.println(“Usage: SummaryDriver in out ”);System.exit(2);}}int res ToolRunner.run(conf, new SummaryDriver(), args);ROGUEWAVE.COM6

Once on the cluster, to run the code from the command line the format is: hadoop jar nameOfJarFile .jar class directory/nameOfDriverClass / Hadoop options args to driver class We use the Hadoop option libjars to make JMSL available to the nodes in the cluster. For example, the command forour first example will look something like: hadoop jar HadoopJMSLExamples.jar class directory /SummaryDriver /–libjars /lib/jmsl.jar /user/hdfs/SummaryInput /user/hdfs/SummaryOutputSee the appendix for a few other considerations when running the job on the cluster.Example 2: JMSL linear regressionMultiple linear regression is one of the most widely used predictive models. The model assumes that the variation in onevariable, the dependent or response variable, can be explained by the variation in a set of independent variables, theexplanatory or predictor variables, and that the relationship is linear in the coefficients4.For example, in order to plan for new store locations a products company may set up a regression model with salesas a dependent variable and predictors such as population size, per capita income, and other demographic variables.Large companies, with thousands of products and customers and potentially millions of transactions a day, may need adistributed solution, such as Hadoop MapReduce, to first accumulate the data to a relevant level of detail and then to fitthe regression model.In this example, we illustrate using the JMSL linear regression in a Hadoop application, similar to what might be used fora business planning for new stores.There are usually many steps to organize the raw data before observations are ready for the regression model.Assuming these steps have been done (Hadoop MapReduce may be leveraged for this purpose as well), theobservations when ready for the regression model are of the form:Y1, x11, x12, x13, , x1mY2, x21, x22, x23, , x2mY3, x31, x32, x33, , x3m Yn, xn1, xn2, xn3, , xnmThat is, the value of the dependent variable Y is paired with the observed values of the predictors. The linear regressionalgorithm must have knowledge of the complete (m 1)-Tuple (Yi, xi1, xi2, xi3, , xim) for each index, i.While example 1 worked directly on the key, Item pairs, which is the form of data for the prototypical MapReducejob, the Mapper class in this example needs some additional sophistication so it can read and process key, (Item1, , ItemM) type data, where different values of keys separate different regression data sets. One option is to treat theentire row as a single text object and then parse and re-cast the elements into doubleValues. For clarity, we preferto group the response and predictor variables as one set so the observation is communicated by Hadoop as onepiece of data.The technical assumptions behind the model should be considered and validated for a specific problem using diagnostic tests. See for example,Neter, et. al., for more details.4ROGUEWAVE.COM7

To begin with, we create the new type, DoubleArrayWritable, by extending two Hadoop data types,DoubleWritable and ArrayWritable.public class DoubleArrayWritable extends ArrayWritable {public DoubleArrayWritable() {super(DoubleWritable.class);}public DoubleArrayWritable(DoubleWritable[] values) {super(DoubleWritable.class, values);}public void set(DoubleWritable[] values) {super.set(values);}public DoubleWritable get(int idx) {return (DoubleWritable) get()[idx];}}public double[] getArray(int from, int to) {int sz to - from 1;double[] vector new double[sz];for (int i from; i to; i ) {vector[i - from] get(i).get();}return vector;}In addition to our new class, once again we need a driver, mapper, and reducer:Figure 3: Linear regression componentsROGUEWAVE.COM8

The Mapper class handles the new input and creates the DoubleArrayWritable object. The map() method emitsthe key and the array for the reducer in the line, context.write(word, val).public static class LinearRegressionMapper extendsMapper Object, Text, Text, DoubleArrayWritable {@Overridepublic void map(Object key, Text value, Context context)throws InterruptedException, IOException {String origColumns[] value.toString().split(“,”);Text word new table[] x new DoubleWritable[origColumns.length - 1];}}for (int i 0; i x.length; i ) {x[i] new olumns[i 1]));}DoubleArrayWritable val new DoubleArrayWritable(x);// Emit (word, val) for the reducer where word// is the key and val is a DoubleArrayWritable.context.write(word, val);In LinearRegressionReducer, we extract the observations from the DoubleArrayWritable object, assign theresponse value and then the predictor values. (A check for the number of predictors allows for potentially differentsized problems by different keys.) After extracting information from the observation an update is applied to theLinearRegression object, here named lr.public static class LinearRegressionReducer extendsReducer Text, DoubleArrayWritable, Text, Text {@Overridepublic void reduce(Text key, Iterable DoubleArrayWritable values,Context context) throws IOException, InterruptedException {int numPred 0;double y 0.0;double[] x null;DoubleWritable[] dwa null;LinearRegression lr null;for (DoubleArrayWritable val : values) {dwa (DoubleWritable[]) val.toArray();if (numPred 0) {numPred dwa.length - 1;//JMSLlr new LinearRegression(numPred, false);ROGUEWAVE.COM9

}}y dwa[0].get();x new double[numPred];for (int i 0; i numPred; i ) {x[i] dwa[i 1].get();}//JMSLlr.update(x, y);// Now we emit the regression model objectdouble[] coefs lr.getCoefficients();StringBuilder val new StringBuilder();String out “Problem “ key.toString() “ Coefficients “;val.append(“\n”);}}for (int i 0; i coefs.length; i ) {if (i coefs.length - 1) {val.append(String.format(“%5.4f,”, coefs[i]));} else {val.append(String.format(“%5.4f”, ontext.write(new Text(out), new Text(val.toString()));There are many options for the format and content of the output. Here we extract the estimated coefficients andappend them into a Text object. Note that the format of the output from the reducer must match the last twoarguments in:Reducer Text, DoubleArrayWritable, Text, Text The run() method in LinearRegressionDriver contains the analogous statements as in SummaryDriver.run,setting up the job configuration, and LinearRegressionDriver.main calls the run() method, as [email protected] int run(String[] args) throws Exception {// Set up a new job.Job job new Job(super.getConf(), “Linear Regression”);// Sets the Jar file to this specific class.job.setJarByClass(getClass());// Sets the Mapper and Reducer to LinearRegressionMapper and// egressionReducer.class);// Set the Map output classes and the Reducer Output ob.setOutputKeyClass(Text.class);ROGUEWAVE.COM10

job.setOutputValueClass(Text.class);//Set the input and output paths.FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new true) ? 0 : 1);return 0;public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException, Exception {Configuration conf new Configuration();String[] otherArgs new ;if (otherArgs.length ! 2) {System.err.println(“Usage: LinearRegressionDriver in out ”);System.exit(2);}}int res ToolRunner.run(conf, new LinearRegressionDriver(), args);Similarly, the command to run the job on the cluster is: hadoop jar HadoopJMSLExamples.jar / class directory /LinearRegressionDriver –libjars/lib/jmsl.jar / /user/hdfs/LinearRegressionInput /user/hdfs/LinearRegressionOutputThis Hadoop MapReduce application performs linear regression on distributed data, such as might arise when a largecompany wants to predict sales based on demographic data to help them plan for new store locations. As is, this codefits linear regression models by key assuming the input data is in the observation format we described earlier. Theoutput, by key, is the estimated coefficients of each model. For example, we simulated sales and demographic data forthree regions, R1, R2, and R3, and ran three regression problems using sales as the dependent variable and populationand per capita disposable income as the independent variables. There is one output file with the estimated coefficientsfor each problem: hadoop fs -cat LinearRegressionOutput/part-r-00000Problem R1 Coefficients4.5092, 0.0592, 0.0089Problem R2 Coefficients2.0779, 0.0640, 0.0095Problem R3 Coefficients14.7010, 0.1002, 0.0057ROGUEWAVE.COM11

Thus for the region 1, the fitted regression line is:Y 4.5092 0.0592*X1 0.0089*X2For a new marketing area within this region, having a population X1 302 (1000’s) and mean disposable income of X2 4500 ( per capita), the predicted sales ( 1000’s) are:Y 4.5092 0.0592*302 0.0089*4500 62.4376.Similarly, we can obtain predicted sales for the other two regions. Other big data applications for linear regressioninclude public survey data, social network analysis, and genome association analysis, to name just a few.Example 3: Apriori for association rule discoveryAssociation rule discovery refers to the problem of detecting associations among discrete items. In market basketanalysis, for example, the discrete items are the different products purchased together in individual transactions.Companies that sell many and varied products might use market basket analysis to help them make better promotionaldecisions. For example, knowing that two products are highly associated, a company would run a promotion on one orthe other, but not both. There are applications for association rule discovery in the areas of text miningand bioinformatics as well.The Apriori algorithm (Agrawal and Srikant, 1994) is one of the most popular algorithms for association rule discoveryin transactional datasets. Apriori first mines the transactions for the frequent item sets. An item set (set of items) isfrequent if it appears in more than a minimum number of transactions. The number of transactions containing an itemset is known as its support, and the minimum support (as a percentage of transactions) is a control parameter in thealgorithm. From the collection of frequent item sets, the algorithm then filters for significant associations amongst theitems.If an item set is frequent in one transaction data set, it does not necessarily mean that it is frequent in the entirecollection of data. This is why Apriori needs two passes through the data when it is distributed. The main reference forthis procedure is Savasere, Omiecinski, and Navathe (1995). The procedure is compared with alternative approaches inRajaraman and Ullman (2011).For distributed data, the procedure is:1. Find the frequent item sets in each transaction partition. This first pass through the data sets and uses aMapReduce pair.2. Find the union of all the frequent item sets. This step reads the output of the separate tasks in step 1 butdoes not need to re-read the transactions. This step is accomplished with a second MapReduce pair.3. For every item set in the union, count the number of times it occurs in each transaction set. This requires asecond pass through the transaction data sets. The outcome of step 3 is the collection of all the item sets whichmay be frequent for all of the data, i.e.,

Hadoop has two main components: the Hadoop Distributed File System (HDFS) and the MapReduce programming and job management framework. The JMSL Numerical Library is a pure Java numerical library, providing a broad range of advanced mathematics, statistics, and charting for the Java environment. It extends core Java numerics and