Monday, November 16, 2015

Hive Cheat Sheet

--Create Partitioned table from a non partitioned table
set hive.exec.dynamic.partition = true;
set hive.exec.dynamic.partition.mode = nonstrict;

create table (id int, name string, countrycode string, district string, population string);
load data local inpath '/home/user/data/cities.pipes' into table;

create external table part_city (id int, name string, district string, population int) partitioned by (countrycode string) location '/user/user/fads/city';

insert overwrite table part_city partition(countrycode) select id, name, district, population, countrycode from city;

***Careful, insert overwrite does some weird stuff to the source table, use insert into instead

--Create Partitioned table from a non partitioned table  (Compressed)
set mapred.output.compress=true;

create external table part_city (id string, name string, district string, population string) partitioned by (countrycode string) row format delimited lines terminated by '\n' stored as orcfile location '/user/user/fads/city';
insert overwrite table part_city partition(countrycode) select id, name, district, population, countrycode from city;

--Equivalent of drop column, in the columns () select the columns you want to retain, data will not be lost
alter table city replace columns(id string, name string, countrycode string, district string, population string);

--Create Table from existing table
create table dupcity like city;

--Get insights how the table was created
show create table city;

--Use UDF, transform and create a new table
add file /path/to/;
insert into table temp select transform(id, name, countrycode, district, population) using 'python' AS (id, name, countrycode, district, population) from city;

Examples use the cities database found in mysql example databases

python code for udf used:(Expands few abbreviated  country codes)

import sys

lkp = {'VAT':'Vatican','SYR':'Syria','BRA':'Brazil','PSE':'Palestine'}

for lines in sys.stdin:
        fields = lines.split('\t')
        print '\t'.join(map(lkp.get, fields, fields)).strip()

Tuesday, November 10, 2015

Comparison of Compression codecs in Hive

Just some stats about the compression codecs with file types (Hive stored as) to check which one to choose.

To compress data in HDFS, three are 2 ways:

Compress the source files (gz/bzip2) and store them in the hive table or copy to HDFS and build a table on it (catalog) - usually using load data (the combinations of load data and type of storage is provided below)

Secondly, data can be compressed using the map/reduce output (query output) by setting the codec that's required and using that compressed output to insert into the table

set mapred.output.compress=true;
set mapred.output.compression.type=BLOCK;

insert overwrite table seqbzip_block_tbl select * from texttable;

Select from text table generates the zip files as the output, we use that output as input to the table.
Again, different combinations and theirs results are provided in the table below (more of a truth table)

Finally, did an test to figure out the best combnation using a 9.8 GB File on a cluster (24 nodes). From storage point of view, ORCFILE/GZipCodec seems to be the go to choice. My test indicates that the time taken for a full table scan is also ORCFILE/GZipCodec the winner. Widely  used combination to 'store' the data is ORC/Gzip and for queries the intermediate results's codec seems to be snappy.

Wednesday, October 28, 2015

Hive String to Date using Python UDF

After trying to get the date from various formats using HiveQL and Pig, it was time for a UDF. I dont know much Java, it seemed much easier to create a temp function from a class. Thankfully, we have Python!

So the Input was something like: (this is a sample, the actual data cannot be pasted here)

1904287Customer 1Jan 11, 2003
96391595Customer 26/17/1969
2236067Customer 308/22/54

hive> create table cust (id int, name String, mydate String) row format delimited fields terminated by '\t' lines terminated by '\n';
Time taken: 0.149 seconds

hive> load data local inpath '/home/my/data/sample.txt' into table cust;
Loading data to table fads.cust
Table fads.cust stats: [numFiles=1, numRows=0, totalSize=91, rawDataSize=0]
Time taken: 0.736 seconds

hive> select * from cust;
1904287 Customer 1      Jan 11, 2003
96391595        Customer 2      6/17/1969
2236067 Customer 3      08/22/54
Time taken: 0.043 seconds, Fetched: 3 row(s)

So now the fun begins, the data is loaded. We start coding python program to manage these dates - Note the program only handles certain types of date and this is shown as an example rather than a full fledged data conversion program. No exceptions have been handled as well. 

import re
import sys

monthdict = {}
monthsstr = "Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec"

datelist = []

for m in monthsstr.split(' '):
        monthdict[m] = index
        index = index+1

#monthdict = sorted(monthdict.items(), key = lambda x : x[1])

def correct_date(mydate):
        year = 0
        sdate = re.sub('( +|,|/|-)',' ',mydate)
        splitdate = sdate.split()

        if len(splitdate[2].strip()) == 2:
                year = int(splitdate[2])
                if year > 14:

                mydate = splitdate[0] + ' ' + splitdate[1] + ' ' + str(year)

                month = monthdict[splitdate[0]]
                if len(splitdate[2].strip()) == 4:
                        mydate = str(month) + ' ' + splitdate[1] + ' ' + splitdate[2]
                        mydate = str(month) + ' ' + splitdate[1] + ' ' + str(year)

        return re.sub(' ','/',mydate.strip())

for line in sys.stdin:
        fields = line.split('\t')
        print fields[0], ',', fields[1] ,',',correct_date(fields[2].strip())

We read the data from stdin, becuase select statement at hive writes to stdout with \t separated field list, we split it and get the 3rd field and do our date conversion and write it back to stdout. 

hive> add file /home/pgms/python/;
Added resources: [/home/pgms/python/]

hive> select transform(id, name, mydate) using 'python' as mdate from cust;
*log4j details here***
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1   Cumulative CPU: 1.76 sec   HDFS Read: 328 HDFS Write: 102 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 760 msec
1904287 , Customer 1 , 1/11/2003
96391595 , Customer 2  , 6/17/1969
2236067 , Customer 3 , 08/22/2054
Time taken: 20.125 seconds, Fetched: 3 row(s)

The dates are converted, now we stored the results in HDFS

hive> insert overwrite DIRECTORY  '/user/myuser/output' select transform(id, name, mydate) using 'python' as mdate from cust;

Moving data to: /user/myuser/output
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1   Cumulative CPU: 1.49 sec   HDFS Read: 328 HDFS Write: 102 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 490 msec
Time taken: 13.334 seconds

$ hadoop fs -cat output/000000_0
1904287 , Customer 1 , 1/11/2003
96391595 , Customer 2  , 6/17/1969
2236067 , Customer 3 , 08/22/2054

Twisted, but can do thins using Python and not just Java in Hadoop. 

Getting Started with Apache Flume

I used spring-XD for data ingestion for a while until I had to move to a different distribution of Hadoop which didn't come with XD installed. I started using Flume and while XD makes life much simpler by giving the ability to define stream and tap in one line, Flume does need at the min a conf file with 3 things (source, channel and sink defined), but has more bells.

Data Ingestion and Monitoring

Let's start off with a very simple way of getting data across (in a terminal), with out being in Hadoop ecosystem for a min

$ cp /src/path/file /dest/path/file
There is a source file (source), there is destination (sink) and the means of getting the file across (channel) is the file-system itself

$ tail -F /log/file >> /mydir/logfile
Continuous, real time data fetching of a log file using tail - simple enough. Lets write a small utility which alerts (from sending you an SMS to emailing you, the alert can be anything),  lets just print it as alter if there is error found in the log.

tail -F /path/to/log/latest.log |  python

The source (source) is the log file, (channel) is unix pipe and (sink) is the python script. The script continues to read from standard input and alerts when error is found - In less than 10 lines, we have a real time monitoring system

-----------Python listing-------------------
import sys

while 1:
        line = sys.stdin.readline()
        if "error" in line.lower():
                print "Alert!, Error Found"

Getting started with Flume

If we were to extend the python script to support various sources, channels and sinks it's going to be more than 10 lines. If the script has to guarantee every line is from source to sink is reached, in a distributed system, the number of lines grows exponentially - Enter Flume.

