This blog is about executing a simple work flow which imports the User data from MySQL database using Sqoop.
The below DAG was generated by Oozie. The fork will spawn a Pig action (which cleans the data) and a Sqoop action (which imports the user data from a MySQL database) in parallel. Once the Pig and the Sqoop actions are done.
Here are the steps to define the work flow and then execute it. This is with the assumption that MySQL, Oozie and Hadoop have been installed, configured and work properly
Create Tables in SQL
CREATE TABLE customer ( c_id INT NOT NULL, first_name VARCHAR(14) NOT NULL, Age INT, gender VARCHAR(20) NOT NULL, city VARCHAR(14), PRIMARY KEY (c_id));
CREATE TABLE sales (
c_id int NOT NULL,
item_id int NOT NULL,
date DATE,
Status VARCHAR(20),
quantity INT
);
ALTER TABLE sales
ADD FOREIGN KEY (item_id)
REFERENCES inventory(item_id);
ALTER TABLE sales
ADD FOREIGN KEY (c_id)
REFERENCES customer(c_id);
create table inventory(
item_id int NOT NULL,
p_name VARCHAR(30) NOT NULL,
price int NOT NULL,
UNIQUE KEY(p_name),
PRIMARY KEY (item_id)
);
Create Job.Properties File
nameNode=hdfs://localhost:9000
jobTracker=localhost:9001
queueName=default
examplesRoot=oozie-ingesting-example
examplesRootDir=/user/${user.name}/${examplesRoot}
oozie.use.system.libpath=true
oozie.wf.validate.ForkJoin=false
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/ING
Create Workflow.xml File
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="cs-wf-fork-join">
<start to="fork-node"/>
<fork name="fork-node">
<path start="customer-node"/>
<path start="sales-node"/>
<path start="inventory-node"/>
</fork>
<action name="customer-node">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/${examplesRootDir}/input-data/customer"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<command>import --connect jdbc:mysql://localhost/final --table customer --target-dir ${examplesRootDir}/input-data/customer -m 1</command>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<action name="sales-node">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/${examplesRootDir}/input-data/sales"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<command>import --connect jdbc:mysql://localhost/final --table sales --target-dir ${examplesRootDir}/input-data/sales -m 1</command>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<action name="inventory-node">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/${examplesRootDir}/input-data/inventory"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<command>import --connect jdbc:mysql://localhost/final --table inventory --target-dir ${examplesRootDir}/input-data/inventory -m 1</command>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<join name="joining" to="pig-node"/>
<action name="pig-node">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}${examplesRootDir}/intermediate"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<script>id.pig</script>
<param>INPUT=${examplesRootDir}/input-data/clickstream</param>
<param>OUTPUT=${examplesRootDir}/intermediate</param>
</pig>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
Create Id.pig File
customer = LOAD '/user/vm4learning/oozie-ingesting-example/input-data/customer/part-m-00000' USING PigStorage(',') AS (c_id:int,first_name:chararray,age:int,gender:chararray,city:chararray);
sales = LOAD '/user/vm4learning/oozie-ingesting-example/input-data/sales/part-m-00000' USING PigStorage(',') AS (c_id:int,item_id:int,date:chararray,status:chararray,quantity:int);
inventory = LOAD '/user/vm4learning/oozie-ingesting-example/input-data/inventory/part-m-00000' USING PigStorage(',') AS (item_id:int,p_name:chararray,price:int);
C1 = JOIN customer BY c_id, sales BY c_id;
C2 = JOIN C1 BY item_id , inventory BY item_id;
B = GROUP C2 BY city;
X = FOREACH B GENERATE group, SUM(C2.price);
STORE X into '$OUTPUT';
Copy the required libraries to HDFS
bin/hadoop fs -rmr /user/vm4learning/share/
bin/hadoop fs -put /home/vm4learning/Code/share/ /user/vm4learning/share/
Copy the data related files in HDFS
bin/hadoop fs -rmr /user/vm4learning/oozie-ingest-examples
bin/hadoop fs -put /home/vm4learning/Code/oozie-ingest-examples/ /user/vm4learning/oozie-ingest-examples
Start Oozie
bin/oozied.sh start
http://localhost:11000/oozie/
Submit an Oozie workflow
bin/oozie job -oozie http://localhost:11000/oozie -config /home/vm4learning/Code/oozie-ingest-examples/apps/cs/job.properties -run
The output should appear in the
/user/vm4learning/oozie-ingest-examples/finaloutput folder in HDFS
after the workflow is complete.
Submit an Oozie coordinator
Modify the start time and the end time in the
/home/vm4learning/Code/oozie-ingest-examples/apps/scheduler/coordinator.xml.
Make sure it is 1 hr back to the system time. The job will run every 10
minutes.
bin/oozie job -oozie
http://localhost:11000/oozie -config /home/vm4learning/Code/oozie-ingest-examples/apps/scheduler/coordinator.properties -run
- Initially the job will be in the `RUNNING` state and finally will reach the `SUCCEEDED` state. The progress of the work flow can be monitored from Oozie console at http://localhost:11000/oozie/.
- The output should appear as below in the 'oozie-ingesting-example/intermediate/part-r-00000' file in HDFS.