Wednesday, April 13, 2016

Oozie actions with Sqoop and Pig Joining

This blog is about executing a simple work flow which imports the User data from MySQL database using Sqoop.

The below DAG was generated by Oozie. The fork will spawn a Pig action (which cleans the data) and a Sqoop action (which imports the user data from a MySQL database) in parallel. Once the Pig and the Sqoop actions are done.



Here are the steps to define the work flow and then execute it. This is with the assumption that  MySQL, Oozie and Hadoop have been installed, configured and work properly

Create Tables in SQL

  • Customer Table
 CREATE TABLE customer ( c_id INT NOT NULL,  first_name VARCHAR(14) NOT NULL, Age INT,        gender VARCHAR(20)  NOT NULL,   city VARCHAR(14),         PRIMARY KEY (c_id));

  • Sales Table
 CREATE TABLE sales (
c_id int NOT NULL,
item_id int NOT NULL,
date DATE,
Status VARCHAR(20),
quantity INT
);
ALTER TABLE sales
ADD FOREIGN KEY (item_id)
REFERENCES inventory(item_id);
ALTER TABLE sales
ADD FOREIGN KEY (c_id)
REFERENCES customer(c_id);

  • Inventory Table
create table inventory(
item_id int NOT NULL,
p_name VARCHAR(30) NOT NULL,
price int NOT NULL,
UNIQUE KEY(p_name),
PRIMARY KEY (item_id)
);
Create Job.Properties File

nameNode=hdfs://localhost:9000
jobTracker=localhost:9001
queueName=default
examplesRoot=oozie-ingesting-example
examplesRootDir=/user/${user.name}/${examplesRoot}
oozie.use.system.libpath=true
oozie.wf.validate.ForkJoin=false
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/ING
 Create Workflow.xml File
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="cs-wf-fork-join">
    <start to="fork-node"/>
    <fork name="fork-node">
        <path start="customer-node"/>
        <path start="sales-node"/>
        <path start="inventory-node"/> 
    </fork>
    <action name="customer-node">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${examplesRootDir}/input-data/customer"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <command>import --connect jdbc:mysql://localhost/final --table customer --target-dir ${examplesRootDir}/input-data/customer -m 1</command>
        </sqoop>
        <ok to="joining"/>
        <error to="fail"/>
    </action>
    <action name="sales-node">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${examplesRootDir}/input-data/sales"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <command>import --connect jdbc:mysql://localhost/final --table sales --target-dir ${examplesRootDir}/input-data/sales -m 1</command>
        </sqoop>
        <ok to="joining"/>
        <error to="fail"/>
    </action>
    <action name="inventory-node">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${examplesRootDir}/input-data/inventory"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <command>import --connect jdbc:mysql://localhost/final --table inventory --target-dir ${examplesRootDir}/input-data/inventory -m 1</command>
        </sqoop>
        <ok to="joining"/>
        <error to="fail"/>
    </action>
    <join name="joining" to="pig-node"/>
    <action name="pig-node">
        <pig>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}${examplesRootDir}/intermediate"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <script>id.pig</script>
            <param>INPUT=${examplesRootDir}/input-data/clickstream</param>
            <param>OUTPUT=${examplesRootDir}/intermediate</param>
        </pig>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>
  Create Id.pig File
customer = LOAD '/user/vm4learning/oozie-ingesting-example/input-data/customer/part-m-00000' USING PigStorage(',') AS (c_id:int,first_name:chararray,age:int,gender:chararray,city:chararray);
sales = LOAD '/user/vm4learning/oozie-ingesting-example/input-data/sales/part-m-00000' USING PigStorage(',') AS (c_id:int,item_id:int,date:chararray,status:chararray,quantity:int);
inventory = LOAD '/user/vm4learning/oozie-ingesting-example/input-data/inventory/part-m-00000' USING PigStorage(',') AS (item_id:int,p_name:chararray,price:int);
C1 = JOIN customer BY c_id, sales BY c_id;
C2 = JOIN C1 BY item_id , inventory BY item_id;
B = GROUP C2 BY city;
X = FOREACH B GENERATE group, SUM(C2.price);
STORE X into '$OUTPUT';

Copy the required libraries to HDFS

bin/hadoop fs -rmr /user/vm4learning/share/
bin/hadoop fs -put /home/vm4learning/Code/share/ /user/vm4learning/share/

Copy the data related files in HDFS

bin/hadoop fs -rmr /user/vm4learning/oozie-ingest-examples
bin/hadoop fs -put /home/vm4learning/Code/oozie-ingest-examples/ /user/vm4learning/oozie-ingest-examples

Start Oozie

bin/oozied.sh start
http://localhost:11000/oozie/

Submit an Oozie workflow

bin/oozie job -oozie http://localhost:11000/oozie -config /home/vm4learning/Code/oozie-ingest-examples/apps/cs/job.properties -run

The output should appear in the /user/vm4learning/oozie-ingest-examples/finaloutput folder in HDFS after the workflow is complete.

Submit an Oozie coordinator

Modify the start time and the end time in the /home/vm4learning/Code/oozie-ingest-examples/apps/scheduler/coordinator.xml. Make sure it is 1 hr back to the system time. The job will run every 10 minutes.

bin/oozie job -oozie http://localhost:11000/oozie -config /home/vm4learning/Code/oozie-ingest-examples/apps/scheduler/coordinator.properties -run

  • Initially the job will be in the `RUNNING` state and finally will reach the `SUCCEEDED` state. The progress of the work flow can be monitored from Oozie console at http://localhost:11000/oozie/.

  •  The output should appear as below in the 'oozie-ingesting-example/intermediate/part-r-00000' file in HDFS.




No comments:

Post a Comment

Xiaomi Launches 32-inch and 43-inch Mi TV 4A in India, Price Starting At ₹13,999

After launching the Mi TV 4 in India, Xiaomi has launched two new affordable smart TVs in India. The company has introduced a 43-inch ...