EMQX是大规模分布式MQTT消息服务器,可以高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网与云应用。EMQX 作为物联网应用开发和物联网平台搭建必须用到的基础设施软件,主要在边缘和云端实现物联网设备互联与设备上云,提供物联网设备接入、协议处理、消息路由、数据存储、流数据处理等核心能力。
访问官网下载安装包:下载 EMQX
解压zip文件得到软件目录
运行EMQX,打开cmd命令窗口,进入软件bin目录,输入emqx start
命令启动软件
登录emqx控制台,访问http://127.0.0.1:18083/,默认用户名、密码是admin、public。
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class App {
public static void main(String[] args) {
String subTopic = "testtopic/#";
String pubTopic = "testtopic/1";
String content = "Hello World";
int qos = 2;
String broker = "tcp://127.0.0.1:1883";
String clientId = "emqx_test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("用户名");
connOpts.setPassword("密码".toCharArray());
// 保留会话
connOpts.setCleanSession(true);
MqttCallback callback = new OnMessageCallback();
// 设置回调
client.setCallback(callback);
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
// 订阅主题
client.subscribe(subTopic);
// 消息发布所需参数
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// 发布消息
client.publish(pubTopic, message);
System.out.println("Message published");
// client.disconnect();
// System.out.println("Disconnected");
// client.close();
// System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
<!--mqtt-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.1</version>
</dependency>
<!-- fastJSON -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
/**
* 发布的bean名称
*/
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
/**
* 客户端与服务器之间的连接意外中断,服务器将发布客户端的"遗嘱"消息
*/
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
private static final String username = "admin";
private static final String password = "DCDremote@997";
private static final String url = "tcp://127.0.0.1:1883";
private static final String clientId = "honeywell-server1";
private static final String defaultTopic = "default";
// @Value("${mqtt.username}")
// private String username;
//
// @Value("${mqtt.password}")
// private String password;
//
// @Value("${mqtt.url}")
// private String url;
//
// @Value("${mqtt.sender.clientId}")
// private String clientId;
//
// @Value("${mqtt.sender.topic}")
// private String defaultTopic;
@Bean
public MqttConnectOptions getMqttConnectOption(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{url});
mqttConnectOptions.setKeepAliveInterval(30);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientsFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOption());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientsFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultQos(1);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
package org.jianying.emqxstudy.mqtt;
import com.alibaba.fastjson.JSON;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@Configuration
public class MqttReceiverConfig {
final static Logger logger = LoggerFactory.getLogger(MqttReceiverConfig.class);
/**
* 订阅的bean名称
*/
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
private static final String username = "admin";
private static final String password = "DCDremote@997";
private static final String url = "tcp://127.0.0.1:1883";
// 接收消息的客户端id
private static final String clientId = "test-server";
// 接收的消息主题, $SYS/brokers 表示发送的是系统主题
private static final String defaultTopic = "$SYS/brokers/+/clients/#,hello/info/faceid/#,hello/server/result/#,info_topic";
// @Value("${mqtt.username}")
// private String username;
//
// @Value("${mqtt.password}")
// private String password;
//
// @Value("${mqtt.url}")
// private String url;
//
// @Value("${mqtt.receiver.clientId}")
// private String clientId;
//
// @Value("${mqtt.receiver.topic}")
// private String defaultTopic;
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{url});
mqttConnectOptions.setKeepAliveInterval(60);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
String[] topics = new String[topicList.size()];
topicList.toArray(topics);
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),
topics);
adapter.setCompletionTimeout(10000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
logger.info(("收到消息" + message.getHeaders().get("mqtt_receivedTopic") + message.getPayload()));
// 主题
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// 消息体
Map maps = (Map) JSON.parse(message.getPayload().toString().trim());
// 判断设备状态
if (topic.contains("$SYS/brokers") && !topic.contains("faceid-server") && !topic.contains("faceid-mqtt-server")) {
if (maps.get("clientid").toString().contains("uniwin-mqtt-client")) {
}
} else if (topic.contains("uniwin/server/result/faceid")) { //结果返回
if (maps.get("type") != null && !maps.get("type").equals("")) {
}
} else {
System.out.println("info...");
if (maps.get("type") != null && !maps.get("type").equals("")) {
String type = maps.get("type").toString();
// 设备心跳检测
if (type.equals("heart")) {
}
// 上传打卡记录
if (type.equals("note")) {
}
// 上传设备参数
if (type.equals("param_upload")) {
}
}
}
}
};
}
}
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送信息到MQTT服务器
*
* @param data 发送的文本
*/
void sendToMqtt(String data);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
String payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}