kafka
kafka是个消息队列系统,用于消息的发布和订阅。
核心概念
kafka是个集群,意味着有很多节点组成。
broker,每个节点称作broker。
message,指的是发布到kafka的每一条记录。
topic,指的是message的分类。方便发布者和订阅者根据需要读取不同的内容。比如可以分为订单topic、日志topic等。
partition,指的是topic内部按照message划分为不同的类型,一个message只能属于一个topic中的某一个partition。目的是为了让同一个topic的数据分散在不同的broker上,实现负载均衡。比如订单topic可以按照ip分为不同的partition,一个partition中的数据在一个broker中存储。
replica,指的是同一个topic内部消息存放多少份,每一份存放在不同的broker中。目的是为了备份,保证数据安全。
leader,指的接收客户读写请求的broker。
follower,指的是同步数据的broker。
producer,指的是发布消息。
consumer,指的是订阅消息。
offset(偏移量),偏移量实际上就是数据的索引,类似于数组中的下标0、1、2、3。
消费方式
消费者需要分组。
同一组内的消费者只能有一个消费者读取同一条消息;目的是为了防止多线程读取数据的时候,产生重复读的问题。
不同组之间的消费者可以多次读取同一条消息;目的是为了共享消息。
安装
修改配置文件config/server.properties
修改以下参数
broker.id
host.name
log.dirs
zookeeper.connect
delete.topic.enable=true
把kafka复制到其他节点后,记得修改server.properties中的broker.id和host.name的值。
启动命令
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
命令行操作
创建主题
bin/kafka-topics.sh --create --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --replication-factor 1 --partitions 1 --topic test
描述主题
bin/kafka-topics.sh --describe --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --topic test
删除主题
bin/kafka-topics.sh --delete --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --topic test
生产者
bin/kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.02:9092 --topic test
消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --topic test --from-beginning
查看消费进度
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 192.168.1.100:2181 --topic wuchao -group group1
java操作
依赖
org.apache.kafka kafka_2.11 0.8.2.2 org.scala-lang scala-library 2.11.8 org.scala-lang scala-reflect 2.11.8
生产者
public static void main(String[] args) throws IOException { Properties originalProps = new Properties(); //该originalProps存储着我们访问kafka集群所需要的信息 originalProps.load(KafkaProducerTest.class.getResourceAsStream("producer.properties")); //如果发送String类型的消息,一定要是有StringEncoder originalProps.put("serializer.class", "kafka.serializer.StringEncoder"); //1.获取可以操作的对象 Producerproducer = new Producer (new ProducerConfig(originalProps)); //2.发送数据 for(int i=0; i<5; i++){ KeyedMessage message = new KeyedMessage (topic, i+"--"); producer.send(message); } //3.关闭 producer.close();}
分区类
kafka默认使用的分区类,是随机分区。
我们可以指定分区方式,好处是让同一类输入一个partition,方便客户端消费的时候,可以从同一个partition取到这一类数据。
//自定义分区,需要实现一个接口类public class MyParitioner implements Partitioner{ //一定要使用有参的构造方法 public MyParitioner(VerifiableProperties p) { } /** * @param key表示在生产者生产数据的时候,需要指定的。 */ @Override public int partition(Object key, int numPartitions) { if(key==null) return 0; return Integer.parseInt(key.toString())%2; }}
在生产者的代码中,需要修改2个地方:
一个是在配置中,指定自定义的分区类
originalProps.put("partitioner.class", MyParitioner.class.getName());
一个是发送消息的时候,指定key
KeyedMessagemessage = new KeyedMessage (topic, i+"", i+"----");producer.send(message);
消费者
//1.获取消费者对象ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(originalProps ));//2.消费数据MaptopicCountMap = new HashMap<>();//topicCountMap中key指的是topic名称,value指的是多少个线程来消费该topictopicCountMap.put("wuchao", 1); //只有1个线程消费topic主题//messageStreams结构中,key指的是topicMap >> messageStreams = consumer.createMessageStreams(topicCountMap);List > list = messageStreams.get("wuchao");//因为上面设置了1个线程,所以list中只有一个元素。返回值KafkaStream就是指向kafka集群中wuchao这个主题的流KafkaStream kafkaStream = list.get(0);ConsumerIterator iterator = kafkaStream.iterator();while(iterator.hasNext()){ //这是个阻塞操作。如果流中没有数据,就在这里等待 MessageAndMetadata mmd = iterator.next(); //获得主题 String topic = mmd.topic(); //获得分区编号 int partition = mmd.partition(); //指的是该消息的偏移量 long offset = mmd.offset(); //读取消息 String message = new String(mmd.message()); System.out.println("topic="+topic+"\tpartition="+partition+"\toffset="+offset+"\tmessage="+message); //立刻修改zk中关于偏移量的值 consumer.commitOffsets();}
消费者如何查看自己消费到了哪个位置
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 192.168.1.100:2181 --topic wuchao -group group1
消费者是否可以随意指定自己消费的位置
直接在zk中使用set命令修改消费的offset。
是否可以指定每次都从开始消费
在consumer.properties中增加配置信息auto.offset.reset=earliest
閱讀更多 動作要快姿勢要帥 的文章