Ceph IO读写流程详解(二)——RADOSGW请求处理
2 RADOSGW请求处理
2.1 Manager、handler与Op之间的关系
这一节,我们将以rgw_process_authenticated函数开始,以分段上传的Op为例进行下一步的讲解。
在Ceph IO读写流程详解(一)——RADOSGW中我们简单介绍了manager和handler,那么rgw是如何根据请求来分辨出究竟是哪个manager以及对应的handler执行对应的任务呢?
process_request()
|-RGWHandler_REST *handler = rest->get_handler()
|-op->get_op(store)
|-op->get_type()
get_handler函数通过url的解析选出对应的manager以及manager对应的handler:
RGWHandler_REST* RGWREST::get_handler(RGWRados * const store,struct req_state* const s,const rgw::auth::StrategyRegistry& auth_registry,const std::string& frontend_prefix,RGWRestfulIO* const rio,RGWRESTMgr** const pmgr,int* const init_error
) {*init_error = preprocess(s, rio);if (*init_error < 0) {return nullptr;}RGWRESTMgr *m = mgr.get_manager(s, frontend_prefix, s->decoded_uri,&s->relative_uri);if (! m) {*init_error = -ERR_NOT_IMPLEMENTED;return nullptr;}if (pmgr) {*pmgr = m;}RGWHandler_REST* handler = m->get_handler(s, auth_registry, frontend_prefix);if (! handler) {*init_error = -ERR_NOT_IMPLEMENTED;return NULL;}*init_error = handler->init(store, s, rio);if (*init_error < 0) {m->put_handler(handler);return nullptr;}return handler;
}
例如S3请求的manager就是RGWRESTMgr_S3,其对应了服务,对象和桶三个handler:
if (s->init_state.url_bucket.empty()) {handler = new RGWHandler_REST_Service_S3(auth_registry);} else if (s->object.empty()) {handler = new RGWHandler_REST_Bucket_S3(auth_registry);} else {handler = new RGWHandler_REST_Obj_S3(auth_registry);}
接着get_op(store)得到op请求的类型:
RGWOp* RGWHandler_REST::get_op(RGWRados* store)
{RGWOp *op;switch (s->op) {case OP_GET:op = op_get();break;case OP_PUT:op = op_put();break;case OP_DELETE:op = op_delete();break;case OP_HEAD:op = op_head();break;case OP_POST:op = op_post();break;case OP_COPY:op = op_copy();break;case OP_OPTIONS:op = op_options();break;default:return NULL;}if (op) {op->init(store, s, this);}return op;
} /* get_op */
并由c++类的多态特性以及url或者其他配置项找到对应的Op:
RGWOp *RGWHandler_REST_Obj_S3::op_get()
{if (is_acl_op()) {return new RGWGetACLs_ObjStore_S3;} else if (s->info.args.exists("uploadId")) {return new RGWListMultipart_ObjStore_S3;} else if (s->info.args.exists("layout")) {return new RGWGetObjLayout_ObjStore_S3;} else if (is_tagging_op()) {return new RGWGetObjTags_ObjStore_S3;} else if (is_obj_retention_op()) {return new RGWGetObjRetention_ObjStore_S3;} else if (is_obj_legal_hold_op()) {return new RGWGetObjLegalHold_ObjStore_S3;} else if (is_symlink_op()) {return new RGWGetSymlink_ObjStore_S3;}return get_obj_op(true);
}
op->get_type()也比较简单,就是获取一个flag标识,用于表示对应的RGWOp,例如class RGWListMultipart : public RGWOp:
RGWOpType get_type() override { return RGW_OP_LIST_MULTIPART; }
现在我们接着来看rgw_process_authenticated函数,首先是:
int ret = handler->init_permissions(op);
这一步是校验user、bucket的acl访问政策的,需要开发相关功能时再关注即可。
接下来是:
if (! skip_retarget) {req->log(s, "recalculating target");ret = handler->retarget(op, &op);...
这个目前只有RGWHandler_REST_S3Website重写了这个函数。下面是:
ret = handler->read_permissions(op);
这部分是读bucket和object的ACL。接下来是:
ret = op->verify_op_mask();
这个是检查user是否具有执行对应RGWOp的权限。然后是:
ret = op->verify_permission();
也是检查相应权限的操作。下一步是检查操作的object或者bucket是否被禁止,检查是否存在RGW_ATTR_BAN标志:
ret = op->verify_op_ban_stat();
下一步是PutObj操作特有的,检查put的长度:
ret = op->verify_params();
...
int RGWPutObj_ObjStore::verify_params()
{if (s->length) {off_t len = atoll(s->length);if (len > (off_t)(s->cct->_conf->rgw_max_put_size)) {return -ERR_TOO_LARGE;}}return 0;
}
在正式开始之前,检查qps的限速:
ret = ThrottleManager::instance().check_qps_limit(s);
2.2 分段上传
接下来我们将以分段上传为例讲解rgw上传对象到下发的全过程:
下面是所有用到的RGWOp
1、分段上传初始化,对应s3api的create-multipart接口
RGWInitMultipart_ObjStore_S3 -> RGWCompleteMultipart_ObjStore -> RGWInitMultipart -> RGWOp
2、上传分段,对应s3api的put-object接口
RGWPutObj_ObjStore_S3-> RGWPutObj_ObjStore -> RGWPutObj -> RGWOp
3、完成分段上传,对应s3api的complete-multipart接口
RGWCompleteMultipart_ObjStore_S3 -> RGWCompleteMultipart_ObjStore -> RGWCompleteMultipart -> RGWOp
在rgw_process_authenticated中,基本上所有的RGWOp都对应了以下三个步骤:
op->pre_exec();
op->execute();
op->complete();
2.2.1 分段上传初始化
分段上传的初始化对应的是RGWInitMultipart这一继承自RGWOp的Op,其pre_exec就是dump一下url:
void RGWInitMultipart::pre_exec()
{rgw_bucket_object_pre_exec(s);
}
接下来看RGWInitMultipart::execute()。
首先是服务端加密的相关功能,主要是使用什么加密方式等,与分段上传的主线关系不大,先忽略:
bufferlist aclbl;
map<string, bufferlist> attrs;
...
op_ret = prepare_encryption(attrs);
然后是:
op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
这一步是粗略的检查一遍header的。
接下来是这部分的重头戏:
do {char buf[33];gen_rand_alphanumeric(s->cct, buf, sizeof(buf) - 1);upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */upload_id.append(buf);// 生成upload idstring tmp_obj_name;RGWMPObj mp(s->object.name, upload_id);tmp_obj_name = mp.get_meta();obj.init_ns(s->bucket, tmp_obj_name, mp_ns); // 初始化命名(包括命名空间)// the meta object will be indexed with 0 size, we cobj.set_in_extra_data(true);obj.index_hash_source = s->object.name;RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);op_target.set_versioning_disabled(true); /* no versioning for multipart meta */RGWRados::Object::Write obj_op(&op_target);obj_op.meta.owner = s->owner.get_id();obj_op.meta.category = RGW_OBJ_CATEGORY_MULTIMETA;obj_op.meta.flags = PUT_OBJ_CREATE_EXCL;op_ret = obj_op.write_meta(0, 0, attrs); // 写入索引池,写meta} while (op_ret == -EEXIST);
其中,tmp_obj_name返回了一个临时对象,其名称是<对象名.upload-id.meta>。
在正式写入meta之前,做了一系列准备,包括名称和命名空间、目标存储桶、meta的种类、标志位等等。
此处我们重点来看obj_op.write_meta(0, 0, attrs)函数:
int RGWRados::Object::Write::write_meta(uint64_t size, uint64_t accounted_size,map<string, bufferlist>& attrs)
{RGWBucketInfo& bucket_info = target->get_bucket_info();RGWRados::Bucket bop(target->get_store(), bucket_info);RGWRados::Bucket::UpdateIndex index_op(&bop, target->get_obj());index_op.set_zones_trace(meta.zones_trace);bool assume_noent = (meta.if_match == NULL && meta.if_nomatch == NULL);int r;if (assume_noent) {r = _do_write_meta(size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op);if (r == -EEXIST) {assume_noent = false;}}if (!assume_noent) {r = _do_write_meta(size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op);}return r;
}
这里面assume_noent指的是是否假设这是第一次上传(非覆盖写),if_match和if_no_match是一对请求头,表示服务端是否有对应的资源,if_match表示如果有与if_match的字段相同的etag就继续执行。
也就是说,如果这两个字段都为空,那么就会假设不存在这个entry,将其当做第一次上传,进入_do_write_meta函数。
/*** Write/overwrite an object to the bucket storage.* bucket: the bucket to store the object in* obj: the object name/key* data: the object contents/value* size: the amount of data to write (data must be this long)* accounted_size: original size of data before compression, encryption* mtime: if non-NULL, writes the given mtime to the bucket storage* attrs: all the given attrs are written to bucket storage for the given object* exclusive: create object exclusively* Returns: 0 on success, -ERR# otherwise.*/
int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_size,map<string, bufferlist>& attrs,bool assume_noent, bool modify_tail,void *_index_op)
{
...
ObjectWriteOperation op;
首先关注ObjectWriteOperation这个类,在注释中的介绍如下:
/** ObjectWriteOperation : compound object write operation* Batch multiple object operations into a single request, to be applied* atomically.*/
也就是说,一个ObjectWriteOperation的实例并不是一条指令,而是一系列指令的集合。
接着是,其最终执行的函数是get_obj_state_impl:
int r = target->get_state(&state, false, assume_noent);|-int RGWRados::get_obj_state_impl(RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj,RGWObjState **state, bool follow_olh, bool assume_noent){...rgw_raw_obj raw_obj;obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
这里是生成了一个临时对象,首先调用s3api的create-multipart-upload接口,创建一个名称为testing的key,得到对应的raw_obj的key为:6374a329-150d-4ec1-be50-78c7d8054f12.4984.1__multipart_testing.2~Ccwk9suqwCZfdRilW8DASrYIXHgrQx6.meta,并且其存储池为.rgw.non-ec,是一个临时对象。其他还有压缩、manifest等信息,不过多赘述。
回到_do_write_meta函数,接下来看:
rgw_obj& obj = target->get_obj();if (obj.get_oid().empty()) {ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): cannot write object with empty name" << dendl;return -EIO;}rgw_rados_ref ref;r = store->get_obj_head_ref(target->get_bucket_info(), obj, &ref);if (r < 0)return r;
......
int RGWRados::get_obj_head_ref(const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_rados_ref *ref)
{get_obj_bucket_and_oid_loc(obj, ref->oid, ref->key);rgw_pool pool;if (!get_obj_data_pool(bucket_info.placement_rule, obj, &pool)) {ldout(cct, 0) << "ERROR: cannot get data pool for obj=" << obj << ", probably misconfiguration" << dendl;return -EIO;}int r = open_pool_ctx(pool, ref->ioctx);if (r < 0) {return r;}ref->ioctx.locator_set_key(ref->key);return 0;
}
获取存储池并且打开ioctx,接下来是:
r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false, modify_tail);if (r < 0)return r;
给ObjectWriteOperation添加一些配置项,接下来的一些操作就是给op添加一些设置xattr的OSDOp,还记得ObjectOperation的成员函数中有一个OSDOp的vector吗,这些操作就是向这个vector中追加OSDOp。
在这里,就务必要插入一段内容,就是各种的op是如何发送给底层的osd的,这涉及到Ceph的messenger模块和osdc模块等等,需要单独进行讲解,详情可见Ceph中的消息通信——Messenger、Message和Connection详解。
下一章节将继续进行multipart-upload和complete-multipart-upload的介绍。