当前位置: 首页 > ds >正文

RGW层Op的组织

1 引言

在Radosgw处理来自客户端请求的时候,最终需要将请求发往OSD进行落盘,通常情况下,一个客户端请求所对应的发往OSD的请求远不止1个,比如对象的写入要判断acl、写入元数据等等;如果每个RGW层的Op都对应发往OSD的一次IO,那么集群的网络IO将会非常高。因此在RGW层Op是以一个Op列表的形式下发到OSD,本文就来介绍这一机制。

2 一个例子ObjectWriteOperation

阅读RGW层的代码可以经常发现以ObjectWriteOperation类定义的op,并且会执行这个类的成员函数,以我们以RGWRados::Object::Write::_do_write_meta为例:

int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_size,map<string, bufferlist>& attrs,bool assume_noent, bool modify_tail,void *_index_op)
{RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);RGWRados *store = target->get_store();ObjectWriteOperation op;...if (meta.manifest) {/* remove existing manifest attr */iter = attrs.find(RGW_ATTR_MANIFEST);if (iter != attrs.end())attrs.erase(iter);bufferlist bl;encode(*meta.manifest, bl);op.setxattr(RGW_ATTR_MANIFEST, bl);}...

在op被初始化之后,第一个可能执行的函数是setxattr,也就是设置xattr元数据,来看这个函数的调用栈:
ObjectWriteOperation::setxattr()
|–::ObjectOperation::setxattr()
|\quad|–add_xattr()
来看add_xattr:

void add_xattr(int op, const char *name, const bufferlist& data) {OSDOp& osd_op = add_op(op);osd_op.op.xattr.name_len = (name ? strlen(name) : 0);osd_op.op.xattr.value_len = data.length();if (name)osd_op.indata.append(name, osd_op.op.xattr.name_len);osd_op.indata.append(data);}

可以发现最终的操作很简单,首先执行add_op():

OSDOp& add_op(int op) {int s = ops.size();ops.resize(s+1);ops[s].op.op = op;out_bl.resize(s+1);out_bl[s] = NULL;out_handler.resize(s+1);out_handler[s] = NULL;out_rval.resize(s+1);out_rval[s] = NULL;return ops[s];}

有一个名为ops的vector<OSDOp>,用于存储所有要发往OSD的操作,当有一个新Op到来的时候就resize这个vector并将这个Op的操作数添加到vector末尾的OSDOp中,例如add_xattr要添加的操作数就是CEPH_OSD_OP_SETXATTR。操作数都是enum类型,可以根据宏定义__CEPH_FORALL_OSD_OPS找到对应的操作数。
可以发现,针对OSD的操作都是存储在列表中,等待一次性发送下去。ObjectWriteOperation的很多成员函数都是相似的处理过程,例如write_full、rm_xattr等,只是不同的成员函数处理的OSDOp的成员不同罢了。

3 bufferlist的encode

3.1 encode的函数定义

还有一个值得注意的点是所有ObjectWriteOperation的成员函数处理的都是bufferlist数据结构,在之前处理manifest的代码中,manifest也是转化为bufferlist再交给op进行处理的。

if (meta.manifest) {/* remove existing manifest attr */iter = attrs.find(RGW_ATTR_MANIFEST);if (iter != attrs.end())attrs.erase(iter);bufferlist bl;encode(*meta.manifest, bl);op.setxattr(RGW_ATTR_MANIFEST, bl);}

接下来就来详细看一下manifest是如何转化到bufferlist。
首先来看encode(*meta.manifest, bl);这行是如何实现的:

#define WRITE_CLASS_ENCODER(cl)						\inline void encode(const cl &c, ::ceph::bufferlist &bl, uint64_t features=0) { \ENCODE_DUMP_PRE(); c.encode(bl); ENCODE_DUMP_POST(cl); }		\inline void decode(cl &c, ::ceph::bufferlist::iterator &p) { c.decode(p); }

这个宏定义在编译的时候就会展开为:

inline void encode(const RGWObjManifest &c, ::ceph::bufferlist &bl, uint64_t features=0) { c.encode(bl); 
}

实际上这是为了encode有一个统一入口,实际上调用的还是RGWObjManifest自身的encode函数。因为bufferlist实际上是ceph特有的内存管理方式,不同的类将其转化为bufferlist可识别的形式显然必须要自定义自己的encode函数。掌握了一个类的encode方式其他的类也可以举一反三。
来看RGWObjManifest::encode():

