Twitter Data Analysis Using Hadoop Flume
we will try the experimental Twitter Source provided by Apache’s Flume distribution to get streaming tweets into HDFS and we will process them in Hive and create structured tables which can be further utilized for analytics via any other tool like, Tableau, QlikView or Hunk.
Flume TwitterAgent Setup
we will setup a Twitter Agent in Apache Flume distribution (apache-flume-1.5.2-bin , which is the latest version at the time of writing this post).
Twitter Source Overview
Apache’s Twitter Source org.apache.flume.source.twitter.TwitterSource is highly experimental and may change between minor versions of Flume. This should be used at our own risk and curiosity to analyze real-time streaming data.
Twitter Source connects via Streaming API to the twitter fire-hose, and continuously downloads tweets. These tweets are converted into Avro format and Avro events are sent to the downstream Flume sink, HDFS sink in our use case. For Connecting with Twitter streaming data, we need consumer and access tokens and secrets of a Twitter developer account.
Creation of Twitter Developer Account
Twitter Developer Account can be created at Twitter Developers apps Page. In this page we need to provide valid twitter account page in the website field from which we need to get streaming data. If we provide valid details on this page we will get our app created as shown in below screen shots. For security reasons, I have blurred consumer access key, secret values in the below screens.
We need below four values for authenticated by Twitter.
- Consumer Key (API Key)
- Consumer Secret (API Secret)
- Access Token
- Access Token Secret
In the same screen, We can create access tokens by clicking on ‘Create my access token‘. Access tokens will be as shown in the below screen.
Creation of Agent in flume.conf
Lets create the agent named TwitterAgent under flume.conf file in FLUME_CONF_DIR directory location with below configuration properties. In this we need to specify the access tokens collected from twitter correctly in Twitter source setup.
Download the flume-sources-1.0-SNAPSHOT.jar and add it to the flume class path as shown below in the conf/flume-env.sh file
FLUME_CLASSPATH=
"/home/training/Installations/apache-flume-1.3.1-bin/flume-sources-1.0-SNAPSHOT.jar"
The jar contains the java classes to pull the Tweets and save them into HDFS.
The conf/flume.conf should have all the agents (flume, memory and hdfs) defined as below
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = <consumerKey>
TwitterAgent.sources.Twitter.consumerSecret = <consumerSecret>
TwitterAgent.sources.Twitter.accessToken = <accessToken>
TwitterAgent.sources.Twitter.accessTokenSecret = <accessTokenSecret>
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/flume/tweets/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
________________________________________________________________________________________
Start flume using the below command
bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent
After a couple of minutes the Tweets should appear in HDFS.
Next download and extract Hive. Modify the conf/hive-site.xml to include the locations of the NameNode and the JobTracker as below
<
configuration
>
<
property
>
<
name
>fs.default.name</
name
>
</
property
>
<
property
>
<
name
>mapred.job.tracker</
name
>
<
value
>localhost:9001</
value
>
</
property
>
</
configuration
>
Lets create structured tables in Hive with the help of AvroSerde and the Schema file created above, TwitterDataAvroSchema.avsc.
We will use the below HiveQL statements for creating managed tweets table in hive. Save this into avrodataread.hql file CREATE EXTERNAL TABLE tweets (
id BIGINT,
created_at STRING,
source STRING,
favorited BOOLEAN,
retweet_count INT,
retweeted_status STRUCT<
text:STRING,
user:STRUCT<screen_name:STRING,name:STRING>>,
entities STRUCT<
urls:ARRAY<STRUCT<expanded_url:STRING>>,
user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
hashtags:ARRAY<STRUCT<text:STRING>>>,
text STRING,
user STRUCT<
screen_name:STRING,
name:STRING,
friends_count:INT,
followers_count:INT,
statuses_count:INT,
verified:BOOLEAN,
utc_offset:INT,
time_zone:STRING>,
in_reply_to_screen_name STRING
)
ROW FORMAT SERDE
'com.cloudera.hive.serde.JSONSerDe'
LOCATION
'/user/flume/tweets'
;
One of the way to determine who is the most influential person in a particular field is to to figure out whose tweets are re-tweeted the most. Give enough time for Flume to collect Tweets from Twitter to HDFS and then run the below query in Hive to determine the most influential person.
SELECT t.retweeted_screen_name, sum(retweets) AS total_retweets, count(*) AS tweet_count FROM (SELECT retweeted_status.user.screen_name as retweeted_screen_name, retweeted_status.text, max(retweet_count) as retweets FROM tweets GROUP BY retweeted_status.user.screen_name, retweeted_status.text) t GROUP BY t.retweeted_screen_name ORDER BY total_retweets DESC LIMIT
10
;
No comments:
Post a Comment