func main(){
//1. 建立连接
client, err := rpc.Dial("tcp", "localhost:1234")
//2.调用调用指定的RPC方法
var reply string //string有默认值
err = client.Call("HelloService.Hello", "hi", &reply) //即是一次请求
}
对?net/rpc
?而言,一个函数需要能够被远程调用,它必须满足一定的条件,否则其会被忽略。
这些条件是:
一个输出方法的格式如下:?
func (t *T) MethodName(argType T1, replyType *T2) error
这个方法的第一个参数代表调用者(client)提供的参数,第二个参数代表要返回给调用者的计算结果。
封装结构体 Call 来承载一次 RPC 调用所需要的信息。
type Call struct {
ServiceMethod string // The name of the service and method to call.
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call // Receives *Call when Go is complete.
Seq uint64
}
func (call *Call) done() {
call.Done <- call
}
请求内容至少需要包括:
为了支持异步调用,Call 结构体中添加了一个字段 Done,Done 的类型是?chan *Call
,当调用结束时,会调用?call.done()
?通知调用方。?
type Client struct {
code codec.Codec
opt *Option
sending sync.Mutex
header codec.Header
mutex sync.Mutex //保护下面的变量
seq uint64
pending map[uint64]*Call
closing bool //user has called Close
shutdown bool // server has told us to stop
}
var ErrShutdown = errors.New("connection is shut down")
func (client *Client) Close() error {
client.mutex.Lock()
defer client.mutex.Unlock()
if client.closing {
return ErrShutdown
}
client.closing = true
return client.code.Close()
}
func (client *Client) IsAvailable() bool {
client.mutex.Lock()
defer client.mutex.Unlock()
return !client.closing && !client.shutdown
}
Close
?方法,而 shutdown 置为 true 一般是有错误发生。?需要存储未完成的请求,可以想象,一个用户发出10个不同的请求,要是客户端不存储这些请求,那收到回复的时候,就难知道如何处理了。
所以在发起请求的时候,需要注册这个请求(往pending中添加),得到回复后需要删除(从pending中delete)。
由此,需要实现和Call相关的注册和删除方法。
而terminateCalls方法是服务端或客户端发生错误时调用,将 shutdown 设置为 true,且将错误信息通知所有 pending 状态的 call。该方法需要都获得sending锁和mutex锁。该方法的使用地方后面会讲到的。
func (client *Client) RegisterCall(call *Call) (uint64, error) {
client.mutex.Lock()
defer client.mutex.Unlock()
if client.closing || client.shutdown {
return 0, ErrShutdown
}
call.Seq = client.seq //设置Call的序号
client.pending[call.Seq] = call
client.seq++
return call.Seq, nil
}
func (client *Client) removeCall(seq uint64) *Call {
client.mutex.Lock()
defer client.mutex.Unlock()
call := client.pending[seq]
delete(client.pending, seq)
return call
}
func (client *Client) terminateCalls(err error) {
client.sending.Lock()
defer client.sending.Unlock()
client.mutex.Lock()
defer client.mutex.Unlock()
client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
}
}
按照前面的例子,创建客户端就
client, err := rpc.Dial("tcp", "localhost:1234")
那我们也按照这样来。
Dail函数通过?...*Option
?将 Option 实现为可选参数(...表示可以0个参数或多个参数),可以不填写opts参数,使用默认的option(即是gob编解码)
//使用例子 client, err := rpc.Dial("tcp", "localhost:1234")
func Dail(network, address string, opts ...*Option) (client *Client, err error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn, opt)
}
parseOption函数就是解析Option,判断其Option是否符合要求等。
NewClient函数,创建 Client 实例,首先需要完成一开始的协议交换,即发送?Option
?信息给服务端,协商好消息的编解码方式。
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
// send options with server
if err := json.NewEncoder(conn).Encode(opt); err != nil {
log.Println("rpc client: options error: ", err)
conn.Close()
return nil, err
}
f := codec.NewCodeFuncMap[opt.CodecType]
if f == nil { //没有符合条件的编解码器
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
log.Println("rpc client: codec error:", err)
return nil, err
}
return &Client{
seq: 1, //序号从1开始,序号0表示可以表示错误
code: f(conn),
opt: opt,
pending: make(map[uint64]*Call),
}, nil
}
func parseOptions(opts ...*Option) (*Option, error) {
if len(opts) == 0 || opts[0] == nil {
return DefaultOption, nil
}
if len(opts) != 1 {
return nil, errors.New("number of options is more than 1")
}
opt := opts[0]
opt.MagicNumber = DefaultOption.MagicNumber
if opt.CodecType == "" {
opt.CodecType = DefaultOption.CodecType
}
if _, ok := codec.NewCodeFuncMap[opt.CodecType]; !ok {
return nil, fmt.Errorf("invalid codec type %s", opt.CodecType)
}
return opt, nil
}
请求和创建客户端完成后,那就是到关键的接收和发送请求了。
那先来看看发送请求。
var reply string //string有默认值
err = client.Call("HelloService.Hello", "hi", &reply)
?先实现个send方法,其参数是*Call。内容是注册该Call,进行编码并发送给服务端。
func (client *Client) send(call *Call) {
// make sure that the client will send a complete request
client.sending.Lock()
defer client.sending.Unlock()
//注册,添加到pending中
seq, err := client.RegisterCall(call)
if err != nil {
call.Error = err
call.done()
return
}
//复用同一个header
client.header.ServiceMethod = call.ServiceMethod
client.header.Seq = seq
client.header.Error = ""
// encode and send the request
if err := client.code.WriteResponse(&client.header, call.Args); err != nil {
call := client.removeCall(seq)
if call != nil {
call.Error = err
call.done()
}
}
}
代码中经常出现call.done(),done方法是为了支持异步调用的,当调用结束时,会调用?call.done()
?通知调用方。?那就会有个异步调用的Go方法。
异步调用的Go方法中,会先判断chan是否符合条件,之后根据函数参数来创建Call,之后调用send方法。
func (client *Client) Go(serviceMethod string, args, reply any, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10) //10或1或其他的也可以的,大于0即可
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}
call := &Call{
ServiceMethod: serviceMethod,
Args: args,
Reply: reply,
Done: done,
}
client.send(call)
return call
}
func (client *Client) Call(serviceMethod string, args, reply any) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
而Call方法中,其是对?Go
?的封装,阻塞 call.Done,等待响应返回,是一个同步接口。
发送解决后,如何进行接收信息呢?
调用Call方法,这是个同步接口,会一直阻塞在call := <-client.Go(...).Done这里,之后当使用call.done()时候,才会解除阻塞。但是按照目前的正常情况,是不会调用call.done()的。这时我们可以新启一个协程去接收信息,处理完信息后就调用call.done()即可。
接收功能,接收到的响应有三种情况:
func (client *Client) receive() {
var err error
for err == nil {
var h codec.Header
if err = client.code.ReadHeader(&h); err != nil {
break
}
call := client.removeCall(h.Seq)
switch {
case call == nil:
err = client.code.ReadBody(nil)
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.code.ReadBody(nil)
call.done()
default:
err = client.code.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
client.terminateCalls(err)
}
在recieve中就使用了terminateCalls方法。在读取Header失败break,就执行该方法。
那么这个新的协程在哪里开启好呢?那可以在创建客户端的时候就开启这个协程。
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
//......
f := codec.NewCodeFuncMap[opt.CodecType]
//前面代码没有变化,就下面封装成一个函数,其内部就使用go client.receive()
return newClientCodec(f(conn), opt), nil
}
func newClientCodec(code codec.Codec, opt *Option) *Client {
client := &Client{
seq: 1,
code: code,
opt: opt,
pending: make(map[uint64]*Call),
}
go client.receive()
return client
}
这样,接收和发送也都处理好了。至此,一个支持异步和并发的 GeeRPC 客户端已经完成。
上一章节只实现了服务端,我们在 main 函数中手动模拟了整个通信过程。因此,这一章节我们就将 main 函数中通信部分替换为今天的客户端。
startServer 没有发生变化。
func main() {
addr := make(chan string)
go startServer(addr)
// in fact, following code is like a simple geerpc client
client, _ := geerpc.Dail("tcp", <-addr) //上一节是使用net.Dail
defer client.Close()
time.Sleep(time.Second * 1)
num := 3
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func(i int) {
defer wg.Done()
args := uint64(i)
var reply string
if err := client.Call("foo.sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
log.Println("reply: ", reply)
}(i)
}
wg.Wait()
}
func startServer(addr chan string) {
l, err := net.Listen("tcp", "localhost:10000")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", l.Addr())
addr <- l.Addr().String()
geerpc.Accept(l)
}
完整代码:https://github.com/liwook/Go-projects/tree/main/geerpc/2-client