前面我们介绍了特殊分隔符、以及固定长度,今天来介绍一下换行符分割,这种换行符是兼容了Windows和Linux的转义的,前提你的报文中没有换行符或者对换行符做特殊处理
System.getProperty("line.separator")
package splicing.linebase;
import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import java.net.InetSocketAddress;
public class LineBaseEchoServer {
public static void main(String[] args) throws InterruptedException {
LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer();
System.out.println("服务器即将启动");
lineBaseEchoServer.start();
}
public void start() throws InterruptedException {
LineBaseServerHandler serverHandler = new LineBaseServerHandler();
EventLoopGroup group = new NioEventLoopGroup();// 线程组
try {
// 服务端启动必须
ServerBootstrap b = new ServerBootstrap();
b.group(group) // 将线程组传入
.channel(NioServerSocketChannel.class) // 指定使用NIO进行网络传输
.localAddress(new InetSocketAddress(Constant.DEFAULT_PORT))// 指定服务器监听端口
// 服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel
// 所以下面这段代码的作用就是为这个子channel增加handler
.childHandler(new ChannelInitializerImp());
// 异步绑定到服务器,sync()会阻塞直到完成
ChannelFuture f = b.bind().sync();
// 我们可以为这个ChannelFuture增加监听器,当它有回调结果的时候通知channelFutureListener
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("服务器端绑定已经完成");
}
});
System.out.println("服务器启动完成,等待客户端的连接和数据......");
// 阻塞直到服务器的channel关闭
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new LineBaseServerHandler());
}
}
}
package splicing.linebase;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
public class LineBaseServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
// 服务端接收到客户端连接时
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端:[" + ctx.channel().remoteAddress() + "] 已连接......");
// super.channelActive(ctx);
}
// 服务端读取到网络数据后的处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept [" + request + "] and the counter is :" + counter.incrementAndGet());
String resp = "Hello ," + request + ". Welcome to Netty World!"
+ System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
// super.channelRead(ctx, msg);
}
// 服务端读取完成网络数据后的处理
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// super.channelReadComplete(ctx);
System.out.println("channelReadComplete...............");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
// super.exceptionCaught(ctx, cause);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "即将关闭....");
}
}
package splicing.linebase;
import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import java.net.InetSocketAddress;
public class LineBaseEchoClient {
private final String host;
public LineBaseEchoClient(String host) {
this.host = host;
}
public void start() throws InterruptedException {
// 线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group) // 将线程组传入
.channel(NioSocketChannel.class) // 指定使用NIO进行网络传输
.remoteAddress(new InetSocketAddress(host, Constant.DEFAULT_PORT)) // 配置要连接服务器的
.handler(new ChannelInitializerImp());
ChannelFuture f = b.connect().sync();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("operationComplete 已经连接到服务器了....");
}
});
System.out.println("已连接到服务器....");
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new LineBaseClientHandler());
}
}
public static void main(String[] args) throws InterruptedException {
new LineBaseEchoClient(Constant.DEFAULT_SERVER_IP).start();
}
}
package splicing.linebase;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private AtomicInteger counter = new AtomicInteger(0);
// 客户端读取到网络数据后的处理
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client Accept [" + msg.toString(CharsetUtil.UTF_8)
+ "] and the counter is :" + counter.incrementAndGet());
}
// 客户端被通知channel活跃后,做事
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
String request = "Mark,zhuge,zhouyu,fox,loulan" + System.getProperty("line.separator");
for (int i = 0; i < 10; i++) {
msg = Unpooled.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
}
}
// 发生异常后的处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
其实我们会发现,到此为止这三种多多少少都有一些限制,如果要想完全避免粘包和半包,在这三种方式里面发现几乎无法实现,因为我们准确计算出客户端的报文结构是怎样的,还有一种消息头+消息体
的分包机制,可以完美地实现不粘包、不半包。下一期我们再来分享