Netty通信中的粘包半包问题(四)

发布时间:2024年01月18日

前面我们介绍了特殊分隔符、以及固定长度,今天来介绍一下换行符分割,这种换行符是兼容了Windows和Linux的转义的,前提你的报文中没有换行符或者对换行符做特殊处理

System.getProperty("line.separator")

1.Server

package splicing.linebase;

import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;

import java.net.InetSocketAddress;

public class LineBaseEchoServer {

    public static void main(String[] args) throws InterruptedException {
        LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer();
        System.out.println("服务器即将启动");
        lineBaseEchoServer.start();
    }
    
    public void start() throws InterruptedException {
        LineBaseServerHandler serverHandler = new LineBaseServerHandler();
        
        EventLoopGroup group = new NioEventLoopGroup();// 线程组
        try {
            // 服务端启动必须
            ServerBootstrap b = new ServerBootstrap();
            b.group(group) // 将线程组传入
                    .channel(NioServerSocketChannel.class) // 指定使用NIO进行网络传输
                    .localAddress(new InetSocketAddress(Constant.DEFAULT_PORT))// 指定服务器监听端口
                    // 服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel
                    // 所以下面这段代码的作用就是为这个子channel增加handler
                    .childHandler(new ChannelInitializerImp());


            // 异步绑定到服务器,sync()会阻塞直到完成
            ChannelFuture f = b.bind().sync();
            // 我们可以为这个ChannelFuture增加监听器,当它有回调结果的时候通知channelFutureListener
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("服务器端绑定已经完成");
                }
            });

            System.out.println("服务器启动完成,等待客户端的连接和数据......");
            // 阻塞直到服务器的channel关闭
            f.channel().closeFuture().sync();

        }finally {
            group.shutdownGracefully().sync();
        }
    }
    
    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new LineBaseServerHandler());
        }
    }
}

package splicing.linebase;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

public class LineBaseServerHandler extends ChannelInboundHandlerAdapter {
    
    
    private AtomicInteger counter = new AtomicInteger(0);

    
    // 服务端接收到客户端连接时
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端:[" + ctx.channel().remoteAddress() + "] 已连接......");
//        super.channelActive(ctx);
    }

    
    // 服务端读取到网络数据后的处理
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept [" + request + "] and the counter is :" + counter.incrementAndGet());
        
        String resp = "Hello ," + request + ". Welcome to Netty World!" 
                + System.getProperty("line.separator"); 
        
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
//        super.channelRead(ctx, msg);
    }

    // 服务端读取完成网络数据后的处理
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//        super.channelReadComplete(ctx);
        System.out.println("channelReadComplete...............");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
//        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "即将关闭....");
    }
}

2.Client

package splicing.linebase;

import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;

import java.net.InetSocketAddress;

public class LineBaseEchoClient {
    
    private final String host;
    
    public LineBaseEchoClient(String host) {
        this.host = host;
    }
    
    public void start() throws InterruptedException {
        // 线程组
        EventLoopGroup group = new NioEventLoopGroup(); 
        try {
            Bootstrap b = new Bootstrap();
            b.group(group) // 将线程组传入
                    .channel(NioSocketChannel.class) // 指定使用NIO进行网络传输
                    .remoteAddress(new InetSocketAddress(host, Constant.DEFAULT_PORT)) // 配置要连接服务器的
                    .handler(new ChannelInitializerImp());
            ChannelFuture f = b.connect().sync();
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("operationComplete 已经连接到服务器了....");
                }
            });
            System.out.println("已连接到服务器....");
            f.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully().sync();
        }
    }
    
    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new LineBaseClientHandler());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new LineBaseEchoClient(Constant.DEFAULT_SERVER_IP).start();
    }
}

package splicing.linebase;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
    
    private AtomicInteger counter = new AtomicInteger(0);
    // 客户端读取到网络数据后的处理
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("Client Accept [" + msg.toString(CharsetUtil.UTF_8)
                + "] and the counter is :" + counter.incrementAndGet());
    }

    // 客户端被通知channel活跃后,做事
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "Mark,zhuge,zhouyu,fox,loulan" + System.getProperty("line.separator");
        for (int i = 0; i < 10; i++) {
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }
    }

    // 发生异常后的处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.结果展示

在这里插入图片描述

在这里插入图片描述
其实我们会发现,到此为止这三种多多少少都有一些限制,如果要想完全避免粘包和半包,在这三种方式里面发现几乎无法实现,因为我们准确计算出客户端的报文结构是怎样的,还有一种消息头+消息体的分包机制,可以完美地实现不粘包、不半包。下一期我们再来分享

文章来源:https://blog.csdn.net/Cover_sky/article/details/135680985
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。