Hadoop架构设计
论述
kafka 依赖 Zookeeper 管理自身集群(Broker、Offset、Producer、Consumer等),所以先要安装 Zookeeper。
为了达到高可用的目的,Zookeeper 自身也不能是单点,接下来就介绍如何搭建一个最小的 Zookeeper 集群(3个 zk 节点)。
常识科普:Kafka的存储与安装不依赖于hdfs/spark,从下边安装过程你可以得知这个信息。
备注:只在slave1,slave2,slave3三个节店上安装zookeeper,master01节店不安装(其实前边hadoop中master01不作为datanode节店,spark中master01不作为worker节店)。
主机名
IP地址
角色服务
安装
master01
192.168.239.100
Namenode
resourcemanager
slave01
192.168.239.101
Datanode
nodemanager
zookeeper
kafka
zookeeper
slave02
192.168.239.102
Datanode
nodemanager
zookeeper
kafka
zookeeper
slave03
192.168.239.103
Datanode
nodemanager
zookeeper
kafka
zookeeper
配置Hadoop基本环境
设置集群虚拟机网络(四台都要)
四服务器IP地址
master01:192.168.239.100
slave01:192.168.239.101
slave02:192.168.239.102
slave03:192.168.239.103
资源分配情况:默认分配
内存2G,CPU核数1个,磁盘20G。
服务器命名
master01
hostnamectl --static set-hostname master01
hostnamectl status
slave01
hostnamectl --static set-hostname slave01
hostnamectl status
slave02
hostnamectl --static set-hostname slave02
hostnamectl status
slave03
hostnamectl --static set-hostname slave03
hostnamectl status
修改网卡
vi /etc/sysconfig/network-scripts/ifcfg-ens33
master01
BOOTPROTO=static
DNS1=192.168.239.2
DNS2=8.8.8.8
IPADDR=192.168.239.100
PREFIX=24
GATEWAY=192.168.239.2
ONBOOT=yes
slave01
BOOTPROTO=static
DNS1=192.168.239.2
DNS2=8.8.8.8
IPADDR=192.168.239.101
PREFIX=24
GATEWAY=192.168.239.2
ONBOOT=yes
slave02
BOOTPROTO=static
DNS1=192.168.239.2
DNS2=8.8.8.8
IPADDR=192.168.239.102
PREFIX=24
GATEWAY=192.168.239.2
ONBOOT=yes
slave03
BOOTPROTO=static
DNS1=192.168.239.2
DNS2=8.8.8.8
IPADDR=192.168.239.103
PREFIX=24
GATEWAY=192.168.239.2
ONBOOT=yes
测试
ping 192.168.239.2
以及四个虚拟机关闭防火墙互ping
关闭防火墙
关闭防火墙后,可以测试使用xshell连接四台centos服务器。
systemctl stop firewalld.service
systemctl disable firewalld.service
firewall-cmd --state
将IP与域名建立关联数据库,四台服务器同配置。
vi /etc/hosts
192.168.239.100 master01
192.168.239.101 slave01
192.168.239.102 slave02
192.168.239.103 slave03
more /etc/hosts
测试
ping master01
ping slave01
ping slave02
ping slave03
使用shell连接centos配置服务器(四台都要)
习惯用mobaxterm。
生成公钥私钥,配置服务器免密互访
在四台机器上生成公钥私钥,一路回车enter即可。
ssh-keygen -t rsa -P ‘’
互相传输公钥,每台机器三行命令,注意本机的命令行可以不敲。
ssh-copy-id master01
ssh-copy-id slave01
ssh-copy-id slave02
ssh-copy-id slave03
四台主机可以免密互相登陆。
测试
在四台服务器上,输入ssh 主机名
ssh master01
ssh slave01
ssh slave02
ssh slave03
如果失败则考虑权限问题
chmod 600 ~/.ssh/authorized_keys
安装chronyNTP同步服务(四台都要,有差别)
rpm -qa |grep chrony
yum -y install chrony
y
yum install -y gcc vim wget
检查时区
timedatectl
vim /etc/chrony.conf
设置master01为chrony服务器端,slave为客户端,在master01上,注释掉server 0.centos.pool.ntp.org iburst等,添加阿里云ntp服务器。
//注释掉默认的四个ntp服务器,因为该服务器同步时间略慢
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
/**
// Allow NTP client access from local network,配置允许访问的客户端列表,支持CIDR,例如:
allow 192.168/16
// 设置同步,Serve time even if not synchronized to any NTP server.,打开注释即可,即:
local stratum 10
// 重启下服务端chrony服务,
systemctl restart chronyd
在slave01、slave02、slave03上,注释掉server 0.centos.pool.ntp.org iburst等,在添加阿里云ntp服务器基础上,将master01添加为主ntp服务器。
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
server master01 iburst
server ntp1.aliyun.com iburst
server ntp2.aliyun.com iburst
server ntp3.aliyun.com iburst
启动chrony
// 启动chrony服务
systemctl start chronyd
// 设置开机自启
systemctl enable chronyd
// 查看chrony服务状态
systemctl status chronyd
// 手动同步系统时钟
chronyc -a makestep
// 查看时间同步源
chronyc sources -v
// 校准时间服务器
chronyc tracking
安装java 1.8环境(四台都要)
报错:
[root@master01 opt]# hadoop version
ERROR: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64/bin/java is not executable.
看见有些安装教程只有yum install java-1.8.0-openjdk.x86_64,这样安装是不完整的,java -version能够查看版本,但是javac和jps命令会报错command not found。解决方法:再安装一个openjdk-devel
source /etc/profile
yum -y install java-1.8.0-openjdk-devel.x86_64
java - version
已有openJDK所以跳过安装,检查java环境变量
// 可以不操作,已验证
which java
ls -lrt /usr/bin/java
ls -lrt /etc/alternatives/java
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64/jre/bin/java
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64/jre/bin/java
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre/bin/java
slave01
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64/jre/bin/java
确定jdk安装路径 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre/bin/java
vi /etc/profile
在最后面加上环境变量配置
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64
export CLASSPATH=
J
A
V
A
H
O
M
E
/
l
i
b
:
JAVA_HOME/lib:
JAVAH?OME/lib:JRE_HOME/lib:
C
L
A
S
S
P
A
T
H
e
x
p
o
r
t
P
A
T
H
=
CLASSPATH export PATH=
CLASSPATHexportPATH=JAVA_HOME/bin:
J
R
E
H
O
M
E
/
b
i
n
:
JRE_HOME/bin:
JREH?OME/bin:PATH
测试
// 测试配置是否成功,只在该终端生效
source /etc/profile
// 重启生效,建议稍后重启,还有软件需要重启
reboot
java -version
echo $JAVA_HOME
安装Scala(四台都要)
安装Scala
cd /opt
wget --no-check-certificate https://downloads.lightbend.com/scala/2.13.3/scala-2.13.3.tgz
tar -zxvf v2.13.10.tar.gz -C /opt/
mv /opt/scala-2.13.10 /opt/scala
配置Scala环境变量
vi /etc/profile
export SCALA_HOME=/opt/scala
export PATH=
P
A
T
H
:
PATH:
PATH:SCALA_HOME/bin
测试
// 测试配置是否成功,只在该终端生效
source /etc/profile
// 重启生效,建议稍后重启,还有软件需要重启
reboot
scala -version
echo $SCALA_HOME
安装Hadoop(四台一样)
官网下载:
https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz
cd /opt/
wget --no-check-certificate https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz
tar -zxvf hadoop-3.3.5.tar.gz -C /opt/
mv /opt/hadoop-3.3.5 /opt/hadoop
echo ‘export HADOOP_HOME=/opt/hadoop/’ >> /etc/profile
echo ‘export PATH=
P
A
T
H
:
PATH:
PATH:HADOOP_HOME/bin’ >> /etc/profile
echo ‘export PATH=
P
A
T
H
:
PATH:
PATH:HADOOP_HOME/sbin’ >> /etc/profile
export HADOOP_HOME=/opt/hadoop
export PATH=
P
A
T
H
:
PATH:
PATH:HADOOP_HOME/bin
export PATH=
P
A
T
H
:
PATH:
PATH:HADOOP_HOME/sbin
hadoop的集群是基于master01/slave模式。
namenode和jobtracker属于master01,datanode和tasktracker属于slave,master01只有一个,而slave有多个。
分布式存储(hdfs)角度:集群中的节点由一个namenode和多个datanode组成。namenode是中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件访问的引用。集群中的datanode一般是一个节点一个,负责管理它所在节点上的存储。
HDFS暴露了文件系统的名字空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据库,这些块存储在一组datanode上。
namenode执行文件系统的名字空间操作,比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体datanode节点的映射。
datanode负责处理文件系统客户端的读写请求。在namenode的统一调度下进行数据块的创建、删除和复制。
分布式应用(mapreduce)角度:集群中的节点有一个jobtracker和多个tasktracker组成。jobtracker负责任务的调度,tasktracker负责并行执行任务。tasktracker必须运行在datanode上,这样便于数据的本地计算,而jobtracker和namenode则必须在同一台机器上。
配置Hadoop配置文件
bin:存放Hadoop、HDFS、YARN和MapReduce运行程序和管理软件
etc:存放Hadoop
include:类似c语言头文件
lib:本地库文件,支持对数据进行压缩和解压
libexe:本地库文件,支持对数据进行压缩和解压
sbin:Hadoop集群启动、停止命令
share:说明文档、案例、依赖jar包
报错:
[root@master01 opt]# hadoop version
ERROR: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64/bin/java is not executable.
看见有些安装教程只有yum install java-1.8.0-openjdk.x86_64,这样安装是不完整的,java -version能够查看版本,但是javac和jps命令会报错command not found。解决方法:再安装一个openjdk-devel
source /etc/profile
yum -y install java-1.8.0-openjdk-devel.x86_64
// 修改hadoop-env.sh
// 刚开始修改为
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64
这样的修改是错的,
// 在主节点cmaster01上格式化主节点命名空间的时候会报错ERROR: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.322.b06-1.el7_9.x86_64 is not executable
解决方法:正确的修改应该是
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre
进入文件cd /opt/hadoop
cd /opt/hadoop/etc/hadoop/
ls
找到7个文件,
hadoop-env.sh
yarn-env.sh
slaves
core-site.xml
hdfs-site.xml
maprd-site.xml
yarn-site.xml
并逐一进行配置
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64
2. 在yarn-env.sh中配置JAVA_HOME
vi yarn-env.sh
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64
3. 在slaves中配置slave节点的ip或者host
vi slaves
master01
#localhost
slave01
slave02
slave03
slave01
4. 修改core-site.xml
vi core-site.xml
fs.defaultFS
hdfs://master01:9000/
hadoop.tmp.dir
file:/opt/hadoop-3.3.5/tmp
io.file.buffer.size
131702
5. 修改hdfs-site.xml
vi hdfs-site.xml
dfs.namenode.secondary.http-address
master01:9001
dfs.namenode.name.dir
file:/opt/hadoop-3.3.5/dfs/name
dfs.datanode.data.dir
file:/opt/hadoop-3.3.5/dfs/data
dfs.replication
3
dfs.webhdfs.enabled
true
dfs.namenode.name.dir----HDFS namenode数据镜像目录。
dfs.datanode.data.dir—HDFS datanode数据镜像存储路径,可以配置多个不同的分区和磁盘中,使用,号分隔。
还可以配置:dfs.namenode.http-address—HDFS Web查看主机和端口号。
可以参考下边这个hdfs-site.xml配置项 :
vim hdfs-site.xml
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/hadoop/hdfs/data</value>
<!-- HDFS datanode数据镜像存储路径,可以配置多个不同的分区和磁盘中,使用,号分隔 -->
<description> </description>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>apollo.hadoop.com:50070</value>
<!-- HDFS Web查看主机和端口号 -->
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>artemis.hadoop.com:50090</value>
<!-- 辅控HDFS Web查看主机和端口 -->
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
<!-- HDFS数据保存份数,通常是3 -->
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>1073741824</value>
<!-- datanode写磁盘会预留1G空间给其它程序使用,而非写满,单位 bytes -->
</property>
<property>
<name>dfs.block.size</name>
<value>134217728</value>
<!-- HDFS数据块大小,当前设置为128M/Blocka -->
</property>
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
<!-- HDFS关闭文件权限 -->
</property>
来自于:https://blog.csdn.net/jssg_tzw/article/details/70314184 6. 修改mapred-site.xml scp mapred-site.xml.template mapred-site.xml vi mapred-site.xml mapreduce.framework.name yarn mapreduce.jobtracker.http.address master01:50030 mapreduce.jobhistory.address master01:10020 mapreduce.jobhistory.webapp.address master01:19888 mapred.job.tracker http://master01:9001 7. 修改yarn-site.xml vi yarn-site.xml yarn.nodemanager.aux-services mapreduce_shuffle yarn.nodemanager.aux-services.mapreduce.shuffle.class org.apache.hadoop.mapred.ShuffleHandler yarn.resourcemanager.address master01:8032 yarn.resourcemanager.scheduler.address master01:8030 yarn.resourcemanager.resource-tracker.address master01:8031 yarn.resourcemanager.resource-tracker.address master01:8035 yarn.resourcemanager.admin.address master01:8033 yarn.resourcemanager.webapp.address master01:8088 yarn.resourcemanager.hostname master01 yarn.nodemanager.resource.memory-mb 2048
先分别在四台服务器/opt下创建hadoop-3.3.5 cd /opt mkdir hadoop-3.3.5 chmod 777 hadoop-3.3.5 将配置好的hadoop-3.3.5文件夹分发给所有slaves scp -r /opt/hadoop-3.3.5 root@slave01:/opt/ scp -r /opt/hadoop-3.3.5 root@slave02:/opt/ scp -r /opt/hadoop-3.3.5 root@slave03:/opt/ 在master01上执行以下操作,就可以启动Hadoop了 cd /opt/hadoop #格式化namenode bin/hadoop namenode -format #启动dfs,yarn sbin/start-all.sh sbin/start-yarn.sh cd /opt/hadoop sbin/start-all.sh sbin/start-yarn.sh 一般有六个进程 [图片] 打开浏览器 http://192.168.239.100:8088浏览ResourceManager的Web界面 http://192.168.239.100:50070备注:该端口50070配置项是可以设置在hdfs-site.xml Hadoop集群HA 暂时不搞 安装Spark https://www.apache.org/dyn/closer.lua/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz cd /opt/ wget --no-check-certificate https://www.apache.org/dyn/closer.lua/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz tar -zxvf spark-3.3.2-bin-hadoop3.tgz -C /opt/ mv /opt/spark-3.3.2-bin-hadoop3 /opt/spark 配置spark cd /opt/spark/conf/ ls scp spark-env.sh.template spark-env.sh ls vi spark-env.sh export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath) export SCALA_HOME=/opt/scala export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64 export HADOOP_HOME=/opt/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop SPARK_master01_IP=master01 SPARK_LOCAL_DIRS=/opt/spark SPARK_DRIVER_MEMORY=1G
// 未配置
export SPARK_master01_IP=127.0.0.1
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/usr/hadoop/etc/hadoop
vi /etc/profile
export HADOOP_OPTS=“
H
A
D
O
O
P
O
P
T
S
?
D
j
a
v
a
.
l
i
b
r
a
r
y
.
p
a
t
h
=
HADOOP_OPTS -Djava.library.path=
HADOOPO?PTS?Djava.library.path=HADOOP_HOME/lib/native”
export SPARK_HOME=/opt/spark
export PATH=
S
P
A
R
K
H
O
M
E
/
b
i
n
:
SPARK_HOME/bin:
SPARKH?OME/bin:PATH
source /etc/profile
注:在设置Worker进程的CPU个数和内存大小,要注意机器的实际硬件条件,如果配置的超过当前Worker节点的硬件条件,Worker进程会启动失败。
vi slaves在slaves文件下填上slave主机名:
scp slaves.template slaves
vi slaves
#localhost
slave01
slave02
slave03
将配置好的spark文件夹分发给所有slaves吧
cd /opt
mkdir spark-3.3.2-bin-hadoop2.7
chmod 777 spark-3.3.2-bin-hadoop2.7
scp -r /opt/spark-3.3.2-bin-hadoop2.7 spark@slave01:/opt/
scp -r /opt/spark-3.3.2-bin-hadoop2.7 spark@slave02:/opt/
scp -r /opt/spark-3.3.2-bin-hadoop2.7 spark@slave03:/opt/
启动spark
cd /opt/spark
bin/run-example SparkPi
sbin/start-all.sh
jps
master01
6608 Worker
6377 master01
6846 Jps
jps
7949 Jps
7328 SecondaryNameNode
7805 master01
7137 NameNode
7475 ResourceManager
slave01、slave02、slave03
$jps
3132 DataNode
3759 Worker
3858 Jps
3231 NodeManager
进入Spark的Web管理页面: http://192.168.239.100:8080
另外也可以进入http://slave01:8042查看slave01的信息:
安装Zookeeper
下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
将下载的.tar.gz的文件解压到/optl文件夹下
cd /opt
wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz -C /opt/
mv /opt/apache-zookeeper-3.8.1-bin /opt/zookeeper
sudo chown -R hadoop ./zookeeper # 修改文件权限
cd /opt/zookeeper
mkdir /opt/zookeeper/zkdata
mkdir /opt/zookeeper/zkdatalog
chmod 777 zkdata
chmod 777 zkdatalog
cd /opt/zookeeper/conf
scp zoo_sample.cfg zoo.cfg
vi zoo.cfg
#这个目录是预先创建的
dataDir=/opt/zookeeper/zkdata
#这个目录是预先创建的
dataLogDir=/opt/zookeeper/zkdatalog
环境变量
vi /etc/profile
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$
Z
O
O
K
E
E
P
E
R
H
O
M
E
/
b
i
n
:
ZOOKEEPER_HOME/bin:
ZOOKEEPERH?OME/bin:PATH
export CLASSPATH=.:
J
A
V
A
H
O
M
E
/
l
i
b
/
d
t
.
j
a
r
:
JAVA_HOME/lib/dt.jar:
JAVAH?OME/lib/dt.jar:JAVA_HOME/lib/tools.jar:$ZOOKEEPER_HOME/lib
5)启动zookeeper
cd /opt/zookeeper/bin
sh zkServer.sh start
安装Kafka
cd /opt
wget --no-check-certificate https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -zxvf kafka_2.13-3.4.0.tgz -C /opt/
mv /opt/kafka_2.13-3.4.0 /opt/kafka
cd /opt
tar -zxvf kafka_2.13-3.4.0.tgz -C /opt/
mv /opt/kafka_2.13-3.4.0 /opt/kafka
cd /opt/kafka
mkdir /opt/kafka/kafka_log
cd /opt/kafka/config/
vi server.properties
log.dirs=/opt/kafka/kafka_log
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
// 下面的去掉注释
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
more server.properties
// 应存在的配置
broker.id=0
listeners=PLAINTEXT://192.178.0.111:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/kafka_log
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
vi /etc/profile
export KAFKA_HOME=/opt/kafka
export PATH=
P
A
T
H
:
PATH:
PATH:KAFKA_HOME/bin
启动Kafka
cd /opt/kafka
sh ./bin/kafka-server-start.sh ./config/server.properties &
kafka-server-start.sh …/config/server.properties
netstat -tunlp|egrep “(2181|9092)”
备注:上边是启动失败信息,启动失败原因是我的服务器9092端口未开启
此时,检测2181与9092端口
netstat -tunlp|egrep “(2181|9092)”
tcp6 0 0 :::2181 ::😗 LISTEN 8896/java
[root@localhost opt]# netstat -tunlp|egrep “(2181|9092)”
tcp6 0 0 192.178.0.111:9092 ::😗 LISTEN 10299/java
tcp6 0 0 :::2181 ::😗 LISTEN 8896/java
安装kafka- manager
先安装sbt
cd /opt
wget --no-check-certificate https://github.com/sbt/sbt/releases/download/v1.8.2/sbt-1.8.2.tgz
tar -zxvf sbt-1.8.2.tgz -C /opt/
mv /opt/sbt-1.8.2.tgz /opt/sbt
vi /etc/profile
export SBT_HOME=/opt/sbt
export PATH=
S
B
T
H
O
M
E
/
b
i
n
:
SBT_HOME/bin:
SBTH?OME/bin:PATH
cd /opt
wget --no-check-certificate https://github.com/yahoo/kafka-manager/releases/CMAK-3.0.0.6.tar.gz
tar -zxvf CMAK-3.0.0.6.tar.gz -C /opt/
mv /opt/CMAK-3.0.0.6 /opt/KafkaManager
cd /opt/KafkaManager/conf
vim application.conf
kafka-manager.zkhosts=“${IP}:2181”
./bin/kafka-manager -Dconfig.file=/opt/KafkaManger/kafka-manager-1.3.0.8/conf/application.conf -Dhttp.port=8181 > /dev/null 2>&1 &
新建一个TOPIC
创建topic
/opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic kafkatopic
此时kafaka服务器开启窗口(执行[root@localhost kafka_2.12-1.1.0]# sh ./bin/kafka-server-start.sh ./config/server.properties &的窗口)会有变化:
[图片]
–查看所有topic
/opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
–查看指定topic
/opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic logTopic100
6) 把KAFKA的生产者启动起来:
/opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --sync --topic kafkatopic
7)另开一个终端,把消费者启动起来:
sh /opt/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
也可以采用:
sh /opt/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.178.0.111:9092 --topic kafkatopic --from-beginning
(–from beginning 是从头开始消费,不加则是消费当前正在发送到该topic的消息)
8)使用
在发送消息的终端输入aaa,则可以在消费消息的终端显示
生产者生产:
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic
消费者接收:
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
生产者生产:
[root@localhost ~]# /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic
a
b
c
d
消费者接收:
[root@localhost ~]# /opt/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
a
b
c
d
[图片]