使用MQTT实现普通/周期发布消息

发布时间:2023年12月20日

系列文章目录

1.SpringBoot整合RabbitMQ并实现消息发送与接收
2. 解析JSON格式参数 & 修改对象的key
3. VUE整合Echarts实现简单的数据可视化
4. List<HashMap<String,String>>实现自定义字符串排序(key排序、Value排序)
5. 使用AOP切面实现日志记录功能
6. SpringBoot整合RabbitMQ中交换机的使用(完成消息的发送和接收案例)

更多该系列文章可以看我主页哦



前言

??????相信大家点开这篇文章,说明对于消息队列这方面还是较感兴趣。前两篇消息队列博文介绍了RabbitMQ的使用,包括消息队列是什么,为什么使用消息队列以及消息队列的优点。今天我们接着聊聊另一种消息队列:MQTT(Message Queuing Telemetry Transport) 是一种轻量级的通信协议,设计初衷是为了连接远程设备和传感器,以实现物联网(IoT)应用。MQTT 协议基于发布/订阅模式,使用 TCP/IP 协议进行通讯,具有低带宽、低开销、易扩展等特点,因此在物联网领域得到广泛应用。
??????在 MQTT 中,设备可以发布消息到一个特定的主题(topic),也可以订阅一个或多个主题以接收消息。这种灵活的发布/订阅机制使得 MQTT 协议非常适合于大规模的设备通信,而且能够有效地减少网络流量和能耗。


一、MQTT的简介及优点

1.1 优点

??????MQTT(Message Queuing Telemetry Transport)是一种轻量级的通讯协议,它是为了解决物联网应用中的低带宽、不稳定的网络环境以及物联网设备数量庞大等问题而设计的。MQTT 协议采用发布/订阅机制,使用 TCP/IP 协议进行通信,具有以下优点:

  • 轻量级:MQTT 协议非常轻量,它的通讯报文只有少量的字节,使得其在低带宽、不稳定的网络环境中也能正常工作,同时也减少了通讯成本。
  • 灵活性高:MQTT 提供了灵活的发布/订阅机制,支持多个客户端同时订阅同一个主题,也可以支持一个客户端订阅多个主题,从而适应不同的应用场景。
  • 可靠性高:MQTT 支持 QoS(Quality of Service)服务质量等级,可以根据应用需求选择适当的 QoS 等级,保证消息传输的可靠性。
  • 易于开发和扩展:MQTT 的协议规范非常简单,易于开发者理解和实现,同时也支持插件式的扩展,可以根据需要添加新的功能。
  • 跨平台支持:MQTT 支持多种编程语言和操作系统平台,使得开发者在不同的环境中都能使用 MQTT 进行通讯。

由于其轻量级、灵活性高、可靠性高等特点,MQTT 在物联网领域得到了广泛应用,已成为物联网设备之间通讯的首选协议。

1.2 简介

以下是与 MQTT 相关的一些重要知识点的介绍:注:如果只是入门,重点理解发布/订阅模式、主题(Topic)即可

  • 发布/订阅模式:MQTT 使用发布/订阅模式,其中发布者(publisher)将消息发布到特定的主题(topic),而订阅者(subscriber)则可以订阅这些主题以接收消息。这种模式使得设备之间的通讯变得灵活和高效。
  • QoS(Quality of Service)等级:MQTT 支持三种不同的 QoS 等级,分别是 0、1 和 2。这些等级允许发送者和接收者在消息传递过程中选择适当的服务质量,以确保消息的可靠性和传递顺序。
  • 主题(Topic):在 MQTT 中,主题用于对消息进行分类和过滤。发布者发布消息到一个或多个主题,而订阅者可以选择订阅感兴趣的主题,从而只接收他们关心的消息。
  • 保留消息(Retained Messages):MQTT 支持保留消息,这意味着发布者可以发布一个保留消息到主题,而订阅者在订阅该主题时会立即收到最新的保留消息,无论它们何时连接到代理服务器。
  • 遗嘱消息(Last Will and Testament):MQTT 允许客户端指定遗嘱消息,即在客户端异常断开连接时,代理服务器可以发布预定义的遗嘱消息到指定的主题,通知其他订阅者该客户端的离线状态。
  • 持久会话(Persistent Session):MQTT 支持持久会话,允许客户端在重新连接后接收之前未接收的消息,并且保持其订阅状态。
  • 安全机制:MQTT 支持基于用户名密码的认证机制,同时也可以通过 TLS/SSL 加密通信数据,确保通讯的安全性。

以上是一些与 MQTT 相关的重要知识点,有助于理解和应用 MQTT 协议在物联网领域中的作用和特点。

二、使用MQTT实现消息发送和接收

