IM.Login.proto
syntax = "proto3";
// 命名空间一般和文件名保持一致
package IM.Login;
// 定义服务
service ImLogin {
// 定义服务函数
rpc Regist (IMRegistReq) returns (IMRegistRes) {}
rpc Login (IMLoginReq) returns (IMLoginRes){}
}
// 注册账号
message IMRegistReq{
string user_name = 1; // 用户名
string password = 2; // 密码
}
message IMRegistRes{
string user_name = 1;
uint32 user_id = 2;
uint32 result_code = 3; // 返回0的时候注册正常
}
// 登录账号
message IMLoginReq{
string user_name = 1;
string password = 2;
}
message IMLoginRes{
uint32 user_id = 1;
uint32 result_code = 2; // 返回0的时候正确
}
在 .proto 文件中定义了数据结构,这些数据结构是面向开发者和业务程序的,并不面向存储和传输。当
需要把这些数据进行存储或传输时,就需要将这些结构数据进行序列化、反序列化以及读写。通过
protoc 这个编译器,ProtoBuf 将会为我们提供相应的接口代码。可通过如下命令生成相应的接口代
码:
// $SRC_DIR: .proto 所在的源目录
// --cpp_out: 生成 c++ 代码
// $DST_DIR: 生成代码的目标目录
// xxx.proto: 要针对哪个 proto 文件生成接口代码
protoc -I=$SRC_DIR --cpp_out=$DST_DIR $SRC_DIR/xxx.proto
// 如:
protoc -I ./ --cpp_out=. simple.proto
$SRC_DIR
指定在解析导入指令时查找 .proto 文件的目录。如果省// 执行下面命令后,将在当前目录下生成 simple.grpc.pb.h 和 simple.grpc.pb.cc 文件
protoc -I ./ --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin`
simple.proto
// 上面的 `which grpc_cpp_plugin` 也可以替换为 grpc_cpp_plugin 程序的路径(如果不在系统PATH下)
// 生成的代码需要依赖第一步生成序列化代码,所以在使用的时候必须都要有(生成的时候不依赖)
这里我们生成对应要使用的文件:
protoc --cpp_out=. IM.Login.proto
protoc --cpp_out=. --grpc_out=. --plugin=protoc-gen-grpc=/usr/local/bin/grpc_cpp_plugin IM.Login.proto
或protoc -I . --cpp_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` *.proto
生成C++代码IM.Login.pb.h和IM.Login.pb.cc, IM.Login.grpc.pb.h和IM.Login.grpc.pb.cc。
记住首先一定要开放命名空间
// 命名空间
// grcp
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
定义服务端的类,继承 proto 文件定义的 grpc 服务 ImLogin ,重写 grpc 服务定义的方法
class IMLoginServiceImpl : public ImLogin::Service {
// 注册
virtual Status Regist(ServerContext* context, const IMRegistReq* request,IMRegistRes* response) override {
std::cout << "Regist user_name: " << request->user_name() << std::endl;
response->set_user_name(request->user_name());
response->set_user_id(10);
response->set_result_code(0);
return Status::OK;
}
// 登录
virtual Status Login(ServerContext* context, const IMLoginReq* request,IMLoginRes* response) override {
std::cout << "Login user_name: " << request->user_name() << std::endl;
response->set_user_id(10);
response->set_result_code(0);
return Status::OK;
}
};
std::string server_address("0.0.0.0:50051");
/* 定义重写的服务类 */
ImLoginServiceImpl service;
/* 创建工厂类 */
ServerBuilder builder;
/* 监听端口和地址 */
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 5000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
/* 注册服务 */
builder.RegisterService(&service);
/** 创建和启动一个RPC服务器*/
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
/* 进入服务事件循环 */
server->Wait();
#include <iostream>
#include <string>
// grpc头文件
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
// 包含我们自己proto文件生成的.h
#include "IM.Login.pb.h"
#include "IM.Login.grpc.pb.h"
// 命名空间
// grcp
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
class IMLoginServiceImpl : public ImLogin::Service {
// 注册
virtual Status Regist(ServerContext* context, const IMRegistReq* request, IMRegistRes* response) override {
std::cout << "Regist user_name: " << request->user_name() << std::endl;
response->set_user_name(request->user_name());
response->set_user_id(10);
response->set_result_code(0);
return Status::OK;
}
// 登录
virtual Status Login(ServerContext* context, const IMLoginReq* request, IMLoginRes* response) override {
std::cout << "Login user_name: " << request->user_name() << std::endl;
response->set_user_id(10);
response->set_result_code(0);
return Status::OK;
}
};
void RunServer()
{
std::string server_addr("0.0.0.0:50051");
// 创建一个服务类
IMLoginServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_addr, grpc::InsecureServerCredentials());
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 5000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
builder.RegisterService(&service);
//创建/启动
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_addr << std::endl;
// 进入服务循环
server->Wait();
}
// 怎么编译?
// 手动编译
// 通过cmake的方式
int main(int argc, const char** argv)
{
RunServer();
return 0;
}
记住首先一定要开放命名空间
// 命名空间
// grcp
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
定义客户端的类,实现两个方法用来发送grpc请求以及接收grpc响应
class ImLoginClient
{
public:
ImLoginClient(std::shared_ptr<Channel> channel)
:stub_(ImLogin::NewStub(channel)) {
}
void Regist(const std::string &user_name, const std::string &password) {
IMRegistReq request;
request.set_user_name(user_name);
request.set_password(password);
IMRegistRes response;
ClientContext context;
std::cout << "-> Regist req" << std::endl;
Status status = stub_->Regist(&context, request, &response);
if(status.ok()) {
std::cout << "user_name:" << response.user_name() << ", user_id:" << response.user_id() << std::endl;
} else {
std::cout << "user_name:" << response.user_name() << "Regist failed: " << response.result_code()<< std::endl;
}
}
void Login(const std::string &user_name, const std::string &password) {
IMLoginReq request;
request.set_user_name(user_name);
request.set_password(password);
IMLoginRes response;
ClientContext context;
std::cout << "-> Login req" << std::endl;
Status status = stub_->Login(&context, request, &response);
if(status.ok()) {
std::cout << "user_id:" << response.user_id() << " login ok" << std::endl;
} else {
std::cout << "user_name:" << request.user_name() << "Login failed: " << response.result_code()<< std::endl;
}
}
private:
std::unique_ptr<ImLogin::Stub> stub_;
};
#include <iostream>
#include <memory>
#include <string>
// /usr/local/include/grpcpp/grpcpp.h
#include <grpcpp/grpcpp.h>
// 包含我们自己proto文件生成的.h
#include "IM.Login.pb.h"
#include "IM.Login.grpc.pb.h"
// 命名空间
// grcp
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
class ImLoginClient
{
public:
ImLoginClient(std::shared_ptr<Channel> channel)
:stub_(ImLogin::NewStub(channel)) {
}
void Regist(const std::string &user_name, const std::string &password) {
IMRegistReq request;
request.set_user_name(user_name);
request.set_password(password);
IMRegistRes response;
ClientContext context;
std::cout << "-> Regist req" << std::endl;
Status status = stub_->Regist(&context, request, &response);
if(status.ok()) {
std::cout << "user_name:" << response.user_name() << ", user_id:" << response.user_id() << std::endl;
} else {
std::cout << "user_name:" << response.user_name() << "Regist failed: " << response.result_code()<< std::endl;
}
}
void Login(const std::string &user_name, const std::string &password) {
IMLoginReq request;
request.set_user_name(user_name);
request.set_password(password);
IMLoginRes response;
ClientContext context;
std::cout << "-> Login req" << std::endl;
Status status = stub_->Login(&context, request, &response);
if(status.ok()) {
std::cout << "user_id:" << response.user_id() << " login ok" << std::endl;
} else {
std::cout << "user_name:" << request.user_name() << "Login failed: " << response.result_code()<< std::endl;
}
}
private:
std::unique_ptr<ImLogin::Stub> stub_;
};
// 照葫芦画瓢
int main()
{
// 服务器的地址
std::string server_addr = "localhost:50051";
ImLoginClient im_login_client(
grpc::CreateChannel(server_addr, grpc::InsecureChannelCredentials())
);
std::string user_name = "darren";
std::string password = "123456";
im_login_client.Regist(user_name, password);
im_login_client.Login(user_name, password);
return 0;
}
//异步多类调用,实际项目中使用多类的比一个类多函数的要多,因为一个类多函数new的效率低,
//并且还可能产生无用数据
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include "IM.Login.grpc.pb.h"
#include "IM.Login.pb.h"
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
// There is no shutdown handling in this code.
void Run() { // 启动
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs();
}
private:
// Class encompasing the state and logic needed to serve a request.
class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), status_(CREATE) {
std::cout << "CallData constructing, this: " << this
<< std::endl; // darren 增加
// Invoke the serving logic right away.
Proceed();
}
virtual ~CallData(){}
virtual void Proceed() {
// std::cout << "CallData Prceed" << std::endl//;
return;
}
// 通用的
// The means of communication with the gRPC runtime for an asynchronous
// server.
ImLogin::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;
// 有差异的
// What we get from the client.
// IMRegistReq request_;
// // What we send back to the client.
// IMRegistRes reply_;
// // The means to get back to the client.
// ServerAsyncResponseWriter<IMRegistRes> responder_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_; // The current serving state.
};
class RegistCallData : public CallData {
public:
RegistCallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq)
:CallData(service, cq), responder_(&ctx_) {
Proceed();
}
~RegistCallData() {}
void Proceed() override {
// std::cout << "RegistCallData Prceed" << std::endl//;
std::cout << "this: " << this
<< " RegistCallData Proceed(), status: " << status_
<< std::endl; // darren 增加
if (status_ == CREATE) { // 0
std::cout << "this: " << this << " RegistCallData Proceed(), status: "
<< "CREATE" << std::endl;
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestRegist(&ctx_, &request_, &responder_, cq_, cq_, this);
} else if (status_ == PROCESS) { // 1
std::cout << "this: " << this << " RegistCallData Proceed(), status: "
<< "PROCESS" << std::endl;
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new RegistCallData(service_, cq_); // 1. 创建处理逻辑
reply_.set_user_name(request_.user_name());
reply_.set_user_id(10);
reply_.set_result_code(0);
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
std::cout << "this: " << this << " RegistCallData Proceed(), status: "
<< "FINISH" << std::endl;
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (RegistCallData).
delete this;
}
}
private:
IMRegistReq request_;
IMRegistRes reply_;
ServerAsyncResponseWriter<IMRegistRes> responder_;
};
class LoginCallData : public CallData {
public:
LoginCallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq)
:CallData(service, cq), responder_(&ctx_) {
Proceed();
}
~LoginCallData() {}
void Proceed() override {
// std::cout << "LoginCallData Prceed" << std::endl//;
std::cout << "this: " << this
<< " LoginCallData Proceed(), status: " << status_
<< std::endl; // darren 增加
if (status_ == CREATE) { // 0
std::cout << "this: " << this << " LoginCallData Proceed(), status: "
<< "CREATE" << std::endl;
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestLogin(&ctx_, &request_, &responder_, cq_, cq_, this);
} else if (status_ == PROCESS) { // 1
std::cout << "this: " << this << " LoginCallData Proceed(), status: "
<< "PROCESS" << std::endl;
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new LoginCallData(service_, cq_); // 1. 创建处理逻辑
reply_.set_user_id(10);
reply_.set_result_code(0);
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
std::cout << "this: " << this << " LoginCallData Proceed(), status: "
<< "FINISH" << std::endl;
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (LoginCallData).
delete this;
}
}
private:
IMLoginReq request_;
IMLoginRes reply_;
ServerAsyncResponseWriter<IMLoginRes> responder_;
};
// This can be run in multiple threads if needed.
void HandleRpcs() { // 可以运行在多线程
// Spawn a new CallData instance to serve new clients.
new RegistCallData(&service_, cq_.get()); //
new LoginCallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
std::cout << "before cq_->Next "
<< std::endl; // 1. 等待消息事件 darren 增加
GPR_ASSERT(cq_->Next(&tag, &ok));
std::cout << "after cq_->Next " << std::endl; // darren 增加
GPR_ASSERT(ok);
std::cout << "before static_cast" << std::endl; // darren 增加
static_cast<CallData*>(tag)->Proceed();
std::cout << "after static_cast" << std::endl; // darren 增加
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
ImLogin::AsyncService service_;
std::unique_ptr<Server> server_;
};
int main(int argc, char** argv) {
ServerImpl server;
server.Run();
return 0;
}