服务端中肯定会有进行监听的。这里先创建一个空的结构体Server。
其Accept方法是进行监听,并与客户端进行连接后, 开启新协程异步去处理ServeConn。
//server.go文件
type Server struct{}
func NewServer() *Server {
return &Server{}
}
var DefaultServer = NewServer()
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Println("rpc server: accept error:", err)
return
}
// 拿到客户端的连接, 开启新协程异步去处理.
go server.ServeConn(conn)
}
}
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
如果想启动服务,过程是非常简单的,传入 listener 即可,tcp 协议和 unix 协议都支持。
lis, _ := net.Listen("tcp", "localhost:10000")
geerpc.Accept(lis)
监听请求之后就是处理信息。那处理消息中要先了解信息的格式以及信息的编解码方式。
对于消息(request和response)的定义,?我们可以简单定为消息头(header)+内容体(body)。
我们定义头部结构体。
//codec.go文件
type Header struct {
ServiceMethod string // format "Service.Method"
Seq uint64 // sequence number chosen by client
Error string
}
编解码方式就有很多种,Go中默认使用的是gob。(gob编码方式为go语言专用的编码方式,?无法跨语言使用)。而要是使用json来进行编解码呢,所以我们抽象出对消息体进行编解码的接口 Codec,抽象出接口是为了实现不同的 Codec 实例来编解码。
编解码Codec实例中就应该有读取头部,读取body,写回复这三个方法。
//codec.go文件
type Code interface {
ReadHeader(*Header) error
ReadBody(any) error
WriteResponse(*Header, any) error
Close() error //关闭连接
}
实现gob编码方式
定义GobCodec
?结构体,一个编解码器需要有的结构
//gob.go文件
type GobCodec struct {
conn io.ReadWriteCloser
buf *bufio.Writer
dec *gob.Decoder
enc *gob.Encoder
}
//使用与客户端的socket连接初始化编解码器。
//dec: gob.NewDecoder(conn)使得解码时从连接中获取数据;
//enc: gob.NewEncoder(buf)编码器需要缓冲区,且该缓冲区的底层io流应该为与客户端的连接。
func NewGobCodec(conn io.ReadWriteCloser) Codec {
buf := bufio.NewWriter(conn)
return &GobCodec{
conn: conn,
buf: buf,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
}
}
?接着实现?Close方法
和接口的ReadHeader
、ReadBody
、WriteResponse
方法。
ReadHeader和WriteResponse分别是用该实例的解码器进行解码,编码器编码回复。
//gob.go文件
func (c *GobCodec) ReadHeader(h *Header) error {
return c.dec.Decode(h)
}
func (c *GobCodec) ReadBody(body any) error {
return c.dec.Decode(body)
}
func (c *GobCodec) WriteResponse(h *Header, body any) (err error) {
defer func() {
c.buf.Flush()
if err != nil {
c.Close()
}
}()
if err := c.enc.Encode(h); err != nil {
log.Println("rpc codec: gob error encoding header:", err)
return err
}
if err := c.enc.Encode(body); err != nil {
log.Println("rpc codec: gob error encoding body:", err)
return err
}
return nil
}
func (c *GobCodec) Close() error {
return c.conn.Close()
}
那接着我们定义字符串来表示哪个是使用gob编解码的。
//codec.go文件
type CodeType string
const (
GobType CodeType = "application/gob"
JsonType CodeType = "application/json" // not implemented
)
type NewCodecFunc func(io.ReadWriteCloser) Codec
var NewCodeFuncMap map[CodeType]NewCodecFunc
func init() {
NewCodeFuncMap = make(map[CodeType]NewCodecFunc)
NewCodeFuncMap[GobType] = NewGobCodec
}
?对外提供NewCodeFuncMap ,客户端和服务端可以通过 Codec 的?CodeType
得到构造函数,从而创建 Codec 实例
实现了GobCodec后,那目前对于该rpc,需要协商的唯一一项内容是消息的编解码方式。我们将这部分信息,放到结构体?Option
?中承载。
//server.go文件
const MagicNumber = 0x3b3f5c
type Option struct {
MagicNumber int // MagicNumber marks this's a geerpc request
CodecType codec.Type // client may choose different Codec to encode body
}
var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
}
一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上简单点,GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 Option 的 CodeType 解码剩余的内容。即报文将以这样的形式发送:
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|
?但要注意的是:在一次连接中,Option 固定在报文的最开始位置,Header 和 Body 可以有多个,即报文可能是这样的。
| Option | Header1 | Body1 | Header2 | Body2 | ...
定义好消息格式和编解码方式后,就回到Server的Accpet方法中。其开启新协程去处理客户的请求(ServeConn方法)。
那该方法中肯定就需要先通过客户发送的信息获取到编解码方式,即是使用?json.NewDecoder
?反序列化得到 Option 实例,检查 MagicNumber 和 CodeType 的值是否正确。然后根据 CodeType 得到对应的消息编解码器。
//server.go文件
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer conn.Close()
var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
log.Println("rpc server: options error: ", err)
return
}
if opt.MagicNumber != MagicNumber {
log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
return
}
//目前只实现了gob编解码
f := codec.NewCodeFuncMap[opt.CodecType]
if f == nil {
log.Printf("rpc server: invalid codec type %s", opt.CodecType)
return
}
server.servCode(f(conn))
}
通过server.ServeCodec方法处理请求,主要分为解析请求信息(readRequest)和处理请求(handleRequest)两步。
//server.go文件
func (server *Server) servCode(cc codec.Codec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
req, err := server.readRequest(cc) //读取请求
if err != nil {
if req == nil {
break
}
req.h.Error = err.Error()
//发送解析请求信息出错的响应信息 invalidRequest = struct{}{}
server.sendResponse(cc, req.h, invalidRequest, sending)
continue
}
wg.Add(1)
go server.handleRequest(cc, req, sending, wg)//处理请求
}
wg.Wait()
cc.Close()
}
建立的连接是长连接,轮询读取连接中的数据并异步处理。因此这里使用了 for 无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要讲解几点:
?可能会有疑惑,sending,wg变量为什么都是new出来的,不能用栈空间变量吗?首先sync.Mutex,sync.WaitGroup都是值类型,互斥锁是不允许copy的,值传递的话,就会进行拷贝,会出错。WaitGroup也相似,其也是值类型,要是不使用指针传递的话,那函数内部调用的wg根本就不是同一个对象,会导致死锁的。
sync.Mutex和sync.WaitGroup作为函数参数,要想其是同一个对象,那就都需要传递其指针,才不会进行拷贝。
这里再抽象出一个结构体request,其存储请求的所有信息
?readRequest
其主要是先解码header,再解码body。
readRequest方法中的cc.ReadBody(&req.requestData),其参数是指针,是因为gob解码需要是指针。可以试试使用req.requestData去测试运行时有什么错误。
//server.go文件
type request struct {
h *codec.Header
// argv, replyv reflect.Value
requestData uint64 //请求的body数据
replyData string //返回给用户的data
}
func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
var h codec.Header
if err := cc.ReadHeader(&h); err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println("rpc server: read header error:", err)
}
return nil, err
}
return &h, nil
}
func (server *Server) readRequest(cc codec.Codec) (*request, error) {
h, err := server.readRequestHeader(cc)
req := &request{h: h}
// TODO: now we don't know the type of request argv
//这一章节,我们只能处理用户发送过来的uint64类型的数据
cc.ReadBody(&req.requestData)
return req, nil
}
?handleRequest和sendResponse
sendResponse先加锁,之后调用编解码器的WriteResponse方法。
handleRequest就是处理信息,跟着调用sendResponse方法发送信息给客户端。
//server.go文件
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body any, sending *sync.Mutex) {
sending.Lock()
defer sending.Unlock()
if err := cc.WriteResponse(h, body); err != nil {
log.Println("rpc server: write response error:", err)
}
}
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()
log.Println("handleRequest ", req.h, req.requestData)
// req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))
// server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
req.replyData = fmt.Sprintf(" ok my resp %d", req.h.Seq)
server.sendResponse(cc, req.h, &req.replyData, sending)
}
目前是还不能判断 body 的类型,后序会实现的。
这要说回结构体request中的一些变量,原教程的是request中是使用了反射类型变量的,但我感觉这一节使用反射,可能读起来会有点疑惑,难理解。所以这里就使用具体的类型来处理,在这一节会好理解点,后序会再添加上反射reflect。
在这里我们就实现了一个消息的编解码器?GobCodec
,并且客户端与服务端实现了简单的协议交换(protocol exchange),即允许客户端使用不同的编码方式。同时实现了服务端的雏形,建立连接,读取、处理并回复客户端的请求。
接下来,我们就在 main 函数中看看如何使用刚实现的 RPC。
func startServer(addr chan string) {
l, err := net.Listen("tcp", "localhost:10000")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", l.Addr())
addr <- l.Addr().String()
geerpc.Accept(l)
}
func main() {
addr := make(chan string)
go startServer(addr)
// in fact, following code is like a simple geerpc client
conn, _ := net.Dial("tcp", <-addr)
defer conn.Close()
// send options
_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)
cc := codec.NewGobCodec(conn)
// send request & receive response
for i := 0; i < 3; i++ {
h := &codec.Header{
ServiceMethod: "Foo.Sum",
Seq: uint64(i),
}
cc.WriteResponse(h, h.Seq)
cc.ReadHeader(h)
var reply string
cc.ReadBody(&reply)
log.Println("reply:", reply)
}
}
startServer
?中使用了信道?addr
,确保服务端端口监听成功,客户端再发起请求。Option
?进行协议交换,接下来发送消息头?h := &codec.Header{}
,和消息体?geerpc req ${h.Seq}
。reply
,并打印出来。完整代码:?https://github.com/liwook/Go-projects/tree/main/geerpc/1-codec