1.SpringBoot整合RabbitMQ并实现消息发送与接收
2. 解析JSON格式参数 & 修改对象的key
3. VUE整合Echarts实现简单的数据可视化
4. List<HashMap<String,String>>实现自定义字符串排序(key排序、Value排序)
5. 使用AOP切面实现日志记录功能
6. SpringBoot整合RabbitMQ中交换机的使用(完成消息的发送和接收案例)
更多该系列文章可以看我主页哦
??????相信大家点开这篇文章,说明对于消息队列这方面还是较感兴趣。前两篇消息队列博文介绍了RabbitMQ的使用,包括消息队列是什么,为什么使用消息队列以及消息队列的优点。今天我们接着聊聊另一种消息队列:MQTT(Message Queuing Telemetry Transport) 是一种轻量级的通信协议,设计初衷是为了连接远程设备和传感器,以实现物联网(IoT)应用。MQTT 协议基于发布/订阅模式,使用 TCP/IP 协议进行通讯,具有低带宽、低开销、易扩展等特点,因此在物联网领域得到广泛应用。
??????在 MQTT 中,设备可以发布消息到一个特定的主题(topic),也可以订阅一个或多个主题以接收消息。这种灵活的发布/订阅机制使得 MQTT 协议非常适合于大规模的设备通信,而且能够有效地减少网络流量和能耗。
??????MQTT(Message Queuing Telemetry Transport)是一种轻量级的通讯协议,它是为了解决物联网应用中的低带宽、不稳定的网络环境以及物联网设备数量庞大等问题而设计的。MQTT 协议采用发布/订阅机制,使用 TCP/IP 协议进行通信,具有以下优点:
由于其轻量级、灵活性高、可靠性高等特点,MQTT 在物联网领域得到了广泛应用,已成为物联网设备之间通讯的首选协议。
以下是与 MQTT 相关的一些重要知识点的介绍:注:如果只是入门,重点理解发布/订阅模式、主题(Topic)即可
以上是一些与 MQTT 相关的重要知识点,有助于理解和应用 MQTT 协议在物联网领域中的作用和特点。
注:如果使用java去运行,记得导mqtt的jar包,自行去mqtt官网下载相关版本jar包。如果是javaee,自行导相关依赖。
下述的参数自行更具自己要发送的终端配置相关的信息。发送数据的结构如下述代码:
String ip = "";//发送终端的ip
String macAddresses = "";//mac地址
String brokerUrl = "tcp://{ip}:12345";//拼接的发送终端的url
String topicPrefix = "/xxx/mac/";//订阅的主题
String clientId = "11111";//终端id
try {
//主题
String topic = topicPrefix + macAddresses;
System.out.println("topic=" + topic);
String mqttBrokerUrl = brokerUrl.replace("{ip}", ip);
System.out.println("url=" + mqttBrokerUrl);
// 创建 MQTT 客户端实例
MqttClient client = new MqttClient(mqttBrokerUrl, clientId);
MqttConnectOptions options = new MqttConnectOptions();
// 设置用户名和密码
options.setUserName("admin");
options.setPassword("password".toCharArray());
// 设置连接超时时间为30秒
// options.setConnectionTimeout(30);
// 连接到 MQTT 代理服务器
System.out.println("开始连接服务器");
client.connect(options);
System.out.println("连接成功");
// 发布会议信息到指定主题
String message = "心态还需努力呀~";
System.out.println("发布消息");
MqttMessage mqttMessage = new MqttMessage(message.getBytes("UTF-8"));//记得转UTF-8格式,防止中文乱码
client.publish(topic, mqttMessage.getPayload(), 0, false);
// 断开 MQTT 连接
System.out.println("断开连接");
client.disconnect();
client.close();
} catch (Exception e) {
e.printStackTrace();
}
订阅哪个主题,去接收订阅主题的消息。里面有些可能会运用到的场景,像接收失败、接收后如何处理等,这个更具实际业务去进行编写。代码如下所示:
try {
String broker = "tcp://mqtt.eclipse.org:1883"; // MQTT broker的地址和端口
String clientId = "JavaSample"; // 客户端ID
MemoryPersistence persistence = new MemoryPersistence(); // 使用内存存储客户端的状态
MqttClient sampleClient = new MqttClient(broker, clientId, persistence); // 创建客户端
MqttConnectOptions connOpts = new MqttConnectOptions(); // 创建连接选项
connOpts.setCleanSession(true); // 设置是否清除会话,如果为true,将不会保存客户端的订阅信息和QoS消息。
System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts); // 连接MQTT broker
System.out.println("Connected");
String topic = "test/topic"; // 需要订阅的主题
sampleClient.setCallback(new MqttCallback() { // 设置回调函数,当有消息到达时,会调用该函数
public void connectionLost(Throwable cause) {} // 当连接丢失时调用
public void messageArrived(String topic, MqttMessage message) throws Exception { // 当消息到达时调用
System.out.println("Message received:\n\t" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {} // 当消息发送完成时调用
});
sampleClient.subscribe(topic); // 订阅主题
Thread.sleep(10000); // 等待10秒,让订阅的消息到达。在实际应用中,可能需要更复杂的方法来处理长时间运行的应用程序。
sampleClient.disconnect(); // 断开连接
} catch (Exception e) {
e.printStackTrace();
}
周期发送:就是在每隔一段时间发送数据到终端,这里我每隔5秒发送数据的写法如下代码所示:
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
//将2.1的内容写进来
}
};
// 每隔5秒执行一次任务
timer.schedule(task, 0, 5000);
??????到这里我相信大家已经对MQTT消息队列有了一定的理解,并且能够简单的使用MQTT实现消息发送、订阅,对于如何周期发送,也有了学会如何使用。理解过RabbitMQ的应该理解MQTT会很容易,它们的思想都是一样的,都是消息队列。其实理解一个消息队列,其余的消息队列再使用时简单查询下就可以轻松上手。这个MQTT的学习也是在工作中运用到的时候我才开始查询学习,对于实现功能来说也是比较简单的发送、接收。所以也就偶然间接触到了MQTT,和大家分享出来一起学习!今后大家在项目上如果遇到这样的接口文档发送/接收数据,相信会高效完成任务!!!