欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 文化 > brpc之baidu_protocol

brpc之baidu_protocol

2025/1/19 13:16:07 来源:https://blog.csdn.net/wuli2496/article/details/145097060  浏览:    关键词:brpc之baidu_protocol

简介

是brpc默认使用的协议

初始化

Protocol baidu_protocol = { ParseRpcMessage,SerializeRequestDefault, PackRpcRequest,ProcessRpcRequest, ProcessRpcResponse,VerifyRpcRequest, NULL, NULL,CONNECTION_TYPE_ALL, "baidu_std" };

协议定义

定义在baidu_rpc_meta.proto文件中

message RpcMeta {optional RpcRequestMeta request = 1;optional RpcResponseMeta response = 2;optional int32 compress_type = 3;optional int64 correlation_id = 4;optional int32 attachment_size = 5;optional ChunkInfo chunk_info = 6;optional bytes authentication_data = 7;optional StreamSettings stream_settings = 8;map<string, string> user_fields = 9;
}message RpcRequestMeta {required string service_name = 1;required string method_name = 2;optional int64 log_id = 3;optional int64 trace_id = 4;optional int64 span_id = 5;optional int64 parent_span_id = 6;optional string request_id = 7; // correspond to x-request-id in http headeroptional int32 timeout_ms = 8;  // client's timeout setting for current call
}message RpcResponseMeta {optional int32 error_code = 1;optional string error_text = 2;
}

处理

ProcessRpcRequest是服务端收到一个完整包后的处理,其依赖ServerPrivateAccessor(获取Server对应的服务名或者方法名)和ControllerPrivateAccessor