2.1 发送数据

注:如果使用java去运行,记得导mqtt的jar包,自行去mqtt官网下载相关版本jar包。如果是javaee,自行导相关依赖。
下述的参数自行更具自己要发送的终端配置相关的信息。发送数据的结构如下述代码:

String ip = "";//发送终端的ip
String macAddresses = "";//mac地址
String brokerUrl = "tcp://{ip}:12345";//拼接的发送终端的url
String topicPrefix = "/xxx/mac/";//订阅的主题
String clientId = "11111";//终端id

try {
        //主题
        String topic = topicPrefix + macAddresses;

        System.out.println("topic=" + topic);
        String mqttBrokerUrl = brokerUrl.replace("{ip}", ip);
        System.out.println("url=" + mqttBrokerUrl);
        // 创建 MQTT 客户端实例
        MqttClient client = new MqttClient(mqttBrokerUrl, clientId);
        MqttConnectOptions options = new MqttConnectOptions();

        // 设置用户名和密码
        options.setUserName("admin");
        options.setPassword("password".toCharArray());

        // 设置连接超时时间为30秒
//                options.setConnectionTimeout(30);

        // 连接到 MQTT 代理服务器
        System.out.println("开始连接服务器");
        client.connect(options);
        System.out.println("连接成功");
        // 发布会议信息到指定主题
        String message = "心态还需努力呀~";

        System.out.println("发布消息");
        MqttMessage mqttMessage = new MqttMessage(message.getBytes("UTF-8"));//记得转UTF-8格式,防止中文乱码
        client.publish(topic, mqttMessage.getPayload(), 0, false);

        // 断开 MQTT 连接
        System.out.println("断开连接");
        client.disconnect();
        client.close();

} catch (Exception e) {
    e.printStackTrace();
}

2.2 订阅数据

订阅哪个主题,去接收订阅主题的消息。里面有些可能会运用到的场景,像接收失败、接收后如何处理等,这个更具实际业务去进行编写。代码如下所示:

 try {
        String broker = "tcp://mqtt.eclipse.org:1883";  // MQTT broker的地址和端口  
        String clientId = "JavaSample";  // 客户端ID  
        MemoryPersistence persistence = new MemoryPersistence();  // 使用内存存储客户端的状态  
        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);  // 创建客户端  
        MqttConnectOptions connOpts = new MqttConnectOptions();  // 创建连接选项  
        connOpts.setCleanSession(true);  // 设置是否清除会话,如果为true,将不会保存客户端的订阅信息和QoS消息。  
        System.out.println("Connecting to broker: " + broker);
        sampleClient.connect(connOpts);  // 连接MQTT broker  
        System.out.println("Connected");
        String topic = "test/topic";  // 需要订阅的主题  
        sampleClient.setCallback(new MqttCallback() {  // 设置回调函数,当有消息到达时,会调用该函数  
            public void connectionLost(Throwable cause) {}  // 当连接丢失时调用  
            public void messageArrived(String topic, MqttMessage message) throws Exception {  // 当消息到达时调用  
                System.out.println("Message received:\n\t" + new String(message.getPayload()));
            }
            public void deliveryComplete(IMqttDeliveryToken token) {}  // 当消息发送完成时调用  
        });
        sampleClient.subscribe(topic);  // 订阅主题  
        Thread.sleep(10000);  // 等待10秒,让订阅的消息到达。在实际应用中,可能需要更复杂的方法来处理长时间运行的应用程序。  
        sampleClient.disconnect();  // 断开连接  
    } catch (Exception e) {
        e.printStackTrace();
    } 

2.3 周期发送

周期发送:就是在每隔一段时间发送数据到终端,这里我每隔5秒发送数据的写法如下代码所示:

Timer timer = new Timer();
TimerTask task = new TimerTask() {
    @Override
    public void run() {
        //将2.1的内容写进来
    }
};

// 每隔5秒执行一次任务
timer.schedule(task, 0, 5000);

总结

??????到这里我相信大家已经对MQTT消息队列有了一定的理解,并且能够简单的使用MQTT实现消息发送、订阅,对于如何周期发送,也有了学会如何使用。理解过RabbitMQ的应该理解MQTT会很容易,它们的思想都是一样的,都是消息队列。其实理解一个消息队列,其余的消息队列再使用时简单查询下就可以轻松上手。这个MQTT的学习也是在工作中运用到的时候我才开始查询学习,对于实现功能来说也是比较简单的发送、接收。所以也就偶然间接触到了MQTT,和大家分享出来一起学习!今后大家在项目上如果遇到这样的接口文档发送/接收数据,相信会高效完成任务!!!

文章来源:https://blog.csdn.net/weixin_52258054/article/details/135101550
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。