Wednesday, 19 August 2015

Hadoop MapReduce Programs using Test Driven Development

This one is for the hadoop fans who want to write and test Map Reduce programs on their windows desktops but can't set up a hadoop cluster. This blog also introduces new Hadoop fans to implement TDD for Map Reduce Programs.

Let us list down the pre requisites first. You will need following softwares installed on your desktop.

1) JDK 1.8.
2) Eclipse 4.x.
3) Apache Maven 3.x

Make sure that the JAVA_HOME is set to JDK installation directory. You will also need an internet connection so that maven can download the necessary dependencies while building a Map Reduce application.

Let us get the ball rolling and go through the following steps to write and test a Map Reduce Program.

Setting up the workspace and the project 

1) Create a folder that will act as an eclipse workspace. Open eclipse and select the newly created folder as the workspace. 
2) Select the option  from the File Menu. File->New->Project  
3) In the New Project window select Maven->Maven Project.
4) Check the option 'Use Default WorkSpace' and click next.
5) Select the archetype maven-archetype-quickstart and click next.
6) Enter the Group Id and Artefact. For eg you can enter group id as com.mr.practice and the artefact as HadoopLocal.
7) A project will be created in the workspace with the same name that was entered as artefact id. 
8) Make sure that the pom of your project include properties, dependencies and plugins as mentioned in the snippet below.
<project>
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.hadoopbook</groupId>
  <artifactId>hadoop-book-mr-dev</artifactId>
  <version>4.0</version>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.5.1</hadoop.version>
  </properties>
  <dependencies>
    <!-- Hadoop main client artifact -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <!-- Unit test artifacts -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.mrunit</groupId>
      <artifactId>mrunit</artifactId>
      <version>1.1.0</version>
      <classifier>hadoop2</classifier>
      <scope>test</scope>
    </dependency>
    <!-- Hadoop test artifact for running mini clusters -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-minicluster</artifactId>
      <version>${hadoop.version}</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <finalName>hadoop-examples</finalName>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>2.5</version>
        <configuration>
          <outputDirectory>${basedir}</outputDirectory>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

9) Save pom and build your project. 
10) The workspace and the project is now ready to write and Test Map Reduce 
programs.

11) Create a java package as com.hadoop.test.HadoopLocal under the src/test/java folder.

12) Inside the Package Create the following class MaxTemperatureMapperTest.


package com.hadoop.test.HadoopLocal;

import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Assert;
import org.junit.Test;

public class MaxTemperatureMapperTest {
  @Test
  public void processesValidRecord() throws IOException, InterruptedException {
     Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                                   // Year ^^^^
         "99999V0203201N00261220001CN9999999N9-00111+99999999999");
                               // Temperature ^^^^^
     new MapDriver<LongWritable, Text, Text, IntWritable>()
       .withMapper(new MaxTemperatureMapper())
       .withInput(new LongWritable(0), value)
       .withOutput(new Text("1950"), new IntWritable(-11))
       .runTest();
   }
  
  @Test
   public void ignoresMissingTemperatureRecord() throws IOException,
       InterruptedException {
     Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                                   // Year ^^^^
         "99999V0203201N00261220001CN9999999N9+99991+99999999999");
                               // Temperature ^^^^^
     new MapDriver<LongWritable, Text, Text, IntWritable>()
       .withMapper(new MaxTemperatureMapper())
       .withInput(new LongWritable(0), value)
       .runTest();
   }
  
   @Test
   public void parsesMalformedTemperature() throws IOException,
       InterruptedException {
     Text value = new Text("0335999999433181957042302005+37950+139117SAO  +0004" +
                                   // Year ^^^^
         "RJSN V02011359003150070356999999433201957010100005+353");
                               // Temperature ^^^^^
     Counters counters = new Counters();
     new MapDriver<LongWritable, Text, Text, IntWritable>()
       .withMapper(new MaxTemperatureMapper())
       .withInput(new LongWritable(0), value)
       .withOutput(new Text("1957"),new IntWritable(1957))
       .withCounters(counters)
       .runTest();
     Counter c = counters.findCounter(MaxTemperatureMapper.Temperature.MALFORMED);
     Assert.assertEquals(c.getValue(), 1l);
   }
}

13) The class MaxTemperatureMapperTest has tests to Test our Mapper for our first hadoop job. The jobs function is to fetch temperatures recorded in a year from each row from huge no of files and find out maximum temeperature recorded per year. The output of the mapper would be a map which has the year as the key and the temperature as the value. Each row is parsed to obtain an year and the temperature. A sample row has been used as the test data in  both the tests.
14) As expected the test will fail. Now create a java package com.hadoop.test.HadoopLocal
package com.hadoop.test.HadoopLocal;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
 private NCDTempParser parser = new NCDTempParser();
 
 enum Temperature {
     MALFORMED
   }
   @Override
   public void map(LongWritable key, Text value, Context context)
       throws IOException, InterruptedException {
     
    parser.parse(value);
      if (parser.isValidTemperature()) {
       int airTemperature = parser.getAirTemperature();
          if (airTemperature > 1000) {
              System.err.println("Temperature over 100 degrees for input: " + value);
              context.setStatus("Detected possibly corrupt record: see logs.");
              context.getCounter(Temperature.MALFORMED).increment(1);
          }
          context.write(new Text(parser.getYear()),
            new IntWritable(parser.getAirTemperature()));
      }
   }
}
15) Also create a helper class (in the same package). This class  parses rows in the file to fetch temerature and year.
package com.hadoop.test.HadoopLocal; import org.apache.hadoop.io.Text; public class NCDTempParser { private static final int MISSING_TEMPERATURE = 9999; private String year; private int airTemperature; private String quality; public void parse(String record) { year = record.substring(15, 19); String airTemperatureString; // Remove leading plus sign as parseInt doesn't like them (pre-Java 7) if (record.charAt(87) == '+') { airTemperatureString = record.substring(88, 92); } else { airTemperatureString = record.substring(87, 92); } airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93); } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public String getYear() { return year; } public int getAirTemperature() { return airTemperature; } } 

 
16) Run the Test MaxTemperatureMapperTest. It should now be green.
17) Let us now write the Test for the reducer. The reducer will work on the mapper's out put to find out maximum temperature recorded in each year.
18) Create the following class in the package com.hadoop.test.HadoopLocal in the package under the folder src/test/java.
19) public class MaxTemperatureReducerTest {
 @Test
   public void returnsMaximumIntegerInValues() throws IOException,
       InterruptedException {
     new ReduceDriver<Text, IntWritable, Text, IntWritable>()
       .withReducer(new MaxTemperatureReducer())
       .withInput(new Text("1950"),
           Arrays.asList(new IntWritable(10), new IntWritable(5)))
       .withOutput(new Text("1950"), new IntWritable(10))
       .runTest();
   }
}
20)  As expected the test will fail. Let us now write the reducer. Create following class in the package com.hadoop.test.HadoopLocal under the folder src/main/java.
public class MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
21) Run the test MaxTemperatureReducerTest. All tests should become green.
22) Let us the write the test for the Driver Class. Before writing the test create the directory structure input/local under the Projects root directory which is HadoopLocal.
23) Create a file called sample.txt with the following contents. Please note that each line has taken 2 rows below. Please copy the following content as is to the file. You should have 5 rows in the file after copying the following contents.
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
24) Create the following class in the package com.hadoop.test.HadoopLocal in the package under the folder src/test/java.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
public class MaxTemperatureDriverTest {
  
  public void test() throws Exception {
    Configuration conf = new Configuration();
    //Config to use local file system has Hadoop File System
    conf.set("fs.defaultFS", "file:///");
    conf.set("mapreduce.framework.name", "local");
    conf.setInt("mapreduce.task.io.sort.mb", 1);
    
    Path input = new Path("input/local");
    Path output = new Path("output");
    
    FileSystem fs = FileSystem.getLocal(conf);
    fs.delete(output, true); // delete old output
    
    MaxTemperatureDriver driver = new MaxTemperatureDriver();
    driver.setConf(conf);
    
    int exitCode = driver.run(new String[] {
        input.toString(), output.toString() });
    //assertThat(exitCode, is(0));
    
    //checkOutput(conf, output);
  }
}
25)  As expected the test will fail.
26) Create following class in the package com.hadoop.test.HadoopLocal under the folder src/main/java.
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MaxTemperatureDriver extends Configured implements Tool {
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.printf("Usage: %s [generic options] <input> <output>\n",
          getClass().getSimpleName());
      ToolRunner.printGenericCommandUsage(System.err);
      return -1;
    }
    
    Job job = new Job(getConf(), "Max temperature");
    job.setJarByClass(getClass());
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    job.setMapperClass(MaxTemperatureMapper.class);
    job.setCombinerClass(MaxTemperatureReducer.class);
    job.setReducerClass(MaxTemperatureReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    return job.waitForCompletion(true) ? 0 : 1;
  }
  
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
    System.exit(exitCode);
  }
}
27) Run the test MaxTemperatureDriverTest. All the tests should now turn green.
28) You have now learned to write Hadoop Programs using TDD.
29) The following picture displays the project structure in eclipse.

Thursday, 30 July 2015

Spark yourself

Time has come to move or to push myself into SPARK ..i have gone though many Youtube videos and books for spark.

Friday, 19 June 2015

Tips to clear Cloudera Apache Hadoop Certification CCD 410

Last weekend i cleared Cloudera Apache Hadoop CCD410 certification.The level of certification is neither hard nor very easy.One who is good in understanding Mapreduce and HDFS and have good practice will clear this exam.

I started taking interest in BigData and Hadoop in end of March.Here is my advice on how to prepare for CCD-410:

  • Hadoop: The Definitive Guide is the bible and one of the best book which gives you good understanding.Try making your own notes when u start reading this book.It would be good to have cloudera quickstart or hortonworks sandbox installed on your laptop . http://datatwist.blogspot.in/2015/05/getting-started.html will help u installing the same.
  • get yourself clear on every topic in detail.serde,serialization,sequencefile, mapfile,compression,setjarbyclassname().hidden files will not be processed,split,recordreader.etc.
  • Read Part 2 of this book in detail,you will get maximum question from this section.
  • You will get 20-25 practical questions question which will ask you about the output of the mapreduce flow .you will be asked questions on on conditions like if you comment fileoutput path what will happen and so on..
  • Try commenting driver class  and see what you get as output .eg comment job.setjarclassname().
  • Start practising on Hive and Sqoop.(SQOOP inport/export are very common but lil confusing some times.) 
  • A strong Java foundation is required before taking the exam. Since Hadoop and many of its most useful features are written in Java, extending abstract classes and implementing its base class interfaces is critical. Hadoop streaming (along with Avro) lend many Hadoop features to other languages, but unlocking the full potential of Hadoop requires Java (for now). In addition, many of Java’s Collections (ArrayList, TreeSets, HashMap classes), string manipulation(String, Pattern, and Matcher classes), and architectural/design capabilities (base serialization, primitive types, etc.) are considered fair game for the exam — and you will find many of these concepts on the exam in typical use cases.
  •  Read the differnce between Mapreduce 1 and YARN.
  • Dont overlook Flume,Sqoop,Hive and crunch.If you are bored of reading you can watch youtube videos.
  • There is a Youtube channel IT Versity by Durga Gadiraju,which explains every aspect of this certification in depth,specially Sqoop,Hive and Performance tuning .
  • Rest All the best dnt panic during the exam if youarnt able to get the answer of practical questions.look for the wrong answer and you will get right one.

Some useful links


Sunday, 31 May 2015

Hive


Designed by some guy called Jeff from Facebook..this mapreduce tool/application is very much useful for guys who are good in SQL based programming.though Hive isn't good for applications where Machine Learning Algos are needed for complex logic.

One can Install it very easily and can run it in 3 different modes.
  1. Hive Shell (hive> SHOW TABLES;)
  2. Calling scripts containing hive commands (hive -f script.q)
  3. Short scripts, you can use the -e option to specify the commands in line, in which case the final semicolon is not required.(hive -S -e 'SELECT * FROM dummy')
Hive converts the above queries to Java mapreduce code and execute it in same way as we do it from hadoop jar command.

Hive creates logical tables from the data files in HDFS.There are two components of Hive .


  1. Metastore 

  2. Warehouse(/user/cloudera/any.db/tablename/0000_m )

Metastore is the Database in the default hive mysql schema.It store matadata information of the tables created by Hive.
Warehouse is nothing but the physical location of the data files stored for Hive Tables.(marked as red).

One can set properties in Hive like we set in  Mapreduce..
There is a precedence hierarchy to setting properties. In the following list, lower numbers take precedence over higher numbers:
  1. The Hive SET command(set a particular property)
  2. The command-line -hiveconf option(for a particular session hive --config /Users/tom/dev/hive-conf)
  3. hive-site.xml and the Hadoop site files (core-site.xml, hdfs-site.xml, mapred-site.xml, and yarn-site.xml)
  4. The Hive defaults and the Hadoop default files (core-default.xml, hdfs-default.xml, mapred-default.xml, and yarn-default.xml)

Comparison with Traditional Databases

Schema on Read Versus Schema on Write

In traditional database  we have to mention schema (check data ,its integrity ,etc)while loading data in a table which is called as schema on write.where as in hive we need not to specify any schema or data comparison is performed while loading data in hive tables.it is just copy or move operation of data files .
Having a schema makes data performance fast but if schema is not known at the time of creation of data tables..hive is a good choice at that time.

  Tables

1)Managed

CREATE TABLE NYSE_2014(symbol string,date string,high float,low float,close float,open float,quantity string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','

LOAD DATA INPATH '/user/cloudera/nyse_2014.csv' OVERWRITE INTO TABLE NYSE_2014;

/user/cloudera/nyse_2014.csv
 file will be moved to warehouse directory and a schema definition will be created in Metastore DB.when we drop this table (drop table managed_table) ,both file and schema definition of the table will be deleted.(files will be copied to warehouse directory ,if we are using load data local inpath i.e loacl file)

2)External

 CREATE EXTERNAL TABLE managed_table (dummy STRING) LOCATION 'user/rakesh/external_table';
LOAD DATA INPATH '/user/Rakesh/data.txt' INTO table managed_table;

Files will not be copied/moved to warehouse directory.it doesn’t even check whether the external location exists at the time it is defined..and when we drop this table only the schema definition will be deleted not the file.

Partitioning and Bucketing

Both partitioning and bucketing help us in performance while looking though the data present in hive metastore files.increase efficiency of the queries on the table(as these will look for the specific file not the whole data set)..

 Partitioning:

create table nyse_2014_partition (symbol string,date string,high float,low float,close float,open float,quantity string) partitioned by (dt string,country string);

insert overwrite table nyse_2014_partition partition(dt='2014',country='US') select * from default.nyse_2014;


 
At the filesystem level, partitions are simply nested subdirectories of the table directory. After loading a few more files into the logs table, the directory structure might look like this:

/user/hive/warehouse/nyse_2014_partition/
dt=2014/
country=US/

00000

 now as partition columns are pseudo columns and one can query on the same and get results.

select * from nyse_2014_partition where country = GB this query will look for the filesin the partition having country ='GB' not the whole data set.

Bucketing:

CREATE TABLE bucketed_users (id INT, name STRING)
CLUSTERED BY (id) INTO 4 BUCKETS;

insert overwrite table bucket_user select * from user:

will create 4 directories in warehouse/bucketed_users directory.

SELECT * FROM bucketed_users
     TABLESAMPLE(BUCKET 1 OUT OF 4 ON id);
will return 25% of records of all the buckets.

If you want to create a new, empty table with the same schema as another table, then use the LIKE keyword:

create table nyse_2014_like like nyse_2014;

CTAS

create table symbols_nyse as select distinct(symbol) from nyse_2014;

Using a custom SerDe: RegexSerDe

Let’s see how to use a custom SerDe for loading data. We’ll use a contrib SerDe that uses a regular expression for reading the fixed-width station metadata from a text file:
CREATE TABLE stations (usaf STRING, wban STRING, name STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "(\\d{6}) (\\d{5}) (.{29}) .*"
);
In previous examples, we have used the DELIMITED keyword to refer to delimited text in the ROW FORMAT clause. In this example, we instead specify a SerDe with the SERDE keyword and the fully qualified classname of the Java class that implements the SerDe,org.apache.hadoop.hive.contrib.serde2.RegexSerDe
to be continued,,,,

Thursday, 28 May 2015

Sorting in Mapreduce

We know that the output of map produces key's in sorted format(not values).
The simplest way to sort he files is to run then through a map only job and output is stored in sequence file having key as Int,Long Writable.(Preparation)
Then passing the same output through default mapreduce job having number of reducers as n.This will sort the output in given sort order.(Partial Sort)
But the above will produce n files which are in sorted order.one can concat them to produce a global sorted file.

Total Sort:


This approach take input sequence files from preparation phase.and the pass it through default mapreduce but with the help of input sampler..which is nothing but divide our partitions in even range..  functionally it is still the same.produce 2 sorted reduce files and combine then to produce a singe sorted file.
Input sampler(returns a sample of keys given an InputFormat and Job) to see if an even partitioning of files have occurred .Client calls writePartitionFile()method of input sampler which produce a sequence file.The same sequence file is used by TotalOrderPartitioner to create partitions for the sort job.
below is the code:
               
 job.setPartitionerClass(TotalOrderPartitioner.class);

   InputSampler.Sampler<IntWritable, LongWritable> sampler =
       new InputSampler.RandomSampler<IntWritable, LongWritable>(0.1, 10000, 10);
 
   InputSampler.writePartitionFile(job, sampler);

   // Add to DistributedCache
  // Configuration conf = job.getConfiguration();
   String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
   URI partitionUri = new URI(partitionFile);
   job.addCacheFile(partitionUri);

Secondary Sort:

One can easily understand by below example.  
                                   

  • Make the key a composite of the natural key and the natural value.
  • The sort comparator should order by the composite key (i.e., the natural key and natural value).
  • The partitioner and grouping comparator for the composite key should consider only the natural key for partitioning and grouping.
SortComparator:Used to define how map output keys are sorted
Excerpts from the book Hadoop - Definitive Guide:
Sort order for keys is found as follows: 1.If the property mapred.output.key.comparator.class is set, either explicitly or by calling setSortComparatorClass() on Job, then an instance of that class is used. (In the old API the equivalent method is setOutputKeyComparatorClass() on JobConf.)
2.Otherwise, keys must be a subclass of WritableComparable, and the registered comparator for the key class is used.
3.If there is no registered comparator, then a RawComparator is used that deserializes the byte streams being compared into objects and delegates to the WritableCompar able’s compareTo() method.
SortComparator Vs GroupComparator in a one liner: SortComparator decides how map output keys are sorted while GroupComparator decides which map output keys within the Reducer go to the same reduce method call.

Thursday, 14 May 2015

HDFS...Hadoop Distributed Filesystem



Hadoop Distributed Filesystem  is like any other other file system with few exceptions. 

HDFS is described by below three points.

·   Works best when u have large files.

·   Streaming access(write once read many) .

·   Works on commodity hardware.

It is different from Portable Operating System Interface(POSIX) as if files saved is less than the block size of disk(4kb is default) it will waste/use who of the block but in HDFS if one is having file of 2kb then the rest of 2kb is free for other systems to use.

HDFS is having a block size which is greater that POSIX by default it is 128MB.means each file is divided into n blocks (file>128 MB).Each block is stored on different data nodes and name node holds metadata only(which data node holds which block).

We have a concept of replication (default is 3),means each block is stored on 3 different locations.to handle fault tolerance.We have a secondary name node to handle name node failures.


HDFS’s fsck command understands blocks. For example, running:
% hdfs fsck / -files -blocks
 which gives the block info of att the files in hdfs.
Java Api for HDFS:
http://hadoop.apache.org/core/docs/current/api/org/apache/hadoop/fs/package-summary.html

I have developed below code which reads,writes,deletes and stores in HDFS.

package com.hadoop.tutorial;
import java.io.InputStream;
import java.net.URI;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;

public class FileSystemCat {

  public static void main(String[] args) throws Exception {
    String uri = args[0];
    String dest = args[1];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    FSDataInputStream in = null;
    FSDataOutputStream out = null;
    try {
      in = fs.open(new Path(uri));
      //fs.mkdirs(new Path(dest));
      out=fs.create(new Path(dest));
      
      IOUtils.copyBytes(in, System.out, 4096, false);
      in.seek(0);
      IOUtils.copyBytes(in, out, 4096, false);
      fs.delete(new Path(uri),true);
    } finally {
      IOUtils.closeStream(in);
      
    }
  }

}
$ hadoop jar gettingstarted_mapreduce.jar com.hadoop.tutorial.FileSystemCat hdfs://quickstart.cloudera:8020/user/cloudera/abc.txt hdfs://quickstart.cloudera:8020/user/cloudera/xyz.txt


Coherency Model :


If More than a block of data is written to the hdfs ,it is available for others users to read.
and the current written box wont b available to the reader.
HDFS provides a way to force all buffers to be flushed to the datanodes via the hflush() method on FSDataOutputStream
Once data is written and  hflush is called it is available to users to read.use hsync() to make sure it is written on directory.

Parallel Copying with distcp
one can run mapreduce jobs to paraller process and transfer data between two clusters,nodes and directories by calling distcp

hadoop distcp dir1 dir2