Netty应用(七) ----MQTT编解码器

发布时间:2023年12月19日

0.前言

这里梳理下netty中对mqtt协议的编码和解码的处理。一方面对mqtt协议的结构再巩固些,另一方面就是学习下netty中对字节的处理。对于MQTT协议,可以参考前一篇文章MQTT协议详解

1. MqttEncoder–编码器

1.1 构造方法

对于编码器,构造方法是私有的。我们可以通过其提供的静态常量INSTANCE访问。

    public static final MqttEncoder INSTANCE = new MqttEncoder();

    private MqttEncoder() { }

1.2 encodeConnectMessage – 连接消息

编码器,我们重点看一下doEncode方法。通过固定报头中的消息类型来对不同类型的消息进行特定编码。

    static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) {

        switch (message.fixedHeader().messageType()) {
            case CONNECT:
                return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message);

            case CONNACK:
                return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message);

            case PUBLISH:
                return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message);

            case SUBSCRIBE:
                return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message);

            case UNSUBSCRIBE:
                return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message);

            case SUBACK:
                return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message);

            case UNSUBACK:
            case PUBACK:
            case PUBREC:
            case PUBREL:
            case PUBCOMP:
                return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message);

            case PINGREQ:
            case PINGRESP:
            case DISCONNECT:
                return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message);

            default:
                throw new IllegalArgumentException(
                        "Unknown message type: " + message.fixedHeader().messageType().value());
        }
    }

