Kafka是一个分布式数据流平台,可以运行在单台服务器上,也可以在多台服务器上部署形成集群。它提供了发布和订阅功能,使用者可以发送数据到Kafka中,也可以从Kafka中读取数据(以便进行后续的处理)。Kafka具有高吞吐、低延迟、高容错等特点。下面介绍一下Kafka中常用的基本概念:
默认端口: zk:2181 kafka:9092
下载与解压
wget https://dlcdn.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar zxvf kafka_2.13-2.8.0.tgz
设置环境变量?vim /etc/profiles
# --- --- --- --- --- --- ---
# 让环境变量生效 source /etc/profile
KAFKA_HOME=/opt/kafka_2.13-2.8.0
PATH=$PATH:$KAFKA_HOME/BIN
修改config/zookeeper.properties配置
#12181
clientPort=12181
修改配置 server.properties
broker.id=0
#默认9092
port=19093
#log.dir: meta.properties中有ClusterId与logs/server.log提示不一致,会报错
log.dirs=/tmp/kafka-logs
advertised.listeners=PLAINTEXT://localhost:9092
zookeeper.connect=localhost:12181
启动
cd kafka_2.13-2.8.0
# 启动zk,如端口冲突,请修改
bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null 2>&1 &
# 启动kafka
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
# 下载
wget https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip
# conf/application.conf
kafka-manager.zkhosts="localhost:12181"
kafka-manager.zkhosts=${?ZK_HOSTS}
cmak.zkhosts="localhost:2181"
cmak.zkhosts=${?ZK_HOSTS}
basicAuthentication.enabled=true #默认为false,改为true
bin/cmak -Dconfig.file=conf/application.conf -Dhttp.port=18080
# 启动,需要jdk11
bin/cmak -Dconfig.file=conf/application.conf -Dhttp.port=18080 -java-home /opt/jdk-11.0.11
浏览访问:
http://...18080/
- 需要手工添加ip与端口
创建Topic:kafka-topics.sh --create --zookeeper localhost:12181 --replication-factor 1 --partitions 1 --topic pk1
# 模拟生产者发消息
kafka-console-producer.sh --broker-list localhost:9092 --topic pk1
#模拟消费者收消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pk1
broker.id
申明当前kafka服务器在集群中的唯一ID