Below are the sources, channels and sinks supported in Flume (version

Each source, channel and sink has quite a few properties that needs to be set (some properties are mandatory). In order to specify these properties there is a configuration file required at the minimum. In a full implementation,there is configuration directory that specifies the flume environment, log4j properties etc

In a nut shell, all the 3 components needs to be set, so the conf file goes like this:

Source = Exec Source
Sink = HDFS
Channel = Memory

Exec Source => Memory => HDFS

The below flume conf file does the same as the tail -F we had above (with out monitoring)

#File Name - simplelogger.conf

log.sources = src
log.sinks = snk
log.channels = chs

#Define Source

log.sources.src.type = exec
log.sources.src.command = tail -F /path/to/log/latest.log

#Define Sink

log.sinks.snk.type = logger

#Define Channels

log.channels.chs.type = memory

#Tie Source and Sink to Channel = chs
log.sources.src.channels = chs

Nitty-Gritty: See how all the declaration start with same name "log", that's in line with "Java" (the file name to be the same as the class name) - so when the flume agent has to be started, the name should be same as the one used above:

For the above config file, the flume agent to be started as:
flume-ng agent -f simplelogger.conf -n log

If the names don't match, there is an error thrown:
"WARN node.AbstractConfigurationProvider: No configuration found for this host:nnnn" and the agent doesnt work.

Nitty-Gritty (2) - While setting the channel properties of sources and sinks, there is a very subtle difference, not sure why this was introduced, if you know why, please leave a comment

For the sink, it's singular (channel) - = chs
For the source, it's plural (channels) - log.sources.src.channels = chs

Perhaps because there can be more than one source?

Anyway, happy fluming, this is a must tool to get data from non-traditional sources into Hadoop or any other sink.

Wednesday, October 21, 2015

Hive Compression options and examples

Hadoop/Hive/Mapreduce paradim - all lead lots of I/O, network bandwidth between the nodes and inherently storage (not to mention redundant storage to aid fault tolerance)

It only makes sense to compress the data so we reduce all the 3 factors mentioned above. Downside? computation when compress/decompress is required and the memory associated - with what I have observed 24 CPUs - 126 GB RAM cluster, for external tables (flat files coming from RDBMS), the compression worked well, there were performance gains.

To begin with, the following parameters in hive are for compressoin. This is also a good time to mention there are 414 parameters that you can set in hive (versoin



Let's not worry about the orc file format for now. We are left with 4 compression parameters to play with (The compression types itself, we'll talk about it in a bit - gzip, bzip2, Snappy etc)

Check the default that's set in your installation for the above parameters, the config file is in (/etc/hive/conf/hive-site.xml)

In my installation:

hive.exec.compress.intermediate is set to false, which means the files produced in intermediate map steps are not compressed - set this to true so that we can get the I/O and Network taking up less bandwidth

hive.exec.compress.output is false, this parameter sets if the final output to HDFS will be compressed

The parameters and mapred.output.compression.codec in the config file (/etc/hive/conf/mapred-site.xml) - MapReduce is one type of application that can run on a Hadoop platform so all the apps using map reduce framework goes in here.

If we notice how the config is split into mapreduce compression and hive over all comperession it gives a fair idea where and how compression applies.

Hive installation, as an application can use the tez and spark engine as well (controlled by set hive.execution.engin=[mr,tez,spark]), the compression for tez and spark is out of scope in this discussion. What this means is map reduce supports compression and we have a few choices to set the compression codec at map reduce frame work.

If I were to draw write the flow chart, it goes something like this:

hive.exec.compress.intermediate   [true/false]
(should hive request execution engine to compress intermediate results?)
        if true and engine is MR (hive.execution.engine)
                (Output to be compressed before sending across network, between mappers?)
                if true
                        (the codec to be used for compressoin)
                        mapreduce.output.fileoutputformat.compress  [true/false]
                        (Should the job output be compressed)
                        if true and hive.exec.compress.output  [true/false]
                                 (if hive should request execution engine to produce compressed output)
                                (at what level should the compression be done?)
                                (what codec should the fileformat be?)

Let's taken an example:

hive> create table alice (line String) row format delimited fields terminated by '\n';
Time taken: 0.167 seconds

#Note I have not zipped the file alice.txt (since we are only looking at output and intermediate formats). If I were to zip the input as well. I would:
$bzip2 alice.txt
hive> load data local inpath '/home/user/data/alice.txt.bz2' into table alice;

hive> load data local inpath '/home/user/data//alice.txt' into table alice;
Loading data to table fads.alice
Table fads.alice stats: [numFiles=1, totalSize=163742]
Time taken: 0.756 seconds

hive> set hive.exec.compress.output=true;
hive> set mapreduce.output.fileoutputformat.compress=true;
hive> set;

INSERT OVERWRITE DIRECTORY '/hdfs/path/alice/output' SELECT * FROM alice;
Stage-Stage-1: Map: 1   Cumulative CPU: 1.58 sec   HDFS Read: 163979 HDFS Write: 163742 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 580 msec

$ hadoop fs -ls output/
Found 1 items
-rw-r--r--   3 user hdfs      48962 2015-10-21 15:35 output/000000_0.bz2

Friday, October 16, 2015

Another word count - Hive UDF with Python

For some reason word count seems to be a good example to start with regular expression, python and map/reduce, well the reason is clear, it's now become the "Hello World" for map/reduce

Input : Alice.txt download from, to make reading easier, I am working with only top 10 rows using limit and head;

1. Create a table in hive to store the book and load the text file
2. Create a python script to count the words and display
3. Use the python script as UDF (why? Regular expression, loops and other goodies available)

1. Create a table in hive to store the book and load the text file

hive> create table alice (line String) row format delimited lines terminated by '\n';
Time taken: 0.196 seconds

hive> load data local inpath '/home/username/python/alice.txt' into table alice;
Loading data to table database.alice
Table database.alice stats: [numFiles=1,  totalSize=163771]
Time taken: 0.757 seconds

2. Create a python script to count the words and display (I have saved it as

import sys
import re

count = {}
#select statement in hive prints it to stdout, so the script reads from stdin
for line in sys.stdin: 
        words =  map(lambda x: re.sub(r'[^\w\s]','',x),line.lower().strip().split(' '))
        for w in words:
                count[w] = count.get(w,0) + 1

for x in sorted(count.items(),key = lambda z: z[1], reverse=True):
        print x[0], '\t', x[1]
script can be tested like so: 
$ cat alice.txt  | python | head -n 10
the     1804
and     912
to       800
a         685
of       625
it        541
she     538
said    462
you    429

3. Use the python script as part of TRANSFORM 

hive> add file /home/username/python/; (Important! See errors encountered)

Added resources: [/home/username/python/]

select transform(line) using 'python' as t_line , count from alice limit 10;

Stage-Stage-1: Map: 2   Cumulative CPU: 4.9 sec   HDFS Read: 164257 HDFS Write: 126 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 900 msec
the     1804
and     912
to      800
a       685
of      625
it      541
she     538
said    462
you     429
Time taken: 16.577 seconds, Fetched: 10 row(s)

using TEZ engine:

hive> set hive.execution.engine=tez;
hive> select transform(line) using 'python' as t_line , count from alice limit 10;
Query ID = username_20151016120505_20e5f39d-7dcc-4dd2-8247-baa8e57b49b9
Total jobs = 1
Launching Job 1 out of 1

Status: Running (Executing on YARN cluster with App id application_1443556515457_25523)

Map 1 ..........   SUCCEEDED      1          1        0        0       0       0
VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 5.32 s
the     1804
and     912
to      800
a       685
of      625
it      541
she     538
said    462
you     429
Time taken: 13.724 seconds, Fetched: 10 row(s)

Errors Encountered:
FAILED: Execution Error, return code 20003 from An error occurred when trying to close the Operator running your custom script
Reason: I used the query as "select transform(line) using 'python /home/username/python/' as t_line , count from alice limit 10;"
Solution: use add file /home/username/python/ so that the script is available in hive cache
then drop the absolute path in the hive query and just use the python script name

select transform(line) using 'python' as t_line , count from alice limit 10;

Wednesday, October 14, 2015

Machine Learning - SGD with sci-kit learning

**************Work in Progress********** Will update with comments and the file to use

import numpy as np
import ast
import copy
from numpy import *
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.svm import LinearSVC
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.multiclass import OneVsRestClassifier
from sklearn import preprocessing
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import confusion_matrix

x_temp_list = []
y_temp_list = []
y_train_text = []
temp_list = []
another_temp_list = []

# Comma seperated test/train file
with open('train.csv','r') as fp:
        lines = fp.readlines()

for line in lines:
        if len(line.strip()) > 1:
                fields = line.split(',')
                if len(line.split(',')) == 2:

X_train = np.array(x_temp_list)
y_train_text = np.array(y_temp_list)

#X_test = np.array(['Cashier Colestru Value Foods'])

#mlb = preprocessing.LabelBinarizer()
#Y = mlb.fit_transform(y_train_text)
count_vect = CountVectorizer()
y_train_counts = count_vect.fit_transform(y_train_text)

tfidf_transformer = TfidfTransformer()

classifier =  Pipeline([('vect', CountVectorizer()),
                      ('tfidf', TfidfTransformer()),
                      ('clf', SGDClassifier(loss='hinge', penalty='l2', alpha=1e-3, n_iter=50)), ]), y_train_text)

print ("Predticing and creating confusion matrix")

y_pred = classifier.predict(X_train)

print(confusion_matrix(y_train_text, y_pred,labels=None))