MQTT基于发布/订阅范式的消息协议,工作在TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅消息协议,是一个基于客户端-服务端的消息发布/订阅传输协议。
npm install mqtt --save
# or
yarn add mqtt
主要包含连接服务、订阅主题、取消订阅主题、给主题发布消息、断开连接服务
另外还有几个监听事件:监听连接、监听重连、监听连接错误、**监听订阅主题的消息**
设置客户端ID、用户名及密码,客户端ID应当具有唯一性
const clientId = 'mqtt_client_' + Math.random().toString(16).substring(2, 8);
const clientId = 'mqtt_client_' + new Date().getTime();
const client = mqtt.connect('ws://domainName:port/mqtt', {
clientId,
username,
password,
// ...other options
});
这种方式与2.1方式的区别:建立连接时的协议
const client = mqtt.connect('mqtt://domainName:port', {
clientId,
username,
password,
// ...other options
});
handleSubscribe = (topic, qos) => {
if (client) {
// subscribe topic
client.subscribe(topic, { qos }, (error) => {
if (error) {
console.log('Subscribe to topics error', error)
return
}
console.log(`Subscribe to topics: ${topic}`)
})
}
}
handleUnsub = (topic, qos) => {
if (client) {
client.unsubscribe(topic, { qos }, (error) => {
if (error) {
console.log('Unsubscribe error', error)
return
}
console.log(`unsubscribed topic: ${topic}`)
})
}
}
handlePublish = (pubRecord) => {
if (client) {
const { topic, qos, payload } = pubRecord
client.publish(topic, payload, { qos }, (error) => {
if (error) {
console.log('Publish error: ', error)
}
})
}
}
handleDisconnect = () => {
if (client) {
try {
client.end(false, () => {
console.log('disconnected successfully')
})
} catch (error) {
console.log('disconnect error:', error)
}
}
}
import React from 'react'
import mqtt from 'mqtt'
class ClassMqtt extends React.Component {
constructor(props) {
super(props)
this.state = {
client: null,
connectStatus: 'Connect',
isSubed: false,
messages: [],
}
}
handleConnect = (host, mqttOptions) => {
this.setState({ connectStatus: 'Connecting' })
this.client = mqtt.connect(host, mqttOptions)
if (this.client) {
this.client.on('connect', () => {
this.setState({ connectStatus: 'Connected' })
console.log('connection successful')
})
this.client.on('error', (err) => {
console.error('Connection error: ', err)
this.client.end()
})
this.client.on('reconnect', () => {
this.setState({ connectStatus: 'Reconnecting' })
})
this.client.on('message', (topic, message) => {
const payload = { topic, message: message.toString() }
const { messages } = this.state
if (payload.topic) {
const changedMessages = messages.concat([payload])
this.setState({ messages: changedMessages })
}
console.log(`received message: ${message} from topic: ${topic}`)
})
}
}
handleSubscribe = (topic, qos) => {
if (this.client) {
// subscribe topic
this.client.subscribe(topic, { qos }, (error) => {
if (error) {
console.log('Subscribe to topics error', error)
return
}
console.log(`Subscribe to topics: ${topic}`)
this.setState({ isSubed: true })
})
}
}
// unsubscribe topic
handleUnsub = (topic, qos) => {
if (this.client) {
this.client.unsubscribe(topic, { qos }, (error) => {
if (error) {
console.log('Unsubscribe error', error)
return
}
console.log(`unsubscribed topic: ${topic}`)
this.setState({ isSubed: false })
})
}
}
// publish message
handlePublish = (pubRecord) => {
if (this.client) {
const { topic, qos, payload } = pubRecord
this.client.publish(topic, payload, { qos }, (error) => {
if (error) {
console.log('Publish error: ', error)
}
})
}
}
// disconnect
handleDisconnect = () => {
if (this.client) {
try {
this.client.end(false, () => {
this.setState({ connectStatus: 'Connect' })
this.setState({ client: null })
console.log('disconnected successfully')
})
} catch (error) {
this.setState({ connectStatus: 'Connect' })
console.log('disconnect error:', error)
}
}
}
render() {
return (
{/* jsx代码 */}
)
}
}
export default ClassMqtt