Apache Kafka Stream 字符统计

简介#

Apache kafka 是一个分布式流处理平台,一个流处理平台需要支持如下三个关键特性

  • 可以发布数据到数据流中也可以订阅数据流中的数据,类似于消息队列
  • 将数据流中的数据持久化用作容错处理
  • 当数据流中出现新的数据时,可以及时处理数据

Kafka可以部署在一台或多台服务器上以集群形式运行,将流中的数据以主题的形式分别保存,每一条记录都包含key,value,和时间戳。

Kafka 提供了Producer,Consumer,Stream,Connector API,Producer负责生产数据,consumer负责消费数据 Stream API可以提供流处理能力,connector api 提供了数据和其他中间件之间的数据迁移能力


安装Kafka#

下载 kafka, 然后解压到本地目录即可

<code>> tar -xzf kafka_2.12-2.2.0.tgz
> cd kafka_2.12-2.2.0/<code>

启动Kafka#

Kafka依赖zookeeper,所以启动kafka之前需要启动zookeeper。Kafka 提供了zookeeper-server-start.sh启动一个单节点zookeeper实例。

<code>kafka-server-start.sh $KAFKA_HOME/config/server.properties
[2019-07-06 00:31:59,766] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

[2019-07-06 00:32:00,836] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2019-07-06 00:32:00,836] INFO starting (kafka.server.KafkaServer)
[2019-07-06 00:32:00,838] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-07-06 00:32:00,881] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2019-07-06 00:32:00,896] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:00,897] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:00,897] INFO Client environment:java.version=1.8.0_181 (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:00,897] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:00,897] INFO Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre (org.apache.zookeeper.ZooKeeper)
...
[2019-07-06 00:32:01,006] INFO Client environment:java.compiler= (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:os.name=Mac OS X (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:os.arch=x86_64 (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:os.version=10.14.6 (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:user.name=jet (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:user.home=/Users/jet (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:user.dir=/Users/jet (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,015] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@15c43bd9 (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,100] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2019-07-06 00:32:06,119] INFO Opening socket connection to server localhost/fe80:0:0:0:0:0:0:1%1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-07-06 00:32:06,224] INFO Socket connection established to localhost/fe80:0:0:0:0:0:0:1%1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-07-06 00:32:06,284] INFO Session establishment complete on server localhost/fe80:0:0:0:0:0:0:1%1:2181, sessionid = 0x100292d2f990000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-07-06 00:32:06,315] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2019-07-06 00:32:07,075] INFO Cluster ID = 4mgiWFzSTfKoRznZ6a-zsw (kafka.server.KafkaServer)
[2019-07-06 00:32:07,298] INFO KafkaConfig values:
\tadvertised.host.name = null
\tadvertised.listeners = null
\t...
\tunclean.leader.election.enable = false
\tzookeeper.connect = localhost:2181
\tzookeeper.connection.timeout.ms = 6000
\tzookeeper.max.in.flight.requests = 10
\tzookeeper.session.timeout.ms = 6000
\tzookeeper.set.acl = false
\tzookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2019-07-06 00:32:07,396] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-07-06 00:32:07,399] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-07-06 00:32:07,408] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-07-06 00:32:07,550] INFO Loading logs. (kafka.log.LogManager)
[2019-07-06 00:32:07,847] INFO [Log partition=test-0, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2019-07-06 00:32:07,874] INFO [Log partition=test-0, dir=/tmp/kafka-logs] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 162 ms (kafka.log.Log)
[2019-07-06 00:32:07,906] INFO Logs loading complete in 355 ms. (kafka.log.LogManager)
[2019-07-06 00:32:08,003] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2019-07-06 00:32:08,007] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2019-07-06 00:32:09,228] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2019-07-06 00:32:09,760] INFO [SocketServer brokerId=0] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
[2019-07-06 00:32:09,767] INFO [SocketServer brokerId=0] Started 1 acceptor threads for data-plane (kafka.network.SocketServer)
[2019-07-06 00:32:10,124] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)

[2019-07-06 00:32:10,131] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:10,201] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:10,247] INFO [ExpirationReaper-0-ElectPreferredLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:10,553] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2019-07-06 00:32:11,293] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)
[2019-07-06 00:32:11,389] INFO Stat of the created znode at /brokers/ids/0 is: 55,55,1562344331342,1562344331342,1,0,0,72102868086751232,188,0,55
(kafka.zk.KafkaZkClient)
[2019-07-06 00:32:11,397] INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT)), czxid (broker epoch): 55 (kafka.zk.KafkaZkClient)
[2019-07-06 00:32:11,560] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:11,568] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:11,569] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:11,607] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2019-07-06 00:32:11,609] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2019-07-06 00:32:11,618] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-07-06 00:32:11,639] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:1000,blockEndProducerId:1999) by writing to Zk with path version 2 (kafka.coordinator.transaction.ProducerIdManager)
[2019-07-06 00:32:11,696] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-07-06 00:32:11,700] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-07-06 00:32:11,700] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-07-06 00:32:14,848] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-07-06 00:32:14,959] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-07-06 00:32:15,022] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-06 00:32:15,022] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-06 00:32:15,022] INFO Kafka startTimeMs: 1562344334967 (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-06 00:32:15,035] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2019-07-06 00:32:15,535] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(test-0) (kafka.server.ReplicaFetcherManager)
[2019-07-06 00:32:15,584] INFO Replica loaded for partition test-0 with initial high watermark 0 (kafka.cluster.Replica)
[2019-07-06 00:32:15,593] INFO [Partition test-0 broker=0] test-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
/<code>

创建主题#

通过partition指定分区数目,replication-factor指定每个分区有多少个备份

<code>kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test/<code>

查看当前节点上的主题列表

<code>kafka-topics.sh --list --bootstrap-server localhost:9092
test/<code>

Producer 发送消息#

通过kafka-console-producer可以将标准输入中的信息发布到对应的topic

<code>kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message/<code>

Consumer 消费消息#

通过kafka-console-consumer.sh 可以消费对应队列中的消息

<code>kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message/<code>

集群部署#

以上示例只启动了一个kafka节点,接下来加入另外两个节点以集群方式运行

<code>cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-1.properties
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-2.properties

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2/<code>

其中broker.id指定了服务器的ID,在集群中必须唯一,如果名字相同的两个节点,则第二个节点将无法启动

启动server1,server2

<code>kafka-server-start.sh $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh $KAFKA_HOME/config/server-2.properties &/<code>

查看topics的相关信息

<code>kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic new-topic
Topic:new-topic\tPartitionCount:1\tReplicationFactor:3\tConfigs:segment.bytes=1073741824

\tTopic: new-topic\tPartition: 0\tLeader: 2\tReplicas: 2,1,0\tIsr: 2,1,0/<code>
  • leader: 指明现在broker.id为2的为首领节点
  • replicas: 是执行备份的节点的集合,无论它是否是首领节点或者活跃状态
  • isr: 处于同步状态节点集合

当我们终止首领节点时,它会重新选举出新的首领节点

<code>ps aux | grep server-2.properties/<code>

此时再去查看kafka集群状态,发现首领节点已变为broke1, relicas中仍然包含被终止的broker2,但是isr中已经不在包含

<code>kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic new-topic
Topic:new-topic\tPartitionCount:1\tReplicationFactor:3\tConfigs:segment.bytes=1073741824
\tTopic: new-topic\tPartition: 0\tLeader: 1\tReplicas: 2,1,0\tIsr: 1,0/<code>

Kafka Connect 导入导出#

Kafka connect是kafka提供的一个工具,用来导入导出数据。

  • 将文件导入到kafka topic中
<code># 创建文件
echo -e "import test\\n import data" > test.txt

# 下面的命令是将文本文件`test.txt`中的消息导入到主题`connect-test`中,让后将主题connect-test中的消息导出到文件`test.sink.txt`中

connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties $KAFKA_HOME/config/connect-file-sink.properties/<code>

connect-standalone.properties 配置了kafka相关信息,指定了键值的序列化格式

<code>bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000/<code>

connect-file-source.properties 输入文件配置信息,将目标文件中的信息逐行处理写入到指定topic

<code>name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test/<code>

connect-file-sink.properties 输出文件配置信息,将指定topic中的信息写入到文件

<code>name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test/<code>

上面的过程也可以拆分成独立的导入导出

<code>## 导入
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties

## 导出
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties/<code>

如下图,导入导出过程中可以通过consumer实时查看主题中的消息


Apache Kafka Stream 字符统计

如何删除启动topic#

topic的数据保存在kafka配置文件指定的目录中,topic的信息保存在zookeeper中,所以删除topic 需要同时删除kafka的数据文件,同时删除zookeeper节点

  • 停止kafka服务
<code>kafka-server-stop.sh/<code>
  • 删除kafka
<code>rm -rf $KAFKA_HOME/data//<code>
  • 删除zookeeper节点中保存的相关信息
<code>zkCli.sh -server localhost:2181 \t# 连接zookeeper

ls /\t\t\t\t\t\t\t\t# 查看zookeeper目录
[admin, brokers, cluster, config, consumers, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

# 删除相关目录
[zk: localhost:2181(CONNECTED) 1] deleteall /admin
[zk: localhost:2181(CONNECTED) 2] deleteall /brokers
[zk: localhost:2181(CONNECTED) 3] deleteall /cluster
[zk: localhost:2181(CONNECTED) 4] deleteall /consumers
[zk: localhost:2181(CONNECTED) 5] deleteall /config
[zk: localhost:2181(CONNECTED) 6] deleteall /isr_change_notification
[zk: localhost:2181(CONNECTED) 7] deleteall /latest_producer_id_block
[zk: localhost:2181(CONNECTED) 8] deleteall /log_dir_event_notification
[zk: localhost:2181(CONNECTED) 9] deleteall /zookeeper/<code>


分享到:


相關文章: