一:簡介
Kafka作為消息源Spout,Redis作為Bolt存儲實時計算的結果。
二:啟動zookeeper、Kafka服務、Redis服務
<code># 啟動redis
redis-sever
# 啟動zookeeper
./zkServer.sh start
# 啟動Kafka
sudo ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
# 創建test主題
./bin/kafka-topics --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test
# 生產者控制檯
./bin/kafka-console-producer --broker-list localhost:9092 --topic test
/<code>
三:示例
![Storm+Kafka+Reids WordCount代碼示例](http://p2.ttnews.xyz/loading.gif)
1. pom.xm
<code><dependency>
<groupid>org.apache.storm/<groupid>
<artifactid>storm-redis/<artifactid>
<version>2.1.0/<version>
/<dependency>
<dependency>
<groupid>org.apache.storm/<groupid>
<artifactid>storm-kafka-client/<artifactid>
<version>2.1.0/<version>
/<dependency>
<dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka_2.13/<artifactid>
<version>2.4.0/<version>
<exclusions>
<exclusion>
<groupid>org.apache.zookeeper/<groupid>
<artifactid>zookeeper/<artifactid>
/<exclusion>
<exclusion>
<groupid>log4j/<groupid>
<artifactid>log4j/<artifactid>
/<exclusion>
/<exclusions>
/<dependency>
<dependency>
<groupid>org.apache.storm/<groupid>
<artifactid>storm-core/<artifactid>
<version>2.1.0/<version>
/<dependency>
<dependency>
<groupid>org.projectlombok/<groupid>
<artifactid>lombok/<artifactid>
<version>1.18.12/<version>
/<dependency>
/<code>
2. SplitSentenceBolt
<code>import lombok.extern.slf4j.Slf4j;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* 將句子分隔成單詞
*/
@Slf4j
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map<string> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
try {
String sentence = input.getStringByField("sentence");
String[] words = sentence.split(" ");
// 將每個單詞流向到下一個Bolt
for (String word : words) {
// 發射時攜帶發射過來的input
collector.emit(input, new Values(word));
}
// 處理成功了給當前tuple做一個成功的標記,調用上游的ack方法
collector.ack(input);
} catch (Exception e) {
log.error("SplitSentenceBolt#execute exception", e);
// 異常做一個失敗的標記,調用上游的fail方法
collector.fail(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
/<string>/<code>
3. WordCountBolt
<code>import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private Map<string> wordCountMap = null;
/**
* 大部分示例變量通常在prepare中進行實例化
* @param topoConf
* @param context
* @param collector
*/
@Override
public void prepare(Map<string> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.wordCountMap = new HashMap<>();
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Long count = wordCountMap.get(word);
if (count == null) {
count = 0L;
}
count++;
wordCountMap.put(word, count);
collector.emit(new Values(word, count));
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
/<string>/<string>/<code>
4. WriteRedisBolt
<code>import org.apache.storm.redis.bolt.AbstractRedisBolt;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.JedisCommands;
public class WriteRedisBolt extends AbstractRedisBolt {
public WriteRedisBolt(JedisPoolConfig config) {
super(config);
}
@Override
protected void process(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = tuple.getLongByField("count");
JedisCommands jedisCommands = getInstance();
jedisCommands.hset("wordcount", word, count.toString());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
/<code>
5. WordCountTopology
<code>import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.example.demo.bolt.SplitSentenceBolt;
import org.example.demo.bolt.WordCountBolt;
import org.example.demo.bolt.WriteRedisBolt;
public class WordCountTopology {
public static void main(String[] args) throws Exception {
// Redis配置
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
.setHost("127.0.0.1")
.setPort(6379)
.setPassword("123456")
.setTimeout(3000)
.build();
String topic = "test";
// 該類將傳入的kafka記錄轉換為storm的tuple
ByTopicRecordTranslator<string> brt = new ByTopicRecordTranslator<>(
(r) -> new Values(r.value(), r.topic()),
new Fields("sentence", topic));
// 設置要消費的topic
brt.forTopic(topic, (r) -> new Values(r.value(), r.topic()), new Fields("sentence", topic));
KafkaSpoutConfig<string> kafkaSpoutConfig = KafkaSpoutConfig
.builder("localhost:9092", topic)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
.setRecordTranslator(brt)
.build();
KafkaSpout<string> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", kafkaSpout);
builder.setBolt("split-bolt", new SplitSentenceBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("word-count-bolt", new WordCountBolt()).shuffleGrouping("split-bolt");
builder.setBolt("write-redis-bolt", new WriteRedisBolt(jedisPoolConfig)).globalGrouping("word-count-bolt");
StormTopology topology = builder.createTopology();
Config config = new Config();
if (args == null || args.length == 0) {
// 本地模式
config.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCountTopology", config, topology);
} else {
// 集群模式
StormSubmitter.submitTopology(args[0],config,builder.createTopology());
}
}
}
/<string>/<string>/<string>/<code>
6. 運行 WordCountTopology#main
7. Kafka生產消息
![Storm+Kafka+Reids WordCount代碼示例](http://p2.ttnews.xyz/loading.gif)
本號主要用於發佈工作中比較常用實用的技術,更側重於乾貨,如有需要請關注本賬號,本號將持續發佈乾貨文章。
閱讀更多 Java實用技術 的文章