Netty通信中的粘包半包问题(三)

发布时间:2024年01月17日

之前我们介绍了用特殊分隔符来分割每个报文,但是如果传输的数据中恰好有个特殊分隔符,它将会被拆分成多个,于是,为了进一步避免这个问题,还有一种解决方案是在两端的channelPipeline中用一个固定长度来区分,这样也可以解决粘包半包的问题

1.Server

package splicing.fixed;

import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;

import java.net.InetSocketAddress;

public class FixedLengthEchoServer {
    
    
    public static final String RESPONSE = "Welcome to Netty";

    public static void main(String[] args) throws InterruptedException {
        FixedLengthEchoServer fixedLengthEchoServer = new FixedLengthEchoServer();
        System.out.println("服务器即将启动");
        fixedLengthEchoServer.start();
    }
    
    public void start() throws InterruptedException {
        FixedLengthServerHandler serverHandler = new FixedLengthServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();// 线程组
        try {
            // 服务端启动必须
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)// 将线程组传入
                    .channel(NioServerSocketChannel.class) // 指定使用NIO进行网络传输
                    .localAddress(new InetSocketAddress(Constant.DEFAULT_PORT)) // 指定服务器监听端口
                    // 服务端每接收到一个连接请求,就会新启一个Socket通信,也就是channel
                    // 所以下面这段代码的作用就是为这个子channel增加handler
                    .childHandler(new ChannelInitializerImp());

            // 异步绑定到服务器,sync()会阻塞直到完成
            ChannelFuture f = b.bind().sync();
            System.out.println("服务器启动完成,等待客户端的连接和数据.....");
            // 阻塞直到服务器的channel关闭
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
    
    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoClient.REQUEST.length()));
            ch.pipeline().addLast(new FixedLengthServerHandler());
        }
    }
}

package splicing.fixed;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

public class FixedLengthServerHandler extends ChannelInboundHandlerAdapter {
    
    private AtomicInteger counter = new AtomicInteger(0);


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept[" + request +
                " ] and the counter is :" + counter.incrementAndGet());
        ctx.writeAndFlush(Unpooled.copiedBuffer(FixedLengthEchoServer.RESPONSE.getBytes()));
//        super.channelRead(ctx, msg);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
//        super.exceptionCaught(ctx, cause);
    }
}

2.Client

package splicing.fixed;

import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;

import java.net.InetSocketAddress;

public class FixedLengthEchoClient {
    
    public static final String REQUEST = "Mark,zhuge,zhouyu,fox,loulan";
    
    private final String host;
    
    public FixedLengthEchoClient(String host) {
        this.host = host;
    }
    
    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 客户端启动必须
            Bootstrap b = new Bootstrap();
            b.group(group)// 将线程组传入
                    .channel(NioSocketChannel.class) // 指定使用NIO进行网络传输
                    .remoteAddress(new InetSocketAddress(host, Constant.DEFAULT_PORT))
                    .handler(new ChannelInitializerImp());
            ChannelFuture f = b.connect().sync();
            System.out.println("已连接到服务器....");
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
    
    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoServer.RESPONSE.length()));
            ch.pipeline().addLast(new FixedLengthClientHandler());
        }
    }


    public static void main(String[] args) throws InterruptedException {
        new FixedLengthEchoClient(Constant.DEFAULT_SERVER_IP).start();
    }
}

package splicing.fixed;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

public class FixedLengthClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
    private AtomicInteger counter = new AtomicInteger(0);
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept [" + msg.toString(CharsetUtil.UTF_8)
                + "] and the counter is :" + counter.incrementAndGet());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        for (int i = 0; i < 100; i++) {
            msg = Unpooled.buffer(FixedLengthEchoClient.REQUEST.length());
            msg.writeBytes(FixedLengthEchoClient.REQUEST.getBytes());
            ctx.writeAndFlush(msg);
        }
//        super.channelActive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
//        super.exceptionCaught(ctx, cause);
    }
}

3.结果展示

在这里插入图片描述
在这里插入图片描述

这种方案的限制在于要提前直到客户端和服务端要发送的报文,并且这个长度是固定的,不能变,如果变了,还是会产生粘包半包的问题,那么有的人可能会想,有没有更好的解决方案,别着急,下一期我们接着分析

文章来源:https://blog.csdn.net/Cover_sky/article/details/135655742
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。