Wednesday, 19 August 2015

Hadoop MapReduce Programs using Test Driven Development

This one is for the hadoop fans who want to write and test Map Reduce programs on their windows desktops but can't set up a hadoop cluster. This blog also introduces new Hadoop fans to implement TDD for Map Reduce Programs.

Let us list down the pre requisites first. You will need following softwares installed on your desktop.

1) JDK 1.8.
2) Eclipse 4.x.
3) Apache Maven 3.x

Make sure that the JAVA_HOME is set to JDK installation directory. You will also need an internet connection so that maven can download the necessary dependencies while building a Map Reduce application.

Let us get the ball rolling and go through the following steps to write and test a Map Reduce Program.

Setting up the workspace and the project 

1) Create a folder that will act as an eclipse workspace. Open eclipse and select the newly created folder as the workspace. 
2) Select the option  from the File Menu. File->New->Project  
3) In the New Project window select Maven->Maven Project.
4) Check the option 'Use Default WorkSpace' and click next.
5) Select the archetype maven-archetype-quickstart and click next.
6) Enter the Group Id and Artefact. For eg you can enter group id as com.mr.practice and the artefact as HadoopLocal.
7) A project will be created in the workspace with the same name that was entered as artefact id. 
8) Make sure that the pom of your project include properties, dependencies and plugins as mentioned in the snippet below.
<project>
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.hadoopbook</groupId>
  <artifactId>hadoop-book-mr-dev</artifactId>
  <version>4.0</version>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.5.1</hadoop.version>
  </properties>
  <dependencies>
    <!-- Hadoop main client artifact -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <!-- Unit test artifacts -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.mrunit</groupId>
      <artifactId>mrunit</artifactId>
      <version>1.1.0</version>
      <classifier>hadoop2</classifier>
      <scope>test</scope>
    </dependency>
    <!-- Hadoop test artifact for running mini clusters -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-minicluster</artifactId>
      <version>${hadoop.version}</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <finalName>hadoop-examples</finalName>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>2.5</version>
        <configuration>
          <outputDirectory>${basedir}</outputDirectory>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

9) Save pom and build your project. 
10) The workspace and the project is now ready to write and Test Map Reduce 
programs.

11) Create a java package as com.hadoop.test.HadoopLocal under the src/test/java folder.

12) Inside the Package Create the following class MaxTemperatureMapperTest.


package com.hadoop.test.HadoopLocal;

import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Assert;
import org.junit.Test;

public class MaxTemperatureMapperTest {
  @Test
  public void processesValidRecord() throws IOException, InterruptedException {
     Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                                   // Year ^^^^
         "99999V0203201N00261220001CN9999999N9-00111+99999999999");
                               // Temperature ^^^^^
     new MapDriver<LongWritable, Text, Text, IntWritable>()
       .withMapper(new MaxTemperatureMapper())
       .withInput(new LongWritable(0), value)
       .withOutput(new Text("1950"), new IntWritable(-11))
       .runTest();
   }
  
  @Test
   public void ignoresMissingTemperatureRecord() throws IOException,
       InterruptedException {
     Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                                   // Year ^^^^
         "99999V0203201N00261220001CN9999999N9+99991+99999999999");
                               // Temperature ^^^^^
     new MapDriver<LongWritable, Text, Text, IntWritable>()
       .withMapper(new MaxTemperatureMapper())
       .withInput(new LongWritable(0), value)
       .runTest();
   }
  
   @Test
   public void parsesMalformedTemperature() throws IOException,
       InterruptedException {
     Text value = new Text("0335999999433181957042302005+37950+139117SAO  +0004" +
                                   // Year ^^^^
         "RJSN V02011359003150070356999999433201957010100005+353");
                               // Temperature ^^^^^
     Counters counters = new Counters();
     new MapDriver<LongWritable, Text, Text, IntWritable>()
       .withMapper(new MaxTemperatureMapper())
       .withInput(new LongWritable(0), value)
       .withOutput(new Text("1957"),new IntWritable(1957))
       .withCounters(counters)
       .runTest();
     Counter c = counters.findCounter(MaxTemperatureMapper.Temperature.MALFORMED);
     Assert.assertEquals(c.getValue(), 1l);
   }
}

