Wednesday, October 28, 2015

Hive String to Date using Python UDF

After trying to get the date from various formats using HiveQL and Pig, it was time for a UDF. I dont know much Java, it seemed much easier to create a temp function from a class. Thankfully, we have Python!

So the Input was something like: (this is a sample, the actual data cannot be pasted here)

1904287Customer 1Jan 11, 2003
96391595Customer 26/17/1969
2236067Customer 308/22/54

hive> create table cust (id int, name String, mydate String) row format delimited fields terminated by '\t' lines terminated by '\n';
OK
Time taken: 0.149 seconds
hive>


hive> load data local inpath '/home/my/data/sample.txt' into table cust;
Loading data to table fads.cust
Table fads.cust stats: [numFiles=1, numRows=0, totalSize=91, rawDataSize=0]
OK
Time taken: 0.736 seconds

hive> select * from cust;
OK
1904287 Customer 1      Jan 11, 2003
96391595        Customer 2      6/17/1969
2236067 Customer 3      08/22/54
Time taken: 0.043 seconds, Fetched: 3 row(s)


So now the fun begins, the data is loaded. We start coding python program to manage these dates - Note the program only handles certain types of date and this is shown as an example rather than a full fledged data conversion program. No exceptions have been handled as well. 

import re
import sys

monthdict = {}
monthsstr = "Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec"

datelist = []

index=1
for m in monthsstr.split(' '):
        monthdict[m] = index
        index = index+1

#monthdict = sorted(monthdict.items(), key = lambda x : x[1])

def correct_date(mydate):
        year = 0
        sdate = re.sub('( +|,|/|-)',' ',mydate)
        splitdate = sdate.split()

        if len(splitdate[2].strip()) == 2:
                year = int(splitdate[2])
                if year > 14:
                        year+=1900
                else:
                        year+=2000

                mydate = splitdate[0] + ' ' + splitdate[1] + ' ' + str(year)

        if re.search('[a-zA-Z]',sdate):
                month = monthdict[splitdate[0]]
                if len(splitdate[2].strip()) == 4:
                        mydate = str(month) + ' ' + splitdate[1] + ' ' + splitdate[2]
                else:
                        mydate = str(month) + ' ' + splitdate[1] + ' ' + str(year)

        return re.sub(' ','/',mydate.strip())


for line in sys.stdin:
        fields = line.split('\t')
        print fields[0], ',', fields[1] ,',',correct_date(fields[2].strip())

We read the data from stdin, becuase select statement at hive writes to stdout with \t separated field list, we split it and get the 3rd field and do our date conversion and write it back to stdout. 

hive> add file /home/pgms/python/cdate.py;
Added resources: [/home/pgms/python/cdate.py]

hive> select transform(id, name, mydate) using 'python cdate.py' as mdate from cust;
*log4j details here***
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1   Cumulative CPU: 1.76 sec   HDFS Read: 328 HDFS Write: 102 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 760 msec
OK
1904287 , Customer 1 , 1/11/2003
96391595 , Customer 2  , 6/17/1969
2236067 , Customer 3 , 08/22/2054
Time taken: 20.125 seconds, Fetched: 3 row(s)

The dates are converted, now we stored the results in HDFS

hive> insert overwrite DIRECTORY  '/user/myuser/output' select transform(id, name, mydate) using 'python cdate.py' as mdate from cust;

 
Moving data to: /user/myuser/output
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1   Cumulative CPU: 1.49 sec   HDFS Read: 328 HDFS Write: 102 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 490 msec
OK
Time taken: 13.334 seconds
hive>


$ hadoop fs -cat output/000000_0
1904287 , Customer 1 , 1/11/2003
96391595 , Customer 2  , 6/17/1969
2236067 , Customer 3 , 08/22/2054

Twisted, but can do thins using Python and not just Java in Hadoop. 




Getting Started with Apache Flume


I used spring-XD for data ingestion for a while until I had to move to a different distribution of Hadoop which didn't come with XD installed. I started using Flume and while XD makes life much simpler by giving the ability to define stream and tap in one line, Flume does need at the min a conf file with 3 things (source, channel and sink defined), but has more bells.

Data Ingestion and Monitoring

Let's start off with a very simple way of getting data across (in a terminal), with out being in Hadoop ecosystem for a min

$ cp /src/path/file /dest/path/file
There is a source file (source), there is destination (sink) and the means of getting the file across (channel) is the file-system itself

