IO 有阻塞和非阻塞两种模式,在阻塞IO下,我们需要耗费一个线程去阻塞在read操作下,去等待有足够多的数据可读并返回。在非阻塞IO下,不停对所有fd集合进行轮询,筛选出所有可读fd进行处理。
阻塞IO浪费线程(会占用内存和上下文切换开销),非阻塞IO会浪费CPU做大量无效操作。而基于IO多路复用系统调用实现的poll的意义在于将可读/可写状态通知和实际文件操作分开,并支持多个文件描述符通过一个系统调用监听以提升性能。
网络库的核心功能就是去同时监听大量的文件描述符的状态变化(通过操作系统调用),并对于不同状态变更,高效,安全地进行对应的文件操作。
对于一个高效的网络库而言,它的设计需要考虑以下几个场景:
这类场景下,对 listener fd 的压?很?,监听 listener fd 的系统调?会被频繁唤醒。??个 fd 只能被?个线程处理,这样的话创建连接的压?只能由单个 CPU 承担?法充分利?多核。Linux 后?增加SO_REUSEPORT 功能,可以对同?个 bind ip+port 创建多个 listener fd,内核提供负载均衡分发,这样来实现多核处理连接创建密集型场景。需要注意的是,即便是那些?连接场景下,如果遇到?些特殊业务场景(例如准点秒杀)也会出现瞬间创建?量连接的情况。
同时由于网络库不仅要管理监听文件事件,还需要管理用户业务逻辑层handler的执行 ,因此一个设计优秀的网络库,还应当具备以下指标:
QPS 要?意味着要让单请求开销低(即平均 latency 低),?先要保证充分利?CPU ,其次是要让 CPU 尽可能少执?内存拷?等和我们?要?作?关的代码。简??之,CPU处于满负荷?作,且做有效的?作的状态。?效?作是指那些和业务?关的事情,例如GC,线程&& 协程上下?切换开销,锁竞争等。在 Golang 中,G 依赖 P 运?,? P ??有调度逻辑,所以需要尽可能充分利? P,不让 P 空转
P99 ? Avg ?的根因是在运?中间遇到?些原因导致 CPU 腾出去进?了其他的?作,或是整个?作循环被暂停了(如 GC stop the world,或是 Goroutine 陷? syscall导致的暂时卡顿,?或是锁竞争)。
在正式开始讲解NetPoll源码前,我们先来快速复习一下多路复用API实现,本文基于Linux系统进行展开,所有此处多路复用器实现基于epoll展开:
typedef union epoll_data {
int fd;
//...
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
op(ADD/DEL/...)
,并声明关?什么events(EPOLLIN/EPOLLOUT/...)
。Epoll 在使?上有两种模式:边缘触发(ET)和?平触发(LT)
golang 原生网络库基于epoll et模式开发,基本架构如下图所示:
golang原生网络库的特点就是:
type Conn interface {
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
Read(b []byte) (n int, err error)
// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
Write(b []byte) (n int, err error)
}
由于go net采用ET模式,所以只会在数据就绪时通知一次,用户在自己的线程中调用read api不断读取数据,直到返回的n等于0,说明数据全部读取完毕了。
golang 原生网络库的优势如下:
但是有利必有弊,golang原生网络库的设计会导致如下问题:
conn.Read(b []byte)
后,完成?次内核缓冲区到??缓冲区的复制。拿到 []byte
后再传递给上层进?协议解析(反序列化)时,往往还需要再进??次拷?根本性原因还是协议反序列化时拿到的内存和内核缓冲区复制到??缓冲区的内存不是同?块导致的
Netpoll 主要由两?部分构成:
这里官方提供了一幅图画的很好:
整个流程要分为三部分来看:
netpoll 初始化:
server 端:
client 端:
netpoll 实现思路和 golang 原生网络库的区别如下:
ET模式在高并发下调度压力比较大,因为 EventLoop 本?只是监听事件,真正的读写操作都在????的 Goroutine 函数中执?,不由?络库控制;因此每次 EventLoop监听到事件发生后,都需要唤醒对应的线程去读写数据,这里存在上下文切换开销。
而 LT 单线程轮询对 cache/计算类业务更友好,因为 Cache 的特点是业务逻辑执?的?常快,所以在 readv 完了后可以?刻执? handler 同时执?write,整个过程都不需要进?线程调度。对于计算类任务??,越少协程切换能够让 CPU 尽可能少的做?效?作。
此处只列举核心的几个对象:
type server struct {
operator FDOperator // ?来根据不同事件进?不同操作
ln Listener.
opts *options // 配置相关回调接口
onQuit func(err error) // 退出回调
connections sync.Map // 记录当前server上accept得到的活跃客户端连接: key=fd, value=connection
}
// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
// epoll 监听的 fd
FD int
// 监听到读写事件后的回调函数
OnRead func(p Poll) error // accept 事件回调
OnWrite func(p Poll) error // 客户端 socket 写回调
OnHup func(p Poll) error
// linkbuffer 与 socket 缓冲区之间的读写API
Inputs func(vs [][]byte) (rs [][]byte)
InputAck func(n int) (err error)
Outputs func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
OutputAck func(n int) (err error)
// epoll 对象
poll Poll
...
}
type Poll interface {
Wait() error
Close() error
Trigger() error
Control(operator *FDOperator, event PollEvent) error
Alloc() (operator *FDOperator)
Free(operator *FDOperator)
}
type defaultPoll struct {
pollArgs
fd int // 监听的fd
wop *FDOperator // eventfd(轻量级进程通信机制), wake epoll_wait -- 用于唤醒wait上阻塞的线程
...
Handler func(events []epollevent) (closed bool) // 发生感兴趣事件时,回调该接口处理这些事件
}
type pollArgs struct {
...
events []epollevent // 发送/接收感兴趣事件
barriers []barrier。 // 用于实现分散读/集中写的向量缓冲区
}
type epollevent struct {
events uint32 // 事件位图
_ int32
data [8]byte // 注册感兴趣事件时,可以携带用户数据的指针
}
netpoll使用pollmanager维护着一组epoll对象池,以此来实现对象复用,每次有客户端新连接被Accept时,都会从epoll池中按照对应的负载均衡策略,pick出一个空闲的epoll对象来监听客户端连接上后续的读写事件。
netpoll多路复用池初始化的流程图如下所示:
具体源码如下:
// poll_manager.go
var pollmanager *manager // 多路复用器池子管理器
func init() {
var loops = runtime.GOMAXPROCS(0)/20 + 1
pollmanager = &manager{}
// 设置负载均衡器,默认采用轮询策略从epoll池中挑选空闲epoll
pollmanager.SetLoadBalance(RoundRobin)
// 设置epoll池的大小,同时会初始化池中的epoll对象
pollmanager.SetNumLoops(loops)
...
}
golang 程序启动时,会去自动调用每个go文件的init方法,所以pollmanager会在程序启动时被初始化。
真正初始化epoll池中epoll对象的逻辑是在设置eventLoopNum时完成的:
// poll_manager.go
func (m *manager) SetNumLoops(numLoops int) error {
..
// netpoll支持运行时动态调整epoll池大小,所以此处存在该分支
// 如果我们打算缩小epoll池大小,则进入下面这个分支
if numLoops < m.NumLoops {
// 创建一个新的epoll池
var polls = make([]Poll, numLoops)
for idx := 0; idx < m.NumLoops; idx++ {
// 对于无需缩减的部分,直接重新指向即可
if idx < numLoops {
polls[idx] = m.polls[idx]
} else {
// 对于需要缩减的部分,直接Close关闭该多路复用器
if err := m.polls[idx].Close(); err != nil {
logger.Printf("NETPOLL: poller close failed: %v\n", err)
}
}
}
// 更新多路复用池管理器的相关状态
m.NumLoops = numLoops
m.polls = polls
m.balance.Rebalance(m.polls)
// 如果是动态缩容,缩容完毕后,直接返回
return nil
}
// 进入初始化或者扩容逻辑
m.NumLoops = numLoops
return m.Run()
}
从上面代码可以看出,netpoll支持在运行时动态调整池子的大小,下面我们看看初始化和扩容逻辑是如何完成的:
// poll_manager.go
// 扩容或者初始化epoll池
func (m *manager) Run() (err error) {
defer func() {
if err != nil {
_ = m.Close()
}
}()
// 如果是初始化epoll池,此处的polls大小应该为0
// 如果时扩容逻辑,此处的polls大小为当前池中已有的多路复用器个数
for idx := len(m.polls); idx < m.NumLoops; idx++ {
var poll Poll
// 创建一个新的多路复用器
poll, err = openPoll()
if err != nil {
return
}
// 新创建的多路复用器追加到polls集合
m.polls = append(m.polls, poll)
// 每个多路复用器绑定一个协程,不断轮询注册到该epoll上的fd事件
go poll.Wait()
}
// 更新多路复用池管理器的相关状态
m.balance.Rebalance(m.polls)
return nil
}
初始化epoll池时,首先是将池中每个epoll对象创建出来:
// poll_default_linux.go
// 打开多路复用器
func openPoll() (Poll, error) {
return openDefaultPoll()
}
func openDefaultPoll() (*defaultPoll, error) {
var poll = new(defaultPoll)
poll.buf = make([]byte, 8)
// 创建Epoll对象
var p, err = EpollCreate(0)
...
// 保存epoll的fd
poll.fd = p
// eventfd是一种进程/线程通信的机制,他类似信号,不过eventfd只是一种通知机制
// 无法承载数据(eventfd承载的数据是8个字节),他的好处是简单并且只消耗一个fd
// 进程间通信机制: https://zhuanlan.zhihu.com/p/383395277
var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
...
// TODO: 这几个回调接口干啥的 ?
poll.Reset = poll.reset
// 处理当前epoll fd上所发生的感兴趣的事件
poll.Handler = poll.handler
// eventFd 通信机制
poll.wop = &FDOperator{FD: int(r0)}
// 在epoll上注册并监听eventFd的可读事件 -- 监听r0上的可读事件
if err = poll.Control(poll.wop, PollReadable); err != nil {
_ = syscall.Close(poll.wop.FD)
_ = syscall.Close(poll.fd)
return nil, err
}
// 初始化FDOperator缓存
poll.opcache = newOperatorCache()
return poll, nil
}
这里使用defaultPoll保存多路复用器上下文信息,同时还为每个多路复用器创建出了一个eventFD用于实现进程间通信,同时在当前epoll上注册监听eventFD的可读事件。
此处使用eventFD是为了epoll池关闭的时候,通知那些阻塞在epoll_wait系统调用上的线程可以醒过来,然后结束自己。
当创建出来多路复用器后,下一步便是将其加入epoll池中,最后为每个多路复用器绑定一个协程,然后不断轮询注册到该epoll上的fd事件:
// poll_default_linux.go
func (p *defaultPoll) Wait() (err error) {
// init
var caps, msec, n = barriercap, -1, 0
p.Reset(128, caps)
// wait
for {
if n == p.size && p.size < 128*1024 {
p.Reset(p.size<<1, caps)
}
// p.fd 就是 epoll fd
// events 就是挂载到epoll tree上的epoll item
// mesc 用于指定阻塞时间,是永久阻塞,还是阻塞一段时间,还是非阻塞IO
// 等待当前epoll上发生感兴趣的事件
n, err = EpollWait(p.fd, p.events, msec)
if err != nil && err != syscall.EINTR {
return err
}
// 如果没有发生感兴趣的事件,则将msec设置为-1,表示下一次采用永久阻塞策略来等待感兴趣的事件发生
// 然后调用Gosched完成协程调度
if n <= 0 {
msec = -1
runtime.Gosched()
continue
}
msec = 0
// 处理感兴趣的事件
if p.Handler(p.events[:n]) {
return nil
}
// we can make sure that there is no op remaining if Handler finished
p.opcache.free()
}
}
defaultPoll的Handler回调接口是在openDefaultPoll函数中被赋值的,实际调用的是poll_default_linux.go
文件中的handler函数:
// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
// 遍历所有感兴趣的事件
for i := range events {
// epollevent.data保存的是与之关联的FDOperator对象
operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
if operator == nil || !operator.do() {
continue
}
var totalRead int
// 判断当前发生了什么事件
evt := events[i].events
triggerRead = evt&syscall.EPOLLIN != 0
triggerWrite = evt&syscall.EPOLLOUT != 0
triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
triggerError = evt&syscall.EPOLLERR != 0
// trigger or exit gracefully
// 是否是eventFD可读事件发生了
if operator.FD == p.wop.FD {
// must clean trigger first
// 从eventFD中读取数据到buf中
syscall.Read(p.wop.FD, p.buf)
atomic.StoreUint32(&p.trigger, 0)
// if closed & exit
// 说明接收到了关闭信号,那么就关闭当前epoll
if p.buf[0] > 0 {
// 关闭eventFD
syscall.Close(p.wop.FD)
// 关闭epoll fd
syscall.Close(p.fd)
operator.done()
return true
}
operator.done()
continue
}
// 发生了可读事件
if triggerRead {
// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
if operator.OnRead != nil {
// 调用OnRead来接收并处理客户端连接
operator.OnRead(p)
// 否则说明发生的是某个客户端连接上的可读事件
} else if operator.Inputs != nil {
// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
// 此处是从LinkBuffer中分配出一块空闲内存
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
// 读取数据到bs缓存区中
var n, err = ioread(operator.FD, bs, p.barriers[i].ivs)
// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
operator.InputAck(n)
...
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
// 其他感兴趣事件的触发此处暂时不展开
...
}
本节介绍一下netpoll为Linux下的epoll系统调用封装的API接口:
Linux 底层的epoll系统调用由红黑树实现,netpoll 给红黑树上每个节点都关联一个epollevent类型,该类型由一个事件位图和用户数据指针组成:
// sys_epoll_linux_arm64.go
type epollevent struct {
events uint32 // events:表示要监听的事件类型,如可读、可写等。这是一个位掩码,可以设置多个事件类型,例如 EPOLLIN 表示可读事件,EPOLLOUT 表示可写事件。
_ int32
data [8]byte // 可以携带用户数据。这里的用户数据通常是一个指针,指向与文件描述符关联的对象或其他相关数据。
}
netpoll 还提供了对epoll对象创建,感兴趣事件监听,等待感兴趣事件发生等操作的API封装:
func EpollCreate(flag int) (fd int, err error) {
var r0 uintptr
// 执行epoll_create系统调用
r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0)
if err == syscall.Errno(0) {
err = nil
}
return int(r0), err
}
func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) {
_, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
if err == syscall.Errno(0) {
err = nil
}
return err
}
func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {
var r0 uintptr
var _p0 = unsafe.Pointer(&events[0])
if msec == 0 {
r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0)
} else {
r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0)
}
if err == syscall.Errno(0) {
err = nil
}
return int(r0), err
}
关于注册感兴趣的事件,netpoll在此基础之上又封装了一层:
// Control implements Poll.
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
var op int
// TODO: evt.data = operator
var evt epollevent
// 将epollevent对象的data指针指向传入的FDOperator对象
p.setOperator(unsafe.Pointer(&evt.data), operator)
// 根据监听的事件类型,更新事件位图
switch event {
case PollReadable: // server accept a new connection and wait read
operator.inuse()
op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollWritable: // client create a new connection and wait connect finished
operator.inuse()
op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollDetach: // deregister
p.delOperator(operator)
op, evt.events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollR2RW: // connection wait read/write
op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollRW2R: // connection wait read
op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
}
// 完成监听事件信息注册
return EpollCtl(p.fd, op, operator.FD, &evt)
}
从Control函数中可以看出来,netpoll会在epollevent的data字段中保存监听的fd对象信息,这里fd对象是netpoll经过封装后的FDOperator对象。
FDOperator对象中又保存了对当前fd对象的读写API封装。
可读事件有两类,一种是accept事件,另一种是readable事件:
accept事件针对的是server端
下面给出整个读事件处理的流程图,大家可以时不时回看本图:
服务提供方server在启动时,会创建一个新的server端套接字,然后在该套接字上打开并监听对应的端口,随后向poll manager获取一个空闲poller对象 , 并在该对象上监听server端套接字的可读事件,这里实际是客户端的accept事件:
netpoll server 方代码模版写法如下:
// 1. OnRequest: 有可读数据时回调该接口
var OnRequest = func(ctx context.Context, connection netpoll.Connection) error { return nil }
// 2. OnPrepare: 客户端连接建立完毕后,回调该接口
var OnPrepare = func(connection netpoll.Connection) context.Context { return nil }
func main() {
// 1. 建立连接
listen, err := net.Listen("tcp", ":1234")
if err != nil {
return
}
// 2. 创建eventLoop
eventLoop, _ := netpoll.NewEventLoop(OnRequest, netpoll.WithOnPrepare(OnPrepare), netpoll.WithReadTimeout(time.Second))
// 3. 启动服务
eventLoop.Serve(listen)
}
eventLoop的Serve方法会创建一个新的Server对象,并启动netpoll服务端:
// Serve implements EventLoop.
func (evl *eventLoop) Serve(ln net.Listener) error {
// 将原生的listener对象转换为netpoll包装后的Listener对象
npln, err := ConvertListener(ln)
if err != nil {
return err
}
evl.Lock()
// 创建新的server对象
evl.svr = newServer(npln,
// opts对象保存了相关事件回调
evl.opts,
// 退出事件回调监听函数
evl.quit)
// 启动回调
evl.svr.Run()
evl.Unlock()
// 监听到停止信号后,从此处返回
err = evl.waitQuit()
// ensure evl will not be finalized until Serve returns
runtime.SetFinalizer(evl, nil)
return err
}
创建完server对象后,会调用server对象的Run方法启动服务:
// Run this server.
func (s *server) Run() (err error) {
// 当前FDOperator对象封装的是server socket套接字对象
s.operator = FDOperator{
FD: s.ln.Fd(), // 服务端Socket监听器
OnRead: s.OnRead, // 可读事件发生
OnHup: s.OnHup, // 挂断事件发生
}
// 挑选一个空闲的多路复用器
s.operator.poll = pollmanager.Pick()
// 监听服务端套接字上的可读事件
err = s.operator.Control(PollReadable)
if err != nil {
// 错误退出时,回调该方法
s.onQuit(err)
}
return err
}
netpoll 在对server socket执行事件注册时,会设置FDOperator的OnRead接口,用于处理服务端套接字上的可读事件。
netpoll 也是通过FDOperator的OnRead接口是否为nil来判断当前发生的事件是accept还是readable事件。
只有server端启动时才会对服务端套接字设置OnRead回调接口,client端是不会设置的。
在defaultPoll的handler函数中,我们暂时只关心读事件是如何被处理的,而关于可读事件,本节我们来看看客户端accept事件是如何处理的:
// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
// 遍历所有感兴趣的事件
for i := range events {
// epollevent.data保存的是与之关联的FDOperator对象
operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
...
var totalRead int
// 判断当前发生了什么事件
evt := events[i].events
triggerRead = evt&syscall.EPOLLIN != 0
triggerWrite = evt&syscall.EPOLLOUT != 0
triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
triggerError = evt&syscall.EPOLLERR != 0
...
// 发生了可读事件
if triggerRead {
// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
if operator.OnRead != nil {
// 调用OnRead来接收并处理客户端连接
operator.OnRead(p)
// 否则说明发生的是某个客户端连接上的可读事件
} else if operator.Inputs != nil {
// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
// 此处是从LinkBuffer中分配出一块空闲内存
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
// 读取数据到bs缓存区中
var n, err = ioread(operator.FD, bs, p.barriers[i].ivs)
// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
operator.InputAck(n)
...
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
// 其他感兴趣事件的触发此处暂时不展开
...
}
回顾上面给出的handler函数可知,netpoll会依次遍历感兴趣的事件集合中每个事件,然后获取与当前事件绑定的FDOperator对象;首先判断当前发生的是否死可读事件,再根据FDOperator的OnRead接口是否为空,来判断发生的是accept事件,还是readable事件。
在server启动一节我们已经知道了,如果FDOperator的OnRead接口不为空,那么说明发生的是客户端的accept事件,此时会调用FDOperator的OnRead回调来处理客户端的连接事件;此处实际调用的是server的OnRead的方法;
// OnRead implements FDOperator.
func (s *server) OnRead(p Poll) error {
// 获取客户端连接
conn, err := s.ln.Accept()
...
// 包装一下原生的conn连接
var connection = &connection{}
// 初始化一下连接
connection.init(conn.(Conn),
// 初始化完毕后,回调用户注册进来的prepare接口
s.opts)
// 连接不活跃,直接返回
if !connection.IsActive() {
return nil
}
// 返回客户端连接套接字对应的文件描述符
var fd = conn.(Conn).Fd()
// 添加关闭回调接口 --- netpoll回调接口这里采用的是回调链的形式,可以添加多个回调接口
connection.AddCloseCallback(func(connection Connection) error {
// 当前连接关闭时,将自己从server连接集合中移除
s.connections.Delete(fd)
return nil
})
// 在server对象中保存 < fd , 已打开连接 >
s.connections.Store(fd, connection)
// 调用连接建立接口
connection.onConnect()
return nil
}
处理客户端accept事件的过程主要分为三步:
server.OnRead函数中调用的connection.init函数主要是用来为当前连接初始化相关数据结构,回调接口,以及在poll上注册对当前connection可读事件的监听
// init initialize the connection with options
func (c *connection) init(conn Conn, opts *options) (err error) {
// init buffer, barrier, finalizer
c.readTrigger = make(chan error, 1)
c.writeTrigger = make(chan error, 1)
// 初始化LinkBuffer相关数据结构
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.outputBarrier = barrierPool.Get().(*barrier) // 用于聚集读写的缓冲区
// 初始化
c.initNetFD(conn) // 确保conn是被netpoll包装后的netFD类型
c.initFDOperator() // 初始化FDOperator
c.initFinalizer() // 添加close回调函数
// 将客户端连接套接字设置为非阻塞模式
syscall.SetNonblock(c.fd, true)
// enable TCP_NODELAY by default
switch c.network {
case "tcp", "tcp4", "tcp6":
// 禁用 Nagle 算法
setTCPNoDelay(c.fd, true)
}
// 启用零拷贝传输的 TCP Socket 选项 和 阻塞超时时间
if setZeroCopy(c.fd) == nil && setBlockZeroCopySend(c.fd, defaultZeroCopyTimeoutSec, 0) == nil {
c.supportZeroCopy = true
}
// connection initialized and prepare options
// 设置相关回调接口,在poll上注册对当前connection可读事件的监听
return c.onPrepare(opts)
}
netpoll 里面为原生的Listener,Connection,Epoll,Fd等对象都进行了一层自己的封装,initNetFD函数便是对原生客户端套接字文件描述符的封装:
func (c *connection) initNetFD(conn Conn) {
if nfd, ok := conn.(*netFD); ok {
c.netFD = *nfd
return
}
c.netFD = netFD{
fd: conn.Fd(),
localAddr: conn.LocalAddr(),
remoteAddr: conn.RemoteAddr(),
}
}
FDOperator 是对需要注册在epoll上进行监听的fd的封装,其是netpoll中的一个核心对象,内部持有被监听的fd和poll对象,同时对外提供fd数据读写回调接口 , 当fd上发生可读可写事件时,便会回调FDOperator上注册好的回调接口进行处理:
func (c *connection) initFDOperator() {
// 通过负载均衡器挑选一个可用的poll
poll := pollmanager.Pick()
// 从opcache中分配一个可用的poll对象
op := poll.Alloc()
// 拿到当前客户端连接对应的socket文件描述符
op.FD = c.fd
// 回调接口初始化 -- 注意OnRead回调被设置为了nil
op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup
op.Inputs, op.InputAck = c.inputs, c.inputAck
op.Outputs, op.OutputAck = c.outputs, c.outputAck
c.operator = op
}
connection 的 onPrepare 函数主要用来将用户提供的相关回调接口设置到当前connection对象上,以及相关读写超时参数等;如果用户提供了OnPrepare接口,此处会进行回调通知。
该函数最后会在当前poll上注册对当前客户端connection的读事件监听:
// onPrepare supports close connection, but not read/write data.
// connection will be registered by this call after preparing.
func (c *connection) onPrepare(opts *options) (err error) {
if opts != nil {
// 将用户通过options设置的回调接口都赋值给当前accept得到的客户端连接
c.SetOnConnect(opts.onConnect)
c.SetOnRequest(opts.onRequest)
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
c.SetIdleTimeout(opts.idleTimeout)
// calling prepare first and then register.
// 如果我们指定了onPrepare回调,此处会执行回调
if opts.onPrepare != nil {
c.ctx = opts.onPrepare(c)
}
}
// 初始化连接上下文
if c.ctx == nil {
c.ctx = context.Background()
}
// prepare may close the connection.
if c.IsActive() {
// 在当前poll上注册对当前客户端connection的读事件监听
return c.register()
}
return nil
}
connection的register函数负责在poll上注册对当前客户端connection的读事件监听:
// register only use for connection register into poll.
func (c *connection) register() (err error) {
err = c.operator.Control(PollReadable)
...
return nil
}
当accept得到的客户端连接初始化完毕后,会调用onConnect函数对客户端连接进行任务包装,然后提交到协程池执行任务:
// onConnect is responsible for executing onRequest if there is new data coming after onConnect callback finished.
func (c *connection) onConnect() {
// 获取用户设置的OnConnect回调和OnRequest回调接口 --- 如果没有设置OnConnect回调,此处直接返回
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
if onConnect == nil {
return
}
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
var connected int32
c.onProcess(
// 第一个回调函数用于判断当前是否连接此刻是否可被处理
func(c *connection) bool {
// 在当前客户端连接初始化完毕后,会在onConnect函数中回调一次客户端提供的OnConnect接口
// 此处通过标记确保只会调用一次OnConnect函数
if atomic.LoadInt32(&connected) == 0 {
return true
}
// check for onRequest
return onRequest != nil &&
// 存在可读数据
c.Reader().Len() > 0
},
// 第二个回调函数会在第一个回调函数返回true的前提下,进行处理
func(c *connection) {
// 回调OnConnect函数
if atomic.CompareAndSwapInt32(&connected, 0, 1) {
c.ctx = onConnect(c.ctx, c)
return
}
// 处理可读数据,回调用户提供的回调函数
if onRequest != nil {
_ = onRequest(c.ctx, c)
}
},
)
}
onProcess 函数内部负责实现一套模版方法,用于不断轮询连接状态,如果可处理,则调用执行处理,直到接收到停止信号或连接不可处理时,才会退出循环:
// onProcess is responsible for executing the process function serially,
// and make sure the connection has been closed correctly if user call c.Close() in process function.
func (c *connection) onProcess(isProcessable func(c *connection) bool, process func(c *connection)) (processed bool) {
...
// 准备任务
var task = func() {
// 如果当前任务可执行,确保至少被执行过一次
if isProcessable(c) {
process(c)
}
// 死循环处理任务,直到接收到关闭信号或者任务不再可处理
var closedBy who
for {
closedBy = c.status(closing)
// close by user or no processable
if closedBy == user || !isProcessable(c) {
break
}
process(c)
}
...
return
}
// 异步跑这个任务 --- gopool.CtxGo 字节开源的协程池
runTask(c.ctx, task)
return true
}
但是这里要注意的是,如果连接上一段时间都没有可读数据,那么与当前连接绑定的协程在发现无数据可读时,会退出返回,也就是说当前协程就与当前连接解绑,并重新放回了协程池中。
大家要注意此处netpoll的实现思路:
netpoll 通过一个单独的协程来监听fd上的可读可写事件,当监听到可读可写事件时,不是在当前协程内进行同步处理,而是将可读可写事件包装为一个任务,然后从协程池中取出一个空闲协程进行处理,这是典型的Reactor模式实现思路。
当netpoll accept到一个连接后,会从poller池中挑选一个空闲poll,然后在当前poll上执行对当前conn可读事件的监听。
后续当conn上发生可读事件时,便会被与该conn绑定的poll感知到,然后通过判断FDOperator的OnRead接口为nil,知道当前发生的是可读事件,而非accept事件。
此时我们再来回看defaultPoll的handler,看看当发生可读事件时,netpoll是如何处理的:
// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
// 遍历所有感兴趣的事件
for i := range events {
// epollevent.data保存的是与之关联的FDOperator对象
operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
...
var totalRead int
// 判断当前发生了什么事件
evt := events[i].events
triggerRead = evt&syscall.EPOLLIN != 0
triggerWrite = evt&syscall.EPOLLOUT != 0
triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
triggerError = evt&syscall.EPOLLERR != 0
...
// 发生了可读事件
if triggerRead {
// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
if operator.OnRead != nil {
// 调用OnRead来接收并处理客户端连接
operator.OnRead(p)
// 否则说明发生的是某个客户端连接上的可读事件
} else if operator.Inputs != nil {
// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
// 此处是从LinkBuffer中分配出一块空闲内存
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
// 读取数据到bs缓存区中
var n, err = ioread(operator.FD, bs, p.barriers[i].ivs)
// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
operator.InputAck(n)
...
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
// 其他感兴趣事件的触发此处暂时不展开
...
}
关于LinkBuffer的源码解析本文就不过多展开了,感兴趣的小伙伴可以阅读我之前写的这篇文章:
FDOperator的Inputs和InputAck回调接口都是在客户端连接初始化时,在initFDOperator方法中被设置的:
func (c *connection) initFDOperator() {
...
op.Inputs, op.InputAck = c.inputs, c.inputAck
op.Outputs, op.OutputAck = c.outputs, c.outputAck
...
}
connection的inputs函数就是调用linkbuffer提供的book方法预定一块内存用于接收socket缓冲区中的可读数据:
// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
return vs[:1]
}
inputAck则复杂一些,首先会调用linkbuffer的bookAck函数完成预留内存的提交,这样已经从socket缓冲区写入linkbuffer的数据就对用户可见了:
// inputAck implements FDOperator.
func (c *connection) inputAck(n int) (err error) {
...
// 提交预留内存,提交后,用户便可以读取这部分内存数据了
length, _ := c.inputBuffer.bookAck(n)
...
var needTrigger = true
// 从协程池中取出一个空闲协程来处理当前连接上的可读数据
if length == n { // first start onRequest
needTrigger = c.onRequest() // 返回值表示是否读取完毕了所有需要的数据, 如果返回false,说明读完了,否则说明没有读完
}
// 单开协程处理客户端连接上的可读数据时,可能在回调用户OnRequest接口时,调用读数据接口从而阻塞等待数据准备就绪
// 此处当数据就绪时,会唤醒对应的协程
if needTrigger && length >= int(atomic.LoadInt64(&c.waitReadSize)) {
c.triggerRead(nil)
}
return nil
}
此处调用connection的onRequest方法,并非直接就是调用的用户提供的回调接口,而是和OnConnect方法一样,创建一个读数据任务去处理当前连接上的可读数据:
// onRequest is responsible for executing the closeCallbacks after the connection has been closed.
func (c *connection) onRequest() (needTrigger bool) {
// 加载用户设置的回调接口
var onRequest, ok = c.onRequestCallback.Load().(OnRequest)
if !ok {
return true
}
// 处理请求
processed := c.onProcess(
// 第一个回调函数用于判断当前连接是否活跃并且还有未读取数据
func(c *connection) bool {
return c.Reader().Len() > 0
},
// 第二个回调才是真正将请求交给用户回调来处理
func(c *connection) {
_ = onRequest(c.ctx, c)
},
)
// if not processed, should trigger read
return !processed
}
connection的onProcess方法上文已经说过,就是从协程池中捞取一个空闲协程来处理当前连接上的可读数据;
如果当前连接上一直有数据可读,便会一直处理,如果当前协程上没有数据可读了,协程便会被释放,重新返回池中。
上文说到,当poll线程监听到可读可写数据的时候,会单开一个线程去处理当前连接上的可读可写数据;如果此时发生的是可读事件,那么最终会回调到用户提供的OnRequest接口。
而用户可以在OnRequest接口中去调用connection相关读API去读取数据:
// ReadString implements Connection.
func (c *connection) ReadString(n int) (s string, err error) {
if err = c.waitRead(n); err != nil {
return s, err
}
return c.inputBuffer.ReadString(n)
}
// ReadBinary implements Connection.
func (c *connection) ReadBinary(n int) (p []byte, err error) {
if err = c.waitRead(n); err != nil {
return p, err
}
return c.inputBuffer.ReadBinary(n)
}
// Next implements Connection.
func (c *connection) Next(n int) (p []byte, err error) {
if err = c.waitRead(n); err != nil {
return p, err
}
return c.inputBuffer.Next(n)
}
这些读API在方法开头都会调用waitRead等待所读数据量就绪后或者读超时后,才会进行数据读取或者超时返回:
// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
// 如果当前可读数据大于需要的了,直接返回,无需等待
if n <= c.inputBuffer.Len() {
return nil
}
// 存储自己希望读取的数据量
atomic.StoreInt64(&c.waitReadSize, int64(n))
// 返回时,清空变量
defer atomic.StoreInt64(&c.waitReadSize, 0)
// 如果设置了读超时属性,就有限期等待,直到数据就绪
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
}
// wait full n
// 否则无限期等待,直到数据准备就绪
for c.inputBuffer.Len() < n {
switch c.status(closing) {
case poller:
return Exception(ErrEOF, "wait read")
case user:
return Exception(ErrConnClosed, "wait read")
default:
// 等待接收数据就绪的信号
err = <-c.readTrigger
if err != nil {
return err
}
}
}
return nil
}
// waitReadWithTimeout will wait full n bytes or until timeout.
// 有限期等待
func (c *connection) waitReadWithTimeout(n int) (err error) {
// set read timeout
if c.readTimer == nil {
c.readTimer = time.NewTimer(c.readTimeout)
} else {
c.readTimer.Reset(c.readTimeout)
}
for c.inputBuffer.Len() < n {
switch c.status(closing) {
case poller:
// cannot return directly, stop timer first!
err = Exception(ErrEOF, "wait read")
goto RET
case user:
// cannot return directly, stop timer first!
err = Exception(ErrConnClosed, "wait read")
goto RET
default:
select {
case <-c.readTimer.C:
// double check if there is enough data to be read
if c.inputBuffer.Len() >= n {
return nil
}
return Exception(ErrReadTimeout, c.remoteAddr.String())
case err = <-c.readTrigger:
if err != nil {
return err
}
continue
}
}
}
RET:
// clean timer.C
if !c.readTimer.Stop() {
<-c.readTimer.C
}
return err
}
可写事件有两类,一种是client端socket套接字可写事件,另一种是server端socket套接字可写事件:
注意区分server socket和socket套接字的区别 , 前者是server端启动绑定并监听的套接字,用于accept客户端连接,后者是accept得到的客户端socket连接套接字 和 客户端connect 服务端成功后得到的 socket套接字。
下面还是给出一幅写数据流程图:
客户端代码典型写法如下:
func main() {
// 1. 建立连接
dialer := netpoll.NewDialer()
conn, _ := dialer.DialConnection("tcp", ":1234", time.Second)
var reader, writer = conn.Reader(), conn.Writer()
// 2. 写数据
write_data := []byte("hello world")
alloc, _ := writer.Malloc(len(write_data))
copy(alloc, write_data) // write data
writer.Flush()
// 3. 读数据
buf, _ := reader.Next(reader.Len())
fmt.Println("服务端响应的数据:" + string(buf))
reader.Release()
}
我们下面来看一下客户端启动过程:
func (d *dialer) DialConnection(network, address string, timeout time.Duration) (connection Connection, err error) {
...
switch network {
case "tcp", "tcp4", "tcp6":
// 走tcp连接
return d.dialTCP(ctx, network, address)
...
}
}
func (d *dialer) dialTCP(ctx context.Context, network, address string) (connection *TCPConnection, err error) {
...
connection, err = DialTCP(ctx, "tcp", nil, tcpAddr)
...
return nil, firstErr
}
func DialTCP(ctx context.Context, network string, laddr, raddr *TCPAddr) (*TCPConnection, error) {
...
sd := &sysDialer{network: network, address: raddr.String()}
c, err := sd.dialTCP(ctx, laddr, raddr)
...
return c, nil
}
func (sd *sysDialer) dialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConnection, error) {
conn, err := internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, 0, "dial")
...
return newTCPConnection(conn)
}
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (conn *netFD, err error) {
...
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr)
}
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (netfd *netFD, err error) {
// syscall.Socket & set socket options
var fd int
// 创建客户端socket对象,同时设置为非阻塞模式
fd, err = sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
...
// 包装客户端socket fd 为netFD
netfd = newNetFD(fd, family, sotype, net)
// 建立与server的连接
err = netfd.dial(ctx, laddr, raddr)
...
return netfd, nil
}
func (c *netFD) dial(ctx context.Context, laddr, raddr sockaddr) (err error) {
...
// 连接server
if crsa, err = c.connect(ctx, lsa, rsa); err != nil {
return err
}
...
return nil
}
func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, retErr error) {
// 系统调用connect,连接server
syscall.Connect(c.fd, ra)
...
// 为当前client socket创建一个新的FDOperator
c.pd = newPollDesc(c.fd)
for {
// 等待直到当前client socket可写为止
if err := c.pd.WaitWrite(ctx); err != nil {
return nil, err
}
switch err := syscall.Errno(nerr); err {
...
// 如果没有错误发生,直接返回Sockaddr
case syscall.Errno(0):
// The runtime poller can wake us up spuriously;
// see issues 14548 and 19289. Check that we are
// really connected; if not, wait again.
if rsa, err := syscall.Getpeername(c.fd); err == nil {
return rsa, nil
}
...
}
}
}
func newPollDesc(fd int) *pollDesc {
pd := &pollDesc{}
poll := pollmanager.Pick()
pd.operator = &FDOperator{
poll: poll,
FD: fd,
OnWrite: pd.onwrite, // 设置OnWrite回调接口
OnHup: pd.onhup,
}
pd.writeTrigger = make(chan struct{})
pd.closeTrigger = make(chan struct{})
return pd
}
// WaitWrite .
func (pd *pollDesc) WaitWrite(ctx context.Context) (err error) {
if pd.operator.isUnused() {
// add ET|Write|Hup
// 在当前连接绑定的poll上注册等待可写事件
if err = pd.operator.Control(PollWritable); err != nil {
logger.Printf("NETPOLL: pollDesc register operator failed: %v", err)
return err
}
}
// 等待直到接收到终止信号或者可写事件(当前client socket缓冲区为空)
select {
case <-pd.closeTrigger: // triggered by poller
// no need to detach, since poller has done it in OnHup.
return Exception(ErrConnClosed, "by peer")
case <-pd.writeTrigger: // triggered by poller
err = nil
case <-ctx.Done(): // triggered by ctx
pd.detach()
pd.operator.unused()
err = mapErr(ctx.Err())
}
// double check close trigger
// 如果没有接收到停止信号,此处就直接返回
select {
case <-pd.closeTrigger:
return Exception(ErrConnClosed, "by peer")
default:
return err
}
}
当我们需要写数据时,通常都会先调用connection的malloc方法分配一块写缓冲区:
func (c *connection) Malloc(n int) (buf []byte, err error) {
return c.outputBuffer.Malloc(n)
}
然后调用connection的flush方法刷新写缓冲区中的数据:
func (c *connection) Flush() error {
...
// 刷新写缓冲区中的数据,让其对外可见
c.outputBuffer.Flush()
// 将数据写入内核socket缓冲区
return c.flush()
}
最终调用connection的flush方法,将linkbuffer中的数据写入socket内核缓冲区中:
func (c *connection) flush() error {
...
// netpoll采用聚集写,所以第一步是将写缓冲区中的数据都读取到写缓冲区向量数组中
var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
// 将数据都写入内核socket缓冲区中
var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
...
// 如果写入了部分数据,则释放掉这部分内存中间
if n > 0 {
err = c.outputBuffer.Skip(n)
c.outputBuffer.Release()
if err != nil {
return Exception(err, "when flush")
}
}
// 如果所有数据都已经成功写入内核socket缓冲区中,则直接返回
if c.outputBuffer.IsEmpty() {
return nil
}
// 可能是因为socket缓冲区满了,导致还有一部分数据没写完
// 此处注册对可写事件的监听
err = c.operator.Control(PollR2RW)
...
// 等待直到可写才会返回
return c.waitFlush()
}
如果socket内核缓冲区被写满了,则进行等待,具体是进行无限期等待,还是有限期等待,取决于我们是否设置了写超时时间:
func (c *connection) waitFlush() (err error) {
// 如果我们没有设置写超时事件,则进行无限期等待
if c.writeTimeout == 0 {
select {
case err = <-c.writeTrigger:
}
return err
}
// 如果我们设置了写超时事件,则执行有限期等待
if c.writeTimer == nil {
c.writeTimer = time.NewTimer(c.writeTimeout)
} else {
c.writeTimer.Reset(c.writeTimeout)
}
select {
case err = <-c.writeTrigger:
if !c.writeTimer.Stop() { // clean timer
<-c.writeTimer.C
}
return err
case <-c.writeTimer.C:
select {
// try fetch writeTrigger if both cases fires
case err = <-c.writeTrigger:
return err
default:
}
// if timeout, remove write event from poller
// we cannot flush it again, since we don't if the poller is still process outputBuffer
c.operator.Control(PollRW2R)
return Exception(ErrWriteTimeout, c.remoteAddr.String())
}
}
可写事件分为两类,一类是客户端socket可写,一类是服务端socket可写,本节我们来分别看看这两类可写事件都是如何处理的:
// 当感兴趣事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
// 遍历所有感兴趣的事件
for i := range events {
...
// 触发写事件
if triggerWrite {
// 处理client socket可写事件
if operator.OnWrite != nil {
operator.OnWrite(p)
} else if operator.Outputs != nil {
// 处理服务端socket可写事件
var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs)
if len(bs) > 0 {
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
operator.OutputAck(n)
if err != nil {
p.appendHup(operator)
continue
}
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
operator.done()
}
...
}
当客户端socket可写事件发生时,也就是客户端socket内核缓冲区有空闲空间可写时,会调用FDOperator的onwrite回调方法进行处理。
onwrite回调中会向writeTrigger通道写入消息,唤醒阻塞等待可写事件的线程:
func (pd *pollDesc) onwrite(p Poll) error {
select {
case <-pd.writeTrigger:
default:
pd.detach()
close(pd.writeTrigger)
}
return nil
}
当服务端socket可写事件发生时,也就是在server accept到客户端连接后,发现客户端连接对应的socket可写时,会经历下面三步:
// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
// 如果写缓冲区为空
if c.outputBuffer.IsEmpty() {
// 移除对当前fd上可写事件的监听
c.rw2r()
return rs, c.supportZeroCopy
}
// 读取数据到rs中
rs = c.outputBuffer.GetBytes(vs)
return rs, c.supportZeroCopy
}
// 不再监听当前FD上的可写事件
func (c *connection) rw2r() {
c.operator.Control(PollRW2R)
c.triggerWrite(nil) // 唤醒等到可写事件的线程
}
var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
3,释放掉已经用完的写缓冲区空间,同时移除对当前fd上可写事件的监听
func (c *connection) outputAck(n int) (err error) {
// 将已经用完的部分内存回收掉
if n > 0 {
c.outputBuffer.Skip(n)
c.outputBuffer.Release()
}
// 如果此时发现所有待写入数据都写入完毕了,那么就移除对当前fd上可写事件的监听
if c.outputBuffer.IsEmpty() {
c.rw2r()
}
return nil
}