- 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
- 📕系列专栏:Spring源码、JUC源码、Kafka原理、分布式技术原理、数据库技术
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
- 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀
在高并发的场景中,消息积压问题,可以说如影随形,真的没办法从根本上解决。表面上看,已经解决了,但后面不知道什么时候,就会冒出一次,比如这次:
有天下午,产品过来说:有几个商户投诉过来了,他们说菜品有延迟,快查一下原因。
这次问题出现得有点奇怪。
为什么这么说?
首先这个时间点就有点奇怪,平常出问题,不都是中午或者晚上用餐高峰期吗?怎么这次问题出现在下午?
根据以往积累的经验,我直接看了kafka
的topic
的数据,果然上面消息有积压,但这次每个partition
都积压了十几万的消息没有消费,比以往加压的消息数量增加了几百倍。这次消息积压得极不寻常。
我赶紧查服务监控看看消费者挂了没,还好没挂。又查服务日志没有发现异常。这时我有点迷茫,碰运气问了问订单组下午发生了什么事情没?他们说下午有个促销活动,跑了一个JOB
批量更新过有些商户的订单信息。
这时,我一下子如梦初醒,是他们在JOB中批量发消息导致的问题。怎么没有通知我们呢?实在太坑了。
虽说知道问题的原因了,倒是眼前积压的这十几万的消息该如何处理呢?
此时,如果直接调大partition
数量是不行的,历史消息已经存储到4
个固定的partition,只有新增的消息才会到新的partition。我们重点需要处理的是已有的partition。
事实上,遇到这种问题恐怕我们的第一反应都是扩容,但是扩容是否真的能够解决问题呢?
是否能够解决所有的问题呢?
根据问题描述,历史消息已经被存储到4个固定的分区中,只有新增的消息才会到新的分区中,所以我们的重点是处理已有的分区,那么回到扩容问题,因为现有分区已经和消费者组里面的消费者一对一了,所以哪怕我们增加分区,或者消费者组里面的消费者,会触发消费再均衡,但是因为突发情况导致消费者消费能力不够,所以哪怕扩容,也不会很快的改善这个堆积问题。
采用多线程的方式进行消费呢?
表面上来看目前来说是一个很好的办法,但是仍然有几个比较关键的问题还需要去解决。
该问题其实需要和下游进行对其,因为现在急需解决消息堆积的问题,如果你贸然使用了多线程来处理,但是没有和下游处理端进行对其,那么他们可能也会面临突增的流量,可能会造成一定的后果。
这个其实涉及到一个问题就是:
先提交offset再消费还是
先消费再提交offset
这个情况会造成数据丢失,假如说因为某种原因出现了异常,导致没有消息成功,那么相当于offset已经提交,但是问题并没有消费掉,这个可以记录下消费出现异常的消息,等到后面再重新跑一遍
这个问题其实和上面是反着来得,需要考虑幂等性的问题,因为我们追求的是精准发送,所以幂等性的问题也需要去考虑
本质上可以再经过一次hash,路由到固定的线程中去,从而实现了本批次中顺序消费,至于提交offset的时机,不需要等到消费完成后就可以提交,然后将消费失败的记录下来,因为我们必须要承认的是,失败并不是一件经常的事情,至于消费失败的问题,等到将来再重新消费即可。
当然,offset的提交实际这个,如果等到消费完成再提交,必须要考虑幂等性的问题。
银行里数据是比较大的,每天生产的数据大概有几个T,回到前面提到加机器的方案,加机器是没用的,行里的真实环境是开发环境给5台kafka,每台kafka服务器是48T,256G的配置,topic初始化的时候是三个topic。
为什么加机器解决不了堆积的问题呢?
场景是将一个数据库中的数据迁移到高斯(国产数据库中)
简单来说就是从数据库读出来,经过kafka,然后插入到高斯中去,但是为什么说kafka不行呢?
中间件kafka能够承受,但是高斯不行,其cpu和内存顶不住,而且目标服务器也不是相加就能加的
其本质是插入出现了问题,也就是相当于消费能力有问题。
如果我们在生产端进行限制呢?
比如在生产端做一些逻辑在,比如限流,或者瓶颈限制?
但是这样做相当于使用软件限制了硬件的消费瓶颈,这样对于行方来说,也不好向上管理,不好像领导交代
前面说的一个消费者对应一个分区,这个的前提是同一个消费者组
但是如果再开一个消费者组的话,本身分区的offset是可以自己设定的
如果说让另一个消费者组的消费的分区的偏移量从一个中段去取的话
这个想法是一个很美好的想法,但是感觉有点冒险
该解决办法就是面对突发办法的一个解决问题
将 分区_offset 存储到redis或者mysql中,redis当中有set结构,每一次生产者生成或者消费者要消费的时候,去查这个redis有没有这个key,如果存在,就代表已经消费过了。
这个方案相当于,kafka中维护了一套offset,同时在redis中也维护了一套,相当于兜底的方案。
相当于为了保证每一条消息都有一个记录。
比如流量突然来了,在极短的响应速度内感知到了以后直接进行分流?