在本章中,我们将讨论Apache Storm的实时应用程序。我们将看到Storm如何在Twitter中使用。
Twitter是一种在线社交网络服务,提供发送和接收用户推文的平台。注册用户可以阅读和发布tweet,但未注册的用户只能阅读tweets。 Hashtag用于按关键字在相关关键字之前附加#来对tweet进行分类。现在让我们来看一个实时场景,找到每个主题使用最多的hashtag。
spout的目的是尽快收到人们提交的tweets。Twitter提供了“Twitter Streaming API”,一个基于Web服务的工具,用于实时检索人们提交的tweets。Twitter Streaming API可以使用任何编程语言访问。
twitter4j是一个开源的非官方Java库,它提供了一个基于Java的模块,可以轻松访问Twitter Streaming API。twitter4j提供了一个基于监听器的框架来访问tweet。要访问Twitter Streaming API,我们需要登录Twitter开发人员帐户,并获取以下OAuth身份验证详细信息。
Storm在其入门套件中提供了一个twitter spout,TwitterSampleSpout。我们将使用它来检索tweet。该邮件需要OAuth身份验证详细信息和至少一个关键字。该spout将发出基于关键字的实时tweet。完整的程序代码如下。
import java.util.Map; |
import java.util.concurrent.LinkedBlockingQueue; |
import twitter4j.FilterQuery; |
import twitter4j.StallWarning; |
import twitter4j.Status; |
import twitter4j.StatusDeletionNotice; |
import twitter4j.StatusListener; |
import twitter4j.TwitterStream; |
import twitter4j.TwitterStreamFactory; |
import twitter4j.auth.AccessToken; |
import twitter4j.conf.ConfigurationBuilder; |
import backtype.storm.Config; |
import backtype.storm.spout.SpoutOutputCollector; |
import backtype.storm.task.TopologyContext; |
import backtype.storm.topology.OutputFieldsDeclarer; |
import backtype.storm.topology.base.BaseRichSpout; |
import backtype.storm.tuple.Fields; |
import backtype.storm.tuple.Values; |
import backtype.storm.utils.Utils; |
@SuppressWarnings("serial") |
public class TwitterSampleSpout extends BaseRichSpout { |
SpoutOutputCollector _collector; |
LinkedBlockingQueue<Status> queue = null; |
TwitterStream _twitterStream; |
String consumerKey; |
String consumerSecret; |
String accessToken; |
String accessTokenSecret; |
String[] keyWords; |
public TwitterSampleSpout(String consumerKey, String consumerSecret, |
String accessToken, String accessTokenSecret, String[] keyWords) { |
this.consumerKey = consumerKey; |
this.consumerSecret = consumerSecret; |
this.accessToken = accessToken; |
this.accessTokenSecret = accessTokenSecret; |
this.keyWords = keyWords; |
} |
public TwitterSampleSpout() { |
// TODO Auto-generated constructor stub |
} |
@Override |
public void open(Map conf, TopologyContext context, |
SpoutOutputCollector collector) { |
queue = new LinkedBlockingQueue<Status>(1000); |
_collector = collector; |
StatusListener listener = new StatusListener() { |
@Override |
public void onStatus(Status status) { |
queue.offer(status); |
} |
@Override |
public void onDeletionNotice(StatusDeletionNotice sdn) {} |
@Override |
public void onTrackLimitationNotice(int i) {} |
@Override |
public void onScrubGeo(long l, long l1) {} |
@Override |
public void onException(Exception ex) {} |
@Override |
public void onStallWarning(StallWarning arg0) { |
// TODO Auto-generated method stub |
} |
}; |
ConfigurationBuilder cb = new ConfigurationBuilder(); |
cb.setDebugEnabled(true) |
.setOAuthConsumerKey(consumerKey) |
.setOAuthConsumerSecret(consumerSecret) |
.setOAuthAccessToken(accessToken) |
.setOAuthAccessTokenSecret(accessTokenSecret); |
_twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); |
_twitterStream.addListener(listener); |
if (keyWords.length == 0) { |
_twitterStream.sample(); |
}else { |
FilterQuery query = new FilterQuery().track(keyWords); |
_twitterStream.filter(query); |
} |
} |
@Override |
public void nextTuple() { |
Status ret = queue.poll(); |
if (ret == null) { |
Utils.sleep(50); |
} else { |
_collector.emit(new Values(ret)); |
} |
} |
@Override |
public void close() { |
_twitterStream.shutdown(); |
} |
@Override |
public Map<String, Object> getComponentConfiguration() { |
Config ret = new Config(); |
ret.setMaxTaskParallelism(1); |
return ret; |
} |
@Override |
public void ack(Object id) {} |
@Override |
public void fail(Object id) {} |
@Override |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
declarer.declare(new Fields("tweet")); |
} |
} |
由spout发出的tweet将被转发到HashtagReaderBolt,它将处理该tweet并发出所有可用的hashtag。HashtagReaderBolt使用twitter4j提供的getHashTagEntities方法。getHashTagEntities读取tweet并返回hashtag的列表。完整的程序代码如下 -
import java.util.HashMap; |
import java.util.Map; |
import twitter4j.*; |
import twitter4j.conf.*; |
import backtype.storm.tuple.Fields; |
import backtype.storm.tuple.Values; |
import backtype.storm.task.OutputCollector; |
import backtype.storm.task.TopologyContext; |
import backtype.storm.topology.IRichBolt; |
import backtype.storm.topology.OutputFieldsDeclarer; |
import backtype.storm.tuple.Tuple; |
public class HashtagReaderBolt implements IRichBolt { |
private OutputCollector collector; |
@Override |
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { |
this.collector = collector; |
} |
@Override |
public void execute(Tuple tuple) { |
Status tweet = (Status) tuple.getValueByField("tweet"); |
for(HashtagEntity hashtage : tweet.getHashtagEntities()) { |
System.out.println("Hashtag: " + hashtage.getText()); |
this.collector.emit(new Values(hashtage.getText())); |
} |
} |
@Override |
public void cleanup() {} |
@Override |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
declarer.declare(new Fields("hashtag")); |
} |
@Override |
public Map<String, Object> getComponentConfiguration() { |
return null; |
} |
} |
发出的hashtag将被转发到HashtagCounterBolt。这个bolt将处理所有的hashtags,并使用Java Map对象将每个hashtags及其计数保存在内存中。完整的程序代码如下。
import java.util.HashMap; |
import java.util.Map; |
import backtype.storm.tuple.Fields; |
import backtype.storm.tuple.Values; |
import backtype.storm.task.OutputCollector; |
import backtype.storm.task.TopologyContext; |
import backtype.storm.topology.IRichBolt; |
import backtype.storm.topology.OutputFieldsDeclarer; |
import backtype.storm.tuple.Tuple; |
public class HashtagCounterBolt implements IRichBolt { |
Map<String, Integer> counterMap; |
private OutputCollector collector; |
@Override |
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { |
this.counterMap = new HashMap<String, Integer>(); |
this.collector = collector; |
} |
@Override |
public void execute(Tuple tuple) { |
String key = tuple.getString(0); |
if(!counterMap.containsKey(key)){ |
counterMap.put(key, 1); |
}else{ |
Integer c = counterMap.get(key) + 1; |
counterMap.put(key, c); |
} |
collector.ack(tuple); |
} |
@Override |
public void cleanup() { |
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ |
System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); |
} |
} |
@Override |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
declarer.declare(new Fields("hashtag")); |
} |
@Override |
public Map<String, Object> getComponentConfiguration() { |
return null; |
} |
} |
提交拓扑是主要应用程序。Twitter拓扑由TwitterSampleSpout,HashtagReaderBolt和HashtagCounterBolt组成。以下程序代码显示如何提交拓扑。
import java.util.*; |
import backtype.storm.tuple.Fields; |
import backtype.storm.tuple.Values; |
import backtype.storm.Config; |
import backtype.storm.LocalCluster; |
import backtype.storm.topology.TopologyBuilder; |
public class TwitterHashtagStorm { |
public static void main(String[] args) throws Exception{ |
String consumerKey = args[0]; |
String consumerSecret = args[1]; |
String accessToken = args[2]; |
String accessTokenSecret = args[3]; |
String[] arguments = args.clone(); |
String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); |
Config config = new Config(); |
config.setDebug(true); |
TopologyBuilder builder = new TopologyBuilder(); |
builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, |
consumerSecret, accessToken, accessTokenSecret, keyWords)); |
builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) |
.shuffleGrouping("twitter-spout"); |
builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) |
.fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); |
LocalCluster cluster = new LocalCluster(); |
cluster.submitTopology("TwitterHashtagStorm", config, |
builder.createTopology()); |
Thread.sleep(10000); |
cluster.shutdown(); |
} |
} |
完整的应用程序有四个Java代码。他们如下 -
您可以使用以下命令编译应用程序 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java |
使用以下命令执行应用程序 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:. |
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret> |
<keyword1> <keyword2> … <keywordN> |
应用程序将打印当前可用的主题标签及其计数。输出应类似于以下内容 -
Result: jazztastic : 1 Result: foodie : 1 Result: Redskins : 1 Result: Recipe : 1 Result: cook : 1 Result: android : 1 Result: food : 2 Result: NoToxicHorseMeat : 1 Result: Purrs4Peace : 1 Result: livemusic : 1 Result: VIPremium : 1 Result: Frome : 1 Result: SundayRoast : 1 Result: Millennials : 1 Result: HealthWithKier : 1 Result: LPs30DaysofGratitude : 1 Result: cooking : 1 Result: gameinsight : 1 Result: Countryfile : 1 Result: androidgames : 1