12.30 大数据 Kafka 知识 要点

大数据 Kafka 知识 要点

信息时代

kafka消息服务器使用场景 90% 缓冲 消息通讯组件

1、jd订单系统

2、解决数据的并发写

Topic :消息分类,以日志分区存储,每一个分区都会有leader和Follower

Record:组成Topic的基本单元,一则消息 key value ts

broker:运行Kafka服务-broker 一台机器只运行一个Broker

Producer:生产消息

Consumer:消费消息

ConsumerGroup:把消费者归类,同一Group的消费者默认是对Topic分区消息实现负载均衡(Fair Shard)

不同Group之间相互不影响,消息是以广播的形式发布。

大数据 Kafka 知识 要点

Kafka消息服务搭建:

1.解压到/usr目录

[root@CentOS ~]# tar zxf kafka_2.11-0.11.0.0.tgz -C /usr/

2.创建软连接(可选)

[root@CentOS ~]# ln -s /usr/kafka_2.11-0.11.0.0/ kafka

3.启动zookeeper (对broker监控,记录集群的元数据信息)

4.搭建kafka的伪分布式

1.创建三个kafka的配置文件

[root@CentOS config]# cp server.properties server-1.properties

[root@CentOS config]# cp server.properties server-2.properties

[root@CentOS config]# cp server.properties server-3.properties

修改配置文件:

server-1.peroperties

broker.id=0 -- 一个kafka实例唯一标示必须唯一

delete.topic.enable=true --允许用户删除topic

listeners=PLAINTEXT://CentOS:9092 --因为kafka服务是通过TCP/IP实现

log.dirs=/tmp/kafka-logs-1 --配置kafka消息存储路径

zookeeper.connect=CentOS:2181

server-2.peroperties

broker.id=1 -- 一个kafka实例唯一标示必须唯一

delete.topic.enable=true --允许用户删除topic

listeners=PLAINTEXT://CentOS:9093 --因为kafka服务是通过TCP/IP实现

log.dirs=/tmp/kafka-logs-2 --配置kafka消息存储路径

zookeeper.connect=CentOS:2181

server-3.peroperties

broker.id=3 -- 一个kafka实例唯一标示必须唯一

delete.topic.enable=true --允许用户删除topic

listeners=PLAINTEXT://CentOS:9094 --因为kafka服务是通过TCP/IP实现

log.dirs=/tmp/kafka-logs-3 --配置kafka消息存储路径

zookeeper.connect=CentOS:2181

启动Kafka

start.sh

echo 'start kafka cluster..'

for i in {1..3}

do

/usr/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh /usr/kafka_2.11-0.11.0.0/config/server-$i.properties 1>/dev/null 2>&1 &

sleep 5

done

shutdown.sh

for i in `jps|grep Kafka | awk '{print $1}'`

do

echo 'kill kafka '$i

kill -9 $i

done

Topic管理

创建Topic

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --create --topic topic01 --partitions 3 --replication-factor 2

查看所有Topic

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --list

查看Tpoic详情

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --describe --topic topic01

Topic:topic01PartitionCount:3ReplicationFactor:2Configs:

Topic: topic01Partition: 0Leader: 2Replicas: 2,0Isr: 2,0

Topic: topic01Partition: 1Leader: 0Replicas: 0,1Isr: 0,1

Topic: topic01Partition: 2Leader: 1Replicas: 1,2Isr: 1,2

修改分区

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --alter --topic topic01 --partitions 4

删除

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --delete --topic topic01

Topic topic01 is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

订阅消息:

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --from-beginning

生产消息

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-console-producer.sh --broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01

介绍:http://kafka.apache.org/documentation/

Java集成Kafka

<dependency>

<groupid>org.apache.kafka/<groupid>

<artifactid>kafka-clients/<artifactid>

<version>0.11.0.0/<version>

大数据 Kafka 知识 要点

1.消费方offset管理

①:手动一定offset

kafkaConsumer.seek(new TopicPartition("t_user_topic",part),offset);

②kafkaConsumer.commitAsync();

提交指定分区的offset

①props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

②将所有的消息的消费放置在try{}catch(Exception e)

try{

//消费消息

//提交offset+1

}catch(Exception e){

//seek到上一次消费的消息

}

提交到指定分区

Map<topicpartition> offsetMetaMap=new HashMap<topicpartition>() ;/<topicpartition>/<topicpartition>

OffsetAndMetadata offsetMeta=new OffsetAndMetadata(offset+1);//必须必当前offset大1

offsetMetaMap.put(new TopicPartition("topic01",分区号),offsetMeta);

kafkaConsumer.commitAsync(offsetMetaMap,null);

2、如何使用Kafka发送复杂消息

Deserializer、Serializer接口

3.如何实现Partition数据的负载均衡

①生产方:实现 Partitioner

②消费方:

1.subscribe fair shared 均分策略

2.assign 指定指定分区


分享到:


相關文章: