以下是一个简单的MQTT连接库文件,其中包含了连接、断开、订阅主题、发送数据和接收数据等函数。请注意,这只是一个示例,你可能需要根据自己的实际需求进行修改。
#include <iostream>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
const std::string SERVER_ADDRESS = "mqtt.server.com";
const int SERVER_PORT = 1883;
// MQTT固定报头结构
struct MqttFixedHeader {
uint8_t controlPacketType;
uint8_t remainingLength;
};
// MQTT连接报文结构
struct MqttConnectPacket {
MqttFixedHeader fixedHeader;
uint8_t variableHeader[10];
uint8_t payload[20];
};
// MQTT订阅报文结构
struct MqttSubscribePacket {
MqttFixedHeader fixedHeader;
uint16_t packetIdentifier;
uint8_t topic[20];
uint8_t qos;
};
// MQTT发布报文结构
struct MqttPublishPacket {
MqttFixedHeader fixedHeader;
uint16_t topicLength;
uint8_t topic[20];
uint8_t payload[100];
};
class MqttClient {
public:
MqttClient() : sockfd(-1), connected(false) {}
~MqttClient() {
if (connected) {
disconnect();
}
}
bool connect(const std::string& clientId) {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
std::cerr << "Failed to create socket" << std::endl;
return false;
}
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(SERVER_PORT);
if (inet_pton(AF_INET, SERVER_ADDRESS.c_str(), &(serv_addr.sin_addr)) <= 0) {
std::cerr << "Failed to set server address" << std::endl;
return false;
}
if (connect(sockfd, reinterpret_cast<struct sockaddr*>(&serv_addr), sizeof(serv_addr)) < 0) {
std::cerr << "Connection failed" << std::endl;
return false;
}
// 构建MQTT连接报文
MqttConnectPacket connectPacket;
connectPacket.fixedHeader.controlPacketType = 0x10; // 连接请求
connectPacket.fixedHeader.remainingLength = 0x0C; // 可变报头长度为12字节
memcpy(connectPacket.variableHeader, "MQTT", 4);
connectPacket.variableHeader[4] = 0x04; // MQTT协议版本号(4)
connectPacket.variableHeader[5] = 0x02; // 连接标志
connectPacket.variableHeader[6] = 0x00; // 保持连接时间的最高8位
connectPacket.variableHeader[7] = 0x3C; // 保持连接时间的最低8位
connectPacket.variableHeader[8] = 0x00; // 清理会话位为0
connectPacket.variableHeader[9] = 0x00; // 预留位为0
memcpy(connectPacket.payload, clientId.c_str(), clientId.length());
// 发送MQTT连接报文
if (send(sockfd, &connectPacket, sizeof(connectPacket), 0) < 0) {
std::cerr << "Failed to send connect packet" << std::endl;
return false;
}
// 接收MQTT服务器的响应
uint8_t response[1024];
ssize_t bytesRead = recv(sockfd, response, sizeof(response), 0);
if (bytesRead <= 0) {
std::cerr << "Failed to receive response" << std::endl;
return false;
}
// 处理MQTT服务器的响应
connected = true;
return true;
}
void disconnect() {
if (connected) {
close(sockfd);
sockfd = -1;
connected = false;
}
}
bool subscribe(const std::string& topic, uint8_t qos) {
if (!connected) {
std::cerr << "Not connected" << std::endl;
return false;
}
// 构建MQTT订阅报文
MqttSubscribePacket subscribePacket;
subscribePacket.fixedHeader.controlPacketType = 0x82; // 订阅请求
subscribePacket.fixedHeader.remainingLength = 0x0E; // 可变报头长度为14字节
subscribePacket.packetIdentifier = 0x1234; // 包标识符
memcpy(subscribePacket.topic, topic.c_str(), topic.length());
subscribePacket.qos = qos; // QoS等级
// 发送MQTT订阅报文
if (send(sockfd, &subscribePacket, sizeof(subscribePacket), 0) < 0) {
std::cerr << "Failed to send subscribe packet" << std::endl;
return false;
}
// 接收MQTT服务器的响应
uint8_t response[1024];
ssize_t bytesRead = recv(sockfd, response, sizeof(response), 0);
if (bytesRead <= 0) {
std::cerr << "Failed to receive response" << std::endl;
return false;
}
// 处理MQTT服务器的响应
return true;
}
bool publish(const std::string& topic, const std::string& message, uint8_t qos) {
if (!connected) {
std::cerr << "Not connected" << std::endl;
return false;
}
// 构建MQTT发布报文
MqttPublishPacket publishPacket;
publishPacket.fixedHeader.controlPacketType = 0x30; // 发布消息
publishPacket.fixedHeader.remainingLength = 0x2D; // 可变报头长度为45字节
publishPacket.topicLength = topic.length();
memcpy(publishPacket.topic, topic.c_str(), topic.length());
memcpy(publishPacket.payload, message.c_str(), message.length());
// 发送MQTT发布报文
if (send(sockfd, &publishPacket, sizeof(publishPacket), 0) < 0) {
std::cerr << "Failed to send publish packet" << std::endl;
return false;
}
// 处理MQTT服务器的响应
return true;
}
ssize_t receive(uint8_t* buffer, size_t bufferSize) {
if (!connected) {
std::cerr << "Not connected" << std::endl;
return -1;
}
return recv(sockfd, buffer, bufferSize, 0);
}
private:
int sockfd;
bool connected;
};
在上述代码中,我们将MQTT连接功能封装到了一个名为MqttClient
的类中,并提供了连接、断开、订阅主题、发送数据和接收数据等函数。你可以根据自己的实际需求调用这些函数。
例如,要连接到MQTT服务器,请使用以下代码:
MqttClient client;
if (client.connect("client_id")) {
// 连接成功
} else {
// 连接失败
}
要订阅主题,请使用以下代码:
if (client.subscribe("topic", 0x01)) {
// 订阅成功
} else {
// 订阅失败
}
要发布消息,请使用以下代码:
if (client.publish("topic", "message", 0x01)) {
// 发布成功
} else {
// 发布失败
}
要接收消息,请使用以下代码:
uint8_t buffer[1024];
ssize_t bytesRead = client.receive(buffer, sizeof(buffer));
if (bytesRead >= 0) {
// 处理接收到的数据
} else {
// 接收数据失败
}
下面是一个完整的MQTT连接示例,包括从用户输入地址和端口到订阅主题和发送消息的全部过程:
#include <iostream>
#include <string>
// 导入上述的MQTT连接库文件
int main() {
std::string serverAddress;
int serverPort;
std::string clientId;
std::string topic;
// 获取用户输入的MQTT服务器地址和端口
std::cout << "Enter MQTT server address: ";
std::cin >> serverAddress;
std::cout << "Enter MQTT server port: ";
std::cin >> serverPort;
std::cout << "Enter client ID: ";
std::cin >> clientId;
MqttClient client;
// 连接到MQTT服务器
if (client.connect(clientId)) {
std::cout << "Connected to MQTT server" << std::endl;
// 订阅主题
std::cout << "Enter topic to subscribe: ";
std::cin >> topic;
if (client.subscribe(topic, 0x01)) {
std::cout << "Subscribed to topic: " << topic << std::endl;
} else {
std::cerr << "Failed to subscribe to topic" << std::endl;
return -1;
}
// 发布消息
std::string message;
std::cout << "Enter message to publish: ";
std::cin.ignore(); // 忽略之前的换行符
std::getline(std::cin, message);
if (client.publish(topic, message, 0x01)) {
std::cout << "Published message: " << message << std::endl;
} else {
std::cerr << "Failed to publish message" << std::endl;
return -1;
}
// 接收消息
uint8_t buffer[1024];
ssize_t bytesRead = client.receive(buffer, sizeof(buffer));
if (bytesRead >= 0) {
std::string receivedMessage(reinterpret_cast<char*>(buffer), bytesRead);
std::cout << "Received message: " << receivedMessage << std::endl;
} else {
std::cerr << "Failed to receive message" << std::endl;
return -1;
}
client.disconnect();
std::cout << "Disconnected from MQTT server" << std::endl;
} else {
std::cerr << "Failed to connect to MQTT server" << std::endl;
return -1;
}
return 0;
}
在这个示例中,我们使用std::cin
从用户那里获取了MQTT服务器的地址、端口、客户端ID以及要订阅和发布的主题和消息。然后,我们通过调用相应的函数来进行连接、订阅、发布和接收。