RocketMq直接上手(火箭班)

发布时间:2024年01月10日

Apache RocketMQ官方文档:https://rocketmq.apache.org/zh/docs/bestPractice/06FAQ/,这里面涵盖了所有的基本知识、各种搭建环境、基础代码测试…还有各种问题总结,很值得自主学习。

在这里插入图片描述

1.配置依赖:pom.xml文件

可以只截取maven仓库,找合适的版本:https://mvnrepository.com/search?q=rocketMQ

<!-- rocketmq -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

2.配置application.yaml

server:
  port: 8866
  
rocketmq:
  #nameservice服务器地址(多个以英文逗号隔开)
  name-server: ip:port
  #生产者配置
  producer:
    #组名
    group: sentry-producer-group

3.生产者

package com.demo.mq;

import com.demo.api.Result;
import com.demo.entity.DeviceParameter;
import com.demo.service.IDeviceParameterService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
@RestController
@RequestMapping(value = "/msgSender")
public class MsgSender {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 主题
    private static final String TOPIC = "mq_text_topic";
    
    /**
     * 
     * @param jsonObject,消费的数据
     * @param req
     * @return
     */
    @PostMapping(value = "/sendMsg")
    public Result<?> sendMsg(@RequestBody JSONObject jsonObject,
                             HttpServletRequest req){
        rocketMQTemplate.asyncSend(TOPIC,
                jsonObject,
                new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    // 当消息消费成功后,会回调该方法
                    log.info("send successful");
                }

                @Override
                public void onException(Throwable throwable) {
                   // 当消息消费失败后,会回调该方法
                   log.info("send fail; {}", throwable.getMessage());
                }
        });
        return Result.OK("发送参数");
    }

}

4.消费消息

package com.demo.mq;

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

// 注意:消费者消费的主题,需要和生产者设置的主题相同;消费者组,不能重复!!!
@Component
@RocketMQMessageListener(topic = "mq_text_topic", consumerGroup = "${spring.application.name}-group",
        messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class MsgConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("-------消费者接收到rocketmq消息:" + message);
    }
}

欢迎关注微信公众号:小红的成长日记,一起学Java!

在这里插入图片描述

文章来源:https://blog.csdn.net/weixin_46462532/article/details/135512977
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。