io即input/output,输入和输出
输入流、输出流(按数据流向)
字节流(InputStream/OutputStream(细分File/Buffered))、字符流(Reader/Writer(细分File/Buffered/put))(按数据处理方式)
字节缓存流:避免频繁的io操作,缓冲区的大小默认为 8192 字节
bio为同步阻塞模式
nio为同步非阻塞模式,一个线程管理多个输入输出通道,涉及轮询、多路复用(一个线程不断轮询多个socket的状态,当socket有读写事件时调用io事件)
核心:channel(双向)、buffer、selector(监听通道事件)
服务器端(pool)
属性:线程池、选择器selector
- 创建一个PoolServer,
- 初始化,并指定端口
开通渠道ServerSocketChannel
设置非阻塞
绑定端口
开通选择器
将渠道注册到选择器- 监听事件
轮询访问选择器
处理对应的通道事件
如果事件key状态为可接收:注册通道到选择器,设置状态为可读
如果事件key状态为可读:将key对应通道设置为可读,线程池执行key对应的继承Thread的handler方法,重写run方法(通过key拿到通道;分配缓冲区,分配输出流;将通道读取的缓冲区内容写入输出流;将服务端回执写入通道;将通道设置可读;唤醒选择器)
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
public class Client {
public static void main(String[] args) throws IOException {
Socket s = new Socket("127.0.0.1", 8888);
s.getOutputStream().write("HelloServer".getBytes());
s.getOutputStream().flush();
System.out.println("write over, waiting for msg back...");
byte[] bytes = new byte[1024];
int len = s.getInputStream().read(bytes);
System.out.println(new String(bytes, 0, len));
s.close();
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8888));
ssc.configureBlocking(false);
System.out.println("server started, listening on :" + ssc.getLocalAddress());
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while(it.hasNext()) {
SelectionKey key = it.next();
it.remove();
handle(key);
}
}
}
private static void handle(SelectionKey key) {
if(key.isAcceptable()) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ );
} catch (IOException e) {
e.printStackTrace();
} finally {
}
} else if (key.isReadable()) { //flip
SocketChannel sc = null;
try {
sc = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(512);
buffer.clear();
int len = sc.read(buffer);
if(len != -1) {
System.out.println(new String(buffer.array(), 0, len));
}
ByteBuffer bufferToWrite = ByteBuffer.wrap("HelloClient".getBytes());
sc.write(bufferToWrite);
} catch (IOException e) {
e.printStackTrace();
} finally {
if(sc != null) {
try {
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class PoolServer {
ExecutorService pool = Executors.newFixedThreadPool(50);
private Selector selector;
/**
*
* @throws IOException
*/
public static void main(String[] args) throws IOException {
PoolServer server = new PoolServer();
server.initServer(8000);
server.listen();
}
/**
*
* @param port
* @throws IOException
*/
public void initServer(int port) throws IOException {
//
ServerSocketChannel serverChannel = ServerSocketChannel.open();
//
serverChannel.configureBlocking(false);
//
serverChannel.socket().bind(new InetSocketAddress(port));
//
this.selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端启动成功!");
}
/**
*
* @throws IOException
*/
@SuppressWarnings("unchecked")
public void listen() throws IOException {
// 轮询访问selector
while (true) {
//
selector.select();
//
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
//
ite.remove();
//
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
//
SocketChannel channel = server.accept();
//
channel.configureBlocking(false);
//
channel.register(this.selector, SelectionKey.OP_READ);
//
} else if (key.isReadable()) {
//
key.interestOps(key.interestOps()&(~SelectionKey.OP_READ));
//
pool.execute(new ThreadHandlerChannel(key));
}
}
}
}
}
/**
*
* @param
* @throws IOException
*/
class ThreadHandlerChannel extends Thread{
private SelectionKey key;
ThreadHandlerChannel(SelectionKey key){
this.key=key;
}
@Override
public void run() {
//
SocketChannel channel = (SocketChannel) key.channel();
//
ByteBuffer buffer = ByteBuffer.allocate(1024);
//
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
int size = 0;
while ((size = channel.read(buffer)) > 0) {
buffer.flip();
baos.write(buffer.array(),0,size);
buffer.clear();
}
baos.close();
//
byte[] content=baos.toByteArray();
ByteBuffer writeBuf = ByteBuffer.allocate(content.length);
writeBuf.put(content);
writeBuf.flip();
channel.write(writeBuf);//
if(size==-1){
channel.close();
}else{
//
key.interestOps(key.interestOps()|SelectionKey.OP_READ);
key.selector().wakeup();
}
}catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
netty是JBoss提供的开源网络编程框架,提供异步的、基于事件驱动的网络应用程序框架和工具。
架构
三层网络架构,Reactor 通信调度层 -> 职责链 PipeLine -> 业务逻辑处理层
创建服务端并指定端口,启动服务端
创建boss和worker事件组
绑定事件组到通道,并指定子处理器,初始化通道,将处理器(继承ChannelHandlerContext,重写读方法(获取信息,将信息写入上下文,关闭上下文)以及异常捕获方法(关闭上下文))加到管道的最后
绑定端口获取future
创建客户端,启动客户端
创建workers事件组
绑定事件组到通道,并指定处理器,初始化通道,将定义的客户端处理器(继承ChannelInboundHandlerAdapter,重写通道激活方法(将信息写入上下文,获取future,添加监听器,当服务端收到信息时输出提示信息)以及读方法(读取信息,最后释放信息))添加到管道的后面
绑定端口,获取future
import com.mashibing.io.aio.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;
public class HelloNetty {
public static void main(String[] args) {
new NettyServer(8888).serverStart();
}
}
class NettyServer {
int port = 8888;
public NettyServer(int port) {
this.port = port;
}
public void serverStart() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new Handler());
}
});
try {
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
class Handler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//super.channelRead(ctx, msg);
System.out.println("server: channel read");
// ByteBuf是netty的一个字节容器
ByteBuf buf = (ByteBuf)msg;
System.out.println(buf.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(msg);
ctx.close();
//buf.release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//super.exceptionCaught(ctx, cause);
cause.printStackTrace();
ctx.close();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;
public class Client {
public static void main(String[] args) {
new Client().clientStart();
}
private void clientStart() {
EventLoopGroup workers = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(workers)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("channel initialized!");
ch.pipeline().addLast(new ClientHandler());
}
});
try {
System.out.println("start to connect...");
ChannelFuture f = b.connect("127.0.0.1", 8888).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workers.shutdownGracefully();
}
}
}
class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel is activated.");
final ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer("HelloNetty".getBytes()));
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("msg send!");
//ctx.close();
}
});
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf)msg;
System.out.println(buf.toString());
} finally {
ReferenceCountUtil.release(msg);
}
}
}
发送的数据出现断开接收或者多个包数据发生粘连
缓冲区剩余空间
大小,将会发生拆包最大报文长度
,TCP在传输前将进行拆包多次写入缓冲区
的数据一次发送出去,将会发生粘包没有及时读取
接收缓冲区中的数据,将发生粘包包首部
,首部中应该至少包含数据包的长度
,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度 了固定长度
,这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开设置边界
,如添加特殊符号,这样接收端通过这个边界就可以将不同的数据包拆分开netty提供了封装的拆包器:
传统拷贝:需要4次数据拷贝和4次上下文切换
磁盘->内核缓冲区的read buffer->用户缓冲区->内核的socket buffer->网卡接口(硬件)的缓冲区
零拷贝:省略中间的2步,不需要CPU的参与
磁盘->内核缓冲区的read buffer->网卡接口(硬件)的缓冲区