void encode(bufferlist& bl) const {ENCODE_START(7, 6, bl);encode(obj_size, bl);...encode(head_placement_rule, bl);encode(tail_placement.placement_rule, bl);ENCODE_FINISH(bl);}

比较重要的是ENCODE_START和ENCODE_FINISH这两个宏定义:

/*** start encoding block** @param v current (code) version of the encoding* @param compat oldest code version that can decode it* @param bl bufferlist to encode to*/
#define ENCODE_START(v, compat, bl)			     \using ::ceph::encode;					     \__u8 struct_v = v, struct_compat = compat;		     \// 写入当前版本号(1字节)encode(struct_v, (bl));				     \// 写入兼容版本号(1字节)encode(struct_compat, (bl));			     \// 指向刚写入的 compat 字节位置::ceph::buffer::list::iterator struct_compat_it = (bl).end();	\struct_compat_it.advance(-1);				     \ceph_le32 struct_len;				             \struct_len = 0;                                            \encode(struct_len, (bl));				     \::ceph::buffer::list::iterator struct_len_it = (bl).end(); \// 指向刚写入的长度字段位置struct_len_it.advance(-4);				     \do {/*** finish encoding block** @param bl bufferlist we were encoding to* @param new_struct_compat struct-compat value to use*/
#define ENCODE_FINISH_NEW_COMPAT(bl, new_struct_compat)			\} while (false);							\// 当前 bufferlist 总长度,get_off()长度字段的位置,struct_len长度字段本身占用的4字节// 这句代码实际上是计算实际编码的数据长度,不包括长度字段本身struct_len = (bl).length() - struct_len_it.get_off() - sizeof(struct_len); \// 回填真实长度struct_len_it.copy_in(4, (char *)&struct_len);			\if (new_struct_compat) {						\struct_compat = new_struct_compat;					\struct_compat_it.copy_in(1, (char *)&struct_compat);		\}#define ENCODE_FINISH(bl) ENCODE_FINISH_NEW_COMPAT(bl, 0)

很明显中间的do-while是留给其他类型的encode函数的,自定义的encode都会插入到ENCODE_START和ENCODE_FINISH_NEW_COMPAT中间的do-while中。
ENCODE_START的三个参数v、compact和bl,bl自不必说,就是数据结构需要encode的bufferlist。v和compact则都服务于bufferlist的版本。在写代码的过程中免不了向数据结构中添加一些成员,这样就导致新旧版本存储数据不一致,当新添加一个成员的时候,就要给v加1,compact则表示能兼容的最老版本,鉴于此,在我们添加新的成员时最好在encode函数中的末尾添加这个成员的encode函数,这样其之前的encode函数均能正常解析,也不必给compact加1了。

3.2 bufferlist具体encode过程

实际上,ceph复杂数据结构的encode过程就是不断的迭代,直到encode到基本类型,哪怕是class嵌套着class,只要不断的调用成员函数的encode函数,最终就能将整个复杂数据结构编码为bufferlist形式,下面将从一个基本类型的encode例子进行讲解。
我们用encode(obj_size, bl);这条语句来详细讲解encode的过程,obj_size是uint64_t类型,该类型的encode函数为:

#define WRITE_INTTYPE_ENCODER(type, etype)				\inline void encode(type v, ::ceph::bufferlist& bl, uint64_t features=0) { \ceph_##etype e;					                \e = v;                                                              \::ceph::encode_raw(e, bl);						\}									\inline void decode(type &v, ::ceph::bufferlist::iterator& p) {	\ceph_##etype e;							\::ceph::decode_raw(e, p);						\v = e;								\}WRITE_INTTYPE_ENCODER(uint64_t, le64)
WRITE_INTTYPE_ENCODER(int64_t, le64)
WRITE_INTTYPE_ENCODER(uint32_t, le32)
WRITE_INTTYPE_ENCODER(int32_t, le32)
WRITE_INTTYPE_ENCODER(uint16_t, le16)
WRITE_INTTYPE_ENCODER(int16_t, le16)

那么实际处理的函数就是::ceph::encode_raw(),来看encode_raw函数,非常简单:

template<class T>
inline void encode_raw(const T& t, bufferlist& bl)
{bl.append((char*)&t, sizeof(t));
}