$ tail -F /log/file >> /mydir/logfile
Continuous, real time data fetching of a log file using tail - simple enough. Lets write a small utility which alerts (from sending you an SMS to emailing you, the alert can be anything),  lets just print it as alter if there is error found in the log.

tail -F /path/to/log/latest.log |  python errormonitor.py

The source (source) is the log file, (channel) is unix pipe and (sink) is the python script. The script continues to read from standard input and alerts when error is found - In less than 10 lines, we have a real time monitoring system

-----------Python listing-------------------
#errormonitor.py
import sys

while 1:
        line = sys.stdin.readline()
        if "error" in line.lower():
                print "Alert!, Error Found"
-----------------------------------------------

Getting started with Flume

If we were to extend the python script to support various sources, channels and sinks it's going to be more than 10 lines. If the script has to guarantee every line is from source to sink is reached, in a distributed system, the number of lines grows exponentially - Enter Flume.

Below are the sources, channels and sinks supported in Flume (version 1.5.2.2.2.4.2-2)


Each source, channel and sink has quite a few properties that needs to be set (some properties are mandatory). In order to specify these properties there is a configuration file required at the minimum. In a full implementation,there is configuration directory that specifies the flume environment, log4j properties etc

In a nut shell, all the 3 components needs to be set, so the conf file goes like this:

Source = Exec Source
Sink = HDFS
Channel = Memory

Exec Source => Memory => HDFS

The below flume conf file does the same as the tail -F we had above (with out monitoring)

#File Name - simplelogger.conf
#Declare

log.sources = src
log.sinks = snk
log.channels = chs

#Define Source

log.sources.src.type = exec
log.sources.src.command = tail -F /path/to/log/latest.log

#Define Sink

log.sinks.snk.type = logger

#Define Channels

log.channels.chs.type = memory

#Tie Source and Sink to Channel

log.sinks.snk.channel = chs
log.sources.src.channels = chs


Nitty-Gritty: See how all the declaration start with same name "log", that's in line with "Java" (the file name to be the same as the class name) - so when the flume agent has to be started, the name should be same as the one used above:

For the above config file, the flume agent to be started as:
flume-ng agent -f simplelogger.conf -n log

If the names don't match, there is an error thrown:
"WARN node.AbstractConfigurationProvider: No configuration found for this host:nnnn" and the agent doesnt work.


Nitty-Gritty (2) - While setting the channel properties of sources and sinks, there is a very subtle difference, not sure why this was introduced, if you know why, please leave a comment

For the sink, it's singular (channel) - log.sinks.snk.channel = chs
For the source, it's plural (channels) - log.sources.src.channels = chs

Perhaps because there can be more than one source?

Anyway, happy fluming, this is a must tool to get data from non-traditional sources into Hadoop or any other sink.


Wednesday, October 21, 2015

Hive Compression options and examples


Hadoop/Hive/Mapreduce paradim - all lead lots of I/O, network bandwidth between the nodes and inherently storage (not to mention redundant storage to aid fault tolerance)

It only makes sense to compress the data so we reduce all the 3 factors mentioned above. Downside? computation when compress/decompress is required and the memory associated - with what I have observed 24 CPUs - 126 GB RAM cluster, for external tables (flat files coming from RDBMS), the compression worked well, there were performance gains.

To begin with, the following parameters in hive are for compressoin. This is also a good time to mention there are 414 parameters that you can set in hive (versoin 0.14.0.2.2.4.2-2)

hive.exec.compress.intermediate
hive.exec.compress.output

hive.exec.orc.compression.strategy
hive.exec.orc.default.compress

mapred.map.output.compression.codec
mapred.output.compression.codec

Let's not worry about the orc file format for now. We are left with 4 compression parameters to play with (The compression types itself, we'll talk about it in a bit - gzip, bzip2, Snappy etc)

Check the default that's set in your installation for the above parameters, the config file is in (/etc/hive/conf/hive-site.xml)

In my installation:

hive.exec.compress.intermediate is set to false, which means the files produced in intermediate map steps are not compressed - set this to true so that we can get the I/O and Network taking up less bandwidth

hive.exec.compress.output is false, this parameter sets if the final output to HDFS will be compressed

The parameters mapred.map.output.compression.codec and mapred.output.compression.codec in the config file (/etc/hive/conf/mapred-site.xml) - MapReduce is one type of application that can run on a Hadoop platform so all the apps using map reduce framework goes in here.

If we notice how the config is split into mapreduce compression and hive over all comperession it gives a fair idea where and how compression applies.

