之前我们介绍了用特殊分隔符来分割每个报文,但是如果传输的数据中恰好有个特殊分隔符,它将会被拆分成多个,于是,为了进一步避免这个问题,还有一种解决方案是在两端的channelPipeline
中用一个固定长度来区分,这样也可以解决粘包半包的问题
package splicing.fixed;
import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import java.net.InetSocketAddress;
public class FixedLengthEchoServer {
public static final String RESPONSE = "Welcome to Netty";
public static void main(String[] args) throws InterruptedException {
FixedLengthEchoServer fixedLengthEchoServer = new FixedLengthEchoServer();
System.out.println("服务器即将启动");
fixedLengthEchoServer.start();
}
public void start() throws InterruptedException {
FixedLengthServerHandler serverHandler = new FixedLengthServerHandler();
EventLoopGroup group = new NioEventLoopGroup();// 线程组
try {
// 服务端启动必须
ServerBootstrap b = new ServerBootstrap();
b.group(group)// 将线程组传入
.channel(NioServerSocketChannel.class) // 指定使用NIO进行网络传输
.localAddress(new InetSocketAddress(Constant.DEFAULT_PORT)) // 指定服务器监听端口
// 服务端每接收到一个连接请求,就会新启一个Socket通信,也就是channel
// 所以下面这段代码的作用就是为这个子channel增加handler
.childHandler(new ChannelInitializerImp());
// 异步绑定到服务器,sync()会阻塞直到完成
ChannelFuture f = b.bind().sync();
System.out.println("服务器启动完成,等待客户端的连接和数据.....");
// 阻塞直到服务器的channel关闭
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoClient.REQUEST.length()));
ch.pipeline().addLast(new FixedLengthServerHandler());
}
}
}
package splicing.fixed;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
public class FixedLengthServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept[" + request +
" ] and the counter is :" + counter.incrementAndGet());
ctx.writeAndFlush(Unpooled.copiedBuffer(FixedLengthEchoServer.RESPONSE.getBytes()));
// super.channelRead(ctx, msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
// super.exceptionCaught(ctx, cause);
}
}
package splicing.fixed;
import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import java.net.InetSocketAddress;
public class FixedLengthEchoClient {
public static final String REQUEST = "Mark,zhuge,zhouyu,fox,loulan";
private final String host;
public FixedLengthEchoClient(String host) {
this.host = host;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
// 客户端启动必须
Bootstrap b = new Bootstrap();
b.group(group)// 将线程组传入
.channel(NioSocketChannel.class) // 指定使用NIO进行网络传输
.remoteAddress(new InetSocketAddress(host, Constant.DEFAULT_PORT))
.handler(new ChannelInitializerImp());
ChannelFuture f = b.connect().sync();
System.out.println("已连接到服务器....");
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoServer.RESPONSE.length()));
ch.pipeline().addLast(new FixedLengthClientHandler());
}
}
public static void main(String[] args) throws InterruptedException {
new FixedLengthEchoClient(Constant.DEFAULT_SERVER_IP).start();
}
}
package splicing.fixed;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
public class FixedLengthClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private AtomicInteger counter = new AtomicInteger(0);
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept [" + msg.toString(CharsetUtil.UTF_8)
+ "] and the counter is :" + counter.incrementAndGet());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
for (int i = 0; i < 100; i++) {
msg = Unpooled.buffer(FixedLengthEchoClient.REQUEST.length());
msg.writeBytes(FixedLengthEchoClient.REQUEST.getBytes());
ctx.writeAndFlush(msg);
}
// super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
// super.exceptionCaught(ctx, cause);
}
}
这种方案的限制在于要提前直到客户端和服务端要发送的报文,并且这个长度是固定的,不能变,如果变了,还是会产生粘包半包的问题,那么有的人可能会想,有没有更好的解决方案,别着急,下一期我们接着分析