本文简介
书接上文。本文以CreateBucket为例进行详细讲述设计理念以及接口变化趋势。
1、接收请求和协议处理请求
rgw_asio_frontend.cc
主要功能:回调函数注册和请求处理
void handle_connection(boost::asio::io_context& context,RGWProcessEnv& env, Stream& stream,timeout_timer& timeout, size_t header_limit,parse_buffer& buffer, bool is_ssl,SharedMutex& pause_mutex,rgw::dmclock::Scheduler *scheduler,const std::string& uri_prefix,boost::system::error_code& ec,spawn::yield_context yield)
{// don't impose a limit on the body, since we read it in piecesstatic constexpr size_t body_limit = std::numeric_limits<size_t>::max();auto cct = env.driver->ctx();// read messages from the stream until eoffor (;;) {// configure the parserrgw::asio::parser_type parser;parser.header_limit(header_limit);parser.body_limit(body_limit);timeout.start();// parse the headerhttp::async_read_header(stream, buffer, parser, yield[ec]);timeout.cancel();if (ec == boost::asio::error::connection_reset ||ec == boost::asio::error::bad_descriptor ||ec == boost::asio::error::operation_aborted ||
#ifdef WITH_RADOSGW_BEAST_OPENSSLec == ssl::error::stream_truncated ||
#endifec == http::error::end_of_stream) {ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;return;}auto& message = parser.get();bool expect_continue = (message[http::field::expect] == "100-continue");{// process the requestRGWRequest req{env.driver->get_new_req_id()};StreamIO real_client{cct, stream, timeout, parser, yield, buffer,is_ssl, local_endpoint, remote_endpoint};auto real_client_io = rgw::io::add_reordering(rgw::io::add_buffering(cct,rgw::io::add_chunking(rgw::io::add_conlen_controlling(&real_client))));RGWRestfulIO client(cct, &real_client_io);optional_yield y = null_yield;process_request(env, &req, uri_prefix, &client, y,scheduler, &user, &latency, &http_ret);}if (!parser.keep_alive()) {return;}// if we failed before reading the entire message, discard any remaining// bytes before reading the nextwhile (!expect_continue && !parser.is_done()) {static std::array<char, 1024> discard_buffer;auto& body = parser.get().body();body.size = discard_buffer.size();body.data = discard_buffer.data();timeout.start();http::async_read_some(stream, buffer, parser, yield[ec]);timeout.cancel();if (ec == http::error::need_buffer) {continue;}if (ec == boost::asio::error::connection_reset) {return;}if (ec) {ldout(cct, 5) << "failed to discard unread message: "<< ec.message() << dendl;return;}}}
}
rgw_process.cc
主要功能:请求处理,包括身份认证、请求处理,函数调用返回等。
int process_request(const RGWProcessEnv& penv,RGWRequest* const req,const std::string& frontend_prefix,RGWRestfulIO* const client_io,optional_yield yield,rgw::dmclock::Scheduler *scheduler,string* user,ceph::coarse_real_clock::duration* latency,int* http_ret)
{int ret = client_io->init(g_ceph_context);dout(1) << "====== starting new request req=" << hex << req << dec<< " =====" << dendl;perfcounter->inc(l_rgw_req);RGWEnv& rgw_env = client_io->get_env();req_state rstate(g_ceph_context, penv, &rgw_env, req->id);req_state *s = &rstate;rgw::sal::Driver* driver = penv.driver;RGWHandler_REST *handler = rest->get_handler(driver, s,*penv.auth_registry,frontend_prefix,client_io, &mgr, &init_error);ldpp_dout(s, 2) << "getting op " << s->op << dendl;op = handler->get_op();std::tie(ret,c) = schedule_request(scheduler, s, op);req->op = op;ldpp_dout(op, 10) << "op=" << typeid(*op).name() << dendl;s->op_type = op->get_type();try {ldpp_dout(op, 2) << "verifying requester" << dendl;ret = op->verify_requester(*penv.auth_registry, yield);ldpp_dout(op, 2) << "normalizing buckets and tenants" << dendl;ret = handler->postauth_init(yield);ret = rgw_process_authenticated(handler, op, req, s, yield, driver);} catch (const ceph::crypto::DigestException& e) {dout(0) << "authentication failed" << e.what() << dendl;abort_early(s, op, -ERR_INVALID_SECRET_KEY, handler, yield);}done:try {client_io->complete_request();} catch (rgw::io::Exception& e) {dout(0) << "ERROR: client_io->complete_request() returned "<< e.what() << dendl;}if (handler)handler->put_op(op);rest->put_handler(handler);const auto lat = s->time_elapsed();if (latency) {*latency = lat;}dout(1) << "====== req done req=" << hex << req << dec<< " op status=" << op_ret<< " http_status=" << s->err.http_ret<< " latency=" << lat<< " ======"<< dendl;return (ret < 0 ? ret : s->err.ret);
} /* process_request */
在rgw_process_authenticated函数中进行OP的详细处理。包括身份认证、pre-exec、exec、complete等函数。
int rgw_process_authenticated(RGWHandler_REST * const handler,RGWOp *& op,RGWRequest * const req,req_state * const s,optional_yield y,rgw::sal::Driver* driver,const bool skip_retarget)
{ldpp_dout(op, 2) << "init permissions" << dendl;int ret = handler->init_permissions(op, y);ldpp_dout(op, 2) << "init op" << dendl;ret = op->init_processing(y);ldpp_dout(op, 2) << "verifying op mask" << dendl;ret = op->verify_op_mask();ldpp_dout(op, 2) << "verifying op permissions" << dendl;{auto span = tracing::rgw::tracer.add_span("verify_permission", s->trace);std::swap(span, s->trace);ret = op->verify_permission(y);std::swap(span, s->trace);}ldpp_dout(op, 2) << "verifying op params" << dendl;ret = op->verify_params();ldpp_dout(op, 2) << "executing" << dendl;{auto span = tracing::rgw::tracer.add_span("execute", s->trace);std::swap(span, s->trace);op->execute(y);std::swap(span, s->trace);}ldpp_dout(op, 2) << "completing" << dendl;op->complete();return 0;
}
rgw_op.cc
此处忽略rest或者swift中的协议处理过程,直接到RGWOP::createBucket()中
void RGWCreateBucket::execute(optional_yield y)
{const rgw::SiteConfig& site = *s->penv.site;const std::optional<RGWPeriod>& period = site.get_period();const RGWZoneGroup& my_zonegroup = site.get_zonegroup();/*步骤1:处理zonegroup信息,确定桶的placement、storage_class等信息,以及是否是主站点存储*//*步骤2:读取桶的信息,如果存在则进行一些处理*/// read the bucket info if it existsop_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),&s->bucket, y);/*步骤3:如果桶不存在,则初始化各种信息,*/s->bucket_owner.id = s->user->get_id();s->bucket_owner.display_name = s->user->get_display_name();createparams.owner = s->user->get_id();buffer::list aclbl;policy.encode(aclbl);createparams.attrs[RGW_ATTR_ACL] = std::move(aclbl);if (has_cors) {buffer::list corsbl;cors_config.encode(corsbl);createparams.attrs[RGW_ATTR_CORS] = std::move(corsbl);}/*步骤4:创建桶*/ldpp_dout(this, 10) << "user=" << s->user << " bucket=" << s->bucket << dendl;op_ret = s->bucket->create(this, createparams, y);/*步骤5:如果失败,则回退处理*/.....
}
2、store层处理和rados中的处理
int RadosBucket::create(const DoutPrefixProvider* dpp,const CreateParams& params,optional_yield y)
{rgw_bucket key = get_key();key.marker = params.marker;key.bucket_id = params.bucket_id;/*创建桶,此处调用rados.cc中的处理流程*/int ret = store->getRados()->create_bucket(dpp, y, key, params.owner, params.zonegroup_id,params.placement_rule, params.zone_placement, params.attrs,params.obj_lock_enabled, params.swift_ver_location,params.quota, params.creation_time, &bucket_version, info);/*link处理*/ret = link(dpp, params.owner, y, false);if (ret && !existed && ret != -EEXIST) {/* if it exists (or previously existed), don't remove it! */ret = unlink(dpp, params.owner, y);if (ret < 0) {ldpp_dout(dpp, 0) << "WARNING: failed to unlink bucket: ret=" << ret<< dendl;}} else if (ret == -EEXIST || (ret == 0 && existed)) {ret = -ERR_BUCKET_EXISTS;}return ret;
}
int RGWRados::create_bucket(const DoutPrefixProvider* dpp,optional_yield y,const rgw_bucket& bucket,const rgw_user& owner,const std::string& zonegroup_id,const rgw_placement_rule& placement_rule,const RGWZonePlacementInfo* zone_placement,const std::map<std::string, bufferlist>& attrs,bool obj_lock_enabled,const std::optional<std::string>& swift_ver_location,const std::optional<RGWQuotaInfo>& quota,std::optional<ceph::real_time> creation_time,obj_version* pep_objv,RGWBucketInfo& info)
{int ret = 0;#define MAX_CREATE_RETRIES 20 /* need to bound retries */for (int i = 0; i < MAX_CREATE_RETRIES; i++) {/*步骤1:初始化bucket的ver_id和quota、time等初始化信息*//*步骤2:bucket_index 初始化*/if (zone_placement) {ret = svc.bi->init_index(dpp, info, info.layout.current_index);if (ret < 0) {return ret;}}/*步骤3:linkbucket_info信息*/constexpr bool exclusive = true;ret = put_linked_bucket_info(info, exclusive, ceph::real_time(), pep_objv, &attrs, true, dpp, y);if (ret == -ECANCELED) {ret = -EEXIST;}
:return ret;}/* this is highly unlikely */ldpp_dout(dpp, 0) << "ERROR: could not create bucket, continuously raced with bucket creation and removal" << dendl;return -ENOENT;
}
put_linked_bucket_info函数int RGWRados::put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, real_time mtime, obj_version *pep_objv,const map<string, bufferlist> *pattrs, bool create_entry_point,const DoutPrefixProvider *dpp, optional_yield y)
{bool create_head = !info.has_instance_obj || create_entry_point;/*步骤1:写bucket_instance*/int ret = put_bucket_instance_info(info, exclusive, mtime, pattrs, dpp, y);RGWBucketEntryPoint entry_point;entry_point.bucket = info.bucket;entry_point.owner = info.owner;entry_point.creation_time = info.creation_time;entry_point.linked = true;/*存储bucket_entrypoint实体信息*/ret = ctl.bucket->store_bucket_entrypoint_info(info.bucket, entry_point, y, dpp, RGWBucketCtl::Bucket::PutParams().set_exclusive(exclusive).set_objv_tracker(&ot).set_mtime(mtime));if (ret < 0)return ret;return 0;
}
3、SVC中的处理:bucket index的创建
int RGWSI_BucketIndex_RADOS::init_index(const DoutPrefixProvider *dpp,RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout)
{librados::IoCtx index_pool;string dir_oid = dir_oid_prefix;int r = open_bucket_index_pool(dpp, bucket_info, &index_pool);if (r < 0) {return r;}dir_oid.append(bucket_info.bucket.bucket_id);map<int, string> bucket_objs;get_bucket_index_objects(dir_oid, idx_layout.layout.normal.num_shards, idx_layout.gen, &bucket_objs);return CLSRGWIssueBucketIndexInit(index_pool,bucket_objs,cct->_conf->rgw_bucket_index_max_aio)();
}
4、总结
至此,一个bucket创建完毕,其他的op类似于此,整体结构变化不大。下图是rgw_rados.h、rgw_sal_rados.h、rgw_service.h和svc_module***.h的相关关系,比较粗糙,仅供参考。