kafka消息服务器使用场景 90% 缓冲 消息通讯组件
1、jd订单系统
2、解决数据的并发写
Topic :消息分类,以日志分区存储,每一个分区都会有leader和Follower
Record:组成Topic的基本单元,一则消息 key value ts
broker:运行Kafka服务-broker 一台机器只运行一个Broker
Producer:生产消息
Consumer:消费消息
ConsumerGroup:把消费者归类,同一Group的消费者默认是对Topic分区消息实现负载均衡(Fair Shard)
不同Group之间相互不影响,消息是以广播的形式发布。
Kafka消息服务搭建:
1.解压到/usr目录
[root@CentOS ~]# tar zxf kafka_2.11-0.11.0.0.tgz -C /usr/
2.创建软连接(可选)
[root@CentOS ~]# ln -s /usr/kafka_2.11-0.11.0.0/ kafka
3.启动zookeeper (对broker监控,记录集群的元数据信息)
4.搭建kafka的伪分布式
1.创建三个kafka的配置文件
[root@CentOS config]# cp server.properties server-1.properties
[root@CentOS config]# cp server.properties server-2.properties
[root@CentOS config]# cp server.properties server-3.properties
修改配置文件:
server-1.peroperties
broker.id=0 -- 一个kafka实例唯一标示必须唯一
delete.topic.enable=true --允许用户删除topic
listeners=PLAINTEXT://CentOS:9092 --因为kafka服务是通过TCP/IP实现
log.dirs=/tmp/kafka-logs-1 --配置kafka消息存储路径
zookeeper.connect=CentOS:2181
server-2.peroperties
broker.id=1 -- 一个kafka实例唯一标示必须唯一
delete.topic.enable=true --允许用户删除topic
listeners=PLAINTEXT://CentOS:9093 --因为kafka服务是通过TCP/IP实现
log.dirs=/tmp/kafka-logs-2 --配置kafka消息存储路径
zookeeper.connect=CentOS:2181
server-3.peroperties
broker.id=3 -- 一个kafka实例唯一标示必须唯一
delete.topic.enable=true --允许用户删除topic
listeners=PLAINTEXT://CentOS:9094 --因为kafka服务是通过TCP/IP实现
log.dirs=/tmp/kafka-logs-3 --配置kafka消息存储路径
zookeeper.connect=CentOS:2181
启动Kafka
start.sh
echo 'start kafka cluster..'
for i in {1..3}
do
/usr/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh /usr/kafka_2.11-0.11.0.0/config/server-$i.properties 1>/dev/null 2>&1 &
sleep 5
done
shutdown.sh
for i in `jps|grep Kafka | awk '{print $1}'`
do
echo 'kill kafka '$i
kill -9 $i
done
Topic管理
创建Topic
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --create --topic topic01 --partitions 3 --replication-factor 2
查看所有Topic
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --list
查看Tpoic详情
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --describe --topic topic01
Topic:topic01PartitionCount:3ReplicationFactor:2Configs:
Topic: topic01Partition: 0Leader: 2Replicas: 2,0Isr: 2,0
Topic: topic01Partition: 1Leader: 0Replicas: 0,1Isr: 0,1
Topic: topic01Partition: 2Leader: 1Replicas: 1,2Isr: 1,2
修改分区
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --alter --topic topic01 --partitions 4
删除
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --delete --topic topic01
Topic topic01 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
订阅消息:
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --from-beginning
生产消息
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-console-producer.sh --broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01
介绍:http://kafka.apache.org/documentation/
Java集成Kafka
<dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka-clients/<artifactid>
<version>0.11.0.0/<version>
1.消费方offset管理
①:手动一定offset
kafkaConsumer.seek(new TopicPartition("t_user_topic",part),offset);
②kafkaConsumer.commitAsync();
提交指定分区的offset
①props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
②将所有的消息的消费放置在try{}catch(Exception e)
try{
//消费消息
//提交offset+1
}catch(Exception e){
//seek到上一次消费的消息
}
提交到指定分区
Map<topicpartition> offsetMetaMap=new HashMap<topicpartition>() ;/<topicpartition>/<topicpartition>
OffsetAndMetadata offsetMeta=new OffsetAndMetadata(offset+1);//必须必当前offset大1
offsetMetaMap.put(new TopicPartition("topic01",分区号),offsetMeta);
kafkaConsumer.commitAsync(offsetMetaMap,null);
2、如何使用Kafka发送复杂消息
Deserializer、Serializer接口
3.如何实现Partition数据的负载均衡
①生产方:实现 Partitioner
②消费方:
1.subscribe fair shared 均分策略
2.assign 指定指定分区
閱讀更多 JackYang1993 的文章