Kafka快速入门

一、介绍

是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的消息队列系统

1、点对点模式

生产者发送消息到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。即使有多个消费者同时消费数据,也能保证顺序


Kafka快速入门


生产者发送一条消息到queue,只有一个消费者能收到

2、发布-订阅模式

生产者生产数据到一个topic中。与点对点消不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。


Kafka快速入门


发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息


Kafka快速入门


3、broker

Kafka 集群包含一个或多个服务器,服务器节点称为broker。

4、Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。类似于数据库的表名

5、partition

topic中的数据分割为一个或多个partition。每个topic至少有一个partition。

6、Producer

生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

7、Consumer

消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

8、Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。


9、Leader

每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

10、Follower

Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。



Topic相当于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪个topic,但是不需要指定topic下的哪个partition,因为kafka会把收到的message进行load balance,均匀的分布在这个topic下的不同的partition上( hash(message) % [broker数量] )。

物理上存储上,这个topic会分成一个或多个partition,每个partiton相当于是一个子queue。在物理结构上,每个partition对应一个物理的目录(文件夹),文件夹命名是[topicname]_[partition]_[序号],一个topic可以有无数多的partition,根据业务需求和数据量来设置。

在kafka配置文件中可随时更高num.partitions参数来配置更改topic的partition数量,在创建Topic时通过参数指定parittion数量。Topic创建之后通过Kafka提供的工具也可以修改partiton数量。

一般来说,

(1)一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率。

(2)同一个Partition的Replica尽量分散到不同的机器,高可用。

当add a new partition的时候,partition里面的message不会重新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会重新参与所有partition的load balance

- Partition Replica:每个partition可以在其他的kafka broker节点上存副本,以便某个kafka broker节点宕机不会影响这个kafka集群。replica副本的方式是按照kafka broker的顺序存。

例如有5个kafka broker节点,某个topic有3个partition,每个partition存2个副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此类推(replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本)。

这样如果某个broker宕机,其实整个kafka内数据依然是完整的。但是,replica副本数越高,系统虽然越稳定,但是回来带资源和性能上的下降;replica副本少的话,也会造成系统丢数据的风险。

二、Kafka的优点

2.1 解耦

在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2.2 冗余(副本)

有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

2.3 扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

2.4 灵活性&峰值处理能力

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

2.5 可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

2.6 顺序保证

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。

2.7 缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。

2.8 异步通信

很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

先安装zookeeper

<code>#上传zookeeper,kafka
tar -zxvf zookeeper
tar -zxvf kafka

#修改zookeeper配置文件 zoo.cfg
vi zoo.cfg
#数据文件目录
dataDir=/opt/module/zookeeper-3.4.11/data
#集群配置,此处没有集群可以不配置
server.1=hadoop101:2888:3888
server.2=hadoop102:2888:3888
server.3=hadoop1032888:3888
/<code>

安装kafka

<code>mkdir logs 
#实际的数据也就放在logs
cd config/
vi server.properties
#必须是一个唯一的值
broker.id=0
#添加,是否可以删除topic,默认是false,不能删除
delete.topic.enable=true
log.dirs=/opt/module/kafka/logs
#存储的时间和大小
log.retention.hours=168
log.segment.bytes=1073741824
#zookeeper集群 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
/<code>
<code>启动kafka
bin/kafka-server-start.sh config/server.properties

创建主题
bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --partitions 1 --replication-factor 1 --topic first

查看主题
bin/kafka-topics.sh --list --zookeeper hadoop102:2181


删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first

发送消息
bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first

消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --from-beginning/<code>


分享到:


相關文章: