关于kafka集群,程序员的学习笔记,超详细,看过都说好

kafka

kafka是个消息队列系统,用于消息的发布和订阅。

核心概念

kafka是个集群,意味着有很多节点组成。

broker,每个节点称作broker。

message,指的是发布到kafka的每一条记录。

topic,指的是message的分类。方便发布者和订阅者根据需要读取不同的内容。比如可以分为订单topic、日志topic等。

partition,指的是topic内部按照message划分为不同的类型,一个message只能属于一个topic中的某一个partition。目的是为了让同一个topic的数据分散在不同的broker上,实现负载均衡。比如订单topic可以按照ip分为不同的partition,一个partition中的数据在一个broker中存储。

replica,指的是同一个topic内部消息存放多少份,每一份存放在不同的broker中。目的是为了备份,保证数据安全。

leader,指的接收客户读写请求的broker。

follower,指的是同步数据的broker。

producer,指的是发布消息。

consumer,指的是订阅消息。

offset(偏移量),偏移量实际上就是数据的索引,类似于数组中的下标0、1、2、3。

消费方式

消费者需要分组。

同一组内的消费者只能有一个消费者读取同一条消息;目的是为了防止多线程读取数据的时候,产生重复读的问题。

不同组之间的消费者可以多次读取同一条消息;目的是为了共享消息。

安装

修改配置文件config/server.properties

修改以下参数

broker.id

host.name

log.dirs

zookeeper.connect

delete.topic.enable=true

把kafka复制到其他节点后,记得修改server.properties中的broker.id和host.name的值。

启动命令

nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

命令行操作

创建主题

bin/kafka-topics.sh --create --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --replication-factor 1 --partitions 1 --topic test

描述主题

bin/kafka-topics.sh --describe --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --topic test

删除主题

bin/kafka-topics.sh --delete --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --topic test

生产者

bin/kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.02:9092 --topic test

消费者

bin/kafka-console-consumer.sh --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --topic test --from-beginning

查看消费进度

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 192.168.1.100:2181 --topic wuchao -group group1

关于kafka集群,程序员的学习笔记,超详细,看过都说好

java操作

依赖

 org.apache.kafka kafka_2.11 0.8.2.2 org.scala-lang scala-library 2.11.8 org.scala-lang scala-reflect 2.11.8

生产者

public static void main(String[] args) throws IOException { Properties originalProps = new Properties(); //该originalProps存储着我们访问kafka集群所需要的信息 originalProps.load(KafkaProducerTest.class.getResourceAsStream("producer.properties")); //如果发送String类型的消息,一定要是有StringEncoder originalProps.put("serializer.class", "kafka.serializer.StringEncoder");  //1.获取可以操作的对象 Producer producer = new Producer(new ProducerConfig(originalProps));  //2.发送数据 for(int i=0; i<5; i++){ KeyedMessage message = new KeyedMessage(topic, i+"--"); producer.send(message); }    //3.关闭 producer.close();}

分区类

kafka默认使用的分区类,是随机分区。

我们可以指定分区方式,好处是让同一类输入一个partition,方便客户端消费的时候,可以从同一个partition取到这一类数据。

//自定义分区,需要实现一个接口类public class MyParitioner implements Partitioner{ //一定要使用有参的构造方法 public MyParitioner(VerifiableProperties p) { } /** * @param key表示在生产者生产数据的时候,需要指定的。 */ @Override public int partition(Object key, int numPartitions) { if(key==null) return 0; return Integer.parseInt(key.toString())%2; }}

在生产者的代码中,需要修改2个地方:

一个是在配置中,指定自定义的分区类

originalProps.put("partitioner.class", MyParitioner.class.getName());

一个是发送消息的时候,指定key

KeyedMessage message = new KeyedMessage(topic, i+"", i+"----");producer.send(message);

消费者

//1.获取消费者对象ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(originalProps ));//2.消费数据Map topicCountMap = new HashMap<>();//topicCountMap中key指的是topic名称,value指的是多少个线程来消费该topictopicCountMap.put("wuchao", 1); //只有1个线程消费topic主题//messageStreams结构中,key指的是topicMap>> messageStreams = consumer.createMessageStreams(topicCountMap);List> list = messageStreams.get("wuchao");//因为上面设置了1个线程,所以list中只有一个元素。返回值KafkaStream就是指向kafka集群中wuchao这个主题的流KafkaStream 
kafkaStream = list.get(0);ConsumerIterator iterator = kafkaStream.iterator();while(iterator.hasNext()){ //这是个阻塞操作。如果流中没有数据,就在这里等待 MessageAndMetadata mmd = iterator.next(); //获得主题 String topic = mmd.topic(); //获得分区编号 int partition = mmd.partition(); //指的是该消息的偏移量 long offset = mmd.offset(); //读取消息 String message = new String(mmd.message()); System.out.println("topic="+topic+"\tpartition="+partition+"\toffset="+offset+"\tmessage="+message); //立刻修改zk中关于偏移量的值 consumer.commitOffsets();}

消费者如何查看自己消费到了哪个位置

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 192.168.1.100:2181 --topic wuchao -group group1

消费者是否可以随意指定自己消费的位置

直接在zk中使用set命令修改消费的offset。

关于kafka集群,程序员的学习笔记,超详细,看过都说好

是否可以指定每次都从开始消费

在consumer.properties中增加配置信息auto.offset.reset=earliest


分享到:


相關文章: