同样的,需要我们搭建一个maven
工程,整合非常的简单,需要用到:
<dependency>
????<groupId>org.springframework.kafka</groupId>
????<artifactId>spring-kafka</artifactId>
</dependency>
来一起看下完整的pom.xml
:
<?xml?version="1.0"?encoding="UTF-8"?>
<project?xmlns="http://maven.apache.org/POM/4.0.0"
?????????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0</modelVersion>
????<groupId>org.example</groupId>
????<artifactId>springboot-kafka-all</artifactId>
????<version>1.0-SNAPSHOT</version>
????<properties>
????????<java.version>1.8</java.version>
????</properties>
????<parent>
????????<groupId>org.springframework.boot</groupId>
????????<artifactId>spring-boot-starter-parent</artifactId>
????????<version>2.1.3.RELEASE</version>
????</parent>
????<dependencies>
????????<!--web-->
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-web</artifactId>
????????</dependency>
????????<!--test-->
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-test</artifactId>
????????</dependency>
????????<!--?kafka-->
????????<dependency>
????????????<groupId>org.springframework.kafka</groupId>
????????????<artifactId>spring-kafka</artifactId>
????????</dependency>
????????<!--Hutool依赖-->
????????<dependency>
????????????<groupId>cn.hutool</groupId>
????????????<artifactId>hutool-all</artifactId>
????????????<version>5.8.4</version>
????????</dependency>
????????<!--fast-json-->
????????<dependency>
????????????<groupId>com.alibaba</groupId>
????????????<artifactId>fastjson</artifactId>
????????????<version>1.2.58</version>
????????</dependency>
????????<dependency>
????????????<groupId>?org.slf4j?</groupId>
????????????<artifactId>?slf4j-api?</artifactId>
????????????<version>?1.6.4?</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.slf4j</groupId>
????????????<artifactId>slf4j-simple</artifactId>
????????????<version>1.7.25</version>
????????????<scope>compile</scope>
????????</dependency>
????????<dependency>
????????????<groupId>org.projectlombok</groupId>
????????????<artifactId>lombok</artifactId>
????????</dependency>
????</dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.springframework.boot</groupId>
????????????????<artifactId>spring-boot-maven-plugin</artifactId>
????????????????<version>2.1.3.RELEASE</version>
????????????</plugin>
????????</plugins>
????</build>
</project>
配置也很简单?application.yml
server:
??port:?8081
spring:
??kafka:
????producer:
??????bootstrap-servers:?127.0.0.1:9092
然后新建一个启动类,看下控制台是否成功链接了Kafka
,在启动之前别忘了开启Kafka集群
先从一个简单的例子,来快速体验一下Kafka
,新建HelloController
@Slf4j
@RestController
public?class?HelloController?{
????private?static?final?String?topic?=?"test";
????@Autowired
????private?KafkaTemplate<Object,?Object>?kafkaTemplate;
????//?接收消息
????@KafkaListener(id?=?"helloGroup",?topics?=?topic)
????public?void?listen(String?msg)?{
????????log.info("hello?receive?value:?{}"?,?msg);
????????//?hello?receive?value:?hello?kafka
????}
????@GetMapping("/hello")
????public?String?hello()?{
????????//?发送消息
????????kafkaTemplate.send(topic,?"hello?kafka");
????????return?"hello";
????}
}
我们通过KafkaTemplate
进行消息的发送, 通过@KafkaListener
进行消息的消费,我们可以指定消费者ID
以及监听的topic
,请求localhost:8081/hello
观察控制台的变化。请求后,发现消息发送和接收的非常快,我们也可以观察UI
后台的消息详情,同步对比
之前我们的topic
是在UI
后台创建的,那么在SpringBoot
中如何创建呢? 下面我们试着发送一个不存在的topic
?//?当topic不存在时?会默认创建一个topic
????//?num.partitions?=?1?#默认Topic分区数
????//?num.replica.fetchers?=?1?#默认副本数
????@GetMapping("/hello1")
????public?String?hello1()?{
????????//?发送消息
????????kafkaTemplate.send("hello1",?"hello1");
????????return?"hello1";
????}
????//?接收消息
????@KafkaListener(id?=?"hello1Group",?topics?=?"hello1")
????public?void?listen1(String?msg)?{
????????log.info("hello1?receive?value:?{}"?,?msg);
????????//?hello1?receive?value:?hello1
????}
请求之后,观察控制台以及管理后台,发现并没有报错,并且给我们自动创建了一个topic
,在自动创建下,默认的参数是:
??num.partitions?=?1?#默认Topic分区数
??num.replica.fetchers?=?1?#默认副本数
如果我想手动创建呢?我们可以通过NewTopic
来手动创建:
@Configuration
public?class?KafkaConfig?{
????@Bean
????public?KafkaAdmin?admin(KafkaProperties?properties){
????????KafkaAdmin?admin?=?new?KafkaAdmin(properties.buildAdminProperties());
????????//?默认False,在Broker不可用时,如果你觉得Broker不可用影响正常业务需要显示的将这个值设置为True
????????admin.setFatalIfBrokerNotAvailable(true);
????????//?setAutoCreate(false)?:?默认值为True,也就是Kafka实例化后会自动创建已经实例化的NewTopic对象
????????//?initialize():当setAutoCreate为false时,需要我们程序显示的调用admin的initialize()方法来初始化NewTopic对象
????????return?admin;
????}
????/**
?????*?创建指定参数的?topic
?????*?@return
?????*/
????@Bean
????public?NewTopic?topic()?{
????????return?new?NewTopic("hello2",?0,?(short)?0);
????}
}
如果要更新呢?也非常的简单
?/**
?????*?更新?topic
?????*?@return
?????*/
????@Bean
????public?NewTopic?topicUpdate()?{
????????return?new?NewTopic("hello2",?1,?(short)?1);
????}
注意这里的参数只能+
不能-
这种方式太简单了,如果我想在代码逻辑中来创建呢?我们可以通过AdminClient
来手动创建
??/**
?????*?AdminClient?创建
?????*/
????@Autowired
????private?KafkaProperties?properties;
????@GetMapping("/create/{topicName}")
????public?String?createTopic(@PathVariable?String?topicName)?{
????????AdminClient?client?=?AdminClient.create(properties.buildAdminProperties());
????????if(client?!=null){
????????????try?{
????????????????Collection<NewTopic>?newTopics?=?new?ArrayList<>(1);
????????????????newTopics.add(new?NewTopic(topicName,1,(short)?1));
????????????????client.createTopics(newTopics);
????????????}catch?(Throwable?e){
????????????????e.printStackTrace();
????????????}finally?{
????????????????client.close();
????????????}
????????}
????????return?topicName;
????}
观察下管理后台,发现topic
都创建成功了
有时候我们发送消息不知道是不是发成功了,需要有一个结果通知。有两种方式,一种是同步
一种是异步
/**
?????*?获取通知结果
?????*?@return
?????*/
????@GetMapping("/hello2")
????public?String?hello2()?{
????????//?同步获取结果
????????ListenableFuture<SendResult<Object,Object>>?future?=?kafkaTemplate.send("hello2","hello2");
????????try?{
????????????SendResult<Object,Object>?result?=?future.get();
????????????log.info("success?>>>?{}",?result.getRecordMetadata().topic());?//?success?>>>?hello2
????????}catch?(Throwable?e){
????????????e.printStackTrace();
????????}
????????return?"hello2";
????}
/**
?????*?获取通知结果
?????*?@return
?????*/
????@GetMapping("/hello2")
????public?String?hello2()?{
????????//?发送消息?-?异步获取通知结果
????????kafkaTemplate.send("hello2",?"async?hello2").addCallback(new?ListenableFutureCallback<SendResult<Object,?Object>>()?{
????????????@Override
????????????public?void?onFailure(Throwable?throwable)?{
????????????????log.error("fail?>>>>{}",?throwable.getMessage());
????????????}
????????????@Override
????????????public?void?onSuccess(SendResult<Object,?Object>?objectObjectSendResult)?{
????????????????log.info("async?success?>>>?{}",?objectObjectSendResult.getRecordMetadata().topic());?//?async?success?>>>?hello2
????????????}
????????});
????????return?"hello2";
????}
同样的,消息也会存在事务
,如果第一条消息发送成功,再发第二条消息的时候出现异常,那么就会抛出异常并回滚第一条消息,下面通过一个简单的例子体会一下
@GetMapping("/hello3")
public?String?hello3()?{
????kafkaTemplate.executeInTransaction(t?->?{
????????t.send("hello3","msg1");
????????if(true)
????????????throw?new?RuntimeException("failed");
????????t.send("hello3","msg2");
????????return?true;
????});
????return?"hello3";
}
//?接收消息
@KafkaListener(id?=?"hello3Group",?topics?=?"hello3")
public?void?listen3(String?msg)?{
????log.info("hello3?receive?value:?{}"?,?msg);
}
默认情况下,Spring-kafka
自动生成的KafkaTemplate
实例,是不具有事务消息发送能力的。我们需要添加transaction-id-prefix
来激活它
spring:
??kafka:
????producer:
??????bootstrap-servers:?127.0.0.1:9092
??????transaction-id-prefix:?kafka_.
启动之后,观察控制台的变化~ ,除此之外,还可以使用注解的方式@Transactional
来开启事务
//?注解方式
????@Transactional(rollbackFor?=?RuntimeException.class)
????@GetMapping("/hello4")
????public?String?hello4()?{
????????kafkaTemplate.send("hello3","msg1");
????????if(true)
????????????throw?new?RuntimeException("failed");
????????kafkaTemplate.send("hello3","msg2");
????????return?"hello4";
????}