Hive installation, as an application can use the tez and spark engine as well (controlled by set hive.execution.engin=[mr,tez,spark]), the compression for tez and spark is out of scope in this discussion. What this means is map reduce supports compression and we have a few choices to set the compression codec at map reduce frame work.

If I were to draw write the flow chart, it goes something like this:

hive.exec.compress.intermediate   [true/false]
(should hive request execution engine to compress intermediate results?)
        if true and engine is MR (hive.execution.engine)
                mapreduce.map.output.compress [true/false]
                (Output to be compressed before sending across network, between mappers?)
                if true
                        mapreduce.map.output.compress.codec  [Snappy/LZO/GzipCodec/BZip2Codec]
                        (the codec to be used for compressoin)
                        mapreduce.output.fileoutputformat.compress  [true/false]
                        (Should the job output be compressed)
                        if true and hive.exec.compress.output  [true/false]
                                 (if hive should request execution engine to produce compressed output)
                                 mapreduce.output.fileoutputformat.compress.type
                                 [RECORD/BLOCK/NONE]
                                (at what level should the compression be done?)
                                mapreduce.output.fileoutputformat.compress.codec
                                [Snappy/LZO/GzipCodec/BZip2Codec]
                                (what codec should the fileformat be?)


Let's taken an example:

hive> create table alice (line String) row format delimited fields terminated by '\n';
OK
Time taken: 0.167 seconds

#Note I have not zipped the file alice.txt (since we are only looking at output and intermediate formats). If I were to zip the input as well. I would:
$bzip2 alice.txt
hive> load data local inpath '/home/user/data/alice.txt.bz2' into table alice;

hive> load data local inpath '/home/user/data//alice.txt' into table alice;
Loading data to table fads.alice
Table fads.alice stats: [numFiles=1, totalSize=163742]
OK
Time taken: 0.756 seconds

hive> set hive.exec.compress.output=true;
hive> set mapreduce.output.fileoutputformat.compress=true;
hive> set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZip2Codec;

INSERT OVERWRITE DIRECTORY '/hdfs/path/alice/output' SELECT * FROM alice;
......
.....
Stage-Stage-1: Map: 1   Cumulative CPU: 1.58 sec   HDFS Read: 163979 HDFS Write: 163742 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 580 msec

$ hadoop fs -ls output/
Found 1 items
-rw-r--r--   3 user hdfs      48962 2015-10-21 15:35 output/000000_0.bz2





Friday, October 16, 2015

Another word count - Hive UDF with Python


For some reason word count seems to be a good example to start with regular expression, python and map/reduce, well the reason is clear, it's now become the "Hello World" for map/reduce

Input : Alice.txt download from http://www.gutenberg.org, to make reading easier, I am working with only top 10 rows using limit and head;

Steps:
1. Create a table in hive to store the book and load the text file
2. Create a python script to count the words and display
3. Use the python script as UDF (why? Regular expression, loops and other goodies available)

1. Create a table in hive to store the book and load the text file

hive> create table alice (line String) row format delimited lines terminated by '\n';
OK
Time taken: 0.196 seconds

hive> load data local inpath '/home/username/python/alice.txt' into table alice;
Loading data to table database.alice
Table database.alice stats: [numFiles=1,  totalSize=163771]
OK
Time taken: 0.757 seconds

2. Create a python script to count the words and display (I have saved it as 1udf.py)

------------------------------------------------------------------------------------
import sys
import re

count = {}
#select statement in hive prints it to stdout, so the script reads from stdin
for line in sys.stdin: 
        words =  map(lambda x: re.sub(r'[^\w\s]','',x),line.lower().strip().split(' '))
        for w in words:
                count[w] = count.get(w,0) + 1

for x in sorted(count.items(),key = lambda z: z[1], reverse=True):
        print x[0], '\t', x[1]
------------------------------------------------------------------------------------
script can be tested like so: 
$ cat alice.txt  | python 1udf.py | head -n 10
the     1804
          1251
and     912
to       800
a         685
of       625
it        541
she     538
said    462
you    429

3. Use the python script as part of TRANSFORM 

hive> add file /home/username/python/1udf.py; (Important! See errors encountered)

Added resources: [/home/username/python/1udf.py]

select transform(line) using 'python 1udf.py' as t_line , count from alice limit 10;

Stage-Stage-1: Map: 2   Cumulative CPU: 4.9 sec   HDFS Read: 164257 HDFS Write: 126 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 900 msec
OK
the     1804
        1251
and     912
to      800
a       685
of      625
it      541
she     538
said    462
you     429
Time taken: 16.577 seconds, Fetched: 10 row(s)

using TEZ engine:

hive> set hive.execution.engine=tez;
hive> select transform(line) using 'python 1udf.py' as t_line , count from alice limit 10;
Query ID = username_20151016120505_20e5f39d-7dcc-4dd2-8247-baa8e57b49b9
Total jobs = 1
Launching Job 1 out of 1

Status: Running (Executing on YARN cluster with App id application_1443556515457_25523)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 5.32 s
--------------------------------------------------------------------------------
OK
the     1804
        1251
and     912
to      800
a       685
of      625
it      541
she     538
said    462
you     429
Time taken: 13.724 seconds, Fetched: 10 row(s)



Errors Encountered:
FAILED: Execution Error, return code 20003 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. An error occurred when trying to close the Operator running your custom script
Reason: I used the query as "select transform(line) using 'python /home/username/python/1udf.py' as t_line , count from alice limit 10;"
Solution: use add file /home/username/python/1udf.py so that the script is available in hive cache
then drop the absolute path in the hive query and just use the python script name

select transform(line) using 'python 1udf.py' as t_line , count from alice limit 10;

Wednesday, October 14, 2015

Machine Learning - SGD with sci-kit learning

**************Work in Progress********** Will update with comments and the file to use

import numpy as np
import ast
import copy
from numpy import *
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.svm import LinearSVC
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.multiclass import OneVsRestClassifier
from sklearn import preprocessing
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import confusion_matrix


x_temp_list = []
y_temp_list = []
y_train_text = []
temp_list = []
another_temp_list = []

# Comma seperated test/train file
with open('train.csv','r') as fp:
        lines = fp.readlines()

for line in lines:
        if len(line.strip()) > 1:
                fields = line.split(',')
                if len(line.split(',')) == 2:
                        x_temp_list.append(fields[0].strip())
                        y_temp_list.append(fields[1].strip())

X_train = np.array(x_temp_list)
y_train_text = np.array(y_temp_list)

#X_test = np.array(['Cashier Colestru Value Foods'])


#mlb = preprocessing.LabelBinarizer()
#Y = mlb.fit_transform(y_train_text)
count_vect = CountVectorizer()
y_train_counts = count_vect.fit_transform(y_train_text)

tfidf_transformer = TfidfTransformer()
Y=tfidf_transformer.fit_transform(y_train_counts)

classifier =  Pipeline([('vect', CountVectorizer()),
                      ('tfidf', TfidfTransformer()),
                      ('clf', SGDClassifier(loss='hinge', penalty='l2', alpha=1e-3, n_iter=50)), ])

classifier.fit(X_train, y_train_text)


print ("Predticing and creating confusion matrix")

y_pred = classifier.predict(X_train)

print(confusion_matrix(y_train_text, y_pred,labels=None))

Simple Lambda examples in Python


Simple lambda function examples (Python)

Use this to learn lambda, I do not recommend using this in your production code.

The exercises are here:

https://github.com/zhiwehu/Python-programming-exercises/blob/master/100%2B%20Python%20challenging%20programming%20exercises.txt

Solutions with the question numbers below. Had some help from http://stackoverflow.com/questions/33113720/count-in-python-lambda-function

import math
from itertools import count
from operator import itemgetter, attrgetter

mylist = ['abc', '123', '758', 'cde']
hello = "Hello world this is Testing for Caps and Lower"


# 1 print filter(lambda x : x%7 == 0 and x%5!=0, range(2000, 3200))

# 3 print  map(lambda x: (x, x*x) ,range(1,input()+1))

# 6 print map(lambda x: round(math.sqrt((2 * 50 * x)/30)) , list(input()))

# 9 print ''.join(map(lambda x: x.upper(),raw_input()))

# 11 print filter(lambda x : int(x,2)%5 == 0, raw_input().split(','))

# 12 WIP print (lambda y : int(y[0])%2 ==0 ,map(lambda x: x, str(range(1,10))))

# 13 print reduce(lambda (x,y),(a,b): (x+a, y+b), [(e.isdigit(),e.isalpha()) for e in mylist], (0,0))
# 13 print reduce(lambda (x,y),(a,b): (x+a, y+b), map(lambda x: (1 if x.isalpha() else 0, 1 if x.isdigit() else 0), mylist), (0,0))


# 14 print reduce(lambda (x,y), (a,b) : (x+a, y+b) , [(e[0].isupper(), e[0].islower()) for e in hello.split(' ')], (0,0))

# 16 intlist = [1,2,3,4,5,6,7,8,9] print filter(lambda x: x%2!=0, intlist)