这分析绝了,代码实战演示Kafka消费者分组(Consumer Group)策略

一、原理介绍

每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)

Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。这样设计的劣势是无法让同一个consumer group里的consumer均匀消费数据,优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据也是有序被消费。

这分析绝了,代码实战演示Kafka消费者分组(Consumer Group)策略

二、SpringBoot集成kafka

代码下载地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

项目名:spring-boot-kafka-group-consumer

1、添加依赖

1)spring-kafka与kafka的版本选择问题

官网介绍:https://spring.io/projects/spring-kafka

这分析绝了,代码实战演示Kafka消费者分组(Consumer Group)策略

This matrix is client compatibility; in most cases (since 0.10.2.0) newer clients can communicate with older brokers. All users with brokers >= 0.10.x.x are recommended to use spring-kafka version 1.3.x or higher

版本选择:

SpringBoot:1.5.16

Kafka:1.0.2

SpringKafka:1.3.2

2、配置kafka连接属性

spring.kafka.bootstrap-servers=192.168.0.108:9093,192.168.0.108:9094,192.168.0.108:9095
## 当两个消费者使用不同的分组时:都会收到消息,类似于"订阅"功能!!!
## 当使用相同的分组时,当某一个消费者不消费时,另外一个消费者开始消费
spring.kafka.consumer.group-id=myGroup2
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3、生产者

@Component
public class Sender {

@Autowired
private KafkaTemplate kafkaTemplate;

public void sendMessage(){
Message m = new Message();
m.setId(System.currentTimeMillis());
m.setMsg(UUID.randomUUID().toString());
m.setSendTime(new Date());
kafkaTemplate.send("jikeh", JSONObject.toJSONString(m));
}
}

4、构建两个消费者

@Component
public class Receiver {

@KafkaListener(topics = "jikeh")
public void processMessage(String content) {
Message m = JSONObject.parseObject(content, Message.class);
System.out.println("接受信息:"+m.getMsg());
}
}

三、场景测试

  • 启动单节点单broker环境

bin/kafka-server-start.sh config/server.properties

  • 创建一个测试topic:jikeh_consumer_group

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jikeh_consumer_group

  • 启动一个生产者,启动两个消费者同属一个消费者组

测试结果:只有一个消费者能够收到消息

  • 启动一个生产者,启动两个消费者分别属于两个不同的消费者组

测试结果:两个消费者都能收到消息

如果理解上有困难,环境参考视频教程:


分享到:


相關文章: