Sunday, 31 May 2015

Hive


Designed by some guy called Jeff from Facebook..this mapreduce tool/application is very much useful for guys who are good in SQL based programming.though Hive isn't good for applications where Machine Learning Algos are needed for complex logic.

One can Install it very easily and can run it in 3 different modes.
  1. Hive Shell (hive> SHOW TABLES;)
  2. Calling scripts containing hive commands (hive -f script.q)
  3. Short scripts, you can use the -e option to specify the commands in line, in which case the final semicolon is not required.(hive -S -e 'SELECT * FROM dummy')
Hive converts the above queries to Java mapreduce code and execute it in same way as we do it from hadoop jar command.

Hive creates logical tables from the data files in HDFS.There are two components of Hive .


  1. Metastore 

  2. Warehouse(/user/cloudera/any.db/tablename/0000_m )

Metastore is the Database in the default hive mysql schema.It store matadata information of the tables created by Hive.
Warehouse is nothing but the physical location of the data files stored for Hive Tables.(marked as red).

One can set properties in Hive like we set in  Mapreduce..
There is a precedence hierarchy to setting properties. In the following list, lower numbers take precedence over higher numbers:
  1. The Hive SET command(set a particular property)
  2. The command-line -hiveconf option(for a particular session hive --config /Users/tom/dev/hive-conf)
  3. hive-site.xml and the Hadoop site files (core-site.xml, hdfs-site.xml, mapred-site.xml, and yarn-site.xml)
  4. The Hive defaults and the Hadoop default files (core-default.xml, hdfs-default.xml, mapred-default.xml, and yarn-default.xml)

Comparison with Traditional Databases

Schema on Read Versus Schema on Write

In traditional database  we have to mention schema (check data ,its integrity ,etc)while loading data in a table which is called as schema on write.where as in hive we need not to specify any schema or data comparison is performed while loading data in hive tables.it is just copy or move operation of data files .
Having a schema makes data performance fast but if schema is not known at the time of creation of data tables..hive is a good choice at that time.

  Tables

1)Managed

CREATE TABLE NYSE_2014(symbol string,date string,high float,low float,close float,open float,quantity string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','

LOAD DATA INPATH '/user/cloudera/nyse_2014.csv' OVERWRITE INTO TABLE NYSE_2014;

/user/cloudera/nyse_2014.csv
 file will be moved to warehouse directory and a schema definition will be created in Metastore DB.when we drop this table (drop table managed_table) ,both file and schema definition of the table will be deleted.(files will be copied to warehouse directory ,if we are using load data local inpath i.e loacl file)

2)External

 CREATE EXTERNAL TABLE managed_table (dummy STRING) LOCATION 'user/rakesh/external_table';
LOAD DATA INPATH '/user/Rakesh/data.txt' INTO table managed_table;

Files will not be copied/moved to warehouse directory.it doesn’t even check whether the external location exists at the time it is defined..and when we drop this table only the schema definition will be deleted not the file.

Partitioning and Bucketing

Both partitioning and bucketing help us in performance while looking though the data present in hive metastore files.increase efficiency of the queries on the table(as these will look for the specific file not the whole data set)..

 Partitioning:

create table nyse_2014_partition (symbol string,date string,high float,low float,close float,open float,quantity string) partitioned by (dt string,country string);

insert overwrite table nyse_2014_partition partition(dt='2014',country='US') select * from default.nyse_2014;


 
At the filesystem level, partitions are simply nested subdirectories of the table directory. After loading a few more files into the logs table, the directory structure might look like this:

/user/hive/warehouse/nyse_2014_partition/
dt=2014/
country=US/

00000

 now as partition columns are pseudo columns and one can query on the same and get results.

select * from nyse_2014_partition where country = GB this query will look for the filesin the partition having country ='GB' not the whole data set.

Bucketing:

CREATE TABLE bucketed_users (id INT, name STRING)
CLUSTERED BY (id) INTO 4 BUCKETS;

insert overwrite table bucket_user select * from user:

will create 4 directories in warehouse/bucketed_users directory.

SELECT * FROM bucketed_users
     TABLESAMPLE(BUCKET 1 OUT OF 4 ON id);
will return 25% of records of all the buckets.

If you want to create a new, empty table with the same schema as another table, then use the LIKE keyword:

create table nyse_2014_like like nyse_2014;

CTAS

create table symbols_nyse as select distinct(symbol) from nyse_2014;

Using a custom SerDe: RegexSerDe

Let’s see how to use a custom SerDe for loading data. We’ll use a contrib SerDe that uses a regular expression for reading the fixed-width station metadata from a text file:
CREATE TABLE stations (usaf STRING, wban STRING, name STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "(\\d{6}) (\\d{5}) (.{29}) .*"
);
In previous examples, we have used the DELIMITED keyword to refer to delimited text in the ROW FORMAT clause. In this example, we instead specify a SerDe with the SERDE keyword and the fully qualified classname of the Java class that implements the SerDe,org.apache.hadoop.hive.contrib.serde2.RegexSerDe
to be continued,,,,

Thursday, 28 May 2015

Sorting in Mapreduce

We know that the output of map produces key's in sorted format(not values).
The simplest way to sort he files is to run then through a map only job and output is stored in sequence file having key as Int,Long Writable.(Preparation)
Then passing the same output through default mapreduce job having number of reducers as n.This will sort the output in given sort order.(Partial Sort)
But the above will produce n files which are in sorted order.one can concat them to produce a global sorted file.

Total Sort:


This approach take input sequence files from preparation phase.and the pass it through default mapreduce but with the help of input sampler..which is nothing but divide our partitions in even range..  functionally it is still the same.produce 2 sorted reduce files and combine then to produce a singe sorted file.
Input sampler(returns a sample of keys given an InputFormat and Job) to see if an even partitioning of files have occurred .Client calls writePartitionFile()method of input sampler which produce a sequence file.The same sequence file is used by TotalOrderPartitioner to create partitions for the sort job.
below is the code:
               
 job.setPartitionerClass(TotalOrderPartitioner.class);

   InputSampler.Sampler<IntWritable, LongWritable> sampler =
       new InputSampler.RandomSampler<IntWritable, LongWritable>(0.1, 10000, 10);
 
   InputSampler.writePartitionFile(job, sampler);

   // Add to DistributedCache
  // Configuration conf = job.getConfiguration();
   String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
   URI partitionUri = new URI(partitionFile);
   job.addCacheFile(partitionUri);

Secondary Sort:

One can easily understand by below example.  
                                   

  • Make the key a composite of the natural key and the natural value.
  • The sort comparator should order by the composite key (i.e., the natural key and natural value).
  • The partitioner and grouping comparator for the composite key should consider only the natural key for partitioning and grouping.
SortComparator:Used to define how map output keys are sorted
Excerpts from the book Hadoop - Definitive Guide:
Sort order for keys is found as follows: 1.If the property mapred.output.key.comparator.class is set, either explicitly or by calling setSortComparatorClass() on Job, then an instance of that class is used. (In the old API the equivalent method is setOutputKeyComparatorClass() on JobConf.)
2.Otherwise, keys must be a subclass of WritableComparable, and the registered comparator for the key class is used.
3.If there is no registered comparator, then a RawComparator is used that deserializes the byte streams being compared into objects and delegates to the WritableCompar able’s compareTo() method.
SortComparator Vs GroupComparator in a one liner: SortComparator decides how map output keys are sorted while GroupComparator decides which map output keys within the Reducer go to the same reduce method call.

Thursday, 14 May 2015

HDFS...Hadoop Distributed Filesystem



Hadoop Distributed Filesystem  is like any other other file system with few exceptions. 

HDFS is described by below three points.

·   Works best when u have large files.

·   Streaming access(write once read many) .

·   Works on commodity hardware.

It is different from Portable Operating System Interface(POSIX) as if files saved is less than the block size of disk(4kb is default) it will waste/use who of the block but in HDFS if one is having file of 2kb then the rest of 2kb is free for other systems to use.

HDFS is having a block size which is greater that POSIX by default it is 128MB.means each file is divided into n blocks (file>128 MB).Each block is stored on different data nodes and name node holds metadata only(which data node holds which block).

We have a concept of replication (default is 3),means each block is stored on 3 different locations.to handle fault tolerance.We have a secondary name node to handle name node failures.


HDFS’s fsck command understands blocks. For example, running:
% hdfs fsck / -files -blocks
 which gives the block info of att the files in hdfs.
Java Api for HDFS:
http://hadoop.apache.org/core/docs/current/api/org/apache/hadoop/fs/package-summary.html

I have developed below code which reads,writes,deletes and stores in HDFS.

package com.hadoop.tutorial;
import java.io.InputStream;
import java.net.URI;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;

public class FileSystemCat {

  public static void main(String[] args) throws Exception {
    String uri = args[0];
    String dest = args[1];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    FSDataInputStream in = null;
    FSDataOutputStream out = null;
    try {
      in = fs.open(new Path(uri));
      //fs.mkdirs(new Path(dest));
      out=fs.create(new Path(dest));
      
      IOUtils.copyBytes(in, System.out, 4096, false);
      in.seek(0);
      IOUtils.copyBytes(in, out, 4096, false);
      fs.delete(new Path(uri),true);
    } finally {
      IOUtils.closeStream(in);
      
    }
  }

}
$ hadoop jar gettingstarted_mapreduce.jar com.hadoop.tutorial.FileSystemCat hdfs://quickstart.cloudera:8020/user/cloudera/abc.txt hdfs://quickstart.cloudera:8020/user/cloudera/xyz.txt


Coherency Model :


If More than a block of data is written to the hdfs ,it is available for others users to read.
and the current written box wont b available to the reader.
HDFS provides a way to force all buffers to be flushed to the datanodes via the hflush() method on FSDataOutputStream
Once data is written and  hflush is called it is available to users to read.use hsync() to make sure it is written on directory.

Parallel Copying with distcp
one can run mapreduce jobs to paraller process and transfer data between two clusters,nodes and directories by calling distcp

hadoop distcp dir1 dir2


 







Hadoop mapreduce basics

Below are the basic points to be remebered about mapreduce..

            map: (K1,V1) → list(K2,V2)
             reduce: (K2,list(V2)) → list(K3,V3)
  1. MapReduce works in 2 phase . Map phase and Reduce phase.
  2. Mapreduce is a data preparation phase and  for filteration of unwanted records.
  3. Reducers have an Iterator which keeps on iterating the values associated with Key.
  4. Rewriting on the same directory will raise exception.
  5. 1 reducer = 1 output file.
  6. we can set number of reducers in mapreduce program.
  7. Input is divided into Splits.and should be equal to the block size
  8. if the reducer is not defined the out put will be written to hdfs.
  9. Read Data Localisation.
  10. Map write output is local not hdfs and in shuffle and sort phase it is sent to reducers.
  11. combiner works well in association and commutative processes.
  12. Combiners cutdown shuffle btw networks in map and reduce phase.
  13. Hadoop Streaming to run in different languages.   

Get Set Hadooooop Installing Cloudera quickstart

Hi Guys,

This blog is for guys who are interested in  words like BigData,Hadoop,Spark etc ..

Let me tell you one thing, i may be wrong on many steps .but this is what i like about myself and i am  happy with it..
There are no rules and no start or stop point to learn Hadoop,BigData Spark etc..but one of the main rule is to unlearn everything if you are starting from scratch..
It would be good to have java knowledge but not mandatory.

Why Hadoop ,Spark ,etc

There are many definitions but i like simple and short ...
Hadoop is open source platform,to handle bigdata,its processing,analytics on commodity hardware, we need tools like Hadoop,Spark ,etc.

  running application on commodity hardware is the main advantage it reduces cost by 50 times....

(BIGDATA>>>>>>>>>>>1GB)

When Hadoop..

You can say that ok i need hadoop to analyse my logs...for learning purposes..(don't do this activity)
use hadoop and its ecosystem for bigger things like Trade analytics,analysis of  browsing data on a particular website,make a recommender system,analyse twitter feeds etc...

How Hadoop.

There are many vendors who provide Hadoop Ecosystem.
Cloudera,Hortonworks,MapR,apacheBigTop,IBM BigInsights etc...there is one from microsoft but it is paid...

you need to understand what u want to b . a bigdata analyst , hadoop administrator,or a tester.

I choose Analyst part...for that you need to install cloudera quickstart or hotonworks sandbox or any other hadoop platform on your laptop(>8GB).for installation plz follow below link.or see you tube videos.
 Books to follow:

Definitive Guide from Tom White and Hadoop in action.