13) The class MaxTemperatureMapperTest has tests to Test our Mapper for our first hadoop job. The jobs function is to fetch temperatures recorded in a year from each row from huge no of files and find out maximum temeperature recorded per year. The output of the mapper would be a map which has the year as the key and the temperature as the value. Each row is parsed to obtain an year and the temperature. A sample row has been used as the test data in  both the tests.
14) As expected the test will fail. Now create a java package com.hadoop.test.HadoopLocal
package com.hadoop.test.HadoopLocal;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
 private NCDTempParser parser = new NCDTempParser();
 
 enum Temperature {
     MALFORMED
   }
   @Override
   public void map(LongWritable key, Text value, Context context)
       throws IOException, InterruptedException {
     
    parser.parse(value);
      if (parser.isValidTemperature()) {
       int airTemperature = parser.getAirTemperature();
          if (airTemperature > 1000) {
              System.err.println("Temperature over 100 degrees for input: " + value);
              context.setStatus("Detected possibly corrupt record: see logs.");
              context.getCounter(Temperature.MALFORMED).increment(1);
          }
          context.write(new Text(parser.getYear()),
            new IntWritable(parser.getAirTemperature()));
      }
   }
}
15) Also create a helper class (in the same package). This class  parses rows in the file to fetch temerature and year.
package com.hadoop.test.HadoopLocal; import org.apache.hadoop.io.Text; public class NCDTempParser { private static final int MISSING_TEMPERATURE = 9999; private String year; private int airTemperature; private String quality; public void parse(String record) { year = record.substring(15, 19); String airTemperatureString; // Remove leading plus sign as parseInt doesn't like them (pre-Java 7) if (record.charAt(87) == '+') { airTemperatureString = record.substring(88, 92); } else { airTemperatureString = record.substring(87, 92); } airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93); } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public String getYear() { return year; } public int getAirTemperature() { return airTemperature; } } 

 
16) Run the Test MaxTemperatureMapperTest. It should now be green.
17) Let us now write the Test for the reducer. The reducer will work on the mapper's out put to find out maximum temperature recorded in each year.
18) Create the following class in the package com.hadoop.test.HadoopLocal in the package under the folder src/test/java.
19) public class MaxTemperatureReducerTest {
 @Test
   public void returnsMaximumIntegerInValues() throws IOException,
       InterruptedException {
     new ReduceDriver<Text, IntWritable, Text, IntWritable>()
       .withReducer(new MaxTemperatureReducer())
       .withInput(new Text("1950"),
           Arrays.asList(new IntWritable(10), new IntWritable(5)))
       .withOutput(new Text("1950"), new IntWritable(10))
       .runTest();
   }
}
20)  As expected the test will fail. Let us now write the reducer. Create following class in the package com.hadoop.test.HadoopLocal under the folder src/main/java.
public class MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
21) Run the test MaxTemperatureReducerTest. All tests should become green.
22) Let us the write the test for the Driver Class. Before writing the test create the directory structure input/local under the Projects root directory which is HadoopLocal.
23) Create a file called sample.txt with the following contents. Please note that each line has taken 2 rows below. Please copy the following content as is to the file. You should have 5 rows in the file after copying the following contents.
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
24) Create the following class in the package com.hadoop.test.HadoopLocal in the package under the folder src/test/java.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
public class MaxTemperatureDriverTest {
  
  public void test() throws Exception {
    Configuration conf = new Configuration();
    //Config to use local file system has Hadoop File System
    conf.set("fs.defaultFS", "file:///");
    conf.set("mapreduce.framework.name", "local");
    conf.setInt("mapreduce.task.io.sort.mb", 1);
    
    Path input = new Path("input/local");
    Path output = new Path("output");
    
    FileSystem fs = FileSystem.getLocal(conf);
    fs.delete(output, true); // delete old output
    
    MaxTemperatureDriver driver = new MaxTemperatureDriver();
    driver.setConf(conf);
    
    int exitCode = driver.run(new String[] {
        input.toString(), output.toString() });
    //assertThat(exitCode, is(0));
    
    //checkOutput(conf, output);
  }
}
25)  As expected the test will fail.
26) Create following class in the package com.hadoop.test.HadoopLocal under the folder src/main/java.
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MaxTemperatureDriver extends Configured implements Tool {
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.printf("Usage: %s [generic options] <input> <output>\n",
          getClass().getSimpleName());
      ToolRunner.printGenericCommandUsage(System.err);
      return -1;
    }
    
    Job job = new Job(getConf(), "Max temperature");
    job.setJarByClass(getClass());
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    job.setMapperClass(MaxTemperatureMapper.class);
    job.setCombinerClass(MaxTemperatureReducer.class);
    job.setReducerClass(MaxTemperatureReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    return job.waitForCompletion(true) ? 0 : 1;
  }
  
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
    System.exit(exitCode);
  }
}
27) Run the test MaxTemperatureDriverTest. All the tests should now turn green.
28) You have now learned to write Hadoop Programs using TDD.
29) The following picture displays the project structure in eclipse.

No comments:

Post a Comment