引言
前面,我們已經介紹了kafka入門知識,storm的入門知識,如果你對這些都不熟悉,請參考之前發佈的文章,這篇文章主要是實戰,沒有過多的理論介紹,如果有任何疑問,可以在下面留言,會在第一時間進行回覆……文章最後,附有完整的代碼實例
一、Kafka環境啟動
版本選擇:版本很重要,否則搭建環境,會出現很多問題,切記本篇文章只針對以下版本有效,其它版本請自行測試……
Zookeeper:3.4.12
Kafka:0.9.0.1
Storm:1.1.1
1、啟動zookeeper環境
zkServer.sh start
2、啟動Kafka環境
cd /usr/local/kafka_2.11-0.9.0.1
bin/kafka-server-start.sh -daemon config/server-1.properties &
3、查看進程,看是否啟動成功
[root@jikeh ~]# jps
2322 Jps
2052 ConsoleProducer
1556 QuorumPeerMain #zookeeper啟動成功
1966 Kafka #kafka啟動成功
二、Storm集成Kafka
1、Kafka消費者
1)pom依賴
<properties>
<project.build.sourceencoding>UTF-8
<project.reporting.outputencoding>UTF-8
<java.version>1.8
<maven.compiler.source>1.8
<maven.compiler.target>1.8
<spring-boot.version>1.5.16.RELEASE
<spring-boot-maven-plugin.version>1.5.16.RELEASE
<maven-compiler-plugin.version>3.5.1
<storm.version>1.1.1
<kafka.clients.version>0.9.0.1
<dependencies>
<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot
<version>${spring-boot.version}
<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot-starter-web
<version>${spring-boot.version}
<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot-configuration-processor
<version>${spring-boot.version}
<dependency>
<groupid>org.apache.storm
<artifactid>storm-core
<version>${storm.version}
<exclusions>
<exclusion>
<groupid>org.apache.logging.log4j
<artifactid>log4j-slf4j-impl
<dependency>
<groupid>org.apache.storm
<artifactid>storm-kafka
<version>${storm.version}
<exclusions>
<exclusion>
<groupid>org.apache.kafka
<artifactid>kafka-clients
<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j
<dependency>
<groupid>org.apache.kafka
<artifactid>kafka-clients
<version>${kafka.clients.version}
<exclusions>
<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j
<dependency>
<groupid>org.apache.kafka
<artifactid>kafka_2.11
<version>${kafka.clients.version}
<exclusions>
<exclusion>
<groupid>org.apache.kafka
<artifactid>kafka-clients
<exclusion>
<groupid>org.apache.zookeeper
<artifactid>zookeeper
<exclusion>
<groupid>log4j
<artifactid>log4j
<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j
<exclusion>
<artifactid>slf4j-log4j12
<groupid>org.slf4j
2)Kafka Spout:接收Kafka消息
//這個地方其實就是kafka配置文件裡邊的zookeeper.connect這個參數,可以去那裡拿過來。
//ZkStr 字符串格式是 ip:port(例如:localhost:2181).brokerZkPath 是存儲所有 topic 和 partition信息的zk 根路徑.默認情況下,Kafka使用 /brokers路徑.
String brokerZkStr = "192.168.199.147:2181";
ZkHosts zkHosts = new ZkHosts(brokerZkStr);
String topic = "jikeh";
//彙報offset信息的root路徑
String offsetZkRoot = "/" + topic;
//存儲該spout id的消費offset信息,譬如以topoName來命名
String offsetZkId = UUID.randomUUID().toString();
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//kafka.api.OffsetRequest.EarliestTime(): 從topic 初始位置讀取消息 (例如,從最老的那個消息開始)
//kafka.api.OffsetRequest.LatestTime(): 從topic尾部開始讀取消息 (例如,新寫入topic的信息)
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
KafkaSpout spout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout);
3)處理kafka消息
public class KafkaConsumerBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
try {
String message = tuple.getStringByField("str");
// String message = tuple.getString(0);
System.out.println("--->" + message);
this.collector.ack(tuple);
} catch (Exception e) {
this.collector.fail(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
4)本地模式運行Storm作業
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaConsumerTopology", config, builder.createTopology());
2、Kafka生產者
1)pom
與上面相同
2)數據源
public class MessageSpout extends BaseRichSpout {
private Fields fields = null;
private SpoutOutputCollector collector ;
public MessageSpout(Fields fields){
this.fields = fields;
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
for (int i = 0; i < 5; i++) {
this.collector.emit(new Values("jikeh", "visit--" + i));
}
Utils.sleep(2000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(this.fields);
}
@Override
public void ack(Object o) {
}
@Override
public void fail(Object o) {
}
}
3)數據處理並寫入kafka
//2、寫入kafka
//set producer properties.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.199.147:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, 3);
KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector(topicName))
// .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"))
;
builder.setBolt("bolt", bolt).shuffleGrouping("spout");
代碼下載地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git
項目名:spring-boot-storm-kafka
閱讀更多 極客慧 的文章