无论const T是什么类型,都会传该类型的引用t,最终取地址转化为char*,实际上就是将地址转换为字符串。encode_raw调用了buffer::list::append(const char *data, unsigned len)函数:

void buffer::list::append(const char *data, unsigned len){_len += len;const unsigned free_in_last = get_append_buffer_unused_tail_length();const unsigned first_round = std::min(len, free_in_last);if (first_round) {// _buffers and carriage can desynchronize when 1) a new ptr// we don't own has been added into the _buffers 2) _buffers// has been emptied as as a result of std::move or stolen by// claim_append.if (unlikely(_carriage != &_buffers.back())) {auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);_carriage = bptr.get();_buffers.push_back(*bptr.release());_num += 1;}_carriage->append(data, first_round);}const unsigned second_round = len - first_round;if (second_round) {auto& new_back = refill_append_space(second_round);new_back.append(data + first_round, second_round);}}

其中_carriage指向bufferlist链表中的最后一个ptr_node,可以理解为指向最后一块内存空间,因为bufferlist实际上是buffer::raw的链表,而buffer::raw是管理内存的数据结构,所以bufferlist的append函数实际上就是将当前加入的数据存储到最后一块内存中并加入到链表末尾。那么_carriage的作用也不难理解了,因为经常需要操作链表末尾的指针,如果从链表头开始找那么时间复杂度就是O(n)O(n)O(n),直接用_carriage可以大大加快append的速度。
append函数中大部分代码都是处理末尾内存空间不足的情况,实际上是buffer::ptr::append(_carriage的父类buffer::ptr)函数执行最终的加入链表逻辑:

_carriage->append(data, first_round);unsigned buffer::ptr::append(const char *p, unsigned l){ceph_assert(_raw);ceph_assert(l <= unused_tail_length());char* c = _raw->get_data() + _off + _len;maybe_inline_memcpy(c, p, l, 32);_len += l;return _len + _off;}

首先获取raw内存的实际地址c,再将p(也就是之前获取到的需要encode的数据的地址)拷贝过去。
针对bufferlist如何管理内存,将在其他篇章中介绍。

http://www.xdnf.cn/news/19151.html

相关文章:

  • 【大前端】React Native(RN)跨端的原理
  • Day16_【机器学习—模型拟合问题】
  • 【MySQL 为什么默认会给 id 建索引? MySQL 主键索引 = 聚簇索引?】
  • 【实战】连锁商超出口网络割接项目案例分享
  • 从CTFshow-pwn入门-pwn43理解栈溢出到底跳转call还是plt
  • 【Word】用 Python 轻松实现 Word 文档对比并生成可视化 HTML 报告
  • 深入 OpenHarmony 内核:设备待机管理模块的休眠调度与资源节能技术
  • 【SpringBoot 版本升级整合Redis异常解决】Unable to connect to 127.0.0.1:6379
  • 5G核心网的架构和功能详解
  • 浏览器访问 ASP.NET Core wwwroot 目录下静态资源的底层实现
  • 新手向:Python编写简易翻译工具
  • 实时标注+硬件加速 | Bandicam 8.2 屏幕录制软件特色功能
  • 局域网共享访问各种报错全记录:从「能 ping 不能进」到「IP/名称差异」一次说清
  • OpenAI重组受阻:微软“锁链”与生态博弈
  • 从 WPF 到 Avalonia 的迁移系列实战篇3:ResourceDictionary资源与样式的差异与迁移技巧
  • 使用 httpsok 工具全面排查网站安全配置
  • @HAProxy 介绍部署使用
  • Copilot、Cursor、Trae、ChatGPT 的“四件套”场景选择表
  • 5G相对于4G网络的优化对比
  • 卷积神经网络实现mnist手写数字集识别案例
  • 三、计算机网络与分布式系统(上)
  • Linux DNS配置文件resolv.conf简介
  • Centos 8 磁盘扩展xfs文件系统 (LVM)
  • 云计算学习100天-第32天
  • 1-ATSAMV71Q21
  • 大模型后训练——Online-RL实践
  • DistributedLock 实现.Net分布式锁
  • 智能养花谁更优?WebIDE PLOY技术与装置的结合及实践价值 —— 精准养护的赋能路径
  • 北斗导航 | 工信部印发《关于优化业务准入促进卫星通信产业发展的指导意见》解析
  • MySQL数据库精研之旅第十三期:吃透用户与权限管理,筑牢数据库安全第一道防线