SASL/SCRAM验证可以动态新增用户并分配权限。
SASL/SCRAM 通过将认证用户信息保存在 ZooKeeper 的方式,避免了动态修改需要重启 Broker 的弊端。在实际使用过程中,可以使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群。因此,如果打算使用 SASL/PLAIN,不妨改用 SASL/SCRAM 试试。不过要注意的是,后者是 0.10.2 版本引入的。
kafka官方文档:
https://kafka.apache.org/documentation/#security_sasl_scram
下面命令创建了一个证书:tly 密码是:123456
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456] --entity-type users --entity-name tly
创建admin证书,密码也是admin
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin] --entity-type users --entity-name admin
查看指定证书加上–entity-name tly 不加则查看所有证书
kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name tly
kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config SCRAM-SHA-512 --delete-config SCRAM-SHA-256 --entity-type users --entity-name tly
在kafka的config文件夹下创建kafka-server-jaas.conf文件
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin"
user_admin="admin"
password="admin";
};
分发脚本
scp kafka-server-jaas.conf root@kafka2-82373:/opt/kafka_2.13-2.6.0/config
scp kafka-server-jaas.conf root@kafka3-82373:/opt/kafka_2.13-2.6.0/config
admin是用来broker之间相互认证的
producer是用来做生产者的
consumer是用来做消费者的
kafka-configs.sh --zookeeper kafka1-33529:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
kafka-configs.sh --zookeeper kafka1-33529:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name producer
kafka-configs.sh --zookeeper kafka1-33529:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name consumer
查看证书清单:
kafka-configs.sh --zookeeper kafka1-33529:2181 --describe --entity-type users
这个配置文件是用来在broker之间相互认证用的,用户名密码对应的就是上一步创建admin证书时指定的用户名密码。
vim kafka_server_jaas.conf
# 添加如下内容
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
分发此配置文件到另外两个节点上,由于我是直接在/root路径下创建的,因此也同步到对应的路径下;如果实在config路径下创建的,则同步的时候也同步到对应的路径下
scp kafka_server_jaas.conf root@kafka2-33529:/root/kafka_server_jaas.conf
scp kafka_server_jaas.conf root@kafka3-33529:/root/kafka_server_jaas.conf
编辑kafka-server-start.sh 脚本,将我们的jaas文件添加到对应的环境中;
# 编辑启动脚本
vim /opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh
# 注释掉这一行(最后一行)
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
# 添加下面的代码
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/root/kafka_server_jaas.conf kafka.Kafka "$@"
分发编辑好的脚本到其它节点:
scp /opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh root@kafka2-33529:/opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh
scp /opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh root@kafka3-33529:/opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh
修改server.conf配置文件,启用认证并指定认证方式,指定超级管理员,启用acl控制权限等设置
# 编辑配置文件
vim /opt/kafka_2.13-2.6.0/config/server.properties
# 添加下面的额内容,注意:每台节点标记的时候对应的kafkaIP需要变更
listeners=SASL_PLAINTEXT://kafka3-33529:9092
advertised.listeners=SASL_PLAINTEXT://kafka3-33529:9092
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
每个节点都要重启
# 重启zk
zkServer.sh restart
# 重启kafka
kafka-server-stop.sh
kafka-server-start.sh -daemon /opt/kafka_2.13-2.6.0/config/server.properties
重启后多次执行jps命令,确保kafka服务没有挂掉
修改生产者和消费者配置文件,增加权限认证
# 修改生产者配置文件
vim /opt/kafka_2.13-2.6.0/config/producer.properties
# 添加以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
# 分发配置文件到其它节点
scp /opt/kafka_2.13-2.6.0/config/producer.properties root@kafka2-33529:/opt/kafka_2.13-2.6.0/config/producer.properties
scp /opt/kafka_2.13-2.6.0/config/producer.properties root@kafka3-33529:/opt/kafka_2.13-2.6.0/config/producer.properties
# 修改消费者配置文件
vim /opt/kafka_2.13-2.6.0/config/consumer.properties
# 添加以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
# 分发配置文件到其它节点
scp /opt/kafka_2.13-2.6.0/config/consumer.properties root@kafka2-33529:/opt/kafka_2.13-2.6.0/config/consumer.properties
scp /opt/kafka_2.13-2.6.0/config/consumer.properties root@kafka3-33529:/opt/kafka_2.13-2.6.0/config/consumer.properties
# 新建生产者jaas文件
vim kafka_producer_jaas.conf
# 增加以下内容,用户名密码为第一步在创建证书时创建的用户名密码
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="producer"
password="123456";
};
# 分发到其它节点
scp kafka_producer_jaas.conf root@kafka2-33529:/root/kafka_producer_jaas.conf
scp kafka_producer_jaas.conf root@kafka3-33529:/root/kafka_producer_jaas.conf
# 新建生产者jaas文件
vim kafka_consumer_jaas.conf
# 增加以下内容,用户名密码为第一步在创建证书时创建的用户名密码
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="consumer"
password="123456";
};
# 分发到其它节点
scp kafka_consumer_jaas.conf root@kafka2-33529:/root/kafka_consumer_jaas.conf
scp kafka_consumer_jaas.conf root@kafka3-33529:/root/kafka_consumer_jaas.conf
配置生产者jaas文件到启动脚本中
# 编辑配置文件
vim /opt/kafka_2.13-2.6.0/bin/kafka-console-producer.sh
# 注释掉下面内容
# exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
# 新增下面内容
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/root/kafka_producer_jaas.conf kafka.tools.ConsoleProducer "$@"
配置消费者jaas文件到启动脚本中
# 编辑配置文件
vim /opt/kafka_2.13-2.6.0/bin/kafka-console-consumer.sh
# 注释掉下面内容
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
# 新增下面内容
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/root/kafka_consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"
创建名为test1的topic用来测试
kafka-topics.sh -create --zookeeper kafka1-33529:2181 -replication-factor 3 --partitions 3 --topic test1
生产消息
kafka-console-producer.sh --bootstrap-server kafka1-33529:9092 --topic test1 --producer.config /opt/kafka_2.13-2.6.0/config/producer.properties
提示没有权限对topic1进行操作:
对生产者证书进行acl授权:
kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=kafka1-33529:2181 \
--add --allow-principal User:producer --operation Write --topic test1
再次生产消息,可以正常生产消息了:
消费消息
kafka-console-consumer.sh --bootstrap-server kafka1-33529:9092 --topic test1 --from-beginning --consumer.config /opt/kafka_2.13-2.6.0/config/consumer.properties
提示没权限消费消息:
对消费者证书进行acl授权,一定要增加–group,否则当前消费者组没有权限,会报Not authorized to access group: test-consumer-group错误
kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka1-33529:2181 \
--add --allow-principal User:consumer --operation Read --topic test1 --group test-consumer-group
再次消费消息即可获取到消息: