前面说过,proto中定义消息结构体的关键字是message,同样,定义服务的关键字是service。
service Route {
// ...
}
一元RPC(Unary RPC)的客户端就像调用本地函数一样地向服务端发送请求,并且等待服务端的返回。
message SimpleRequest {
string data = 1;
}
message SimpleResponse {
int32 code = 1;
string data = 2;
}
service Route {
rpc SimpleRoute(SimpleRequest) returns (SimpleResponse);
}
客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息,比如数据下载的场景。
message SimpleRequest {
string data = 1;
}
message StreamResponse{
string stream_value = 1;
}
service Route {
rpc ListValue(SimpleRequest) returns (stream StreamResponse){};
}
相反,客户端会不断地向服务器发送数据流,比如客户端进行数据上传的场景。
message SimpleResponse {
int32 code = 1;
string data = 2;
}
message StreamRequest{
string stream_data = 1;
}
service Route {
rpc RouteList(stream StreamRequest) returns (SimpleResponse){};
}
客户端和服务端同时使用读写流去发送消息序列,两个流独立操作,可以同时发送和接收,比如双方会话的场景。
message StreamResponse{
string stream_value = 1;
}
message StreamRequest{
string stream_data = 1;
}
service Route {
rpc DoubleSideStream(stream StreamRequest) returns (stream StreamResponse){};
}
以下指令可以生成xx.pb.go和xx_grpc.pb.go,至于protoc工具的安装和使用等,可以参照前面的教程。
protoc -I./proto \
--go_out=./genproto --go_opt paths=source_relative \
--go-grpc_out=./genproto --go-grpc_opt paths=source_relative \
./proto/*/*.proto
package main
import (
"context"
"log"
"net"
"github.com/IguoChan/proto-prj/genproto/simplepb"
"google.golang.org/grpc"
)
func NewRouteServer() *Server {
return &Server{}
}
type Server struct {
simplepb.UnimplementedRouteServer
}
func (r *Server) SimpleRoute(ctx context.Context, request *simplepb.SimpleRequest) (*simplepb.SimpleResponse, error) {
//TODO implement me
panic("implement me")
}
func (r *Server) ListValue(request *simplepb.SimpleRequest, server simplepb.Route_ListValueServer) error {
//TODO implement me
panic("implement me")
}
func (r *Server) RouteList(server simplepb.Route_RouteListServer) error {
//TODO implement me
panic("implement me")
}
func (r *Server) DoubleSideStream(server simplepb.Route_DoubleSideStreamServer) error {
//TODO implement me
panic("implement me")
}
const (
Address string = ":8080"
Network string = "tcp"
)
func main() {
// 监听本地端口
listener, err := net.Listen(Network, Address)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
log.Println(Address + " net.Listing...")
// 新建gRPC的服务端实例
grpcServer := grpc.NewServer()
// 在gRPC服务器中注册我们的服务
simplepb.RegisterRouteServer(grpcServer, NewRouteServer())
// 起服务,阻塞等待
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}
代码如上所示,其基本可以分为以下几个部分:
package main
import (
"context"
"io"
"log"
"strconv"
"github.com/IguoChan/proto-prj/genproto/simplepb"
"google.golang.org/grpc"
)
const (
// Address 连接地址
Address string = ":8080"
)
var grpcClient simplepb.RouteClient
func main() {
// 连接服务器
conn, err := grpc.Dial(Address, grpc.WithInsecure())
if err != nil {
log.Fatalf("net.Connect err: %v", err)
}
defer conn.Close()
// 建立gRPC连接
grpcClient = simplepb.NewRouteClient(conn)
simpleRoute() // 一元rpc
listValue() // 服务端流式rpc
routeList() // 客户端流式rpc
doubleSideStream() // 双向流式rpc
}
...
Client端的代码如上:
接下来我们就具体看一下四种grpc调用方式的实现。
func (r *Server) SimpleRoute(ctx context.Context, request *simplepb.SimpleRequest) (*simplepb.SimpleResponse, error) {
res := simplepb.SimpleResponse{
Code: 200,
Data: "hello " + request.Data + " " + time.Now().String(),
}
return &res, nil
}
服务端实现代码如上,运行后打印:
$ go run server.go
2023/01/04 23:13:59 :8080 net.Listing...
客户端实现代码为:
func simpleRoute() {
// invoke the service with grpcClient
res, err := grpcClient.SimpleRoute(context.Background(), &simplepb.SimpleRequest{Data: "I am iguochan"})
if err != nil {
log.Fatalf("SimpleRoute err: %+v", err)
}
log.Println(res)
}
运行后打印:
$ go run client.go
2023/01/04 23:15:44 code:200 data:"hello I am iguochan 2023-01-04 23:15:44.158069 +0800 CST m=+104.656427709"
可以发现,很容易就实现了grpc的简单调用。
服务端代码如下:
func (r *Server) ListValue(request *simplepb.SimpleRequest, server simplepb.Route_ListValueServer) error {
for n := 0; n < 5; n++ {
// 向流中发送消息, 默认每次send送消息最大长度为`math.MaxInt32`bytes
err := server.Send(&simplepb.StreamResponse{
StreamValue: request.Data + strconv.Itoa(n),
})
if err != nil {
return err
}
}
return nil
}
客户端代码如下:
func listValue() {
// 创建发送结构体
req := simplepb.SimpleRequest{
Data: "stream server grpc ",
}
// 调用我们的服务(ListValue方法)
stream, err := grpcClient.ListValue(context.Background(), &req)
if err != nil {
log.Fatalf("Call ListStr err: %v", err)
}
for {
//Recv() 方法接收服务端消息,默认每次Recv()最大消息长度为`1024*1024*4`bytes(4M)
res, err := stream.Recv()
// 判断消息流是否已经结束
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("ListStr get stream err: %v", err)
}
// 打印返回值
log.Println(res.StreamValue)
}
}
运行如下
$ go run client.go
2023/01/04 23:36:49 stream server grpc 0
2023/01/04 23:36:49 stream server grpc 1
2023/01/04 23:36:49 stream server grpc 2
2023/01/04 23:36:49 stream server grpc 3
2023/01/04 23:36:49 stream server grpc 4
值得注意的是,当客户端想结束客户端的发送时,可以使用stream.CloseSend()方法暂停服务端的发送,但是如果后来又调用了stream.Recv(),服务端又会继续发送数据。
func (r *Server) RouteList(server simplepb.Route_RouteListServer) error {
for {
//从流中获取消息
res, err := server.Recv()
if err == io.EOF {
//发送结果,并关闭
return server.SendAndClose(&simplepb.SimpleResponse{
Code: 200,
Data: "hello " + time.Now().String(),
})
}
if err != nil {
return err
}
log.Println(res.StreamData)
}
}
服务端的代码如上,客户端代码如下:
func routeList() {
//调用服务端RouteList方法,获流
stream, err := grpcClient.RouteList(context.Background())
if err != nil {
log.Fatalf("Upload list err: %v", err)
}
for n := 0; n < 5; n++ {
//向流中发送消息
err := stream.Send(&simplepb.StreamRequest{StreamData: "stream client rpc " + strconv.Itoa(n)})
if err != nil {
log.Fatalf("stream request err: %v", err)
}
}
//关闭流并获取返回的消息
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("RouteList get response err: %v", err)
}
log.Println(res)
}
服务端的打印如下:
$ go run server.go
2023/01/04 23:43:07 :8080 net.Listing...
2023/01/04 23:43:28 stream client rpc 0
2023/01/04 23:43:28 stream client rpc 1
2023/01/04 23:43:28 stream client rpc 2
2023/01/04 23:43:28 stream client rpc 3
2023/01/04 23:43:28 stream client rpc 4
客户端的打印:
$ go run client.go
2023/01/04 23:43:28 code:200 data:"hello 2023-01-04 23:43:28.970596 +0800 CST m=+21.749100126"
func (r *Server) DoubleSideStream(server simplepb.Route_DoubleSideStreamServer) error {
n := 1
for {
req, err := server.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
err = server.Send(&simplepb.StreamResponse{
StreamValue: "from stream server answer: the " + strconv.Itoa(n) + " question is " + req.StreamData,
})
if err != nil {
return err
}
n++
log.Printf("from stream client question: %s", req.StreamData)
}
}
服务端代码如上,客户端代码如下:
func doubleSideStream() {
//调用服务端的Conversations方法,获取流
stream, err := grpcClient.DoubleSideStream(context.Background())
if err != nil {
log.Fatalf("get conversations stream err: %v", err)
}
for n := 0; n < 5; n++ {
err := stream.Send(&simplepb.StreamRequest{StreamData: "stream client rpc " + strconv.Itoa(n)})
if err != nil {
log.Fatalf("stream request err: %v", err)
}
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Conversations get stream err: %v", err)
}
// 打印返回值
log.Println(res.StreamValue)
}
//最后关闭流
err = stream.CloseSend()
if err != nil {
log.Fatalf("Conversations close stream err: %v", err)
}
}
服务端的打印如下:
$ go run server.go
2023/01/04 23:52:07 :8080 net.Listing...
2023/01/04 23:52:11 from stream client question: stream client rpc 0
2023/01/04 23:52:11 from stream client question: stream client rpc 1
2023/01/04 23:52:11 from stream client question: stream client rpc 2
2023/01/04 23:52:11 from stream client question: stream client rpc 3
2023/01/04 23:52:11 from stream client question: stream client rpc 4
客户端的打印:
go run client.go 1 ?
2023/01/04 23:52:11 from stream server answer: the 1 question is stream client rpc 0
2023/01/04 23:52:11 from stream server answer: the 2 question is stream client rpc 1
2023/01/04 23:52:11 from stream server answer: the 3 question is stream client rpc 2
2023/01/04 23:52:11 from stream server answer: the 4 question is stream client rpc 3
2023/01/04 23:52:11 from stream server answer: the 5 question is stream client rpc 4