欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 国际 > 项目组件框架介绍[bRPC]

项目组件框架介绍[bRPC]

2024/12/22 2:21:38 来源:https://blog.csdn.net/2301_77838258/article/details/144385099  浏览:    关键词:项目组件框架介绍[bRPC]

文章目录

  • 前言
  • bRPC安装
  • bRPC的简单使用
    • protobuf简单使用
    • Echo服务
    • 远程调用Echo服务
  • 与etcd关联


前言

    bRPC是百度开源的一款工业级RPC框架,功能强大, 常用于搜索、存储、机器学习、广告、推荐等高性能系统。

bRPC安装

    使用源码安装即可, 在安装前要确认依赖

sudo apt-get install -y libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev

直接克隆仓库, 也可以从官网下载

git clone https://github.com/apache/brpc.git
cd brpc
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j6
make
sudo make install

bRPC的简单使用

    官网给出了一些测试代码供我们了解使用方法, 比如最简单的一个echo服务

protobuf简单使用

syntax="proto2";
//设置命名空间名
package example;option cc_generic_services = true;
//请求结构
message EchoRequest 
{required string message = 1;
};
//响应结构
message EchoResponse 
{required string message = 1;
};
//EchoService将提供一个Echo方法
service EchoService 
{rpc Echo(EchoRequest) returns (EchoResponse);
};

通过这个protobuf文件生成对应的*.pb.cc和*.pb.h文件, 后面我们编写服务和调用都要包含这文件

protoc --cpp_out=. echo.proto 

在echo.pb.h文件中可以看到这部分

class EchoService : public ::PROTOBUF_NAMESPACE_ID::Service 
{...
virtual void Echo(::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::example::EchoRequest* request,::example::EchoResponse* response, ::google::protobuf::Closure* done);...
};

这个EchoService类刚好就是proto文件中设定的service , 而Echo方法就需要我们自己重写
再来看这个类

class EchoService_Stub : public EchoService
{...
void Echo(::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::example::EchoRequest* request,::example::EchoResponse* response, ::google::protobuf::Closure* done);...
};

这个就是我们要远程调用Echo服务时使用的, 使用就直接构造对象调用Echo方法即可, 就能远程调用到Echo服务的Echo方法

Echo服务

    现在我们来看搭建一个Echo服务, 这里我简化了官网的代码, 方便快速上手搭建
首先是EchoServer

class EchoServiceImpl : public EchoService 
{
public:EchoServiceImpl() {}virtual ~EchoServiceImpl() {}virtual void Echo(google::protobuf::RpcController* cntl_base,const EchoRequest* request,EchoResponse* response,google::protobuf::Closure* done) {//这个对象可以 RAII 样式调用 done->Run()brpc::ClosureGuard done_guard(done);//可以选择设置一个回调函数, 将在响应后调用, 肯定是在这些参数销毁之前调用//cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));//设置响应response->set_message(request->message());//可以通过设置 Controller 来压缩响应,当然, 这种压缩是有代价的// cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);}
};
int main(int argc, char* argv[]) {//这个就是Srever对象brpc::Server server;//这个是我们刚写的服务EchoServiceImpl echo_service_impl;//然后将这个服务添加到Server中//如果你创建的这个EchoServiceImpl对象在栈上, 那么就用brpc::SERVER_DOESNT_OWN_SERVICE//如果在堆上创建的, 比如new出来的, 就用brpc::SERVER_OWNS_SERVICEif (server.AddService(&echo_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {LOG(ERROR) << "Fail to add service";return -1;}//设置监听地址butil::EndPoint point;if (butil::str2endpoint(/*监听地址, 加上端口, 比如 127.0.0.1:8848 */, &point) < 0) {LOG(ERROR) << "Invalid listen address:";return -1;}//或者使用端口, butil::IP_ANY表示任意, 相当于0.0.0.0point = butil::EndPoint(butil::IP_ANY, FLAGS_port);//设置配置启动即可brpc::ServerOptions options;options.idle_timeout_sec =/*设置空闲超时时间*/;if (server.Start(point, &options) != 0) {LOG(ERROR) << "Fail to start EchoServer";return -1;}//开跑server.RunUntilAskedToQuit();return 0;
}

远程调用Echo服务

DEFINE_string(attachment, "", "Carry this along with requests");
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:8000", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); 
DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
int main(int argc, char* argv[]) 
{GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);//我们需要一个brpc::Channel对象, 这个就是和Server的信道。并且是线程安全的brpc::Channel channel;//可以自己设置配置, 使用空指针就是默认配置brpc::ChannelOptions options;options.protocol = FLAGS_protocol;options.connection_type = FLAGS_connection_type;options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;options.max_retry = FLAGS_max_retry;//初始化信道if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {LOG(ERROR) << "Fail to initialize channel";return -1;}//使用之前提到的这个东西来调用Echo方法, 传入一个信道, 这个也是线程安全的example::EchoService_Stub stub(&channel);//每 1 秒发送一次请求并等待响应。while (!brpc::IsAskedToQuit()) {example::EchoRequest request;example::EchoResponse response;brpc::Controller cntl;request.set_message("hello world");//因为 'done'(最后一个参数)为 NULL,所以此函数会一直等待,直到响应返回或发生错误(包括 timedout)。stub.Echo(&cntl, &request, &response, NULL);if (!cntl.Failed()) {LOG(INFO) << "Received response from " << cntl.remote_side()<< " to " << cntl.local_side()<< ": " << response.message() << " (attached="<< cntl.response_attachment() << ")"<< " latency=" << cntl.latency_us() << "us";} else {LOG(WARNING) << cntl.ErrorText();}//打印一下返回的响应std::cout<<response.message()<<std::endl;usleep(FLAGS_interval_ms * 1000L);}LOG(INFO) << "EchoClient is going to quit";return 0;
}

测试一下
在这里插入图片描述

与etcd关联

    我们可以将brpc::Channel对象和对应的服务相关联, 因为我们每次使用bRPC远程调用都需要brpc::Channel对象, 那么我们可以在服务发现后返回一个brpc::Channel对象

namespace MindbniM
{using ChannelPtr=std::shared_ptr<brpc::Channel>;//这是对一个服务的brpc信道管理class ServiceChannel{public:using ptr=std::shared_ptr<ServiceChannel>;ServiceChannel(const std::string& ServiceName):_name(ServiceName){}bool append(const std::string& host){ChannelPtr ch=std::make_shared<brpc::Channel>();brpc::ChannelOptions op;//这里为了方便就固定参数了op.connect_timeout_ms=-1;op.timeout_ms=-1;op.max_retry=3;op.protocol="baidu_std";int ret=ch->Init(host.c_str(),&op);if(ret<0){LOG_ROOT_ERROR<<"初始化信道失败 host: "<<host;return false;}std::unique_lock<std::mutex> lock(_mutex);_channels.emplace_back(ch);_map[host]=ch;return true;}bool remove(const std::string& host){std::unique_lock<std::mutex> lock(_mutex);auto it=_map.find(host);if(it==_map.end()){LOG_ROOT_WARNING<<"没有找到该要删除的信道信息 host: "<<host;return false;}auto cit=_channels.begin();while(cit!=_channels.end()){if(*cit==it->second){_channels.erase(cit);break;}++cit;}_map.erase(host);return true;}ChannelPtr choose(){//直接简单轮转std::unique_lock<std::mutex> lock(_mutex);_index%=_channels.size();return _channels[_index++];}private:std::mutex _mutex;std::string _name;int _index=0;//信道储存std::vector<ChannelPtr> _channels;//主机和信道的映射std::unordered_map<std::string,ChannelPtr> _map;};//总信道管理class ServiceManager{public:using ptr=std::shared_ptr<ServiceManager>;ChannelPtr choose(const std::string& ServiceName)     {std::unique_lock<std::mutex> lock(_mutex);if(!_services.count(ServiceName)){LOG_ROOT_ERROR<<ServiceName<<" 没有能提供该服务的主机";return nullptr;}//返回对应Service的信道return _services[ServiceName]->choose();}//添加关心的服务, 只有关心的服务才会被添加void add_concern(const std::string& ServiceName){std::unique_lock<std::mutex> lock(_mutex);_concern.insert(ServiceName);}//服务上线回调void onServiceOnline(const std::string& ServiceInstance,const std::string& Host){std::string ServiceName=getServiceName(ServiceInstance);std::unique_lock<std::mutex> lock(_mutex);if(!_concern.count(ServiceName)){LOG_ROOT_DEBUG<<ServiceName<<"服务上线 Host:"<<Host<<"(不关心)";return ;       }if(!_services.count(ServiceName)){_services[ServiceName]=std::make_shared<ServiceChannel>(ServiceName);}LOG_ROOT_DEBUG<<ServiceName<<"服务上线 Host:"<<Host;_services[ServiceName]->append(Host);}//服务下线回调void onServiceOffline(const std::string& ServiceInstance,const std::string& Host){std::string ServiceName=getServiceName(ServiceInstance);std::unique_lock<std::mutex> lock(_mutex);if(!_concern.count(ServiceName)){LOG_ROOT_DEBUG<<ServiceName<<"服务下线 Host:"<<Host<<"(不关心)";return ;       }if(!_services.count(ServiceName)){LOG_ROOT_WARNING<<ServiceName<<"要删除的服务不存在";}_services[ServiceName]->remove(Host);}private:std::string getServiceName(const std::string& ServiceInstance){auto pos=ServiceInstance.find_last_of('/');if(pos==std::string::npos) return ServiceInstance;return ServiceInstance.substr(0,pos);}std::mutex _mutex;std::unordered_set<std::string> _concern;std::unordered_map<std::string,ServiceChannel::ptr> _services;};}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com