安装环境:
系统:
CentOS release 6.8 (Final)
服务器:
120.77.32.220
kafka版本:
kafka_2.11-2.0.0.tgz
zookeeper版本:
zookeeper-3.4.13.tar.gz
1、简介
kafka (官网地址:http://kafka.apache.org)是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。
2、下载软件包
iptables -F
setenforce 0
cd /opt/software
wget
jdk下载地址
3、安装java
3.1、解压jdk
cd /opt/software
tar -xzf jdk-8u65-linux-x64.tar.gz -C /usr/local/
mv /usr/local/jdk1.8.0_65/ /usr/local/java
3.2、配置环境变量
cat /etc/profile
# jdk
JAVA_HOME=/usr/local/java
JRE_HOME=/usr/local/java/jre
CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH
source /etc/profile
3.3、查看JAVA是否安装成功
java -version
[root@localhost local]# java -version
java version "1.8.0_151"
Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)
4、kafka安装
tar -xzf kafka_2.11-2.0.0.tgz -C /usr/local/
mv /usr/local/kafka_2.11-2.0.0/ /usr/local/kafka
5、配置环境变量
cat /etc/profile
# kafka
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
6、配置kakfa
mkdir -p /usr/local/kafka/log/kafka #创建kafka日志目录
cd /usr/local/kafka/config #进入配置目录
vi server.properties #编辑修改相应的参数
broker.id=0
port=9092 #端口号
host.name=192.168.0.13 #服务器IP地址,修改为自己的服务器IP
log.dirs=/usr/local/kafka/log/kafka #日志存放路径,上面创建的目录
zookeeper.connect=192.168.0.13:2181 #zookeeper地址和端口,单机配置部署,localhost:2181
7、配置zookeeper
mkdir /usr/local/kafka/zookeeper #创建zookeeper目录
mkdir /usr/local/kafka/log/zookeeper #创建zookeeper日志目录
cd /usr/local/kafka/config #进入配置目录
vi zookeeper.properties #编辑修改相应的参数
dataDir=/usr/local/kafka/zookeeper #zookeeper数据目录
dataLogDir=/usr/local/kafka/log/zookeeper #zookeeper日志目录
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
8、创建启动、关闭kafka脚本
cd /usr/local/kafka
#创建启动脚本
cat kafkastart.sh
#!/bin/sh
#启动zookeeper
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
sleep 3
#启动kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
#创建关闭脚本
cat kafkastop.sh
#!/bin/bash
#关闭kafka
/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &
sleep 3
#关闭zookeeper
/usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &
#添加脚本执行权限
chmod +x kafkastart.sh
chmod +x kafkastop.sh
9、设置脚本开机自动执行
vi /etc/rc.d/rc.local
sh /usr/local/kafka/kafkastart.sh &
10、启动Kafka 、zookeeper服务
#启动kafka
sh /usr/local/kafka/kafkastart.sh
#关闭kafka
sh /usr/local/kafka/kafkastop.sh
netstat -nptl |grep 2181
netstat -nptl |grep 9092
查看zookeeper版本:
echo stat|nc localhost 2181
11、测试生产者和消费者
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>This is a message
>This is another message
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
12、脚本定期清理logs下的日志文件
cat clean_kafkalog.sh
#!/bin/bash
#Description:This>
#History: 2018-09-04 First release.
# log file dir.
logDir=/usr/local/kafka/logs
# Reserved 7 files.
COUNT=7
ls -t $logDir/server.log* | tail -n +$[$COUNT+1] | xargs rm -f
ls -t $logDir/controller.log* | tail -n +$[$COUNT+1] | xargs rm -f
ls -t $logDir/state-change.log* | tail -n +$[$COUNT+1] | xargs rm -f
ls -t $logDir/log-cleaner.log* | tail -n +$[$COUNT+1] | xargs rm -f
chmod +x clean_kafkalog.sh
每周日的0点0分去执行这个脚本:
crontab -e
0 0 * * 0 /usr/local/kafka/clean_kafkalog.sh
13、配置SASL用户名密码认证
13.1、zookeeper配置
13.1.1修改zookeeper.properties配置文件
vim /usr/local/kafka/config/zookeeper.properties
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
13.1.2、创建JAAS配置文件
cat /usr/local/kafka/config/zk_server_jaas.conf
ZKServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
13.1.3、启动zookeeper前设置环境变量
cat /usr/local/kafka/bin/zookeeper-server-start.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=$KAFKA_HOME/config/zk_server_jaas.conf -Dzookeeper.sasl.serverconfig=ZKServer"
13.1.4、启动zookeeper,编写启动脚本测试配置是否正确
cat /usr/local/kafka/zookeeper_start.sh
#!/bin/bash
export KAFKA_OPTS=" -Djava.security.auth.login.config=$KAFKA_HOME/config/zk_server_jaas.conf -Dzookeeper.sasl.serverconfig=ZKServer "
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties >> /usr/local/kafka/log/zookeeper/zookeeper.log 2>&1 &
chmod +x zookeeper_start.sh
./zookeeper_start.sh
13.2、kafka配置
13.2.1、伪分布式Kafka Broker配置
配置server.properties:
vim /usr/local/kafka/config/server.properties #文件最后加入
# 监听的地址,使用的schema是SASL_PLAINTEXT,即最简单的用户名密码认证,不需要加密。
listeners=SASL_PLAINTEXT://192.168.0.13:9092
# 使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
# SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
#allow.everyone.if.no.acl.found=true
super.users=User:admin;User:alice
13.2.2、创建JAAS配置文件
vim /usr/local/kafka/config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="geting"
password="geting"
user_geting="geting"
user_alice="geting";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
13.2.3、启动kafka前设置环境变量
vim /usr/local/kafka/bin/kafka-server-start.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf "
cat /usr/local/kafka/bin/kafka-run-class.sh
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi
13.1.4、启动kafka,编写启动脚本测试配置是否正确
vim /usr/local/kafka/kafka_start.sh
#!/bin/bash
export KAFKA_OPTS=" -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf"
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties >>/usr/local/kafka/log/kafka/kafka.log 2>&1 &
chmod +x kafka_start.sh
./kafka_start.sh
./kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.0.15:2181 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security
./kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.0.15:2181 --list --topic sean-security
15、Consumer & Provider端配置
15.1、创建kafka_client_jaas.conf文件
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice2";
};
15.2、让程序加载这个配置文件
System.setProperty("java.security.auth.login.config",
"$KAFKA_HOME/config/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
閱讀更多 愛踢人生 的文章