简介
是brpc默认使用的协议
初始化
Protocol baidu_protocol = { ParseRpcMessage,SerializeRequestDefault, PackRpcRequest,ProcessRpcRequest, ProcessRpcResponse,VerifyRpcRequest, NULL, NULL,CONNECTION_TYPE_ALL, "baidu_std" };
协议定义
定义在baidu_rpc_meta.proto文件中
message RpcMeta {optional RpcRequestMeta request = 1;optional RpcResponseMeta response = 2;optional int32 compress_type = 3;optional int64 correlation_id = 4;optional int32 attachment_size = 5;optional ChunkInfo chunk_info = 6;optional bytes authentication_data = 7;optional StreamSettings stream_settings = 8;map<string, string> user_fields = 9;
}message RpcRequestMeta {required string service_name = 1;required string method_name = 2;optional int64 log_id = 3;optional int64 trace_id = 4;optional int64 span_id = 5;optional int64 parent_span_id = 6;optional string request_id = 7; // correspond to x-request-id in http headeroptional int32 timeout_ms = 8; // client's timeout setting for current call
}message RpcResponseMeta {optional int32 error_code = 1;optional string error_text = 2;
}
处理
ProcessRpcRequest
是服务端收到一个完整包后的处理,其依赖ServerPrivateAccessor
(获取Server对应的服务名或者方法名)和ControllerPrivateAccessor
void ProcessRpcRequest(InputMessageBase* msg_base) {const int64_t start_parse_us = butil::cpuwide_time_us();DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));SocketUniquePtr socket_guard(msg->ReleaseSocket());Socket* socket = socket_guard.get();const Server* server = static_cast<const Server*>(msg_base->arg());ScopedNonServiceError non_service_error(server);RpcMeta meta;if (!ParsePbFromIOBuf(&meta, msg->meta)) {LOG(WARNING) << "Fail to parse RpcMeta from " << *socket;socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s",socket->description().c_str());return;}const RpcRequestMeta &request_meta = meta.request();SampledRequest* sample = AskToBeSampled();if (sample) {sample->meta.set_service_name(request_meta.service_name());sample->meta.set_method_name(request_meta.method_name());sample->meta.set_compress_type((CompressType)meta.compress_type());sample->meta.set_protocol_type(PROTOCOL_BAIDU_STD);sample->meta.set_attachment_size(meta.attachment_size());sample->meta.set_authentication_data(meta.authentication_data());sample->request = msg->payload;sample->submit(start_parse_us);}std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);if (NULL == cntl.get()) {LOG(WARNING) << "Fail to new Controller";return;}std::unique_ptr<google::protobuf::Message> req;std::unique_ptr<google::protobuf::Message> res;ServerPrivateAccessor server_accessor(server);ControllerPrivateAccessor accessor(cntl.get());const bool security_mode = server->options().security_mode() &&socket->user() == server_accessor.acceptor();if (request_meta.has_log_id()) {cntl->set_log_id(request_meta.log_id());}if (request_meta.has_request_id()) {cntl->set_request_id(request_meta.request_id());}if (request_meta.has_timeout_ms()) {cntl->set_timeout_ms(request_meta.timeout_ms());}cntl->set_request_compress_type((CompressType)meta.compress_type());accessor.set_server(server).set_security_mode(security_mode).set_peer_id(socket->id()).set_remote_side(socket->remote_side()).set_local_side(socket->local_side()).set_auth_context(socket->auth_context()).set_request_protocol(PROTOCOL_BAIDU_STD).set_begin_time_us(msg->received_us()).move_in_server_receiving_sock(socket_guard);if (meta.has_stream_settings()) {accessor.set_remote_stream_settings(meta.release_stream_settings());}if (!meta.user_fields().empty()) {for (const auto& it : meta.user_fields()) {(*cntl->request_user_fields())[it.first] = it.second;}}// Tag the bthread with this server's key for thread_local_data().if (server->thread_local_options().thread_local_data_factory) {bthread_assign_data((void*)&server->thread_local_options());}Span* span = NULL;if (IsTraceable(request_meta.has_trace_id())) {span = Span::CreateServerSpan(request_meta.trace_id(), request_meta.span_id(),request_meta.parent_span_id(), msg->base_real_us());accessor.set_span(span);span->set_log_id(request_meta.log_id());span->set_remote_side(cntl->remote_side());span->set_protocol(PROTOCOL_BAIDU_STD);span->set_received_us(msg->received_us());span->set_start_parse_us(start_parse_us);span->set_request_size(msg->payload.size() + msg->meta.size() + 12);}MethodStatus* method_status = NULL;do {if (!server->IsRunning()) {cntl->SetFailed(ELOGOFF, "Server is stopping");break;}if (socket->is_overcrowded()) {cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",butil::endpoint2str(socket->remote_side()).c_str());break;}if (!server_accessor.AddConcurrency(cntl.get())) {cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",server->options().max_concurrency);break;}if (FLAGS_usercode_in_pthread && TooManyUserCode()) {cntl->SetFailed(ELIMIT, "Too many user code to run when"" -usercode_in_pthread is on");break;}// NOTE(gejun): jprotobuf sends service names without packages. So the// name should be changed to full when it's not.butil::StringPiece svc_name(request_meta.service_name());if (svc_name.find('.') == butil::StringPiece::npos) {const Server::ServiceProperty* sp =server_accessor.FindServicePropertyByName(svc_name);if (NULL == sp) {cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",request_meta.service_name().c_str());break;}svc_name = sp->service->GetDescriptor()->full_name();}const Server::MethodProperty* mp =server_accessor.FindMethodPropertyByFullName(svc_name, request_meta.method_name());if (NULL == mp) {cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",request_meta.service_name().c_str(),request_meta.method_name().c_str());break;} else if (mp->service->GetDescriptor()== BadMethodService::descriptor()) {BadMethodRequest breq;BadMethodResponse bres;breq.set_service_name(request_meta.service_name());mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);break;}// Switch to service-specific error.non_service_error.release();method_status = mp->status;if (method_status) {int rejected_cc = 0;if (!method_status->OnRequested(&rejected_cc, cntl.get())) {cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",mp->method->full_name().c_str(), rejected_cc);break;}}google::protobuf::Service* svc = mp->service;const google::protobuf::MethodDescriptor* method = mp->method;accessor.set_method(method);if (!server->AcceptRequest(cntl.get())) {break;}if (span) {span->ResetServerSpanName(method->full_name());}const int req_size = static_cast<int>(msg->payload.size());butil::IOBuf req_buf;butil::IOBuf* req_buf_ptr = &msg->payload;if (meta.has_attachment_size()) {if (req_size < meta.attachment_size()) {cntl->SetFailed(EREQUEST,"attachment_size=%d is larger than request_size=%d",meta.attachment_size(), req_size);break;}int body_without_attachment_size = req_size - meta.attachment_size();msg->payload.cutn(&req_buf, body_without_attachment_size);req_buf_ptr = &req_buf;cntl->request_attachment().swap(msg->payload);}CompressType req_cmp_type = (CompressType)meta.compress_type();req.reset(svc->GetRequestPrototype(method).New());if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {cntl->SetFailed(EREQUEST, "Fail to parse request message, ""CompressType=%s, request_size=%d", CompressTypeToCStr(req_cmp_type), req_size);break;}res.reset(svc->GetResponsePrototype(method).New());// `socket' will be held until response has been sentgoogle::protobuf::Closure* done = ::brpc::NewCallback<int64_t, Controller*, const google::protobuf::Message*,const google::protobuf::Message*, const Server*,MethodStatus*, int64_t>(&SendRpcResponse, meta.correlation_id(), cntl.get(), req.get(), res.get(), server,method_status, msg->received_us());// optional, just release resource ASAPmsg.reset();req_buf.clear();if (span) {span->set_start_callback_us(butil::cpuwide_time_us());span->AsParent();}if (!FLAGS_usercode_in_pthread) {return svc->CallMethod(method, cntl.release(), req.release(), res.release(), done);}if (BeginRunningUserCode()) {svc->CallMethod(method, cntl.release(), req.release(), res.release(), done);return EndRunningUserCodeInPlace();} else {return EndRunningCallMethodInPool(svc, method, cntl.release(),req.release(), res.release(), done);}} while (false);// `cntl', `req' and `res' will be deleted inside `SendRpcResponse'// `socket' will be held until response has been sentSendRpcResponse(meta.correlation_id(), cntl.release(), req.release(), res.release(), server,method_status, msg->received_us());
}
对于开启了usercode_in_pthread
,如果计数超过限制,会放到UserCodeBackupPool
线程池中来执行
int UserCodeBackupPool::Init() {// Like bthread workers, these threads never quit (to avoid potential hang// during termination of program).for (int i = 0; i < FLAGS_usercode_backup_threads; ++i) {pthread_t th;if (pthread_create(&th, NULL, UserCodeRunner, this) != 0) {LOG(ERROR) << "Fail to create UserCodeRunner";return -1;}}return 0;
}static void* UserCodeRunner(void* args) {butil::PlatformThread::SetName("brpc_user_code_runner");static_cast<UserCodeBackupPool*>(args)->UserCodeRunningLoop();return NULL;
}// Entry of backup thread for running user code.
void UserCodeBackupPool::UserCodeRunningLoop() {bthread::run_worker_startfn();
#ifdef BAIDU_INTERNALlogging::ComlogInitializer comlog_initializer;
#endifint64_t last_time = butil::cpuwide_time_us();while (true) {bool blocked = false;UserCode usercode = { NULL, NULL };{BAIDU_SCOPED_LOCK(s_usercode_mutex);while (queue.empty()) {pthread_cond_wait(&s_usercode_cond, &s_usercode_mutex);blocked = true;}usercode = queue.front();queue.pop_front();if (g_too_many_usercode &&(int)queue.size() <= FLAGS_usercode_backup_threads) {g_too_many_usercode = false;}}const int64_t begin_time = (blocked ? butil::cpuwide_time_us() : last_time);usercode.fn(usercode.arg);const int64_t end_time = butil::cpuwide_time_us();inpool_count << 1;inpool_elapse_us << (end_time - begin_time);last_time = end_time;}
}
放入队列逻辑为
void EndRunningCallMethodInPool(::google::protobuf::Service* service,const ::google::protobuf::MethodDescriptor* method,::google::protobuf::RpcController* controller,const ::google::protobuf::Message* request,::google::protobuf::Message* response,::google::protobuf::Closure* done) {CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs;args->service = service;args->method = method;args->controller = controller;args->request = request;args->response = response;args->done = done;return EndRunningUserCodeInPool(CallMethodInBackupThread, args);
};void EndRunningUserCodeInPool(void (*fn)(void*), void* arg) {InitUserCodeBackupPoolOnceOrDie();g_usercode_inplace.fetch_sub(1, butil::memory_order_relaxed);// Not enough idle workers, run the code in backup threads to prevent// all workers from being blocked and no responses will be processed// anymore (deadlocked).const UserCode usercode = { fn, arg };pthread_mutex_lock(&s_usercode_mutex);s_usercode_pool->queue.push_back(usercode);// If the queue has too many items, we can't drop the user code// directly which often must be run, for example: client-side done.// The solution is that we set a mark which is not cleared before// queue becomes short again. RPC code checks the mark before// submitting tasks that may generate more user code.if ((int)s_usercode_pool->queue.size() >=(FLAGS_usercode_backup_threads *FLAGS_max_pending_in_each_backup_thread)) {g_too_many_usercode = true;}pthread_mutex_unlock(&s_usercode_mutex);pthread_cond_signal(&s_usercode_cond);
}
闭包
在处理协议时,最终提供给Server的闭包为
google::protobuf::Closure* done = ::brpc::NewCallback<int64_t, Controller*, const google::protobuf::Message*,const google::protobuf::Message*, const Server*,MethodStatus*, int64_t>(&SendRpcResponse, meta.correlation_id(), cntl.get(), req.get(), res.get(), server,method_status, msg->received_us());template <typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5, typename Arg6, typename Arg7>
inline ::google::protobuf::Closure* NewCallback(void (*function)(Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7),Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5, Arg6 arg6, Arg7 arg7) {return new internal::FunctionClosure7<Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7>(function, true, arg1, arg2, arg3, arg4, arg5, arg6, arg7);
}
FunctionClosure7
闭包类关系为
function_
:为发送响应函数SendRpcResponse
arg1_
:为meta.correlation_id()
arg2_
:为cntl.get()
,类型为Controller*
arg3_
:为req.get()
,类型为const google::protobuf::Message*
,表示请求消息
arg4_
:为res.get()
,类型为const google::protobuf::Message*
,表示用于加响应的消息
arg5_
:为server
, 类型为const Server*
,表示具体的服务业务
arg6_
:为method_status
,类型为MethodStatus*
,表示方法状态
arg7_
:为msg->received_us()
,类型为int64_t
,表示消息的接收时间