前提:确保kafka环境
我使用的方案是docker
我使用的镜像为:wurstmeister/kafka
我使用的镜像为:wurstmeister/zookeeper
docker安装kafka和zk教程:点这里手把手教你使用Docker搭建kafka【详细教程】
使用kafka前,要确保你对kafka有一个基本的了解,如topic(可以看成是rabbitmq的队列),分区,offset,生产者,消费者,消费者组等
此教程源码地址:https://gitee.com/jackXUYY/springboot-example.git
拉下代码之后,改一下kafka的地址为自己的kafka地址,然后把事务配置先注释掉
配置文件里的transaction-id-prefix属性
检查项目能否正常启动
发送消息
消费者
打印结果
发送和消费结果
生产者事务发送:需配置transaction-id-prefix开启事务
结果
结果
我们把这个类的注释打开,同时配置文件里的,partitioner.class配置开启,把消费者组group-id改为mykafka2,再把事务的配置也注释掉
把上面的方法注释掉,把此方法的注释打开
发送一个符合分区的消息内容
因为此时没有1号分区,消息会发送超时
我们可以新建配置类,声明分区数
然后在发送消息试一下
后续,刚才我们是手动ack,提交偏移量
我们也可以指定偏移量去消费,方法代码里都有的,这里就截图了
批量消费
过滤消息内容再进行消费,异常处理
消息转发
定时启动,停止监听器
生产者如何提高吞吐量
参考博客:https://blog.51cto.com/u_14452299/6019881
思考,如果其他中间件或框架,想要自定义配置的话,那可能也会继承它的接口,重写方法,或者声明自己的配置类啥的