JMS
特性
Kafka:http://kafka.apache.org/
* 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统(严格意义上是不属于队列产品,是一个流处理平台),它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者
* 类似MQ,功能较为简单,主要支持常规的MQ功能
* 它提供了类似于JMS的特性,但是在设计实现上完全不同,它并不是JMS规范的实现
* 缺点:运维难度大,文档比较少, 需要掌握Scala
* RocketMQ:http://rocketmq.apache.org/
* 阿里开源的一款的消息中间件, 纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点, 性能强劲(零拷贝技术),支持海量堆积, 支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域
* 基于JMS Provider的实现
* 缺点:社区相对不活跃,更新比较快,纯java支持
* RabbitMQ:http://www.rabbitmq.com/
* 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错
* 缺点:使用Erlang开发,阅读和修改源码难度大
* 快速开始:http://kafka.apache.org/quickstart
* 快速认识概念
* Broker
* Kafka的服务端程序,可以认为一个mq节点就是一个broker
* broker存储topic的数据
* Producer生产者
* 创建消息Message,然后发布到MQ中
* 该角色将消息发布到Kafka的topic中
* Consumer消费者:
* 消费队列里面的消息
* 需要的软件和环境版本说明
* kafka-xx-yy
* xx 是scala版本,yy是kafka版本(scala是基于jdk开发,需要安装jdk环境)
* 下载地址:http://kafka.apache.org/downloads
* zookeeper
* Apache 软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册
* 下载地址:https://zookeeper.apache.org/releases.html
* jdk1.8
* 步骤
* 上传安装包(zk、jdk、kafka)
* 安装jdk
* 解压:
tar -zxvf jdk-8u181-linux-x64.tar.gz
# 重命名
mv jdk1.8.0_181 jdk1.8
# 重置环境变量
vim /etc/profile
JAVA_HOME=/usr/local/app/jdk1.8
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
环境变量立刻生效
source /etc/profile
查看安装情况
java -version
解压和重命名
* 安装Zookeeper (默认2181端口)
进入conf文件夹
# 复制一份配置文件
cp zoo_sample.cfg zoo.cfg
默认配置文件 zoo.cfg
启动zk
进入bin目录
./zkServer.sh start
# 查看端口占用情况
lsof -i:2181
若没有lsof命令则安装
yum install -y lsof
* 安装Kafka (默认 9092端口)
* config目录下 server.properties
#标识broker编号,集群中有多个broker,则每个broker的编号需要设置不同
broker.id=0
#修改下面两个配置 ( listeners 配置的ip和advertised.listeners相同时启动kafka会报错)
listeners(内网Ip)
advertised.listeners(公网ip)
#修改zk地址,默认地址【这里填写zk的地址,由于在同一台机器所以用 localhost】
zookeeper.connection=localhost:2181
# 启动
./kafka-server-start.sh ../config/server.properties &
# 停止
./kafka-server-stop.sh
# 创建topic
./kafka-topics.sh --create --zookeeper 39.105.17.127:2181 --replication-factor 1 --partitions 1 --topic xdclass-topic
# 查看topic
./kafka-topics.sh --list --zookeeper 39.105.17.127:2181
Linux环境下daemon守护进程运行Kafka
* kafka如果直接启动信息会打印在控制台,如果关闭窗口,kafka随之关闭
* 守护进程方式启动
./kafka-server-start.sh -daemon ../config/server.properties &
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
# always restart
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- 9092:9092
# host ip
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# always restart
restart: always
volumes:
- ./docker.sock:/var/run/docker.sock
验证
通过容器名称进入到kafka容器中:
docker exec -it kafka /bin/bash
创建一个名称为test的topic:
kafka-topics.sh --create --topic test \
--zookeeper zookeeper:2181 --replication-factor 1 \
--partitions 1
查看刚刚创建的topic信息:
kafka-topics.sh --zookeeper zookeeper:2181 \
--describe --topic test
打开生产者发送若干条消息:
kafka-console-producer.sh --topic=test \
--broker-list kafka:9092
开发消费者接收消息:
kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning --topic test
【zookeeper 改为 ip】
查看topic
kafka-topics.sh --list --zookeeper zookeeper:2181
查看broker节点topic状态信息
kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test
如果可以成功接收到消息,则说明kafka已经启动成功了,可以进行本地开发以及调试工作了
Kafka数据存储流程、原理、LEO+HW讲解
topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
是以文件夹的形式存储在具体Broker本机上
Kafka的producer生产者发送到Broker分区策略
* 生产者发送到broker里面的流程是怎样的呢,一个 topic 有多个 partition分区,每个分区又有多个副本
* 如果指定Partition ID,则PR被发送至指定Partition (ProducerRecord)
* 如果未指定Partition ID,但指定了Key, PR会按照hash(key)发送至对应Partition
* 如果未指定Partition ID也没指定Key,PR会按照默认 round-robin轮训模式发送到每个Partition
* 消费者消费partition分区默认是range模式
* 如果同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)
* 注意:Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互
* 生产者到broker发送流程
* Kafka的客户端发送数据到服务器,不是来一条就发一条,会经过内存缓冲区(默认是16KB),通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集到的Batch里面,再一次性发送到Broker上去的,这样性能才可能题高
* 生产者常见配置
* 官方文档 http://kafka.apache.org/documentation/#producerconfigs
#kafka地址,即broker地址
bootstrap.servers
?
#当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
acks
?
#请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
retries
?
#每个分区未发送消息总字节大小,单位:字节,超过设置的值就会提交数据到服务端,默认值是16KB
batch.size
?
# 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满,如果想减少请求的数量,可以设置 linger.ms 大于#0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
# 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送 减少请求
#如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
linger.ms
?
# buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
# 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器
# 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
# buffer.memory要大于batch.size,否则会报申请内存不足的错误,不要超过物理内存,根据实际情况调整
buffer.memory
?
# key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使
#消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将#key序列化成字节数组。
key.serializer
value.serializer
-- Topic (名字)
-- PartitionID (可选)
-- Key(可选)
-- Value
key默认是null,大多数应用程序会用到key
自定义分区算法,让消息始终在同一个topic的同一个partition分区,这样保存顺序消费
producer生产者发送指定分区实战
* 创建topic,配置5个分区,1个副本
* 发送代码
默认分区器:
org.apache.kafka.clients.producer.internals.DefaultPartitioner
自定义分区规则
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
* round-robin (RoundRobinAssignor非默认策略)轮训
* 【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来, 所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者
* topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6
* c-1: topic-p0/topic-p2/topic-p4/topic-p6
* c-2:topic-p1/topic-p3/topic-p5
* 弊端
* 如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
* 有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2
* t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))
* 消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2
* range (RangeAssignor默认策略)范围
* 【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者
* topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6
* c-1: topic-p0/topic-p1/topic-p2/topic-p3
* c-2:topic-p4/topic-p5/topic-p6
* 弊端
* 只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
* 如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic越多则消费的分区也越多,则性能有所下降
消费者配置
#消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
group.id
?
#为true则自动提交偏移量
enable.auto.commit
?
#自动提交offset周期
auto.commit.interval.ms
?
#重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理,
#默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更 才可以
auto.offset.reset
?
#序列化器
key.deserializer
Consumer手工提交offset配置和从头消费配置
* Kafka 采取了分片和索引机制,将每个partition分为多个segment,每个segment对应2个文件 log 和 index
* 新增备注
index文件中并没有为每一条message建立索引,采用了稀疏存储的方式
每隔一定字节的数据建立一条索引,避免了索引文件占用过多的空间和资源,
从而可以将索引文件保留到内存中缺点是没有建立索引的数据在查询的过程中需要小范围内的顺序扫描操作。
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 默认是1G,当log数据文件大于1g后,会创建一个新的log文件(即segment,包括index和log)
log.segment.bytes=1073741824
#分段一
00000000000000000000.index 00000000000000000000.log
#分段二 数字 1234指的是当前文件的最小偏移量offset,即上个文件的最后一个消息的offset+1
00000000000000001234.index 00000000000000001234.log
#分段三
00000000000000088888.index 00000000000000088888.log
CA: 如果不要求P(不允许分区),则C(强一致性)和A(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,没办法部署子节点,这是违背分布式系统设计的初衷的
?
CP: 如果不要求A(可用),每个请求都需要在服务器之间保持强一致,而P(分区)会导致同步时间无限延长(也就是等待数据同步完才能正常访问服务),一旦发生网络故障或者消息丢失等情况,就要牺牲用户的体验,等待所有数据全部一致了之后再让用户访问系统
?
AP:要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。
Partition什么时间发送ack确认机制(要追求高吞吐量,那么就要放弃可靠性)
Kafka数据可靠性保证原理之ISR机制讲解
* 什么是ISR (in-sync replica set )
* leader会维持一个与其保持同步的replica集合,该集合就是ISR,每一个leader partition都有一个ISR,leader动态维护, 要保证kafka不丢失message,就要保证ISR这组集合存活(至少有一个存活),并且消息commit成功
* Partition leader 保持同步的 Partition Follower 集合, 当 ISR 中的Partition Follower 完成数据的同步之后,就会给 leader 发送 ack
* 如果Partition follower长时间(replica.lag.time.max.ms) 未向leader同步数据,则该Partition Follower将被踢出ISR
* Partition Leader 发生故障之后,就会从 ISR 中选举新的 Partition Leader。
* OSR (out-of-sync-replica set)
* 与leader副本分区 同步滞后过多的副本集合
* AR(Assign Replicas)
* 分区中所有副本统称为AR
Kafka的HighWatermark的作用你知道多少
* 背景 broker故障后
* ACK保障了【生产者】的投递可靠性
* ** partition的多副本保障了【消息存储】的可靠性**
* hw的作用是啥?
* 备注:重复消费问题需要消费者自己处理
*** HW作用:保证消费数据的一致性和副本数据的一致性**
假设没有HW,消费者消费leader到15,下面消费者应该消费16。
?
此时leader挂掉,选下面某个follower为leader,此时消费者找新leader消费数据,发现新Leader没有16数据,报错。
?
HW(High Watermark)是所有副本中最小的LEO
Kafka高可用集群搭建节点需求规划
* 注意
* 没那么多机器,采用伪集群方式搭建(端口号区分)
* zookeeper部署3个节点
* 2181
* 2182
* 2183
* kafka部署3个节点
* 9092
* 9093
* 9094
* 网络安全组记得开放端口
zookeeper节点端口
* 2181
* 2182
* 2183
* cp -r 复制zk节点
* 修改配置zoo.cfg
#客户端端口
clientPort=2181
?
#数据存储路径
dataDir=/tmp/zookeeper/2181 2182 / 2183
?
#修改AdminServer的端口:
admin.serverPort=8881 8882 / 8883
cd /tmp/zookeeper/2181 2182 / 2183
echo 1 > myid 2 / 3
服务器id 为之前配置的 myid中的内容
# server.服务器id=服务器IP地址:服务器直接通信端口:服务器之间选举投票端口
?
server.1=127.0.0.1:2881:3881
server.2=127.0.0.1:2882:3882
server.3=127.0.0.1:2883:3883
#启动zk
./zkServer.sh start
?
#查看节点状态
./zkServer.sh status
?
#停止节点
./zkServer.sh stop
Kafka高可用集群搭建-环境准备
* 伪集群搭建,3个节点同个机器端口区分
* 9092
* 9093
* 9094
* 配置
#内网中使用,内网部署 kafka 集群只需要用到 listeners,内外网需要作区分时 才需要用到advertised.listeners
listeners=PLAINTEXT://172.18.123.229:9092
?
advertised.listeners=PLAINTEXT://112.74.55.160:9092
?
#每个节点编号1、2、3
broker.id=1
?
#端口
port=9092
?
#配置3个
log.dirs=/tmp/kafka-logs-1
?
#zk地址
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
Kafka高可用集群之多个Kafka节点搭建实战
* 启动Kafka实战
./kafka-server-start.sh -daemon ../config/server.properties &
?
./kafka-server-start.sh ../config/server.properties &
./kafka-topics.sh --create --zookeeper 112.74.55.160:2181,112.74.55.160:2182,112.74.55.160:2183 --replication-factor 3 --partitions 6 --topic xdclass-cluster-topic
SpringBoot项目测试
Kafka高可用集群搭建实战-守护进程方式启动
* 守护进程的方式启动 kafka
./kafka-server-start.sh -daemon ../config/server.properties &
* Kafka将数据持久化到了硬盘上,为了控制磁盘容量,需要对过去的消息进行清理
* 问题:如果让你去设计这个日志删除策略,你会怎么设计?【原理思想】很重要的体现,下面是kafka答案
* 内部有个定时任务检测删除日志,默认是5分钟 log.retention.check.interval.ms
* 支持配置策略对数据清理
* 根据segment单位进行定期清理
#清理超过指定时间的消息,默认是168小时,7天,
#还有log.retention.ms, log.retention.minutes, log.retention.hours,优先级高到低
log.retention.hours=168
?
#超过指定大小后,删除旧的消息,下面是1G的字节数,-1就是没限制
log.retention.bytes=1073741824
?
还有基于日志起始位移(log start offset),未来社区还有更多
基于【时间删除】 日志说明
配置了7天后删除,那7天如何确定呢?
?
每个日志段文件都维护一个最大时间戳字段,每次日志段写入新的消息时,都会更新该字段
?
一个日志段segment写满了被切分之后,就不再接收任何新的消息,最大时间戳字段的值也将保持不变
?
kafka通过将当前时间与该最大时间戳字段进行比较,从而来判定是否过期
假设日志段大小是500MB,当前分区共有4个日志段文件,大小分别是500MB,500MB,500MB和10MB
?
10MB那个文件就是active日志段。
?
此时该分区总的日志大小是3*500MB+10MB=1500MB+10MB
?
如果阈值设置为1500MB,那么超出阈值的部分就是10MB,小于日志段大小500MB,故Kafka不会执行任何删除操作,即使总大小已经超过了阈值;
?
如果阈值设置为1000MB,那么超过阈值的部分就是500MB+10MB > 500MB,此时Kafka会删除最老的那个日志段文件
?
注意:超过阈值的部分必须要大于一个日志段的大小
零拷贝ZeroCopy(SendFile)
kafka高性能
方式一:
<!--kafka客户端-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
方式二:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
spring:
kafka:
bootstrap-servers: 112.74.55.160:9092,112.74.55.160:9093,112.74.55.160:9094
producer:
# 消息重发的次数。
retries: 0
#一个批次可以使用的内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
Springboot项目整合spring-kafka监听消费消息
* 配置文件修改增加消费者信息
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
#手工ack,调用ack后立刻提交offset
ack-mode: manual_immediate
#容器运行的线程数
concurrency: 4
Kafka事务消息-整合SpringBoot实战
* Kafka 从 0.11 版本开始引入了事务支持
* 事务可以保证对多个分区写入操作的原子性
* 操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能
* 配置
spring:
kafka:
bootstrap-servers: 112.74.55.160:9092,112.74.55.160:9093,112.74.55.160:9094
producer:
# 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
#retries: 1
#一个批次可以使用的内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
acks: all
?
#事务id
transaction-id-prefix: xdclass-tran
?
?
?
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
?
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
auto-offset-reset: earliest
?
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
?
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
?
listener:
# 在侦听器容器中运行的线程数。
concurrency: 4
#listner负责ack,手动调用Acknowledgment.acknowledge()后立即提交
ack-mode: manual_immediate
#避免出现主题未创建报错
missing-topics-fatal: false
Kafka很多内容,但是不一定都要学,看自己的需求,有些功能是比较鸡肋的