ServerPrivateAccessor
- const Server* _server
ControllerPrivateAccessor
- Controller* _cntl
void ProcessRpcRequest(InputMessageBase* msg_base) {const int64_t start_parse_us = butil::cpuwide_time_us();DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));SocketUniquePtr socket_guard(msg->ReleaseSocket());Socket* socket = socket_guard.get();const Server* server = static_cast<const Server*>(msg_base->arg());ScopedNonServiceError non_service_error(server);RpcMeta meta;if (!ParsePbFromIOBuf(&meta, msg->meta)) {LOG(WARNING) << "Fail to parse RpcMeta from " << *socket;socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s",socket->description().c_str());return;}const RpcRequestMeta &request_meta = meta.request();SampledRequest* sample = AskToBeSampled();if (sample) {sample->meta.set_service_name(request_meta.service_name());sample->meta.set_method_name(request_meta.method_name());sample->meta.set_compress_type((CompressType)meta.compress_type());sample->meta.set_protocol_type(PROTOCOL_BAIDU_STD);sample->meta.set_attachment_size(meta.attachment_size());sample->meta.set_authentication_data(meta.authentication_data());sample->request = msg->payload;sample->submit(start_parse_us);}std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);if (NULL == cntl.get()) {LOG(WARNING) << "Fail to new Controller";return;}std::unique_ptr<google::protobuf::Message> req;std::unique_ptr<google::protobuf::Message> res;ServerPrivateAccessor server_accessor(server);ControllerPrivateAccessor accessor(cntl.get());const bool security_mode = server->options().security_mode() &&socket->user() == server_accessor.acceptor();if (request_meta.has_log_id()) {cntl->set_log_id(request_meta.log_id());}if (request_meta.has_request_id()) {cntl->set_request_id(request_meta.request_id());}if (request_meta.has_timeout_ms()) {cntl->set_timeout_ms(request_meta.timeout_ms());}cntl->set_request_compress_type((CompressType)meta.compress_type());accessor.set_server(server).set_security_mode(security_mode).set_peer_id(socket->id()).set_remote_side(socket->remote_side()).set_local_side(socket->local_side()).set_auth_context(socket->auth_context()).set_request_protocol(PROTOCOL_BAIDU_STD).set_begin_time_us(msg->received_us()).move_in_server_receiving_sock(socket_guard);if (meta.has_stream_settings()) {accessor.set_remote_stream_settings(meta.release_stream_settings());}if (!meta.user_fields().empty()) {for (const auto& it : meta.user_fields()) {(*cntl->request_user_fields())[it.first] = it.second;}}// Tag the bthread with this server's key for thread_local_data().if (server->thread_local_options().thread_local_data_factory) {bthread_assign_data((void*)&server->thread_local_options());}Span* span = NULL;if (IsTraceable(request_meta.has_trace_id())) {span = Span::CreateServerSpan(request_meta.trace_id(), request_meta.span_id(),request_meta.parent_span_id(), msg->base_real_us());accessor.set_span(span);span->set_log_id(request_meta.log_id());span->set_remote_side(cntl->remote_side());span->set_protocol(PROTOCOL_BAIDU_STD);span->set_received_us(msg->received_us());span->set_start_parse_us(start_parse_us);span->set_request_size(msg->payload.size() + msg->meta.size() + 12);}MethodStatus* method_status = NULL;do {if (!server->IsRunning()) {cntl->SetFailed(ELOGOFF, "Server is stopping");break;}if (socket->is_overcrowded()) {cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",butil::endpoint2str(socket->remote_side()).c_str());break;}if (!server_accessor.AddConcurrency(cntl.get())) {cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",server->options().max_concurrency);break;}if (FLAGS_usercode_in_pthread && TooManyUserCode()) {cntl->SetFailed(ELIMIT, "Too many user code to run when"" -usercode_in_pthread is on");break;}// NOTE(gejun): jprotobuf sends service names without packages. So the// name should be changed to full when it's not.butil::StringPiece svc_name(request_meta.service_name());if (svc_name.find('.') == butil::StringPiece::npos) {const Server::ServiceProperty* sp =server_accessor.FindServicePropertyByName(svc_name);if (NULL == sp) {cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",request_meta.service_name().c_str());break;}svc_name = sp->service->GetDescriptor()->full_name();}const Server::MethodProperty* mp =server_accessor.FindMethodPropertyByFullName(svc_name, request_meta.method_name());if (NULL == mp) {cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",request_meta.service_name().c_str(),request_meta.method_name().c_str());break;} else if (mp->service->GetDescriptor()== BadMethodService::descriptor()) {BadMethodRequest breq;BadMethodResponse bres;breq.set_service_name(request_meta.service_name());mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);break;}// Switch to service-specific error.non_service_error.release();method_status = mp->status;if (method_status) {int rejected_cc = 0;if (!method_status->OnRequested(&rejected_cc, cntl.get())) {cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",mp->method->full_name().c_str(), rejected_cc);break;}}google::protobuf::Service* svc = mp->service;const google::protobuf::MethodDescriptor* method = mp->method;accessor.set_method(method);if (!server->AcceptRequest(cntl.get())) {break;}if (span) {span->ResetServerSpanName(method->full_name());}const int req_size = static_cast<int>(msg->payload.size());butil::IOBuf req_buf;butil::IOBuf* req_buf_ptr = &msg->payload;if (meta.has_attachment_size()) {if (req_size < meta.attachment_size()) {cntl->SetFailed(EREQUEST,"attachment_size=%d is larger than request_size=%d",meta.attachment_size(), req_size);break;}int body_without_attachment_size = req_size - meta.attachment_size();msg->payload.cutn(&req_buf, body_without_attachment_size);req_buf_ptr = &req_buf;cntl->request_attachment().swap(msg->payload);}CompressType req_cmp_type = (CompressType)meta.compress_type();req.reset(svc->GetRequestPrototype(method).New());if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {cntl->SetFailed(EREQUEST, "Fail to parse request message, ""CompressType=%s, request_size=%d", CompressTypeToCStr(req_cmp_type), req_size);break;}res.reset(svc->GetResponsePrototype(method).New());// `socket' will be held until response has been sentgoogle::protobuf::Closure* done = ::brpc::NewCallback<int64_t, Controller*, const google::protobuf::Message*,const google::protobuf::Message*, const Server*,MethodStatus*, int64_t>(&SendRpcResponse, meta.correlation_id(), cntl.get(), req.get(), res.get(), server,method_status, msg->received_us());// optional, just release resource ASAPmsg.reset();req_buf.clear();if (span) {span->set_start_callback_us(butil::cpuwide_time_us());span->AsParent();}if (!FLAGS_usercode_in_pthread) {return svc->CallMethod(method, cntl.release(), req.release(), res.release(), done);}if (BeginRunningUserCode()) {svc->CallMethod(method, cntl.release(), req.release(), res.release(), done);return EndRunningUserCodeInPlace();} else {return EndRunningCallMethodInPool(svc, method, cntl.release(),req.release(), res.release(), done);}} while (false);// `cntl', `req' and `res' will be deleted inside `SendRpcResponse'// `socket' will be held until response has been sentSendRpcResponse(meta.correlation_id(), cntl.release(), req.release(), res.release(), server,method_status, msg->received_us());
}

对于开启了usercode_in_pthread,如果计数超过限制,会放到UserCodeBackupPool线程池中来执行

int UserCodeBackupPool::Init() {// Like bthread workers, these threads never quit (to avoid potential hang// during termination of program).for (int i = 0; i < FLAGS_usercode_backup_threads; ++i) {pthread_t th;if (pthread_create(&th, NULL, UserCodeRunner, this) != 0) {LOG(ERROR) << "Fail to create UserCodeRunner";return -1;}}return 0;
}static void* UserCodeRunner(void* args) {butil::PlatformThread::SetName("brpc_user_code_runner");static_cast<UserCodeBackupPool*>(args)->UserCodeRunningLoop();return NULL;
}// Entry of backup thread for running user code.
void UserCodeBackupPool::UserCodeRunningLoop() {bthread::run_worker_startfn();
#ifdef BAIDU_INTERNALlogging::ComlogInitializer comlog_initializer;
#endifint64_t last_time = butil::cpuwide_time_us();while (true) {bool blocked = false;UserCode usercode = { NULL, NULL };{BAIDU_SCOPED_LOCK(s_usercode_mutex);while (queue.empty()) {pthread_cond_wait(&s_usercode_cond, &s_usercode_mutex);blocked = true;}usercode = queue.front();queue.pop_front();if (g_too_many_usercode &&(int)queue.size() <= FLAGS_usercode_backup_threads) {g_too_many_usercode = false;}}const int64_t begin_time = (blocked ? butil::cpuwide_time_us() : last_time);usercode.fn(usercode.arg);const int64_t end_time = butil::cpuwide_time_us();inpool_count << 1;inpool_elapse_us << (end_time - begin_time);last_time = end_time;}
}

放入队列逻辑为

void EndRunningCallMethodInPool(::google::protobuf::Service* service,const ::google::protobuf::MethodDescriptor* method,::google::protobuf::RpcController* controller,const ::google::protobuf::Message* request,::google::protobuf::Message* response,::google::protobuf::Closure* done) {CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs;args->service = service;args->method = method;args->controller = controller;args->request = request;args->response = response;args->done = done;return EndRunningUserCodeInPool(CallMethodInBackupThread, args);
};void EndRunningUserCodeInPool(void (*fn)(void*), void* arg) {InitUserCodeBackupPoolOnceOrDie();g_usercode_inplace.fetch_sub(1, butil::memory_order_relaxed);// Not enough idle workers, run the code in backup threads to prevent// all workers from being blocked and no responses will be processed// anymore (deadlocked).const UserCode usercode = { fn, arg };pthread_mutex_lock(&s_usercode_mutex);s_usercode_pool->queue.push_back(usercode);// If the queue has too many items, we can't drop the user code// directly which often must be run, for example: client-side done.// The solution is that we set a mark which is not cleared before// queue becomes short again. RPC code checks the mark before// submitting tasks that may generate more user code.if ((int)s_usercode_pool->queue.size() >=(FLAGS_usercode_backup_threads *FLAGS_max_pending_in_each_backup_thread)) {g_too_many_usercode = true;}pthread_mutex_unlock(&s_usercode_mutex);pthread_cond_signal(&s_usercode_cond);
}

闭包

在处理协议时,最终提供给Server的闭包为

google::protobuf::Closure* done = ::brpc::NewCallback<int64_t, Controller*, const google::protobuf::Message*,const google::protobuf::Message*, const Server*,MethodStatus*, int64_t>(&SendRpcResponse, meta.correlation_id(), cntl.get(), req.get(), res.get(), server,method_status, msg->received_us());template <typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5, typename Arg6, typename Arg7>
inline ::google::protobuf::Closure* NewCallback(void (*function)(Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7),Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5, Arg6 arg6, Arg7 arg7) {return new internal::FunctionClosure7<Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7>(function, true, arg1, arg2, arg3, arg4, arg5, arg6, arg7);
}

FunctionClosure7闭包类关系为

FunctionClosure7<Arg1, Arg2, Arg3, Arg4, Arg5, Arg6 ,Arg7 >
- FunctionType function_
- bool self_deleting_
- Arg1 arg1_
- Arg2 arg2_
- Arg3 arg3_
- Arg4 arg4_
- Arg5 arg5_
- Arg6 arg6_
- Arg7 arg7_
Closure

function_:为发送响应函数SendRpcResponse
arg1_:为meta.correlation_id()
arg2_:为cntl.get(),类型为Controller*
arg3_:为req.get(),类型为const google::protobuf::Message*,表示请求消息
arg4_:为res.get(),类型为const google::protobuf::Message*,表示用于加响应的消息
arg5_:为server, 类型为const Server*,表示具体的服务业务
arg6_ :为method_status,类型为MethodStatus*,表示方法状态
arg7_:为msg->received_us(),类型为int64_t,表示消息的接收时间

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com