Storm集成Kafka環境搭建,原理介紹,功能開發,功能測試

引言

前面,我們已經介紹了kafka入門知識,storm的入門知識,如果你對這些都不熟悉,請參考之前發佈的文章,這篇文章主要是實戰,沒有過多的理論介紹,如果有任何疑問,可以在下面留言,會在第一時間進行回覆……文章最後,附有完整的代碼實例

一、Kafka環境啟動

版本選擇:版本很重要,否則搭建環境,會出現很多問題,切記本篇文章只針對以下版本有效,其它版本請自行測試……

Zookeeper:3.4.12

Kafka:0.9.0.1

Storm:1.1.1

1、啟動zookeeper環境

zkServer.sh start

2、啟動Kafka環境

cd /usr/local/kafka_2.11-0.9.0.1

bin/kafka-server-start.sh -daemon config/server-1.properties &

3、查看進程,看是否啟動成功

[root@jikeh ~]# jps
2322 Jps
2052 ConsoleProducer
1556 QuorumPeerMain #zookeeper啟動成功
1966 Kafka #kafka啟動成功

二、Storm集成Kafka

1、Kafka消費者

1)pom依賴

<properties>
<project.build.sourceencoding>UTF-8
<project.reporting.outputencoding>UTF-8

<java.version>1.8
<maven.compiler.source>1.8
<maven.compiler.target>1.8
<spring-boot.version>1.5.16.RELEASE
<spring-boot-maven-plugin.version>1.5.16.RELEASE
<maven-compiler-plugin.version>3.5.1
<storm.version>1.1.1
<kafka.clients.version>0.9.0.1


<dependencies>
<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot
<version>${spring-boot.version}

<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot-starter-web
<version>${spring-boot.version}

<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot-configuration-processor
<version>${spring-boot.version}


<dependency>

<groupid>org.apache.storm
<artifactid>storm-core
<version>${storm.version}

<exclusions>
<exclusion>
<groupid>org.apache.logging.log4j
<artifactid>log4j-slf4j-impl




<dependency>
<groupid>org.apache.storm
<artifactid>storm-kafka
<version>${storm.version}

<exclusions>
<exclusion>
<groupid>org.apache.kafka
<artifactid>kafka-clients

<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j




<dependency>
<groupid>org.apache.kafka
<artifactid>kafka-clients
<version>${kafka.clients.version}
<exclusions>
<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j





<dependency>
<groupid>org.apache.kafka
<artifactid>kafka_2.11
<version>${kafka.clients.version}
<exclusions>
<exclusion>
<groupid>org.apache.kafka
<artifactid>kafka-clients

<exclusion>
<groupid>org.apache.zookeeper
<artifactid>zookeeper

<exclusion>
<groupid>log4j
<artifactid>log4j


<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j

<exclusion>
<artifactid>slf4j-log4j12
<groupid>org.slf4j





2)Kafka Spout:接收Kafka消息

//這個地方其實就是kafka配置文件裡邊的zookeeper.connect這個參數,可以去那裡拿過來。
//ZkStr 字符串格式是 ip:port(例如:localhost:2181).brokerZkPath 是存儲所有 topic 和 partition信息的zk 根路徑.默認情況下,Kafka使用 /brokers路徑.
String brokerZkStr = "192.168.199.147:2181";
ZkHosts zkHosts = new ZkHosts(brokerZkStr);

String topic = "jikeh";

//彙報offset信息的root路徑
String offsetZkRoot = "/" + topic;

//存儲該spout id的消費offset信息,譬如以topoName來命名
String offsetZkId = UUID.randomUUID().toString();

SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);

kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//kafka.api.OffsetRequest.EarliestTime(): 從topic 初始位置讀取消息 (例如,從最老的那個消息開始)
//kafka.api.OffsetRequest.LatestTime(): 從topic尾部開始讀取消息 (例如,新寫入topic的信息)
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
KafkaSpout spout = new KafkaSpout(kafkaConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout);

3)處理kafka消息

public class KafkaConsumerBolt extends BaseRichBolt {

private OutputCollector collector;

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}

@Override
public void execute(Tuple tuple) {
try {
String message = tuple.getStringByField("str");

// String message = tuple.getString(0);

System.out.println("--->" + message);

this.collector.ack(tuple);

} catch (Exception e) {
this.collector.fail(tuple);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}
}

4)本地模式運行Storm作業

Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaConsumerTopology", config, builder.createTopology());

2、Kafka生產者

1)pom

與上面相同

2)數據源

public class MessageSpout extends BaseRichSpout {

private Fields fields = null;
private SpoutOutputCollector collector ;

public MessageSpout(Fields fields){
this.fields = fields;
}

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}

@Override
public void nextTuple() {
for (int i = 0; i < 5; i++) {
this.collector.emit(new Values("jikeh", "visit--" + i));
}
Utils.sleep(2000);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(this.fields);
}

@Override
public void ack(Object o) {

}

@Override
public void fail(Object o) {

}
}

3)數據處理並寫入kafka

//2、寫入kafka
//set producer properties.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.199.147:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, 3);

KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector(topicName))
// .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"))
;
builder.setBolt("bolt", bolt).shuffleGrouping("spout");

代碼下載地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

項目名:spring-boot-storm-kafka


分享到:


相關文章: