This blog is about executing a simple work flow which imports the User data from MySQL database using Sqoop.
The below DAG was generated by Oozie. The fork will spawn a Pig action (which cleans the data) and a Sqoop action (which imports the user data from a MySQL database) in parallel. Once the Pig and the Sqoop actions are done.
bin/hadoop fs -put /home/vm4learning/Code/share/ /user/vm4learning/share/
bin/hadoop fs -put /home/vm4learning/Code/oozie-ingest-examples/ /user/vm4learning/oozie-ingest-examples
http://localhost:11000/oozie/
The output should appear in the /user/vm4learning/oozie-ingest-examples/finaloutput folder in HDFS after the workflow is complete.
bin/oozie job -oozie http://localhost:11000/oozie -config /home/vm4learning/Code/oozie-ingest-examples/apps/scheduler/coordinator.properties -run
The below DAG was generated by Oozie. The fork will spawn a Pig action (which cleans the data) and a Sqoop action (which imports the user data from a MySQL database) in parallel. Once the Pig and the Sqoop actions are done.
Here are the steps to define the work flow and then execute it. This is with the assumption that MySQL, Oozie and Hadoop have been installed, configured and work properly
Create Tables in SQL
- Customer Table
CREATE TABLE customer ( c_id INT NOT NULL, first_name VARCHAR(14) NOT NULL, Age INT, gender VARCHAR(20) NOT NULL, city VARCHAR(14), PRIMARY KEY (c_id));
- Sales Table
CREATE TABLE sales (
c_id int NOT NULL,
item_id int NOT NULL,
date DATE,
Status VARCHAR(20),
quantity INT
);
ALTER TABLE sales
ADD FOREIGN KEY (item_id)
REFERENCES inventory(item_id);
ALTER TABLE sales
ADD FOREIGN KEY (c_id)
REFERENCES customer(c_id);
- Inventory Table
create table inventory(Create Job.Properties File
item_id int NOT NULL,
p_name VARCHAR(30) NOT NULL,
price int NOT NULL,
UNIQUE KEY(p_name),
PRIMARY KEY (item_id)
);
Create Workflow.xml FilenameNode=hdfs://localhost:9000jobTracker=localhost:9001queueName=defaultexamplesRoot=oozie-ingesting-exampleexamplesRootDir=/user/${user.name}/${examplesRoot}oozie.use.system.libpath=trueoozie.wf.validate.ForkJoin=falseoozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/ING
Create Id.pig File<?xml version="1.0" encoding="UTF-8"?><workflow-app xmlns="uri:oozie:workflow:0.2" name="cs-wf-fork-join"><start to="fork-node"/><fork name="fork-node"><path start="customer-node"/><path start="sales-node"/><path start="inventory-node"/></fork><action name="customer-node"><sqoop xmlns="uri:oozie:sqoop-action:0.2"><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><prepare><delete path="${nameNode}/${examplesRootDir}/input-data/customer"/></prepare><configuration><property><name>mapred.job.queue.name</name><value>${queueName}</value></property></configuration><command>import --connect jdbc:mysql://localhost/final --table customer --target-dir ${examplesRootDir}/input-data/customer -m 1</command></sqoop><ok to="joining"/><error to="fail"/></action><action name="sales-node"><sqoop xmlns="uri:oozie:sqoop-action:0.2"><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><prepare><delete path="${nameNode}/${examplesRootDir}/input-data/sales"/></prepare><configuration><property><name>mapred.job.queue.name</name><value>${queueName}</value></property></configuration><command>import --connect jdbc:mysql://localhost/final --table sales --target-dir ${examplesRootDir}/input-data/sales -m 1</command></sqoop><ok to="joining"/><error to="fail"/></action><action name="inventory-node"><sqoop xmlns="uri:oozie:sqoop-action:0.2"><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><prepare><delete path="${nameNode}/${examplesRootDir}/input-data/inventory"/></prepare><configuration><property><name>mapred.job.queue.name</name><value>${queueName}</value></property></configuration><command>import --connect jdbc:mysql://localhost/final --table inventory --target-dir ${examplesRootDir}/input-data/inventory -m 1</command></sqoop><ok to="joining"/><error to="fail"/></action><join name="joining" to="pig-node"/><action name="pig-node"><pig><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><prepare><delete path="${nameNode}${examplesRootDir}/intermediate"/></prepare><configuration><property><name>mapred.job.queue.name</name><value>${queueName}</value></property><property><name>mapred.compress.map.output</name><value>true</value></property></configuration><script>id.pig</script><param>INPUT=${examplesRootDir}/input-data/clickstream</param><param>OUTPUT=${examplesRootDir}/intermediate</param></pig><ok to="end"/><error to="fail"/></action><kill name="fail"><message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message></kill><end name="end"/></workflow-app>
customer = LOAD '/user/vm4learning/oozie-ingesting-example/input-data/customer/part-m-00000' USING PigStorage(',') AS (c_id:int,first_name:chararray,age:int,gender:chararray,city:chararray);sales = LOAD '/user/vm4learning/oozie-ingesting-example/input-data/sales/part-m-00000' USING PigStorage(',') AS (c_id:int,item_id:int,date:chararray,status:chararray,quantity:int);inventory = LOAD '/user/vm4learning/oozie-ingesting-example/input-data/inventory/part-m-00000' USING PigStorage(',') AS (item_id:int,p_name:chararray,price:int);C1 = JOIN customer BY c_id, sales BY c_id;C2 = JOIN C1 BY item_id , inventory BY item_id;B = GROUP C2 BY city;X = FOREACH B GENERATE group, SUM(C2.price);STORE X into '$OUTPUT';
Copy the required libraries to HDFS
bin/hadoop fs -rmr /user/vm4learning/share/bin/hadoop fs -put /home/vm4learning/Code/share/ /user/vm4learning/share/
Copy the data related files in HDFS
bin/hadoop fs -rmr /user/vm4learning/oozie-ingest-examplesbin/hadoop fs -put /home/vm4learning/Code/oozie-ingest-examples/ /user/vm4learning/oozie-ingest-examples
Start Oozie
bin/oozied.sh starthttp://localhost:11000/oozie/
Submit an Oozie workflow
bin/oozie job -oozie http://localhost:11000/oozie -config /home/vm4learning/Code/oozie-ingest-examples/apps/cs/job.properties -runThe output should appear in the /user/vm4learning/oozie-ingest-examples/finaloutput folder in HDFS after the workflow is complete.
Submit an Oozie coordinator
Modify the start time and the end time in the /home/vm4learning/Code/oozie-ingest-examples/apps/scheduler/coordinator.xml. Make sure it is 1 hr back to the system time. The job will run every 10 minutes.
- Initially the job will be in the `RUNNING` state and finally will reach the `SUCCEEDED` state. The progress of the work flow can be monitored from Oozie console at http://localhost:11000/oozie/.
- The output should appear as below in the 'oozie-ingesting-example/intermediate/part-r-00000' file in HDFS.
- The output should appear as below in the 'oozie-ingesting-example/intermediate/part-r-00000' file in HDFS.


No comments:
Post a Comment