Springboot整合kafka基本使用

发布时间:2023年12月20日

项目搭建

同样的,需要我们搭建一个maven工程,整合非常的简单,需要用到:

<dependency>
????<groupId>org.springframework.kafka</groupId>
????<artifactId>spring-kafka</artifactId>
</dependency>

来一起看下完整的pom.xml:

<?xml?version="1.0"?encoding="UTF-8"?>
<project?xmlns="http://maven.apache.org/POM/4.0.0"
?????????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0</modelVersion>

????<groupId>org.example</groupId>
????<artifactId>springboot-kafka-all</artifactId>
????<version>1.0-SNAPSHOT</version>

????<properties>
????????<java.version>1.8</java.version>
????</properties>

????<parent>
????????<groupId>org.springframework.boot</groupId>
????????<artifactId>spring-boot-starter-parent</artifactId>
????????<version>2.1.3.RELEASE</version>
????</parent>

????<dependencies>
????????<!--web-->
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-web</artifactId>
????????</dependency>
????????<!--test-->
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-test</artifactId>
????????</dependency>
????????<!--?kafka-->
????????<dependency>
????????????<groupId>org.springframework.kafka</groupId>
????????????<artifactId>spring-kafka</artifactId>
????????</dependency>
????????<!--Hutool依赖-->
????????<dependency>
????????????<groupId>cn.hutool</groupId>
????????????<artifactId>hutool-all</artifactId>
????????????<version>5.8.4</version>
????????</dependency>
????????<!--fast-json-->
????????<dependency>
????????????<groupId>com.alibaba</groupId>
????????????<artifactId>fastjson</artifactId>
????????????<version>1.2.58</version>
????????</dependency>

????????<dependency>
????????????<groupId>?org.slf4j?</groupId>
????????????<artifactId>?slf4j-api?</artifactId>
????????????<version>?1.6.4?</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.slf4j</groupId>
????????????<artifactId>slf4j-simple</artifactId>
????????????<version>1.7.25</version>
????????????<scope>compile</scope>
????????</dependency>
????????<dependency>
????????????<groupId>org.projectlombok</groupId>
????????????<artifactId>lombok</artifactId>
????????</dependency>
????</dependencies>

????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.springframework.boot</groupId>
????????????????<artifactId>spring-boot-maven-plugin</artifactId>
????????????????<version>2.1.3.RELEASE</version>
????????????</plugin>
????????</plugins>
????</build>

</project>

配置也很简单?application.yml

server:
??port:?8081

spring:
??kafka:
????producer:
??????bootstrap-servers:?127.0.0.1:9092

然后新建一个启动类,看下控制台是否成功链接了Kafka,在启动之前别忘了开启Kafka集群

基本使用

先从一个简单的例子,来快速体验一下Kafka,新建HelloController

@Slf4j
@RestController
public?class?HelloController?{

????private?static?final?String?topic?=?"test";

????@Autowired
????private?KafkaTemplate<Object,?Object>?kafkaTemplate;

????//?接收消息
????@KafkaListener(id?=?"helloGroup",?topics?=?topic)
????public?void?listen(String?msg)?{
????????log.info("hello?receive?value:?{}"?,?msg);
????????//?hello?receive?value:?hello?kafka
????}

????@GetMapping("/hello")
????public?String?hello()?{
????????//?发送消息
????????kafkaTemplate.send(topic,?"hello?kafka");
????????return?"hello";
????}
}

我们通过KafkaTemplate进行消息的发送, 通过@KafkaListener进行消息的消费,我们可以指定消费者ID以及监听的topic,请求localhost:8081/hello观察控制台的变化。请求后,发现消息发送和接收的非常快,我们也可以观察UI后台的消息详情,同步对比

topic创建

之前我们的topic是在UI后台创建的,那么在SpringBoot中如何创建呢? 下面我们试着发送一个不存在的topic

?//?当topic不存在时?会默认创建一个topic
????//?num.partitions?=?1?#默认Topic分区数
????//?num.replica.fetchers?=?1?#默认副本数
????@GetMapping("/hello1")
????public?String?hello1()?{
????????//?发送消息
????????kafkaTemplate.send("hello1",?"hello1");
????????return?"hello1";
????}

????//?接收消息
????@KafkaListener(id?=?"hello1Group",?topics?=?"hello1")
????public?void?listen1(String?msg)?{
????????log.info("hello1?receive?value:?{}"?,?msg);
????????//?hello1?receive?value:?hello1
????}

请求之后,观察控制台以及管理后台,发现并没有报错,并且给我们自动创建了一个topic,在自动创建下,默认的参数是:

??num.partitions?=?1?#默认Topic分区数
??num.replica.fetchers?=?1?#默认副本数

如果我想手动创建呢?我们可以通过NewTopic来手动创建:

@Configuration
public?class?KafkaConfig?{
????@Bean
????public?KafkaAdmin?admin(KafkaProperties?properties){
????????KafkaAdmin?admin?=?new?KafkaAdmin(properties.buildAdminProperties());
????????//?默认False,在Broker不可用时,如果你觉得Broker不可用影响正常业务需要显示的将这个值设置为True
????????admin.setFatalIfBrokerNotAvailable(true);
????????//?setAutoCreate(false)?:?默认值为True,也就是Kafka实例化后会自动创建已经实例化的NewTopic对象
????????//?initialize():当setAutoCreate为false时,需要我们程序显示的调用admin的initialize()方法来初始化NewTopic对象
????????return?admin;
????}

????/**
?????*?创建指定参数的?topic
?????*?@return
?????*/
????@Bean
????public?NewTopic?topic()?{
????????return?new?NewTopic("hello2",?0,?(short)?0);
????}
}

如果要更新呢?也非常的简单

?/**
?????*?更新?topic
?????*?@return
?????*/
????@Bean
????public?NewTopic?topicUpdate()?{
????????return?new?NewTopic("hello2",?1,?(short)?1);
????}

注意这里的参数只能+不能-

这种方式太简单了,如果我想在代码逻辑中来创建呢?我们可以通过AdminClient来手动创建

??/**
?????*?AdminClient?创建
?????*/
????@Autowired
????private?KafkaProperties?properties;

????@GetMapping("/create/{topicName}")
????public?String?createTopic(@PathVariable?String?topicName)?{
????????AdminClient?client?=?AdminClient.create(properties.buildAdminProperties());
????????if(client?!=null){
????????????try?{
????????????????Collection<NewTopic>?newTopics?=?new?ArrayList<>(1);
????????????????newTopics.add(new?NewTopic(topicName,1,(short)?1));
????????????????client.createTopics(newTopics);
????????????}catch?(Throwable?e){
????????????????e.printStackTrace();
????????????}finally?{
????????????????client.close();
????????????}
????????}
????????return?topicName;
????}

观察下管理后台,发现topic都创建成功了

获取消息发送的结果

有时候我们发送消息不知道是不是发成功了,需要有一个结果通知。有两种方式,一种是同步一种是异步

同步获取结果

/**
?????*?获取通知结果
?????*?@return
?????*/
????@GetMapping("/hello2")
????public?String?hello2()?{
????????//?同步获取结果
????????ListenableFuture<SendResult<Object,Object>>?future?=?kafkaTemplate.send("hello2","hello2");
????????try?{
????????????SendResult<Object,Object>?result?=?future.get();
????????????log.info("success?>>>?{}",?result.getRecordMetadata().topic());?//?success?>>>?hello2
????????}catch?(Throwable?e){
????????????e.printStackTrace();
????????}

????????return?"hello2";
????}

异步获取

/**
?????*?获取通知结果
?????*?@return
?????*/
????@GetMapping("/hello2")
????public?String?hello2()?{
????????//?发送消息?-?异步获取通知结果
????????kafkaTemplate.send("hello2",?"async?hello2").addCallback(new?ListenableFutureCallback<SendResult<Object,?Object>>()?{
????????????@Override
????????????public?void?onFailure(Throwable?throwable)?{
????????????????log.error("fail?>>>>{}",?throwable.getMessage());
????????????}

????????????@Override
????????????public?void?onSuccess(SendResult<Object,?Object>?objectObjectSendResult)?{
????????????????log.info("async?success?>>>?{}",?objectObjectSendResult.getRecordMetadata().topic());?//?async?success?>>>?hello2
????????????}
????????});

????????return?"hello2";
????}

Kafka事务

同样的,消息也会存在事务,如果第一条消息发送成功,再发第二条消息的时候出现异常,那么就会抛出异常并回滚第一条消息,下面通过一个简单的例子体会一下

@GetMapping("/hello3")
public?String?hello3()?{
????kafkaTemplate.executeInTransaction(t?->?{
????????t.send("hello3","msg1");
????????if(true)
????????????throw?new?RuntimeException("failed");
????????t.send("hello3","msg2");
????????return?true;
????});

????return?"hello3";
}

//?接收消息
@KafkaListener(id?=?"hello3Group",?topics?=?"hello3")
public?void?listen3(String?msg)?{
????log.info("hello3?receive?value:?{}"?,?msg);
}

默认情况下,Spring-kafka自动生成的KafkaTemplate实例,是不具有事务消息发送能力的。我们需要添加transaction-id-prefix来激活它

spring:
??kafka:
????producer:
??????bootstrap-servers:?127.0.0.1:9092
??????transaction-id-prefix:?kafka_.

启动之后,观察控制台的变化~ ,除此之外,还可以使用注解的方式@Transactional来开启事务

//?注解方式
????@Transactional(rollbackFor?=?RuntimeException.class)
????@GetMapping("/hello4")
????public?String?hello4()?{
????????kafkaTemplate.send("hello3","msg1");
????????if(true)
????????????throw?new?RuntimeException("failed");
????????kafkaTemplate.send("hello3","msg2");
????????return?"hello4";
????}
文章来源:https://blog.csdn.net/qq_60870118/article/details/135119296
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。