创建一个 gateway 示例:
// main.go
package main
import (
"flag"
"fmt"
"gateway/middleware"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/gateway"
)
var configFile = flag.String("f", "etc/gateway.yaml", "the config file")
func main() {
var c gateway.GatewayConf
flag.Parse()
// 加载 gateway 配置,如果配置有问题记录 FATAL 日志后即退出
conf.MustLoad(*configFile, &c)
// 实例化 gateway,如果出错记录 FATAL 日志后即退出
// 可能的出错包括:
// 1)初始化日志 logx 失败(创建日志文件失败),日志文件含以下五种:
// 信息级别的日志:infoLog
// 错误级别的日志:errorLog
// 严重级别的日志:severeLog
// 慢查询日志:slowLog
// 统计日志:statLog
// 而堆栈日志 stackLog 同 errorLog 一起,访问日志 access 同 infoLog 。
server := gateway.MustNewServer(c)
defer server.Stop()
fmt.Printf("Starting gateway at %s:%d...\n", c.Host, c.Port)
server.Start()
}
// gateway/server.go
// MustNewServer creates a new gateway server.
func MustNewServer(c GatewayConf, opts ...Option) *Server {
svr := &Server{
upstreams: c.Upstreams,
Server: rest.MustNewServer(c.RestConf),
}
for _, opt := range opts {
opt(svr)
}
return svr
}
// rest/server.go
// MustNewServer returns a server with given config of c and options defined in opts.
// Be aware that later RunOption might overwrite previous one that write the same option.
// The process will exit if error occurs.
func MustNewServer(c RestConf, opts ...RunOption) *Server {
server, err := NewServer(c, opts...)
if err != nil {
logx.Must(err)
}
return server
}
gateway.MustNewServer 调用了 rest.MustNewServer,但在 rest.MustNewServer 增加了 upstreams 的初始化。upstreams 源自于 gateway.GatewayConf,对应的配置如下:。
Upstreams: # 网关上游的配置列表
- Grpc: # 网关上游只能为 grpc 服务,不支持 http 等服务其它服务
Etcd: # 服务发现用的 Etcd 配置
Hosts: # Etcd 的服务地址列表
- 127.0.0.1:2379
Key: login.rpc # 服务注册在 Etcd 的 key
ProtoSets: # 服务的 pb 文件列表(使用工具 protoc 根据 proto 生成 pb 文件:protoc --descriptor_set_out=login.pb login.proto)
- proto/login.pb
Mappings: # Mappings can also be written in proto options 定义 http 路径到 rpc 路径的映射列表
- Method: get
Path: /v1/login
RpcPath: login.Login/login // 格式:包名.服务名/方法名
从上述内容可以看出,go-zero 的 gateway 在 rest 基础上增加了 upstreams 。当然不仅这一些,在 gateway 启动时也增加了特有的东西:
// Start starts the gateway server.
func (s *Server) Start() {
logx.Must(s.build()) // 这也是 gateway 在 rest 基础上新增的
s.Server.Start()
}
上述 s.build() 的源代码如下:
// gateway/server.go
func (s *Server) build() error {
// 调用 s.ensureUpstreamNames() 确保所有上游服务(gRPC 服务)的名称都是唯一的,
// 如果有重复的名称,函数返回错误。
if err := s.ensureUpstreamNames(); err != nil {
return err
}
// 使用 mr.MapReduceVoid 函数进行 MapReduce 操作,这个函数接收三个参数:
// 1)一个用于生成数据源的函数
// 2)一个 Map 函数
// 3)一个 Reduce 函数
return mr.MapReduceVoid(func(source chan<- Upstream) {
// 生成数据源的函数:
// 遍历 s.upstreams(上游服务列表),将每个上游服务发送到 Map 函数
for _, up := range s.upstreams {
source <- up
}
}, func(up Upstream, writer mr.Writer[rest.Route], cancel func(error)) { // Map 函数,对于每个上游服务,执行以下操作:
var cli zrpc.Client
// 创建一个 gRPC 客户端 cli,用于与上游服务通信
if s.dialer != nil {
cli = s.dialer(up.Grpc)
} else {
cli = zrpc.MustNewClient(up.Grpc)
}
// 调用 s.createDescriptorSource(cli, up) 创建一个描述符源 grpcurl.DescriptorSource),
// 用于获取 gRPC 服务的元数据。
source, err := s.createDescriptorSource(cli, up)
if err != nil {
cancel(fmt.Errorf("%s: %w", up.Name, err))
return
}
// 使用 internal.GetMethods(source) 获取 gRPC 服务的所有方法
methods, err := internal.GetMethods(source)
if err != nil {
cancel(fmt.Errorf("%s: %w", up.Name, err))
return
}
// 创建一个 gRPCurl 解析器,用于解析 gRPC 方法的元数据
resolver := grpcurl.AnyResolverFromDescriptorSource(source)
// 遍历这些方法,为每个具有 HTTP 方法和路径的方法生成一个 HTTP 处理器(s.buildHandler(...)),
// 并将它们映射到 RESTful API 的路由上。
for _, m := range methods {
if len(m.HttpMethod) > 0 && len(m.HttpPath) > 0 {
writer.Write(rest.Route{
Method: m.HttpMethod,
Path: m.HttpPath,
// http 调用转为 rpc 调用
Handler: s.buildHandler(source, resolver, cli, m.RpcPath),
})
}
}
methodSet := make(map[string]struct{})
for _, m := range methods {
methodSet[m.RpcPath] = struct{}{}
}
// 遍历 up.Mappings(自定义的 RESTful API 映射),
// 为每个映射生成一个 HTTP 处理器,并将生成的路由写入到 Reduce 函数。
// 如果映射中指定的 gRPC 方法不存在,则返回错误。
for _, m := range up.Mappings {
// 在将方法映射到路由之前,函数会检查映射是否存在,如果不存在则返回错误
if _, ok := methodSet[m.RpcPath]; !ok {
cancel(fmt.Errorf("%s: rpc method %s not found", up.Name, m.RpcPath))
return
}
writer.Write(rest.Route{
Method: strings.ToUpper(m.Method),
Path: m.Path,
// 调用 buildHandler 函数来构建一个处理器,用于处理 RESTful API 请求
Handler: s.buildHandler(source, resolver, cli, m.RpcPath),
})
}
}, func(pipe <-chan rest.Route, cancel func(error)) {
// Reduce 函数:
// 从管道中读取生成的路由,并将它们添加到 HTTP 服务器(s.Server)中
for route := range pipe {
s.Server.AddRoute(route)
}
})
}
这个函数的主要目的是将 gRPC 服务的方法映射到 HTTP RESTful API,并将生成的 API 添加到 HTTP 服务器中。通过这种方式,可以在 gRPC 服务的基础上提供一个 RESTful API,使得客户端可以使用 HTTP 调用 gRPC 服务。
下为 mr.MapReduceVoid 的源代码:
// core/mr/mapreduce.go
// MapReduceVoid maps all elements generated from given generate,
// and reduce the output elements with given reducer.
func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer VoidReducerFunc[U], opts ...Option) error {
_, err := MapReduce(generate, mapper, func(input <-chan U, writer Writer[any], cancel func(error)) {
reducer(input, cancel)
}, opts...)
if errors.Is(err, ErrReduceNoOutput) {
return nil
}
return err
}
// MapReduce maps all elements generated from given generate func,
// and reduces the output elements with given reducer.
func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V], opts ...Option) (V, error) {
panicChan := &onceChan{channel: make(chan any)}
source := buildSource(generate, panicChan)
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}
// gateway/server.go
func (s *Server) buildHandler(source grpcurl.DescriptorSource, resolver jsonpb.AnyResolver,
cli zrpc.Client, rpcPath string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
parser, err := internal.NewRequestParser(r, resolver)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
w.Header().Set(httpx.ContentType, httpx.JsonContentType)
handler := internal.NewEventHandler(w, resolver)
// http 调用转成了 grpc 调用
if err := grpcurl.InvokeRPC(r.Context(), source, cli.Conn(), rpcPath, s.prepareMetadata(r.Header),
handler, parser.Next); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
}
st := handler.Status
if st.Code() != codes.OK {
httpx.ErrorCtx(r.Context(), w, st.Err())
}
}
}
http 调用转 grpc 调用过程复杂,最终调用了 grpc-go 的 Invoke:
// https://github.com/grpc/grpc-go/blob/master/clientconn.go
// ClientConnInterface defines the functions clients need to perform unary and
// streaming RPCs. It is implemented by *ClientConn, and is only intended to
// be referenced by generated code.
type ClientConnInterface interface { // ClientConn 实现了该接口,实现落在两个文件中:clientconn.go 和 call.go
// Invoke performs a unary RPC and returns after the response is received
// into reply.
Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error
// NewStream begins a streaming RPC.
NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}
中间还用到了 grpcdynamic 的 Stub.InvokeRpc:
// https://github.com/jhump/protoreflect/blob/main/dynamic/grpcdynamic/stub.go
// InvokeRpc sends a unary RPC and returns the response. Use this for unary methods.
func (s Stub) InvokeRpc(ctx context.Context, method *desc.MethodDescriptor, request proto.Message, opts ...grpc.CallOption) (proto.Message, error) {
if method.IsClientStreaming() || method.IsServerStreaming() {
return nil, fmt.Errorf("InvokeRpc is for unary methods; %q is %s", method.GetFullyQualifiedName(), methodType(method))
}
if err := checkMessageType(method.GetInputType(), request); err != nil {
return nil, err
}
resp := s.mf.NewMessage(method.GetOutputType())
if err := s.channel.Invoke(ctx, requestMethod(method), request, resp, opts...); err != nil {
return nil, err
}
return resp, nil
}
// https://github.com/grpc/grpc-go/blob/master/call.go
package grpc
import (
"context"
)
// Invoke sends the RPC request on the wire and returns after response is
// received. This is typically called by generated code.
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
// cs 类型为,
// 结构体 clientStream 实现了接口 ClientStream
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil { // 发送请求
return err
}
return cs.RecvMsg(reply) // 接收响应
}
// ClientStream defines the client-side behavior of a streaming RPC.
//
// All errors returned from ClientStream methods are compatible with the
// status package.
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read. If the metadata
// is nil and the error is also nil, then the stream was terminated without
// headers, and the status can be discovered by calling RecvMsg.
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
CloseSend() error
// Context returns the context for this stream.
//
// It should not be called until after Header or RecvMsg has returned. Once
// called, subsequent client-side retries are disabled.
Context() context.Context
// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
//
// It is not safe to modify the message after calling SendMsg. Tracing
// libraries and stats handlers may use the message lazily.
SendMsg(m any) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m any) error
}
调用路径归纳总结:
grpcurl/grpcurl.InvokeRPC()/invoke.go
-> grpcdynamic/Stub.InvokeRpc()/stub.go
-> grpc-go/grpc.ClientConn.Invoke()/clientconn.go|call.go // ClientConn 是一个 struct,实现了接口 ClientConnInterface
-> grpc-go/grpc.invoke()/call.go // invoke 是 grpc 下的全局私有函数
-> grpc-go/grpc.clientStream::SendMsg()/stream.go // clientStream 是一个 struct,实现了接口 ClientStream
-> grpc-go/grpc.csAttempt::SendMsg()/stream.go // csAttempt 是一个 struct,实现了接口 ClientTransport
-> grpc-go/grpc.ClientTransport::write()/internal/transport/transport.go // ClientTransport 是一个接口,结构体 http2Client 实现了 ClientTransport
-> grpc-go/grpc.http2Client::Write()/internal/transport/http2_client.go // 结构体 http2Client 实现了 ClientTransport,将数据写入 http2Client.controlBuf 中
http2Client::Write 将数据写入 http2Client.controlBuf 后返回,数据的发送由另外的协程 loopyWriter.run() 负责:
// https://github.com/grpc/grpc-go/blob/master/internal/transport/controlbuf.go
//
// run should be run in a separate goroutine.
// It reads control frames from controlBuf and processes them by:
// 1. Updating loopy's internal state, or/and
// 2. Writing out HTTP2 frames on the wire.
//
// Loopy keeps all active streams with data to send in a linked-list.
// All streams in the activeStreams linked-list must have both:
// 1. Data to send, and
// 2. Stream level flow control quota available.
//
// In each iteration of run loop, other than processing the incoming control
// frame, loopy calls processData, which processes one node from the
// activeStreams linked-list. This results in writing of HTTP2 frames into an
// underlying write buffer. When there's no more control frames to read from
// controlBuf, loopy flushes the write buffer. As an optimization, to increase
// the batch size for each flush, loopy yields the processor, once if the batch
// size is too low to give stream goroutines a chance to fill it up.
//
// Upon exiting, if the error causing the exit is not an I/O error, run()
// flushes and closes the underlying connection. Otherwise, the connection is
// left open to allow the I/O error to be encountered by the reader instead.
func (l *loopyWriter) run() (err error) {
defer func() {
if l.logger.V(logLevel) {
l.logger.Infof("loopyWriter exiting with error: %v", err)
}
if !isIOError(err) {
l.framer.writer.Flush()
l.conn.Close()
}
l.cbuf.finish()
}()
for {
it, err := l.cbuf.get(true)
if err != nil {
return err
}
if err = l.handle(it); err != nil {
return err
}
if _, err = l.processData(); err != nil {
return err
}
gosched := true
hasdata:
for {
it, err := l.cbuf.get(false)
if err != nil {
return err
}
if it != nil {
if err = l.handle(it); err != nil {
return err
}
if _, err = l.processData(); err != nil {
return err
}
continue hasdata
}
isEmpty, err := l.processData() // 最底层调用了 Go 标准库的 io.Writer::Write(),Writer 是一个接口
if err != nil {
return err
}
if !isEmpty {
continue hasdata
}
if gosched {
gosched = false
if l.framer.writer.offset < minBatchSize {
runtime.Gosched()
continue hasdata
}
}
l.framer.writer.Flush()
break hasdata
}
}
}