This post is having more advanced concepts in Hive like Dynamic Partition, Static Partition, custom map reduce script, hive UDF using java and python.
Configuring Hive to allow partitions
A query across all partitions can trigger with an enormous Map Reduce Job, if the table data and number of partitions are large. A highly suggested safety measure is putting Hive into strict mode, which prohibits queries of a partitioned table without a WHERE clause that filters the partitions.
We can set the mode to nonstrict, as in the following session.
Dynamic Partitioning –configuration
Hive> set hive.exec.dynamic.partition.mode=nonstrict;
Hive> set hive.exec.dynamic.partition=true;
Hive> set hive.enforce.bucketing=true;
Once we have configured, Then we will see how we will create a dynamic partition
Example:
Source table:
1. Hive> create table transaction_records(txnno INT,txndate STRING,custno INT,amount DOUBLE,category STRING, product STRING,City STRING,State String,Spendby String )row format delimited fields terminated by ‘,’ stored as textfile;
Create Partitioned table:
1. Hive> create table transaction_recordsByCat(txnno INT,txndate STRING,custno INT,amount DOUBLE, product STRING,City STRING,State String,Spendby String )
Partitioned by (category STRING)
Clustered by(state) INTO 10 buckets
row format delimited fields terminated by ‘,’ stored as textfile;
In the above partitioned query we are portioning table depending on the category and bucketing by 10 that means it will create 0-9 buckets and assign the hash value the same.
Column category no need to provide in table structure , Since we are creating partition based on the category
Insert existing table data into newly created partition table.
Hive>from transaction_records txn INSERT OVERWRITE TABLE table transaction_recordsByCat PARTITION(category) select txn.txnno ,txn.txndate,txn.custno,txn.amount,
txn. product,txn.City,txn.State,txn.Spendby ,txn.category DISTRIBUTE BY category;
Static partition
If we get data every month to process the same, we can use the static partition
Hive> create table logmessage(name string,id int) partitioned by (year int,month int) row format delimited fileds terminated by ‘\t’;
How to insert data for static partition table?
Hive>alter table logmessage add partition(year=2014,month=2);
Custom Map Reduce script using Hive
Hive QL allows traditional map/reduce programmers to be able to plug I their custom mappers and reducers to do more sophisticated analysis that may not be supported by the built-in capabilities of the language.
Sample data scenario
We are having movie data, different users will give different ratings for same movie or different movies.
user_movie_data.txt file having data like below
userid,rating,unixtime
1 1 134564324567
2 3 134564324567
3 1 134564324567
4 2 134564324567
5 2 134564324567
6 1 134564324567
Now with above data, we need to create a table called
u_movie_data,then we will load the data to the same.
Hive>CREATE TABLE u_movie_data(userid INT,rating INT,unixtime STRING) ROW FORMATED DELIMITED FIELDS TERMINATED BY ‘\t’ STROED AS TEXTFILE;
Hive> LOAD DATA LOCAL INPATH ‘/usr/local/hive_demo/user_movie_data.txt’ OVERWRITE INTO TABLE u_movie_data;
We can use any logic which will be converted unix time into weekday, any custom integration. Here we used python script.
Import sys
Import datetime
for line in sys.stdin:
line = line.strip()
userid,movieid,rating,unixtime=line.split(‘\t’)
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print ‘\t’.join([userid,movieid,rating,str(weekday)])
How we will execute python script in hive, first add the file into Hive shell?
Hive> add FILE /usr/local/hive_demo/weekday_mapper.py;
Now load the data into table, we need to do
TRANSFORM
INSERT OVERWRITE TABLE u_movie_data_new
SELECT TRANSFORM(userid,movieid,rating,unixtime)
USING ‘python weekday_mapper.py’
AS (userid,movieid,rating,weekday) from u_movie_data;
Hive QL- User-defined function
1.Suppose we have 2 columns – 1 is id of type string and another one is unixtimestamp of type String.
Create a data set with 2 columns(udf_input.txt) and place it inside /usr/local/hive_demo/
one,1456432145676
two, 1456432145676
three, 1456432145676
four, 1456432145676
five, 1456432145676
six, 1456432145676
Now we can create a table and load the data the same.
create table udf_testing (id string,unixtimestamp string)
Row format delimited fields terminated by ‘,’;
Hive> load data local inpath ‘/usr/local/hive_demo/udf_input.txt’
Hive>select * from udf_testing;
Now we will write User defined function using java to get more meaningful date and time format.
Open eclipse->create new java project and New class- add the below code inside java class.
Add the jars from hive location.
Import java.util.Date;
Import java.text.DateFormat;
Import org.apache.hadoop.hive.ql.exec.UDF;
Import org.apache.hadoop.io.Text;
public class UnixTimeToDate extends UDF {
public Text evaluate(Text text){
if(text==null) return null;
long timestamp = Long.parseLong(text.toString());
return new Text(toDate(timestamp));
}
private String toDate(long timestamp){
Date date = new Date(timestamp*1000);
Return DateFormat.getInstance().format(date).toString();
}
}
Once created, then export jar file as
unixtime_to_java_date.jar
Now we need to execute jar file from Hive
1. We need to add the jar file in hive shell
Hive>add JAR /usr/local/hive_demo/ unixtime_to_java_date.jar;
Hive>create temporary FUNCTION userdate AS ‘UnixTimeToDate’;
Hive> select id,userdate(unixtimestamp) from udf_testing;
This is how we will work with hive. Hope you like this post.
Thank you for viewing this post.