帶安全認證的Kafka 集群安裝配置(SASL+ACL)

 原文地址:https://blog.csdn.net/z11220857/article/details/77049613
帶安全認證的Kafka 集群安裝配置(SASL+ACL)

使用SASL機制的KAFKA集群的安裝

背景介紹

本文檔是以北京聯通最近佈置的kafka集群為樣本,結合一些教程和資料編寫。之前北京聯通使用的一直是0.8.2的版本,由於局方要求給實時數據交換平臺添加安全認證分權分域的來發送和接收消息,故升級到0.10.2的最新版本。由於是內網傳輸,所以我們採用了主機之間SASL安全認證而未採用ssl傳輸過程加密,且kafka的broker同zookeeper之間也未採用SASL安全認證。

名詞解釋

SASL

全稱Simple Authentication and Security Layer,是一種用來擴充C/S模式驗證能力的機制。

Topic

Kafka將消息種子(Feed)分門別類,每一類的消息稱之為一個主題(Topic).

Producer

發佈消息的對象稱之為主題生產者(Kafka topic producer)

Consumer

訂閱消息並處理髮布的消息的種子的對象稱之為主題消費者(consumers)

Broker

已發佈的消息保存在一組服務器中,稱之為Kafka集群。集群中的每一個服務器都是一個代理(Broker). 消費者可以訂閱一個或多個主題(topic),並從Broker拉數據,從而消費這些已發佈的消息。

環境配置

3臺已經安裝zookeeper的CentOS 6.5虛擬機

Hostname

IP地址

節點一 node01 192.168.40.11

節點二 node02 192.168.40.12

節點三 node03 192.168.40.13

本次安裝主機防火牆均關閉,三臺機器之間做過RSA免密鑰

(zookeeper安裝和免密鑰後續會再出一個文檔)

下面我們進入安裝過程

下載最新版本

目前kafka的最新版本為0.10.2.1,我們上官網下載一個壓縮包,由於本機使用的sacala版本為2.10,故我們下載了scala 2.10 - kafka_2.10-0.10.2.1.tgz (asc, md5) ,地址:

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.1/kafka_2.10-0.10.2.1.tgz

· 上傳節點,解壓到自定義目錄/home/,修改集群目錄名字為/home/kafka_0.10

修改broker配置文件

· 進入/home/kafaka_0.10/config目錄,編輯server.properties

· 具體的性能參數要根據業務來修改,我們主要修改影響集群搭建的幾項。

broker.id=0

此項配置因為有三臺主機,所以每個主機分別配置了從0到2的主機id。

delete.topic.enable=true

是否能夠刪除消息主題topic

listeners=SASL_PLAINTEXT://node01:6667

監聽的主機地址端口,填寫本機地址即可,例如node01填寫listeners=SASL_PLAINTEXT://node01:6667(必須在三臺機器都配置主機名列表),也可以直接寫IP地址,即外部服務器連接kafka主機的地址和端口,SASL_PLAINTEXT為使用的安全機制名稱。

log.dirs=/home/kafka/logs

此處填寫kafka的log存放地址,=後面的地址文件夾必須啟動集群前使用前創建。

num.partitions=3 

partition個數設置,如果在創建topic的時候沒有指定partition的數量,則使用這個值來設置。

zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper集群的地址和端口
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

這幾項對應的就是SASL的設置,最後一行是設定超級用戶admin,這個用戶一會會在jaas配置文件裡設置用戶名和密碼.

· node02和node03除了id、監聽的主機和端口配置不一樣,別的配置都一樣。

· 下面創建kafka_server_jaas.conf

vikafka_server_jaas.conf

文件內容如下:

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_alice="alice";
};

此處配置了兩個用戶,一個admin,一個是alice。=後面為用戶的密碼,這裡我們設置成和用戶名一樣的了。每個主機都要配置,這個是broker通信必須的配置。admin用戶必須設置,即user_admin=”admin”

· 最後需要為 Kafka 添加 java.security.auth.login.config 環境變量。在 bin/kafka-run-class.sh 中添加以下內容:

KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/home/kafka_0.10/config/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

· 紅色標記部分為添加內容,該段內容位於kafka-run-class.sh腳本的最後。

客戶端的配置

1. 配置客戶端安全認證文件kafka_client_jaas.conf

· kafka_client_jaas.conf此文件主要供jar包客戶端使用。

· 我們創建/home/conf文件夾,然後在文件夾下創建kafka_client_jaas.conf 文件

[root@node02 ~]# cd /home
[root@node02 home]# mkdir conf
[root@node02 home]# vi kafka_client_jaas.conf

文件內容如下:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice";
};

此處alice即為kafka_server_jaas.conf文件中設置的用戶,作為登錄用戶名和密碼供客戶端登陸使用。

2. 修改console-producer和console-consumer的配置

· 然後在producer.porperties添加環境變量和配置

[root@node01 home]# cd kafka_0.10/config
[root@node01 home]#vi producer.properties

文件內容修改如下:

bootstrap.servers=node01:6667,node02:6667,node03:6667
bootstrap.servers=後的內容即為kafka集群主機地址和端口。
最後在文件末尾加上:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

· 修改consumer.porperties添加環境變量和配置

zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connnect=後面填寫zookeeper集群的主機地址和端口。
group.id=test1

這裡是消費者組群填寫,這裡我設置的是test1,這個主要是為了console口的consumer來使用的。

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

以上配置加在文件的末尾。

· 到此我們就完成了全部需要配置的文件的修改,下面我們準備啟動kafka集群。

集群啟動

· 在啟動kafka集群前,首先確保zookeeper集群的啟動

· 啟動zookeeper後,進入/home/kafka_0.10/bin目錄

[root@node02 ~]# cd /home/kafka_0.10/bin
[root@node02 ~]#./kafka-server-start.sh config/server.properties &

· 接著我們要創建主題topic

[root@node02 ~]#./kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 1 --partitions 1 --topic test1

我們通過此命令創建了一個叫做test1的主題topic,備份因子一個,partition一個。

· 然後我們通過list命令來查看相關topic的列表

[root@node02 ~]./kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 test

· 我創建了kafka_ss和test1兩個topic可見以下內容:

[root@node01 bin]# ./kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 test
[2017-06-12 13:25:52,280] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS confi
guration section named 'Client' was found in specified JAAS configuration file: '/home/kafka_0.10/config/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) __consumer_offsets
kafka_ss
test1

可以看到一共有三個topic:__consumer_offsets、kafka_ss、test1。__consumer_offsets為默認的topic。上面出現的報警信息是正常的,因為kafka brokers與zookeeper的主機之間未使用sasl認證,可以忽略。

設置acl權限

· 在/home/kafka_0.10/bin目錄下:

添加alice作為主題kafka_ss的消費者,用消費者組為test1,我們只用 --consumer 選項,zookeeper.connnect後面寫zookeeper的主機和地址。

./kafka-acls.sh --authorizer-properties zookeeper.connect=node01:2181,node02:2181,node03:2181 --add --allow-principal User:alice --consumer --topic kafka_ss --group test1

這個設置可以讓alice用戶在console-cosumer上面用test1消費組消費kafka_ss的消息。

· 為了可以遠程用程序來實現生產和消費消息,我們繼續設置alice用戶的權限。

./kafka-acls.sh --authorizer-properties zookeeper.connect=node01:2181,node02:2181,node03:2181 --add --allow-principal User:alice --allow-host * --operation Read --operation Write --topic kafka_ss

此命令可以設置允許alice用戶從所有IP地址讀寫topic kafka_ss,zookeeper.connnect後面寫zookeeper的主機和地址。

· 如果要限制IP地址,可以採用下面的命令:

./kafka-acls.sh --authorizer-properties zookeeper.connect= node01:2181,node02:2181,node03:2181 --add --allow-principal User:alice --allow-host 192.168.40.11 --allow-host 192.168.40.12 --operation Read --operation Write --topic kafka_ss

以上命令可以讓alice用戶從192.168.40.11和192.168.40.12讀寫topickafka_ss。

· 查看對應能操作topic的用戶列表

./kafka-acls.sh --authorizer-properties zookeeper.connect=node01:2181,node02:2181,node03:2181 --list --topic kafka_ss

可以得到以下內容

[2017-06-12 15:20:10,891] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/home/kafka_0.10/config/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)Current ACLs for resource `Topic:kafka_ss`:
User:alice has Allow permission for operations: Describe from hosts: *
User:alice has Allow permission for operations: Read from hosts: *
User:alice has Allow permission for operations: Write from hosts: *

可以看到用戶alice可以從任何IP上讀寫和描述kafak_ss的消息。

· 那我們接下來描述一下topic kafka_ss

./kafka-topics.sh --describe --topic kafka_ss --zookeeper node01:2181,node02:2181,node03:2181

得到以下內容:

Topic:kafka_ss PartitionCount:1 ReplicationFactor:1
Configs:Topic: kafka_ss Partition: 0 Leader: -1 Replicas: 0 Isr:

第一行是所有分區的摘要,每一個線提供一個分區信息,因為我們只有一個分區,所有隻有一條線。

· 到這裡我們的生產和消費消息之前的準備已經全部完成,生下來我們就要開始使用console和程序來開始生產和消費消息了。

使用客戶端生產和消費消息

使用console客戶端

· 在node01的/home/kafka_0.10/bin目錄下,運行console-consumer

./kafka-console-consumer.sh --bootstrap-server node01:6667,node02:6667,node03:6667 --topic kafka_ss --from-beginning --consumer.config=../config/consumer.properties

此命令可以開啟console口開始讀取kafka_ss topic 下的消息

顯示以下內容:

[2017-06-12 17:21:20,234] WARN The configuration 'zookeeper.connect' was supplied but isn't a known config.
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2017-06-12 17:21:20,234] WARN The configuration 'zookeeper.connection.timeout.ms' was supplied but isn't a known config.(org.apache.kafka.clients.consumer.ConsumerConfig)

由於目前還沒有producer往topic裡發送消息所以,告警下方還沒有數據,告警可以忽略。

· 在node02的/home/kafka_0.10/bin目錄下,運行console-producer

./kafka-console-producer.sh --broker-list node01:6667,node02:6667,node03:6667 --topic kafka_ss --producer.config=../config/producer.properties

此命令可以開啟console口開始往kafka_ss topic 裡發送消息

啟動完成之後不會有任何信息,直接輸入消息然後回車就發送出去了。

使用java api來使用客戶端

· 使用java api的consumer程序

package kafkademo;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class consumerdemo2 {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node01:6667,node02:6667,node03:6667");
props.put("group.id", "test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// acl Authorizer
props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.mechanism", "PLAIN");
System.setProperty("java.security.auth.login.config","/Users/book/conf/kafka_client_jaas.conf");
KafkaConsumer<string> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("kafka_ss"));
while (true) {
ConsumerRecords<string> records = consumer.poll(100);
for (ConsumerRecord<string> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
}
/<string>/<string>/<string>

上面方框內的內容,紅色標註的是acl認證增加部分,其餘的部分跟一般消費者程序一致。

· bootstrap.servers對應的value值應該是集群的地址和端口

· group.id對應的值則是分配的消費者組

· 其他配置無需改動

· 使用java api的producer程序

package kafkademo;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class producerdemo2 {
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
props.put("bootstrap.servers", "node01:6667,node02:6667,node03:6667");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//acl Authorizer
System.setProperty("java.security.auth.login.config", "/Users/book/conf/kafka_client_jaas.conf");

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
Producer<string> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<string>(
"kafka_ss",Integer.toString(i), Integer.toString(i)));
System.out.println("11\n");
producer.close();
}
}
/<string>/<string>

上面方框內的內容,紅色標註的是acl認證增加部分,其餘的部分跟一般生產者程序一致。

· bootstrap.servers對應的value值應該是集群的地址和端口

· 其他配置無需改動

· 先啟動消費者,再啟動生產者,console口會打印出如下內容

到此demo程序運行成功,安裝完成。


分享到:


相關文章: