官方文档:
https://kafka.apache.org/documentation/#security
在官方文档中,kafka有五种加密认证方式,分别如下:
几种认证方式的详细比较
SASL/PLAIN是基于用户名密码的认证方式,是比较常用的一种认证,通常与TLS一起用于加密以实现安全认证。
vim /opt/kafka_2.13-2.6.0/config/server.properties
# 增加以下配置,每台节点都要配置
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 分发配置,需要注意用户名和路径
scp /opt/kafka_2.13-2.6.0/config/server.properties root@kafka2-73085:/opt/kafka_2.13-2.6.0/config/
scp /opt/kafka_2.13-2.6.0/config/server.properties root@kafka3-73085:/opt/kafka_2.13-2.6.0/config/
# 编辑配置文件
vim /opt/kafka_2.13-2.6.0/config/kafka_server_jaas.conf
# 增加以下配置
# 增加了两个用户 admin用户和tly用户,配置文件中间不能有注释
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_tly="123456";
};
# 分发配置文件
scp kafka_server_jaas.conf root@kafka2-73085:/opt/kafka_2.13-2.6.0/config/
scp kafka_server_jaas.conf root@kafka3-73085:/opt/kafka_2.13-2.6.0/config/
上面配置是新增了两个用户,admin和tly,这两个用户都是普通用户,KafkaServer中的username、password配置的用户和密码,是用来broker和broker连接认证。在本例中,admin是代理broker间通信的用户。user_userName配置为连接到broker的所有用户定义密码,broker使用这些验证所有客户端连接,包括来自其他broker的连接。
# 编辑 kafka-run-class.sh
vim /opt/kafka_2.13-2.6.0/bin/kafka-run-class.sh
# 更改第219行
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.13-2.6.0/config/kafka_server_jaas.conf"
# 分发脚本
scp kafka-run-class.sh root@kafka2-73085:/opt/kafka_2.13-2.6.0/bin/kafka-run-class.sh
scp kafka-run-class.sh root@kafka2-73085:/opt/kafka_2.13-2.6.0/bin/kafka-run-class.sh
kafka-server-start.sh -daemon /opt/kafka_2.13-2.6.0/config/server.properties
这个时候集群启动完毕已经带有权限
查看topic列表
kafka-topics.sh --bootstrap-server kafka1-73085:9092 --list
迟迟不出结果,卡住了,实际上是没有权限
查看日志如下:
# 编辑权限文件
vim /root/auth.conf
# 输入如下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="tly" password="123456";
username和password必须是kafka_server_jaas.conf中配置的。
kafka-topics.sh --bootstrap-server kafka1-73085:9092 --list --command-config /root/auth.conf
已经可以正常返回了,说明有权限认证通过了:
kafka-topics.sh --bootstrap-server kafka1-73085:9092 --create --topic topic_1 --partitions 3 --replication-factor 3 --command-config /root/auth.conf
kafka-console-producer.sh --bootstrap-server \
kafka1-73085:9092 --topic topic_1 --producer.config /root/auth.conf
kafka-console-consumer.sh --bootstrap-server kafka1-73085:9092 \
--topic topic_1 --consumer.config /root/auth.conf
# 修改server.properties 增加如下配置
# 指定通过ACL来控制权限
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 指定超级管理员
super.users=User:admin
使用权限配置并且超级用户是admin,这个用户和我们上面配置的列表中的admin用户相互映射,每个机器都需要配置然后重启所有节点
# 配置用户认证文件信息
vim /root/admin.conf
# 输入如下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
# 重启所有节点,每个节点都要执行
# 停止节点
kafka-server-stop.sh
# 启动节点
kafka-server-start.sh -daemon /opt/kafka_2.13-2.6.0/config/server.properties
此时每个节点都有了权限认证
# 配置用户认证文件
vim /root/admin.conf
# 输入以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
这个时候使用auth.conf已经不能创建topic了,使用admin.conf可以创建,因为admin账号和配置的超级管理员一致;而auth.conf配置的是tly账号;
kafka-topics.sh --bootstrap-server kafka1-73085:9092 --create --topic topic_2 --partitions 3 --replication-factor 3 --command-conf /root/auth.conf
不使用认证创建会一直卡主不动,其实也是没权限的,时间久了就会报timeout异常
kafka-topics.sh --bootstrap-server kafka1-73085:9092 --create --topic topic_3 --partitions 3 --replication-factor 3
命令为:kafka-acls.sh
参数如下:
kafka-acls.sh --bootstrap-server kafka1-73085:9092 --list --command-config /root/admin.conf
kafka-acls.sh --bootstrap-server kafka1-73085:9092 --topic topic_1 --remove --command-config /root/admin.conf
给之前的用户tly授权
kafka-acls.sh --bootstrap-server kafka1-73085:9092 --add --allow-principal User:tly --topic topic_1 --operation all --command-config /root/admin.conf
使用tly用户查看当前topic
kafka-topics.sh --bootstrap-server kafka1-73085:9092 --list --command-config /root/auth.conf
kafka-acls.sh --bootstrap-server kafka1-73085:9092 --remove --allow-principal user:tly --topic topic_1 --operation all --command-config /root/admin.conf
kafka-acls.sh --bootstrap-server kafka1-73085:9092 --add --deny-principal User:tly --topic topic_1 --operation all --deny-host 127.0.0.1 --command-config admin.conf
# 删除所有权限信息
kafka-acls.sh --bootstrap-server kafka1-73085:9092 --topic topic_1 --remove --command-config /root/admin.conf
# 添加生产者权限
kafka-acls.sh --bootstrap-server kafka1-73085:9092 --add --allow-principal User:tly --operation Write --topic topic_1 --command-config /root/admin.conf
# 添加消费权限
kafka-acls.sh --bootstrap-server kafka1-73085:9092 --add --allow-principal User:tly --operation Read --topic topic_1 --command-config /root/admin.conf
# 消费消息
kafka-console-consumer.sh --bootstrap-server kafka1-73085:9092 --topic topic_1 --from-beginning --consumer.config /root/auth.conf
# 发现消费不到,原因是消费者组没有权限
# 增加消费者组权限
kafka-acls.sh --bootstrap-server kafka1-73085:9092 --add --allow-principal User:tly --operation Read --topic topic_1 --group group_1 --command-config /root/admin.conf
# 再次执行消费消息命令并增加分组信息即可以消费到数据
kafka-console-consumer.sh --bootstrap-server kafka1-73085:9092 --topic topic_1 --from-beginning --consumer.config /root/auth.conf --group group_1