gRPC的middleware一般指的是gRPC-Go生态中的go-grpc-middleware,middleware主要体现在对拦截器的支持,目前官方实现的主要拦截器有以下几种。
Auth
grpc_auth - 可定制的auth中间件。
Logging
grpc_ctxtags - 将标签映射成上下文的库;
grpc_zap - 集成zap日志处理;
grpc_logrus - 集成logrus日志处理;
grpc_kit - 8集成go-kit/log日志处理;
grpc_grpc_logsettable - grpclog.LoggerV2的包装器,允许在运行时替换记录器(线程安全)。
Monitoring
grpc_prometheus? - 普罗米修斯的拦截器;
otgrpc? - OpenTracing 拦截器;
grpc_opentracing - OpenTracing 拦截器,支持流和处理程序返回的标签;
otelgrpc - OpenTelemetry 拦截器。
Client
grpc_retry - 通用的 gRPC 响应代码重试机制,客户端中间件。
Server
grpc_validator - 用于gRPC的.proto文件的请求参数校验;
grpc_recovery - 将panic转换为gRPC的error;
ratelimit - 自定义的限制器限制gRPC速率。
除了这些middleware,我们还可以实现定制的拦截器,实现自己的功能。下面我将简单介绍一下grpc_zap,然后介绍一下如何实现自己的拦截器。
在服务端,插入grpc_zap的日志插件,基本代码如下:
go复制代码
// 创建一个zap.Logger对象
logger, err := zap.NewDevelopment()
if err != nil {
log.Fatalf("failed to initialize zap logger: %v", err)
}
// 将grpc内部的打印logger替换为以上创建的logger
grpc_zap.ReplaceGrpcLoggerV2(logger)
// 新建gRPC的服务端实例
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_zap.StreamServerInterceptor(logger),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_zap.UnaryServerInterceptor(logger),
)),
)
在新建grpcServer过程中,传入一元和流式RPC的拦截器处理插件,其中集成了grpc_zap的一些插件,其代码如下:
// UnaryServerInterceptor returns a new unary server interceptors that adds zap.Logger to the context.
func UnaryServerInterceptor(logger *zap.Logger, opts ...Option) grpc.UnaryServerInterceptor {
o := evaluateServerOpt(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
newCtx := newLoggerForCall(ctx, logger, info.FullMethod, startTime, o.timestampFormat)
resp, err := handler(newCtx, req)
if !o.shouldLog(info.FullMethod, err) {
return resp, err
}
code := o.codeFunc(err)
level := o.levelFunc(code)
duration := o.durationFunc(time.Since(startTime))
o.messageFunc(newCtx, "finished unary call with code "+code.String(), level, code, err, duration)
return resp, err
}
}
其实以上代码主要完成了两件事,一个是通过newLoggerForCall将传入的logger集成到上下文中,另一个是在请求完成后打印此次请求的一些信息。
func newLoggerForCall(ctx context.Context, logger *zap.Logger, fullMethodString string, start time.Time, timestampFormat string) context.Context {
var f []zapcore.Field
f = append(f, zap.String("grpc.start_time", start.Format(timestampFormat)))
if d, ok := ctx.Deadline(); ok {
f = append(f, zap.String("grpc.request.deadline", d.Format(timestampFormat)))
}
callLog := logger.With(append(f, serverCallFields(fullMethodString)...)...)
return ctxzap.ToContext(ctx, callLog)
}
一般而言,我们在一个服务进程中维持一份日志对象logger,保证日志打印等属性的一致性。
自定义日志
针对gRPC服务端的自定义日志,当我们选择设置拦截器之后,可以使用自己生成的logger直接打印,也可以选择ctxzap.Extract函数将我们传入拦截器的logger提取出来,这二者使用的是同一个对象,但是后者会包含更多的信息,并且更新了grpc_ctxtags信息。
// Extract takes the call-scoped Logger from grpc_zap middleware.
//
// It always returns a Logger that has all the grpc_ctxtags updated.
func Extract(ctx context.Context) *zap.Logger {
l, ok := ctx.Value(ctxMarkerKey).(*ctxLogger)
if !ok || l == nil {
return nullLogger
}
// Add grpc_ctxtags tags metadata until now.
fields := TagsToFields(ctx)
// Add zap fields added until now.
fields = append(fields, l.fields...)
return l.logger.With(fields...)
}
gRPC内部日志
在gRPC的内部,日志会被默认输入到stderr,并且在未设置GRPC_GO_LOG_SEVERITY_LEVEL这个环境变量的情况下,只会输出error级别的日志,如果我们想替换其内部的logger,需要调用SetLoggerV2函数。
// SetLoggerV2 sets logger that is used in grpc to a V2 logger.
// Not mutex-protected, should be called before any gRPC functions.
func SetLoggerV2(l LoggerV2) {
if _, ok := l.(*componentData); ok {
panic("cannot use component logger as grpclog logger")
}
grpclog.Logger = l
grpclog.DepthLogger, _ = l.(grpclog.DepthLoggerV2)
}
而在middleware中,提供了各个不同的日志体系对于SetLoggerV2函数的封装,譬如grpc_zap包提供的ReplaceGrpcLoggerV2,可以使用自定义的logger代替gRPC内部的默认logger。
// ReplaceGrpcLoggerV2 replaces the grpc_log.LoggerV2 with the provided logger.
// It should be called before any gRPC functions.
func ReplaceGrpcLoggerV2(logger *zap.Logger) {
ReplaceGrpcLoggerV2WithVerbosity(logger, 0)
}
// ReplaceGrpcLoggerV2WithVerbosity replaces the grpc_.LoggerV2 with the provided logger and verbosity.
// It should be called before any gRPC functions.
func ReplaceGrpcLoggerV2WithVerbosity(logger *zap.Logger, verbosity int) {
zgl := &zapGrpcLoggerV2{
logger: logger.With(SystemField, zap.Bool("grpc_log", true)),
verbosity: verbosity,
}
grpclog.SetLoggerV2(zgl)
}
打印
2023-02-06T10:26:31.494+0800 INFO zap/grpclogger.go:92 [core][Server #1] Server created {"system": "grpc", "grpc_log": true}
2023-02-06T10:26:31.494+0800 INFO zap/grpclogger.go:92 [core][Server #1 ListenSocket #2] ListenSocket created {"system": "grpc", "grpc_log": true}
2023-02-06T10:26:56.141+0800 INFO zap/grpclogger.go:92 [core]CPU time info is unavailable on non-linux environments. {"system": "grpc", "grpc_log": true}
2023-02-06T10:26:57.153+0800 INFO zap/options.go:212 finished unary call with code OK {"grpc.start_time": "2023-02-06T10:26:56+08:00", "system": "grpc", "span.kind": "server", "grpc.service": "simplepb.Route", "grpc.method": "SimpleRoute", "grpc.code": "OK", "grpc.time_ms": 1001.357}
2023-02-06T10:26:57.158+0800 INFO zap/grpclogger.go:92 [transport]transport: loopyWriter.run returning. connection error: desc = "transport is closing" {"system": "grpc", "grpc_log": true}
以上日志,第四行是拦截器打印的,其他都是gRPC内部的日志打印。
客户端有关grpc_zap的拦截器使用很简单,只需要在连接服务器的时候带上选项即可。
logger, err := zap.NewDevelopment()
if err != nil {
log.Fatalf("failed to initialize zap logger: %v", err)
}
// 连接服务器
conn, err := grpc.Dial(Address, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(grpc_zap.UnaryClientInterceptor(logger))),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(grpc_zap.StreamClientInterceptor(logger))),
)
if err != nil {
log.Fatalf("net.Connect err: %v", err)
}
打印如下:
2023-02-06T13:10:21.069+0800 DEBUG zap/options.go:212 finished client unary call {"system": "grpc", "span.kind": "client", "grpc.service": "simplepb.Route", "grpc.method": "SimpleRoute", "grpc.code": "OK", "grpc.time_ms": 1018.059}
自定义拦截器其实很简单,只需要实现gRPC定义的注册函数就好了,我们以服务端的一元RPC请求为例,实现一个拦截器如下:
func ExampleUnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// do something before handler
log.Println("before handler")
resp, err = handler(ctx, req)
if err != nil {
return nil, err
}
// do something after handler
log.Println("after handler")
return resp, nil
}
}
类似于grpc_zap.UnaryServerInterceptor,我们在建立服务的时候设置拦截器。
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_zap.StreamServerInterceptor(logger),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_zap.UnaryServerInterceptor(logger),
ExampleUnaryInterceptor(),
)),
)
在日志中的打印如下:
2023/02/06 14:48:06 before handler
2023-02-06T14:48:07.125+0800 INFO server/server.go:60 handling {"grpc.start_time": "2023-02-06T14:48:06+08:00", "system": "grpc", "span.kind": "server", "grpc.service": "simplepb.Route", "grpc.method": "SimpleRoute"}
2023/02/06 14:48:07 after handler
2023-02-06T14:48:07.125+0800 INFO zap/options.go:212 finished unary call with code OK {"grpc.start_time": "2023-02-06T14:48:06+08:00", "system": "grpc", "span.kind": "server", "grpc.service": "simplepb.Route", "grpc.method": "SimpleRoute", "grpc.code": "OK", "grpc.time_ms": 1001.325}
注意,拦截器的调用顺序是符合出栈入栈规则的,即:
原因是,在ChainUnaryServer函数中,是倒序将注册的拦截器函数一个个包装,最里面的就是核心的处理逻辑,故而会展现出以上的规则。
// ChainUnaryServer creates a single interceptor out of a chain of many interceptors.
//
// Execution is done in left-to-right order, including passing of context.
// For example ChainUnaryServer(one, two, three) will execute one before two before three, and three
// will see context changes of one and two.
func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
n := len(interceptors)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
chainer := func(currentInter grpc.UnaryServerInterceptor, currentHandler grpc.UnaryHandler) grpc.UnaryHandler {
return func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
return currentInter(currentCtx, currentReq, info, currentHandler)
}
}
// 以下逻辑可以实现:
// 处理interceptors[n-1]函数中执行核心处理逻辑(核心服务处理函数作为handler(ctx, req)的handler,也就是此函数传入的handler)
// 处理interceptors[n-2]函数中执行interceptors[n-1](interceptors[n-1]作为handler)
// ......
// 处理interceptors[0]函数后执行interceptors[1](interceptors[1]作为handler)
// 从而实现层层包裹。
chainedHandler := handler
for i := n - 1; i >= 0; i-- {
chainedHandler = chainer(interceptors[i], chainedHandler)
}
return chainedHandler(ctx, req)
}
}
如下,ExampleUnaryInterceptor1和ExampleUnaryInterceptor唯一的区别就是打印时加了1:
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_zap.StreamServerInterceptor(logger),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
ExampleUnaryInterceptor(),
ExampleUnaryInterceptor1(),
)),
)
日志如下:
2023/02/06 14:49:59 before handler
2023/02/06 14:49:59 before handler 1
2023-02-06T14:50:00.393+0800 INFO server/server.go:60 handling {"grpc.start_time": "2023-02-06T14:49:59+08:00", "system": "grpc", "span.kind": "server", "grpc.service": "simplepb.Route", "grpc.method": "SimpleRoute"}
2023/02/06 14:50:00 after handler 1
2023/02/06 14:50:00 after handler
gRPC的生态提供了拦截器的中间件功能,我们既可以使用社区提供的拦截器,也可以自定义这些拦截器,这为我们实现gRPC的一些前置和后置操作提供了方便。