当前位置: 首页 > java >正文

Ceph IO读写流程详解(二)——RADOSGW请求处理

2 RADOSGW请求处理

2.1 Manager、handler与Op之间的关系

这一节,我们将以rgw_process_authenticated函数开始,以分段上传的Op为例进行下一步的讲解。
在Ceph IO读写流程详解(一)——RADOSGW中我们简单介绍了manager和handler,那么rgw是如何根据请求来分辨出究竟是哪个manager以及对应的handler执行对应的任务呢?

process_request()
|-RGWHandler_REST *handler = rest->get_handler()
|-op->get_op(store)
|-op->get_type()

get_handler函数通过url的解析选出对应的manager以及manager对应的handler:

RGWHandler_REST* RGWREST::get_handler(RGWRados * const store,struct req_state* const s,const rgw::auth::StrategyRegistry& auth_registry,const std::string& frontend_prefix,RGWRestfulIO* const rio,RGWRESTMgr** const pmgr,int* const init_error
) {*init_error = preprocess(s, rio);if (*init_error < 0) {return nullptr;}RGWRESTMgr *m = mgr.get_manager(s, frontend_prefix, s->decoded_uri,&s->relative_uri);if (! m) {*init_error = -ERR_NOT_IMPLEMENTED;return nullptr;}if (pmgr) {*pmgr = m;}RGWHandler_REST* handler = m->get_handler(s, auth_registry, frontend_prefix);if (! handler) {*init_error = -ERR_NOT_IMPLEMENTED;return NULL;}*init_error = handler->init(store, s, rio);if (*init_error < 0) {m->put_handler(handler);return nullptr;}return handler;
}

例如S3请求的manager就是RGWRESTMgr_S3,其对应了服务,对象和桶三个handler:

if (s->init_state.url_bucket.empty()) {handler = new RGWHandler_REST_Service_S3(auth_registry);} else if (s->object.empty()) {handler = new RGWHandler_REST_Bucket_S3(auth_registry);} else {handler = new RGWHandler_REST_Obj_S3(auth_registry);}

接着get_op(store)得到op请求的类型:

RGWOp* RGWHandler_REST::get_op(RGWRados* store)
{RGWOp *op;switch (s->op) {case OP_GET:op = op_get();break;case OP_PUT:op = op_put();break;case OP_DELETE:op = op_delete();break;case OP_HEAD:op = op_head();break;case OP_POST:op = op_post();break;case OP_COPY:op = op_copy();break;case OP_OPTIONS:op = op_options();break;default:return NULL;}if (op) {op->init(store, s, this);}return op;
} /* get_op */

并由c++类的多态特性以及url或者其他配置项找到对应的Op:

RGWOp *RGWHandler_REST_Obj_S3::op_get()
{if (is_acl_op()) {return new RGWGetACLs_ObjStore_S3;} else if (s->info.args.exists("uploadId")) {return new RGWListMultipart_ObjStore_S3;} else if (s->info.args.exists("layout")) {return new RGWGetObjLayout_ObjStore_S3;} else if (is_tagging_op()) {return new RGWGetObjTags_ObjStore_S3;} else if (is_obj_retention_op()) {return new RGWGetObjRetention_ObjStore_S3;} else if (is_obj_legal_hold_op()) {return new RGWGetObjLegalHold_ObjStore_S3;} else if (is_symlink_op()) {return new RGWGetSymlink_ObjStore_S3;}return get_obj_op(true);
}

op->get_type()也比较简单,就是获取一个flag标识,用于表示对应的RGWOp,例如class RGWListMultipart : public RGWOp:

RGWOpType get_type() override { return RGW_OP_LIST_MULTIPART; }

现在我们接着来看rgw_process_authenticated函数,首先是:

int ret = handler->init_permissions(op);

这一步是校验user、bucket的acl访问政策的,需要开发相关功能时再关注即可。
接下来是:

if (! skip_retarget) {req->log(s, "recalculating target");ret = handler->retarget(op, &op);...

这个目前只有RGWHandler_REST_S3Website重写了这个函数。下面是:

ret = handler->read_permissions(op);

这部分是读bucket和object的ACL。接下来是:

ret = op->verify_op_mask();

这个是检查user是否具有执行对应RGWOp的权限。然后是:

ret = op->verify_permission();

也是检查相应权限的操作。下一步是检查操作的object或者bucket是否被禁止,检查是否存在RGW_ATTR_BAN标志:

ret = op->verify_op_ban_stat();

下一步是PutObj操作特有的,检查put的长度:

ret = op->verify_params();
...
int RGWPutObj_ObjStore::verify_params()
{if (s->length) {off_t len = atoll(s->length);if (len > (off_t)(s->cct->_conf->rgw_max_put_size)) {return -ERR_TOO_LARGE;}}return 0;
}

在正式开始之前,检查qps的限速:

ret = ThrottleManager::instance().check_qps_limit(s);

2.2 分段上传

接下来我们将以分段上传为例讲解rgw上传对象到下发的全过程:
下面是所有用到的RGWOp
1、分段上传初始化,对应s3api的create-multipart接口

RGWInitMultipart_ObjStore_S3 -> RGWCompleteMultipart_ObjStore -> RGWInitMultipart -> RGWOp

2、上传分段,对应s3api的put-object接口

RGWPutObj_ObjStore_S3-> RGWPutObj_ObjStore -> RGWPutObj -> RGWOp

3、完成分段上传,对应s3api的complete-multipart接口

RGWCompleteMultipart_ObjStore_S3 -> RGWCompleteMultipart_ObjStore -> RGWCompleteMultipart -> RGWOp

在rgw_process_authenticated中,基本上所有的RGWOp都对应了以下三个步骤:

op->pre_exec();
op->execute();
op->complete();

2.2.1 分段上传初始化

分段上传的初始化对应的是RGWInitMultipart这一继承自RGWOp的Op,其pre_exec就是dump一下url:

void RGWInitMultipart::pre_exec()
{rgw_bucket_object_pre_exec(s);
}

接下来看RGWInitMultipart::execute()。
首先是服务端加密的相关功能,主要是使用什么加密方式等,与分段上传的主线关系不大,先忽略:

bufferlist aclbl;
map<string, bufferlist> attrs;
...  
op_ret = prepare_encryption(attrs);

然后是:

op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);

这一步是粗略的检查一遍header的。
接下来是这部分的重头戏:

do {char buf[33];gen_rand_alphanumeric(s->cct, buf, sizeof(buf) - 1);upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */upload_id.append(buf);// 生成upload idstring tmp_obj_name;RGWMPObj mp(s->object.name, upload_id);tmp_obj_name = mp.get_meta();obj.init_ns(s->bucket, tmp_obj_name, mp_ns); // 初始化命名(包括命名空间)// the meta object will be indexed with 0 size, we cobj.set_in_extra_data(true);obj.index_hash_source = s->object.name;RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);op_target.set_versioning_disabled(true); /* no versioning for multipart meta */RGWRados::Object::Write obj_op(&op_target);obj_op.meta.owner = s->owner.get_id();obj_op.meta.category = RGW_OBJ_CATEGORY_MULTIMETA;obj_op.meta.flags = PUT_OBJ_CREATE_EXCL;op_ret = obj_op.write_meta(0, 0, attrs); // 写入索引池,写meta} while (op_ret == -EEXIST);

其中,tmp_obj_name返回了一个临时对象,其名称是<对象名.upload-id.meta>。
在正式写入meta之前,做了一系列准备,包括名称和命名空间、目标存储桶、meta的种类、标志位等等。
此处我们重点来看obj_op.write_meta(0, 0, attrs)函数:

int RGWRados::Object::Write::write_meta(uint64_t size, uint64_t accounted_size,map<string, bufferlist>& attrs)
{RGWBucketInfo& bucket_info = target->get_bucket_info();RGWRados::Bucket bop(target->get_store(), bucket_info);RGWRados::Bucket::UpdateIndex index_op(&bop, target->get_obj());index_op.set_zones_trace(meta.zones_trace);bool assume_noent = (meta.if_match == NULL && meta.if_nomatch == NULL);int r;if (assume_noent) {r = _do_write_meta(size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op);if (r == -EEXIST) {assume_noent = false;}}if (!assume_noent) {r = _do_write_meta(size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op);}return r;
}

这里面assume_noent指的是是否假设这是第一次上传(非覆盖写),if_match和if_no_match是一对请求头,表示服务端是否有对应的资源,if_match表示如果有与if_match的字段相同的etag就继续执行。
也就是说,如果这两个字段都为空,那么就会假设不存在这个entry,将其当做第一次上传,进入_do_write_meta函数。

/*** Write/overwrite an object to the bucket storage.* bucket: the bucket to store the object in* obj: the object name/key* data: the object contents/value* size: the amount of data to write (data must be this long)* accounted_size: original size of data before compression, encryption* mtime: if non-NULL, writes the given mtime to the bucket storage* attrs: all the given attrs are written to bucket storage for the given object* exclusive: create object exclusively* Returns: 0 on success, -ERR# otherwise.*/
int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_size,map<string, bufferlist>& attrs,bool assume_noent, bool modify_tail,void *_index_op)
{
...
ObjectWriteOperation op;

首先关注ObjectWriteOperation这个类,在注释中的介绍如下:

/** ObjectWriteOperation : compound object write operation* Batch multiple object operations into a single request, to be applied* atomically.*/

也就是说,一个ObjectWriteOperation的实例并不是一条指令,而是一系列指令的集合。
接着是,其最终执行的函数是get_obj_state_impl:

int r = target->get_state(&state, false, assume_noent);|-int RGWRados::get_obj_state_impl(RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj,RGWObjState **state, bool follow_olh, bool assume_noent){...rgw_raw_obj raw_obj;obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);

这里是生成了一个临时对象,首先调用s3api的create-multipart-upload接口,创建一个名称为testing的key,得到对应的raw_obj的key为:6374a329-150d-4ec1-be50-78c7d8054f12.4984.1__multipart_testing.2~Ccwk9suqwCZfdRilW8DASrYIXHgrQx6.meta,并且其存储池为.rgw.non-ec,是一个临时对象。其他还有压缩、manifest等信息,不过多赘述。

回到_do_write_meta函数,接下来看:

  rgw_obj& obj = target->get_obj();if (obj.get_oid().empty()) {ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): cannot write object with empty name" << dendl;return -EIO;}rgw_rados_ref ref;r = store->get_obj_head_ref(target->get_bucket_info(), obj, &ref);if (r < 0)return r;
......
int RGWRados::get_obj_head_ref(const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_rados_ref *ref)
{get_obj_bucket_and_oid_loc(obj, ref->oid, ref->key);rgw_pool pool;if (!get_obj_data_pool(bucket_info.placement_rule, obj, &pool)) {ldout(cct, 0) << "ERROR: cannot get data pool for obj=" << obj << ", probably misconfiguration" << dendl;return -EIO;}int r = open_pool_ctx(pool, ref->ioctx);if (r < 0) {return r;}ref->ioctx.locator_set_key(ref->key);return 0;
}

获取存储池并且打开ioctx,接下来是:

r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false, modify_tail);if (r < 0)return r;

给ObjectWriteOperation添加一些配置项,接下来的一些操作就是给op添加一些设置xattr的OSDOp,还记得ObjectOperation的成员函数中有一个OSDOp的vector吗,这些操作就是向这个vector中追加OSDOp。
在这里,就务必要插入一段内容,就是各种的op是如何发送给底层的osd的,这涉及到Ceph的messenger模块和osdc模块等等,需要单独进行讲解,详情可见Ceph中的消息通信——Messenger、Message和Connection详解。
下一章节将继续进行multipart-upload和complete-multipart-upload的介绍。

http://www.xdnf.cn/news/2879.html

相关文章:

  • Lightroom 2025手机版:专业编辑,轻松上手
  • 基于 STM32 的智慧图书馆智能控制系统设计与实现
  • DeepSeek破界而来:重构大规模深度检索的算力与边界
  • Java云原生+quarkus
  • 1.1探索 LLaMA-Factory:大模型微调的一站式解决方案
  • Consul安装部署(Windows环境)
  • 链表反转_leedcodeP206
  • 判断图片url损坏无法展示工具类
  • UE5 Set actor Location和 Set World Location 和 Set Relative Location 的区别
  • 关于本地端口启动问题
  • JAVA--- 关键字static
  • 长效住宅IP是什么?如何获取长效住宅IP?
  • 工程管理部绩效考核关键指标与项目评估
  • 选择排序快速排序
  • 国标GB28181视频平台EasyCVR实用方案:如何实现画面拉伸
  • 大厂Java面试深度解析:Dubbo服务治理、WebSocket实时通信、RESTEasy自定义注解与C3P0连接池配置实践
  • 信创开发中的数据库详解:国产替代背景下的技术生态与实践指南
  • 百度「心响」:通用超级智能体,重新定义AI任务执行新范式
  • Linux CentOS 7 安装Apache 部署html页面
  • 前端 AI 开发实战:基于自定义工具类的大语言模型与语音识别调用指南
  • 2025.4.29_STM32_看门狗WDG
  • 通过全局交叉注意力机制和距离感知训练从多模态数据中识别桥本氏甲状腺炎|文献速递-深度学习医疗AI最新文献
  • 前端防护利器:disable-devtool 使用指南 - 保护你的Web应用安全
  • JAVA---集合ArrayList
  • 《从线性到二维:CSS Grid与Flex的布局范式革命与差异解析》
  • Spring中bean的生命周期(笔记)
  • LeetCode热题100--53.最大子数组和--中等
  • 最新的30个Android Kotlin面试题
  • Kafka的Rebalance机制可能引发什么问题?如何优化?怎么减少不必要的Rebalance
  • 第十六届蓝桥杯 2025 C/C++组 密密摆放