对于消息的编码,我们可以对照着协议来看,这样会更清晰。
固定报头
在这里插入图片描述
可变报头
在这里插入图片描述
encodeConnectMessage方法

    private static ByteBuf encodeConnectMessage(
            ByteBufAllocator byteBufAllocator,
            MqttConnectMessage message) {
        // 1. 有效载荷初始大小设为0字节
        int payloadBufferSize = 0;

        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttConnectVariableHeader variableHeader = message.variableHeader();
        MqttConnectPayload payload = message.payload();
        // 2.public enum MqttVersion {
        //    MQTT_3_1("MQIsdp", (byte) 3),
        //    MQTT_3_1_1("MQTT", (byte) 4);
        // }  对mqtt版本做校验,枚举中只有两种,如果名称和版本不匹配,则报错
        MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
                (byte) variableHeader.version());

        // 3.如果可变报头中,没有用户名称但是有用户密码,则报错
        if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
            throw new DecoderException("Without a username, the password MUST be not set");
        }

        // 4. 校验有效载荷中的clientId,版本MQTT_3_1中clinetId编码字节长度必须为1到23,
        // 在v3.1.1中允许超过23字节或者长度为0的clientId,所以不为null即可。
        String clientIdentifier = payload.clientIdentifier();
        if (!isValidClientId(mqttVersion, clientIdentifier)) {
            throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
        }
        byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);
        // 4.1 connect消息的有效载荷,如果都包含的话必须按这个顺序出现:客户端标识符,遗嘱主题,遗嘱消息,用户名,密码
        // 而这几部分的结构,都是由一个两字节的长度和对应的载荷消息的组成,所以这里都是 2字节 + 消息的字节长度
        payloadBufferSize += 2 + clientIdentifierBytes.length;

        // 5.校验有效载荷中的遗嘱主题和遗嘱消息,如果可变报头中的遗嘱标志为1,则表示这两项存在
        String willTopic = payload.willTopic();
        byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EmptyArrays.EMPTY_BYTES;
        byte[] willMessage = payload.willMessageInBytes();
        byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
        if (variableHeader.isWillFlag()) {
            payloadBufferSize += 2 + willTopicBytes.length;
            payloadBufferSize += 2 + willMessageBytes.length;
        }

        // 6.校验有效载荷中的用户和密码,如果可变报头中的用户名/密码标识为1,则表示用户名/密码存在
        String userName = payload.userName();
        byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EmptyArrays.EMPTY_BYTES;
        if (variableHeader.hasUserName()) {
            payloadBufferSize += 2 + userNameBytes.length;
        }
        byte[] password = payload.passwordInBytes();
        byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
        if (variableHeader.hasPassword()) {
            payloadBufferSize += 2 + passwordBytes.length;
        }

        byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
        // 7. 可变报头的长度 = 2字节的协议名的长度(v3.1为3,v3.1.1值为4) + 4字节的协议名(MQTT) + 1字段协议级别 + 1字节连接标志 + 2字节保持时间
        int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4;
        int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
        // 8.这里计算的是固定头的长度 = 1字节的报文类型(包括类型标志位) + 剩余长度字段,剩余长度=可变报头 + 有效载荷
        // 由于剩余长度字段中每字节的最高位为进制位,所以每个字节表示的最大值为128(0-127),所以(可变报头+有效载荷)/128即剩余长度所占的字字节
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);

        // 9.创建一个这Mqtt消息长度的byteBuf,开始写入消息
        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);

        // 9.1 向byteBuf中写入固定报头中的第一个字节,详细介绍见下段代码
        buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
        // 9.2 向byteBuf中写入固定报头中的剩余长度字段,详细介绍见下段代码
        writeVariableLengthInt(buf, variablePartSize);

        // 9.3 写入两字节的协议名的长度,再写入协议名
        buf.writeShort(protocolNameBytes.length);
        buf.writeBytes(protocolNameBytes);

        // 9.4 1字节协议版本,1字节连接标志,2字节连接时间
        buf.writeByte(variableHeader.version());
        buf.writeByte(getConnVariableHeaderFlag(variableHeader));
        buf.writeShort(variableHeader.keepAliveTimeSeconds());

        // 9.5 最后写入有效载荷
        buf.writeShort(clientIdentifierBytes.length);
        buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);
        if (variableHeader.isWillFlag()) {
            buf.writeShort(willTopicBytes.length);
            buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);
            buf.writeShort(willMessageBytes.length);
            buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
        }
        if (variableHeader.hasUserName()) {
            buf.writeShort(userNameBytes.length);
            buf.writeBytes(userNameBytes, 0, userNameBytes.length);
        }
        if (variableHeader.hasPassword()) {
            buf.writeShort(passwordBytes.length);
            buf.writeBytes(passwordBytes, 0, passwordBytes.length);
        }
        return buf;
    }
    /**
     * 计算固定报头第一字节
     *
     * @param header
     * @return
     */
    private static int getFixedHeaderByte1(MqttFixedHeader header) {
        int ret = 0;
        // 1. 将消息类型左移四位,因为第一字节的0-3位为类型对应的标志位
        ret |= header.messageType().value() << 4;

        // 2. 如果是重发报文,则重发标志为1,第一字节的第3位
        if (header.isDup()) {
            // 采用或运算,将第一字节的第三位这只为1
            ret |= 0x08;
        }
        // 3. Qos等级为第一字节的第1、2位
        ret |= header.qosLevel().value() << 1;

        // 4.如果要保留消息,则保留标志为1,第一字节的第0位
        if (header.isRetain()) {
            ret |= 0x01;
        }
        return ret;
    }

    /**
     * 写入固定报头中的剩余长度字段
     *
     * @param buf
     * @param num
     */
    private static void writeVariableLengthInt(ByteBuf buf, int num) {
        do {
            // 1. %求余,取前一个字节所表示的数值大小,
            // 比如num表示200字节,第一次这里digit表示72
            // 由于第二次,num=1,大于0成立,这里1 % 128 = 1
            int digit = num % 128;
            // 2. 第一除以128,即去除前一个字节,num表示1,第二次等于0了,表示没有字节了
            num /= 128;
            if (num > 0) {
                // 3.如果还有字节,则最高位进制位设置为1
                digit |= 0x80;
            }
            // 4. 第一字节写入72,第二字节写入1
            buf.writeByte(digit);
        } while (num > 0);
    }

	/**
	* 通过或运算计算连接标志
	*/
    private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
       int flagByte = 0;
       if (variableHeader.hasUserName()) {
           flagByte |= 0x80;
       }
       if (variableHeader.hasPassword()) {
           flagByte |= 0x40;
       }
       if (variableHeader.isWillRetain()) {
           flagByte |= 0x20;
       }
       flagByte |= (variableHeader.willQos() & 0x03) << 3;
       if (variableHeader.isWillFlag()) {
           flagByte |= 0x04;
       }
       if (variableHeader.isCleanSession()) {
           flagByte |= 0x02;
       }
       return flagByte;
   }

1.3 encodeConnAckMessage - 确认连接

可变报头
在这里插入图片描述
encodeConnAckMessage

    private static ByteBuf encodeConnAckMessage(
            ByteBufAllocator byteBufAllocator,
            MqttConnAckMessage message) {
        // 一共4个字节,两字节的固定头,两字节的可变头,无载荷
        ByteBuf buf = byteBufAllocator.buffer(4);
        buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
        // 剩余长度字段固定为2,表示可变报头+载荷=2字节
        buf.writeByte(2);
        // 服务端如果保存了会话,则置为1,如果没有保存,则置为0,
        buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
        buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
        return buf;
    }

1.4 encodePublishMessage – 发布消息

固定报头
在这里插入图片描述

可变报头
只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中
在这里插入图片描述
encodePublishMessage

    private static ByteBuf encodePublishMessage(
            ByteBufAllocator byteBufAllocator,
            MqttPublishMessage message) {
        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttPublishVariableHeader variableHeader = message.variableHeader();
        ByteBuf payload = message.payload().duplicate();

        String topicName = variableHeader.topicName();
        byte[] topicNameBytes = encodeStringUtf8(topicName);

        // 1. 可变报头长度 = 2字节长度 + topic长度 + 2字节PacketIdentifier(qos=1或qos=2时存在)
        int variableHeaderBufferSize = 2 + topicNameBytes.length +
                (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);
        int payloadBufferSize = payload.readableBytes();
        int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);

        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
        buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
        writeVariableLengthInt(buf, variablePartSize);
        buf.writeShort(topicNameBytes.length);
        buf.writeBytes(topicNameBytes);
        if (mqttFixedHeader.qosLevel().value() > 0) {
            buf.writeShort(variableHeader.packetId());
        }
        buf.writeBytes(payload);

        return buf;
    }

1.5 encodeSubscribeMessage - 订阅主题

有效载荷
在这里插入图片描述
encodeSubscribeMessage

    private static ByteBuf encodeSubscribeMessage(
            ByteBufAllocator byteBufAllocator,
            MqttSubscribeMessage message) {
        int variableHeaderBufferSize = 2;
        int payloadBufferSize = 0;

        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttMessageIdVariableHeader variableHeader = message.variableHeader();
        MqttSubscribePayload payload = message.payload();

        // 1. 订阅消息的载荷中可以包含多个订阅主题
        for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
            String topicName = topic.topicName();
            byte[] topicNameBytes = encodeStringUtf8(topicName);
            // 2. 每个订阅主题的载荷 = 2字节长度 + topic过滤器的长度 + 1字节的Qos
            payloadBufferSize += 2 + topicNameBytes.length;
            payloadBufferSize += 1;
        }

        int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);

        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
        buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
        writeVariableLengthInt(buf, variablePartSize);

        // 3. 可变头中包含2字节的packageId
        int messageId = variableHeader.messageId();
        buf.writeShort(messageId);

        // Payload
        for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
            String topicName = topic.topicName();
            byte[] topicNameBytes = encodeStringUtf8(topicName);
            buf.writeShort(topicNameBytes.length);
            buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
            buf.writeByte(topic.qualityOfService().value());
        }

        return buf;
    }

1.6 encodeUnsubscribeMessage - 取消订阅

有效载荷示例
在这里插入图片描述
encodeUnsubscribeMessage

    private static ByteBuf encodeUnsubscribeMessage(
            ByteBufAllocator byteBufAllocator,
            MqttUnsubscribeMessage message) {
        int variableHeaderBufferSize = 2;
        int payloadBufferSize = 0;

        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttMessageIdVariableHeader variableHeader = message.variableHeader();
        MqttUnsubscribePayload payload = message.payload();
        
        // 1. 要取消的主题列表,每个主题过滤器 = 2字节长度 + 过滤器长度
        for (String topicName : payload.topics()) {
            byte[] topicNameBytes = encodeStringUtf8(topicName);
            payloadBufferSize += 2 + topicNameBytes.length;
        }

        int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);

        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
        buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
        writeVariableLengthInt(buf, variablePartSize);

        // Variable Header
        int messageId = variableHeader.messageId();
        buf.writeShort(messageId);

        // Payload
        for (String topicName : payload.topics()) {
            byte[] topicNameBytes = encodeStringUtf8(topicName);
            buf.writeShort(topicNameBytes.length);
            buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
        }

        return buf;
    }

1.7 encodeSubAckMessage - 订阅应答

可变报头
可变报头包含等待确认的 SUBSCRIBE 报文的报文标识符。
有效载荷
有效载荷包含一个返回码清单。每个返回码对应等待确认的 SUBSCRIBE 报文中的一个主题过滤器。它们指定了 SUBSCRIBE 请求的每个订阅被授予的最大 QoS 等级。
在这里插入图片描述
encodeSubAckMessage

    private static ByteBuf encodeSubAckMessage(
            ByteBufAllocator byteBufAllocator,
            MqttSubAckMessage message) {
        // 1. 可变报头包含等待确认的 SUBSCRIBE 报文的报文标识符,所以固定长度为2
        int variableHeaderBufferSize = 2;
        // 2. SUBSCRIBE报文中的每个过滤器,subAck消息中都要给出对应主题的所能赋予的最大Qos等级
        int payloadBufferSize = message.payload().grantedQoSLevels().size();
        int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
        buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
        writeVariableLengthInt(buf, variablePartSize);
        buf.writeShort(message.variableHeader().messageId());
        for (int qos : message.payload().grantedQoSLevels()) {
            buf.writeByte(qos);
        }

        return buf;
    }

1.8 encodeMessageWithOnlySingleByteFixedHeaderAndMessageId

对于UNSUBACK、PUBACK、PUBREC、PUBREL、PUBCOMP消息,都是一些确认消息。这类消息中的可变报头需要携带对应消息的packetId,无载荷。

    private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
            ByteBufAllocator byteBufAllocator,
            MqttMessage message) {
        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
        int msgId = variableHeader.messageId();
        
        // 对于此方法,适用于报文结构为: 可变头只有个2字节packetId,并且无有效载荷的
        int variableHeaderBufferSize = 2; // variable part only has a message id
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
        buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
        writeVariableLengthInt(buf, variableHeaderBufferSize);
        buf.writeShort(msgId);

        return buf;
    }

1.9 encodeMessageWithOnlySingleByteFixedHeader

对于PINGREQ、PINGRESP、DISCONNECT消息,既无可变报头,也无有效载荷。

2. MqttDecoder–解码器

2.1 构造方法

对于解码器,netty提供了对外公共的构造方法,无参构造和有参构造。

    private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;

    private final int maxBytesInMessage;

    public MqttDecoder() {
      this(DEFAULT_MAX_BYTES_IN_MESSAGE);
    }

    public MqttDecoder(int maxBytesInMessage) {
        super(DecoderState.READ_FIXED_HEADER);
        this.maxBytesInMessage = maxBytesInMessage;
    }

采用无参构造,默认消息的最大字节为8092,有参构造则可以指定最大字节数。同时,将initialState设置为固定报头,作用是后面解码时从固定头开始,这里作者注释也有说明。

    /**
     * Creates a new instance with the specified initial state.
     */
    protected ReplayingDecoder(S initialState) {
        state = initialState;
    }

    /**
     * States of the decoder.
     * We start at READ_FIXED_HEADER, followed by
     * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
     */
    enum DecoderState {
        READ_FIXED_HEADER,
        READ_VARIABLE_HEADER,
        READ_PAYLOAD,
        BAD_MESSAGE,
    }

2.2 READ_FIXED_HEADER - 固定报头解码

解码器,我们重点看一下decode方法。这里通过state()方法判断是解析固定头、可变头、载荷、错误消息。new对象的时候,初始状态为固定头,从固定头开始,注意固定头和可变头case中没有break,所以解码顺序固定头 -> 可变头 -> 载荷。在这三个过程中有异常,则 -> BAD_MESSAGE
decodeFixedHeader

    private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) {
        // 1. 取无符号第一字节,固定头第一字节,7-4位为消息类型,所以右移4位
        short b1 = buffer.readUnsignedByte();
        MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
        // 1.1 重发标志第3位,值为1,标志重发
        boolean dupFlag = (b1 & 0x08) == 0x08;
        // 1.2 Qos占1、2位,采用& 0x06获取到这两位值,然后右移一位即Qos的值
        int qosLevel = (b1 & 0x06) >> 1;
        // 1.3 会话保留标志站0位
        boolean retain = (b1 & 0x01) != 0;

        int remainingLength = 0;
        int multiplier = 1;
        short digit;
        int loops = 0;
        do {
            digit = buffer.readUnsignedByte();
            // 2.由于每个字节的最高位为进制位,所以&127获取除进制位之外的7位
            remainingLength += (digit & 127) * multiplier;
            multiplier *= 128;
            loops++;
        } while ((digit & 128) != 0 && loops < 4);
        // 协议规定,剩余长度字段最大为4个字节
        if (loops == 4 && (digit & 128) != 0) {
            throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
        }
        MqttFixedHeader decodedFixedHeader =
                new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
        return validateFixedHeader(resetUnusedFields(decodedFixedHeader));
    }

validateFixedHeader
协议中规定,PUBREL、SUBSCRIBE、UNSUBSCRIBE中固定头中的Qos等级必须是1。
在这里插入图片描述

    static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) {
        switch (mqttFixedHeader.messageType()) {
            case PUBREL:
            case SUBSCRIBE:
            case UNSUBSCRIBE:
                if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) {
                    throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1");
                }
            default:
                return mqttFixedHeader;
        }
    }

resetUnusedFields
对于协议中规定,固定报头中标志位保留的,一律置为0。

    static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {
        switch (mqttFixedHeader.messageType()) {
            case CONNECT:
            case CONNACK:
            case PUBACK:
            case PUBREC:
            case PUBCOMP:
            case SUBACK:
            case UNSUBACK:
            case PINGREQ:
            case PINGRESP:
            case DISCONNECT:
                if (mqttFixedHeader.isDup() ||
                        mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE ||
                        mqttFixedHeader.isRetain()) {
                    return new MqttFixedHeader(
                            mqttFixedHeader.messageType(),
                            false,
                            MqttQoS.AT_MOST_ONCE,
                            false,
                            mqttFixedHeader.remainingLength());
                }
                return mqttFixedHeader;
            case PUBREL:
            case SUBSCRIBE:
            case UNSUBSCRIBE:
                if (mqttFixedHeader.isRetain()) {
                    return new MqttFixedHeader(
                            mqttFixedHeader.messageType(),
                            mqttFixedHeader.isDup(),
                            mqttFixedHeader.qosLevel(),
                            false,
                            mqttFixedHeader.remainingLength());
                }
                return mqttFixedHeader;
            default:
                return mqttFixedHeader;
        }
    }

2.3 READ_VARIABLE_HEADER - 可变报头解码

解码和编码是相对应的,这里不针对每种都进行说明。讲一下连接消息,其他的都比较简单。
decodeConnectionVariableHeader

    private static MqttDecoder.Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) {
        // 1. decodeString方法的作用,通过前两个长度字节(MLB,SLB)指定的字节长度来获取数据并装成string
        // Connect可变头最开始的两个长度字节用来表示协议名称的长度,所以这里通过此方法获取的就是协议名MQTT
        final MqttDecoder.Result<String> protoString = decodeString(buffer);
        // 2.注意这里的numberOfBytesConsumed字段,用于表示已经读取的字节个数,等解码完有效载荷后,
        // 会比较固定头里的剩余长度字段和此字段是否完全相等
        int numberOfBytesConsumed = protoString.numberOfBytesConsumed;

        final byte protocolLevel = buffer.readByte();
        numberOfBytesConsumed += 1;

        final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);

        // 3.取出连接标志字节,采用&进行解码
        final int b1 = buffer.readUnsignedByte();
        numberOfBytesConsumed += 1;

        final MqttDecoder.Result<Integer> keepAlive = decodeMsbLsb(buffer);
        numberOfBytesConsumed += keepAlive.numberOfBytesConsumed;
        final boolean hasUserName = (b1 & 0x80) == 0x80;
        final boolean hasPassword = (b1 & 0x40) == 0x40;
        final boolean willRetain = (b1 & 0x20) == 0x20;
        final int willQos = (b1 & 0x18) >> 3;
        final boolean willFlag = (b1 & 0x04) == 0x04;
        final boolean cleanSession = (b1 & 0x02) == 0x02;
        if (mqttVersion == MqttVersion.MQTT_3_1_1) {
            final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
            if (!zeroReservedFlag) {
                throw new DecoderException("non-zero reserved flag");
            }
        }

        final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
                mqttVersion.protocolName(),
                mqttVersion.protocolLevel(),
                hasUserName,
                hasPassword,
                willRetain,
                willQos,
                willFlag,
                cleanSession,
                keepAlive.value);
        return new MqttDecoder.Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed);
    }

decodeString
在这里插入图片描述
在这里插入图片描述

2.4 READ_PAYLOAD- 有效载荷解码

有效载荷也不针对每种都进行说明。讲一下连接消息的有效载荷解码。
decodeConnectionPayload

    private static MqttDecoder.Result<MqttConnectPayload> decodeConnectionPayload(
            ByteBuf buffer,
            MqttConnectVariableHeader mqttConnectVariableHeader) {
        // 1.解码clientId,对于connect消息,载荷里的clientId是必须有的。
        final MqttDecoder.Result<String> decodedClientId = decodeString(buffer);
        final String decodedClientIdValue = decodedClientId.value;
        final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
                (byte) mqttConnectVariableHeader.version());
        // 1.1 这里前面说过,如果是v3.1.1版本的mqtt,是允许clientId是空字符串或者大于23字节的,但是v3.1的不支持
        if (!isValidClientId(mqttVersion, decodedClientIdValue)) {
            throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
        }
        int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;

        // 2.对于遗嘱topic和遗嘱消息、用户名、密码这几部分需要根据可变头中的标志位是否为1来判断是否存在
        MqttDecoder.Result<String> decodedWillTopic = null;
        MqttDecoder.Result<byte[]> decodedWillMessage = null;
        if (mqttConnectVariableHeader.isWillFlag()) {
            decodedWillTopic = decodeString(buffer, 0, 32767);
            numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
            decodedWillMessage = decodeByteArray(buffer);
            numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;
        }
        MqttDecoder.Result<String> decodedUserName = null;
        MqttDecoder.Result<byte[]> decodedPassword = null;
        if (mqttConnectVariableHeader.hasUserName()) {
            decodedUserName = decodeString(buffer);
            numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
        }
        if (mqttConnectVariableHeader.hasPassword()) {
            decodedPassword = decodeByteArray(buffer);
            numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed;
        }

        final MqttConnectPayload mqttConnectPayload =
                new MqttConnectPayload(
                        decodedClientId.value,
                        decodedWillTopic != null ? decodedWillTopic.value : null,
                        decodedWillMessage != null ? decodedWillMessage.value : null,
                        decodedUserName != null ? decodedUserName.value : null,
                        decodedPassword != null ? decodedPassword.value : null);
        return new MqttDecoder.Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed);
    }
文章来源:https://blog.csdn.net/qq_45268591/article/details/135061825
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。