当前位置: 首页 > ops >正文

Redis C++ 实现笔记(F篇)

Implementing Redis in C++ : F

Redis C++ 实现笔记(F篇)

前言

本章代码及思路均来自Build Your Own Redis with C/C++

本文章只阐述我的理解想法,以及需要注意的地方。

本文章为续<<Implementing Redis in C++ : E>>所以本文章不再完整讲解全部代码,只讲解其不同的地方

Redis-like命令

在原作者接下来的步骤中,原作者新增了zaddzremzscorezquery的命令,这些命令都是与redis的命令类似,我们简单的说一下这几个命令:

  1. zadd:将一个元素加入到有序集合中,如果这个元素已经存在,则更新这个元素的分数。

使用方法:zadd key score member,如果数据不存在,添加成功后,返回 (int) 1,如果数据已经存在,则更新数据,返回 (int) 0

  1. zrem:从有序集合中删除一个元素。

使用方法:zrem key member,删除成功后,返回 (int) 1,如果数据不存在,则返回 (int) 0

  1. zscore:获取有序集合中指定成员的分数。

使用方法:zscore key member,返回 (double) score,如果数据不存在,则返回 (nil)

  1. zquery:获取有序集合中 >=(score, name) 区间的成员。

使用方法:zquery zset score name skip count,返回 (array),包含有序集合中 >=(score, name) 区间的成员。其中skip是跳过的数量,count是返回的成员数量。

主体思路

为了快速理解作者的代码的思路,我将定义的结构体之间的关系以以下图示描述:

g_data
└── db : HMap├── newer : HTab   (正在使用)│   ├── tab[0] ──> HNode(Entry1) ──> HNode(Entry2) ──> ...│   │                                 ││   │                                 └── Entry│   │                                     ├── key  = "foo"│   │                                     ├── type = STR│   │                                     ├── str  = "hello"│   │                                     └── zset = (unused if type=STR)│   ││   ├── tab[1] ──> HNode(Entry3) ──> ...│   │                     ││   │                     └── Entry│   │                         ├── key  = "myzset"│   │                         ├── type = ZSET│   │                         ├── str  = ""│   │                         └── zset : ZSet│   │                                ├── root (AVLNode*)│   │                                │       ┌── ZNode::tree(score=1.0, name="a")│   │                                │ root ─┼── ZNode::tree(score=2.0, name="b")│   │                                │       └── ZNode::tree(score=3.0, name="c")│   │                                ││   │                                └── hmap : HMap│   │                                     ├── newer : HTab│   │                                     │     ├── tab[hash("a")] ──> HNode(ZNode "a")│   │                                     │     ├── tab[hash("b")] ──> HNode(ZNode "b")│   │                                     │     └── tab[hash("c")] ──> HNode(ZNode "c")│   │                                     ├── older : HTab (迁移用)│   │                                     └── migrate : szie_t (迁移用)│   ││   ├── mask : size_t   (hash 索引掩码, 形如 2^k-1)│   └── size : size_t   (元素数量)│├── older : HTab        (rehash 迁移时的旧表)│   ├── tab : HNode*[]│   ├── mask│   └── size│└── migrate_pos : size_t  (迁移进度)g_data└── db : HMapHMap (HashMap)├── newer : HTab       ---> 正在使用的新表├── older : HTab       ---> 迁移中的旧表└── migrate_pos : size_t  ---> 迁移进度HTab (节点表)├── tab  : HNode**     ---> 数组,每个元素是链表头指针│          tab[0] ---> HNode ---> HNode ---> ...│          tab[1] ---> HNode ---> ......├── mask : size_t      ---> 用于计算索引 (hcode & mask)└── size : size_t      ---> 表中元素个数HNode (链表节点)├── next : HNode*     ---> 指向下一个 HNode└── hcode : uint64_t  ---> 哈希值Entry (用户数据)├── node: HNode  <--- 侵入式节点,链接到 HTab 的 tab[i] 链表├── key: std::string <--- 用户原始数据的 键├── type: uint32_t  <--- 数据类型├── str: std::string <--- 用户原始数据的 值└── zset: ZSet  <--- 存放有序集合数据ZSet (有序集合)├── root: AVLNode*   ---> 根据(socre, name)进行排序的 AVL 树根节点└── hmap: HMap  ---> 根据 name 索引的哈希表ZNode (有序集合节点)├── tree: AVLNode├── hmap: HNode├── score: double├── len: size_t└── name: char*AVLNode (AVL 树节点)
├── parent: AVLNode*
├── left: AVLNode*
├── right: AVLNode*
├── height: uint32_t
└── cnt: uint32_t

在上面的整体思路中,我们拓展了Entry中存储数据的结构,让Entry中既可以存储str也可以存储zset,同时仍然保持使用Entry进行快速的查找,同时因为我们使用了ZSet
ZSet中使用AVL树来存储(score, name)的数据,也使用HMap来索引name

与之前的修改相比,现在的主要修改是使用ZNode中的hmap来接入HMap,并使用ZNode中的tree来接入AVL树

因为在先前,我们已经实现了AVL树HMap,而实现Redis-like 命令,就可以使用我们先去实现的这两个算法,来高效的查找数据。

所以接下来我们就要实现上面的命令和逻辑。

ZSet{}, ZNode{}, znode_new()

struct ZSet{AVLNode* root = nullptr; // index by (score, name)HMap hmap; // index by name
};struct ZNode{AVLNode tree;HNode hmap;double score = 0;size_t len = 0;char name[0]; // flexible array
};static ZNode* znode_new(const char* name, size_t len, double score){ZNode* node = (ZNode*)malloc(sizeof(ZNode)+len);assert(node);avl_init(&node->tree);node->hmap.next = nullptr;node->hmap.hcode = str_hash((uint8_t*)name, len);node->score = score;node->len = len;memcpy(&node->name[0], name, len);return node;
}

具体的ZSet{}ZNode{}结构体的关系我们已经在上面的图示中说明了,这里不再赘述。

其中我们需要注意的是ZNode{}中的name[0],这是定义了一个灵活数组,用来保存name,我们这样定义struct的方式,并不能使用new来创建结构体,因为new只会创建静态结构体,也就是不会计算我们后面增加的内容的大小,而malloc()可以创建动态结构体,可以灵活的分配内存大小。

因为我们的namechar类型,所以我们直接使用sizeof(ZNode)+len也是可以的。

要是换成其他的数据,记得要进行计算后面的大小。

zadd()

zadd()命令,用于添加一个元素到ZSet中,示例:zadd zset 1 name1,也就是向Entry中存储键为key = zsetAVLNode Entry::zset::rootAVL树中插入一个(score, name),同时,使用哈希插入到Entry中存储键为key = zsetHTab Entry::zset::hmap::newer中,将name作为keyscore作为value

所以,zadd()命令也就是分成了三个步骤:

  1. 检查是否已经存在
  2. 存在:更新
  3. 不存在:插入
bool zset_insert(ZSet* zset, const char* name, size_t len, double score){ZNode* node = zset_lookup(zset, name, len); // check the node existif(node){zset_update(zset, node, score);return false; // update existing node}else{node = znode_new(name, len, score);hm_insert(&zset->hmap, &node->hmap);tree_insert(zset, node);return true;}
}

我们一步步解释其中的函数。

// a helper structure for the hashtable lookup
struct HKey{HNode node;const char* name = nullptr;size_t len = 0;
};static bool hcmp(HNode* node, HNode* key){ZNode* znode = container_of(node, ZNode, hmap);HKey* hkey = container_of(key, HKey, node);if(znode->len != hkey->len)return false;return 0 == memcmp(znode->name, hkey->name, znode->len);
}// lookup by name
ZNode* zset_lookup(ZSet* zset, const char* name, size_t len){if(!zset->root)return nullptr;HKey key;key.node.hcode = str_hash((uint8_t*)name, len);key.name = name;key.len = len;HNode* found = hm_lookup(&zset->hmap, &key.node, &hcmp);return found ? container_of(found, ZNode, hmap) : nullptr ;
}

我们首先创建了一个HKey结构体,用来辅助查找。

为什么要新建一个Hkey结构体呢?

我们的hm_lookup函数需要(HMap* hmap,HNode* key,bool (*eq)(HNode* , HNode*))这四个参数,同时我们传入的函数指针hcmp还会使用container_of来获取到上级结构体,拿到上级结构体中的数据再次进行比对,最终确认我们要找的节点,在这同时我们不能单独的传入一个char*类型,我们就需要一个结构体来承担,存储HNode*char*等的作用。

注意:HKey中的HNode代表的是ZNode中的HNode

当我们找到对应的节点后,我们就可以使用container_of来还原回上级结构体ZNode了,并返回。

// compare by the (score, name) tuple
static bool zless(AVLNode* lhs, double score, const char* name, size_t len){ZNode* zl = container_of(lhs, ZNode, tree);if(zl->score != score){return zl->score < score;}int rv = memcmp(zl->name, name, min(zl->len, len));if (rv != 0) {return rv < 0;}return zl->len < len;
}static bool zless(AVLNode* lhs, AVLNode* rhs){ZNode* zr = container_of(rhs, ZNode, tree);return zless(lhs, zr->score, zr->name, zr->len);
}static void tree_insert(ZSet* zset, ZNode* node){AVLNode* parent = nullptr; // insert under this nodeAVLNode* *from = &zset->root; // the incoming pointer to the next nodewhile(*from){ // tree searchparent = *from;from =  zless(&node->tree, parent) ? &parent->left : &parent->right;}*from = &node->tree;  // attach to this nodenode->tree.parent = parent;zset->root = avl_fix(&node->tree);
}

因为我们要使用AVL树来存储(score, name)的信息,所以树在排序的时候也是,先比较score,然后比较name,然后进行存储,所以当我们在搜索准备插入数据的时候,我们也需要使用container_of来获取上级结构体中的(score, name)数据,也就是我们写的zless函数。

tree_insert函数,也是通过while zless找到插入的位置,然后进行插入。

// update the score of an existing node
static void zset_update(ZSet* zset, ZNode* node, double score){if(node->score == score) return; // no changezset->root = avl_del(&node->tree);avl_init(&node->tree);node->score = score;tree_insert(zset, node);
}

更新代码似乎不用多说,需要注意的是,修改AVL树中的数据,可能会破坏树的有序性,所以我们要先删除原来的数据,然后再插入新的数据。

avl_del中删除后会调用avl_fix来修复树,不必担心树被破坏。

static bool str2dbl(const std::string &s, double &out){char* endp = nullptr;out = strtod(s.c_str(), &endp);return endp == s.c_str() + s.size() && !isnan(out);
}static bool str2int(const std::string &s, int64_t &out){char* endp = nullptr;out = strtoll(s.c_str(), &endp, 10);return endp == s.c_str() + s.size();
}// zadd zset score name
static void do_zadd(std::vector<std::string> &cmd, Ring_buf &buf){double score = 0;if(!str2dbl(cmd[2], score)){return out_err(buf, ERR_BAD_ARG, "expect float");}// lookup the zsetLookupKey key;key.key.swap(cmd[1]);key.node.hcode = str_hash((uint8_t*)key.key.data(), key.key.size());HNode* hnode = hm_lookup(&g_data.db, &key.node, &entry_eq);Entry* ent = nullptr;if(!hnode){// insert a new keyent = entry_new(T_ZSET);ent->key.swap(key.key);ent->node.hcode = key.node.hcode;hm_insert(&g_data.db, &ent->node);}else{ent = container_of(hnode, Entry, node);if(ent->type != T_ZSET){return out_err(buf, ERR_BAD_TYP, "expect zset");}}// add or update the tupleconst std::string &name = cmd[3];bool added = zset_insert(&ent->zset, name.data(), name.size(), score);return out_int(buf, (int64_t)added);
}

因为我们接收到的网络数据是字符串,所以需要将字符串转换成对应的数据。

str2dblstr2int中,首先都先定义了char *end;这是用来在使用strtodstrtoll时,存放解析结束的位置,如果遇到不能解析的符号,就停留在那里,否则,就指向字符串的末尾。

使用strtodstrtoll将字符串转换成对应的数据,stroll中的10就是按照十进制来转换的。

在判断 endp == s.c_str() + s.size() 时,其实就是在验证字符串是否被完全成功解析。

  • s.c_str() 会把 std::string 转换成 C 风格字符串 (const char*),返回一个指向首字符的指针。
  • s.c_str() + s.size() 指向的是字符串的末尾(即最后一个字符的下一个位置,通常是 \0 的位置)。
  • 如果 endp 没有指向这个末尾,就说明解析过程提前停止了,后面还有未能解析的内容,此时就认为解析失败。
  • 只有当 endp 恰好等于字符串末尾时,才说明整个字符串都被正确解析。

举例说明:

  • 输入 "123"endp 会指向字符串末尾,表示解析成功。
  • 输入 "123abc"endp 会停在 'a' 处,说明 "abc" 不是合法数字,因此解析不完整。

在我们的do_add中,我们仍然是使用了一个新的结构体来辅助查询,这个结构体的作用也和我们上面将的一样,hm_lookup需要HNode的参数,同时又需要上级的结构体中的内容来进行比对,也就是我们传入的entry_eq,所以需要我们新建一个LookupKey结构体。

zrem()

// delete by name
void zset_delete(ZSet* zset, ZNode* node){// remove from the hashtableHKey key;key.node.hcode = node->hmap.hcode;key.name = node->name;key.len = node->len;HNode* found = hm_delete(&zset->hmap, &key.node, &hcmp);assert(found);// remove from the AVL treezset->root = avl_del(&node->tree);znode_del(node);
}

对于我们删除的函数HNode* hm_delete(HMap* hmap,HNode* key,bool (*eq)(HNode* , HNode*)),同样也是需要我们的HKey来辅助删除的,hcmp通过比对node上级结构体中的数据,来辅助查找对应的节点。


static const ZSet k_empty_zset;static ZSet* expect_zset(std::string &s){LookupKey key;key.key.swap(s);key.node.hcode = str_hash((uint8_t*)key.key.data(), key.key.size());HNode* hnode = hm_lookup(&g_data.db, &key.node, &entry_eq);if(!hnode){// a non-existent key is treated as an empty zsetreturn (ZSet*)&k_empty_zset;}Entry* ent = container_of(hnode, Entry, node);return ent->type == T_ZSET ? &ent->zset : nullptr;
}static void do_zrem(std::vector<std::string> &cmd, Ring_buf &buf){ZSet* zset = expect_zset(cmd[1]);if(!zset){return out_err(buf, ERR_BAD_TYP, "expect zset");}const std::string &name = cmd[2];ZNode* znode = zset_lookup(zset, name.data(), name.size());if(znode){zset_delete(zset, znode);}return out_int(buf, znode ? 1 : 0);
}

LookupKey的使用与HKey类似,不过,LookupKey中的HNode代表(查找)的是Entry中的HNode

我们的zrem命令会先查找键,然后删除键中ZSet中的元素,而数据的存储在Entry中,所以我们通过expect_zset来获取键对应的ZSet,然后通过zset_delete来删除键对应的元素。

在这其中,我们还定义了一个static const ZSet k_zset_empty;,用来作为,当key不存在时,返回的ZSet

为什么我们要使用一个自己定义的空对象呢?

返回空对象 k_empty_zsetkey 不存在时等同空集合,并与 Redis 语义一致,让返回nullptr只有一种情况,即key存在但不是ZSet

zscore()

static void do_zscore(std::vector<std::string> &cmd, Ring_buf &buf){ZSet* zset = expect_zset(cmd[1]);if(!zset){return out_err(buf, ERR_BAD_TYP, "expect zset");}const std::string &name = cmd[2];ZNode* znode = zset_lookup(zset, name.data(), name.size());return znode ? out_dbl(buf, znode->score) : out_nil(buf);
}

查找指定有序集(ZSet)中的成员(name)的分数(score)

这个似乎就不用多说了,先使用expect_zset()函数获取有序集,然后使用zset_lookup()函数查找成员,如果找到则返回分数,否则返回nil

zquery()

用来查找指定有序集(ZSet)中 **>=(score, name)**的成员
使用命令:zquery zset score name offset limit
其中offset为查询的起始位置,limit为查询的个数

// find the first (score, name) tuple that is >= key.
ZNode* zset_seekge(ZSet* zset, double score, const char* name, size_t len){AVLNode* found = nullptr;for(AVLNode* node = zset->root; node;){if(zless(node, score, name, len)){node = node->right;}else{found = node;node = node->left;}}return found ? container_of(found, ZNode, tree) : nullptr;
}

首先我们使用zless函数来判断当前节点的分数是否小于给定的分数。如果小于,则说明当前节点的分数小于给定的分数,那么我们就向右移动,同时记录当前大于 (score, name) 的节点,否则向左移动,直到找到第一个大于 (score, name) 的点。

// offset into the succeeding or preceding node
ZNode* znode_offset(ZNode* node, int64_t offset){AVLNode* tnode = node ? avl_offset(&node->tree, offset) : nullptr;return tnode ? container_of(tnode, ZNode, tree) : nullptr;
}

这个代码就是通过offset找到,当前传入节点的前驱后继

下面的代码是具体的实现

AVLNode* avl_offset(AVLNode* node, int64_t offset){int64_t pos = 0; // the rank of difference from the starting nodewhile(offset != pos){if(pos < offset && pos+avl_cnt(node->right) >= offset){// the target is inside the right subtreenode = node->right;pos += avl_cnt(node->left) + 1;}else if(pos > offset && pos - avl_cnt(node->left) <= offset){// the target is inside the left subtreenode = node->left;pos -= avl_cnt(node->right) + 1;}else{// go to the parentAVLNode* parent = node->parent;if(!parent){return nullptr;}if(parent->right == node){pos -= avl_cnt(node->left)+1;}else{pos += avl_cnt(node->right)+1;}node = parent;}}return node;
}
         3/   \2     5/     /1     4

index: 0 1 2 3 4
value: [1] [2] [3] [4] [5]
P --> O
offset如果大于零,那就是要找当前节点的右子树的大小,如果右子树的大小不够,也就是说,P的移动,无法移动到我们要找的节点,所以我们要向上移动,增大我们要找的右子树的范围,让我们的P可以移动到对应的节点
P在向上移动的过程中,如果在移动前的节点是其父节点的右子树,那我们挪上去后,P要相应的减去左子树的大小,也就是向左移动
为什么要向左移动,假设我们在移动前节点的父节点上,想要移动到右子树,也就是更大的值的地方,我们的P是要向右移动的,而向右移动多少?那就是我们移动前的节点的左子树的大小(AVL中序遍历的输出是有序的,所以不必担心左子树的值比右子树的值大)
那么相应的,如果我们在移动前的节点是其父节点的左子树,那我们挪上去后,P要相应的加上右子树的大小,也就是向右移动

也正如上面我们所说的,我们要找到pos == offset的位置
pos < offset时,也就是说,我们的Offset所指的位置,还在P所指位置的右侧,我们要向右移动,如果我们的右子树足够大(avl_cnt),也就是说,P可以向右移动的距离足够大,那就可以直接移动到右子树中,查找对应的offset所指的位置
如果右子树不够大,那就只能向上移动,扩大我们可以移动的范围
同理,当pos > offset时,也就是说,我们的Offset所指的位置,还在P所指位置的左侧,我们要向左移动,如果我们的左子树足够大(avl_cnt),也就是说,P可以向左移动的距离足够大,那就可以直接移动到左子树中,查找对应的offset所指的位置
如果左子树不够大,那就只能向上移动,扩大我们可以移动的范围

从节点 3 出发,offset = +2 (也就是要找 5 节点)
我们发现 3 的右子树的大小是 2 (5 节点和 4 节点),也就是P所指的位置可以向右移动 2 个位置,刚好可以移动到 5 节点
那我们就进入右子树。

// zquery zset score name offset limit 
static void do_zquery(std::vector<std::string> &cmd, Ring_buf &buf){// parse argsdouble score = 0;if(!str2dbl(cmd[2], score)){return out_err(buf, ERR_BAD_ARG, "expect fp number");}const std::string &name = cmd[3];int64_t offset = 0, limit = 0;if(!str2int(cmd[4], offset) || !str2int(cmd[5],limit)){return out_err(buf, ERR_BAD_ARG, "expect int");}// get the zsetZSet* zset = expect_zset(cmd[1]);if(!zset){return out_err(buf, ERR_BAD_TYP, "expect zset");}// seek to the keyif(limit <= 0){return out_arr(buf,0);}ZNode* znode = zset_seekge(zset, score, name.data(), name.size());znode = znode_offset(znode, offset);// output size_t ctx = out_begin_arr(buf);int64_t n = 0;while(znode && n < limit){out_str(buf, znode->name, znode->len);out_dbl(buf, znode->score);znode = znode_offset(znode, 1);n += 2;}out_end_arr(buf, ctx, (uint32_t)n);}

我们将上面的代码进行结合,先获取输入的scorename,再通过except_zset获取到对应的有序表(ZSet),然后通过zset_seekge寻找到第一个大于(score, name)ZNode,然后通过znode_offset计算跳过的个数,最后进行输出。

end

这些就是代码修改的主体,其他的部分改动较小,我们就不再讲述了,鉴于代码放在这里实在太多,我给出我的github地址,大家可以去找study/dev_4的目录进行查看

github地址:https://github.com/AuroBreeze/Implementing-Redis-in-C

http://www.xdnf.cn/news/19852.html

相关文章:

  • C/C++关键字——union
  • Python开篇撬动未来的万能钥匙 从入门到架构的全链路指南
  • 《IC验证必看|semaphore与mailbox的核心区别》
  • [从零开始面试算法] (11/100) LeetCode 226. 反转二叉树:递归的“镜像”魔法
  • RabbitMQ学习笔记
  • 找活招工系统源码 雇员雇主小程序 后端JAVA前端uniapp
  • 《云原生深坑实录:让团队卡壳的不是配置,是底层逻辑盲区》
  • 基于扣子平台构造AutoGen框架的多智能体使用-----封装成FastAPI接口供调用
  • JVM:程序计数器
  • 基于Matlab狭窄空间环境中多无人机自重构V字队形方法研究
  • 《清远市市级政务信息化服务项目立项审批细则(试行)》标准解读
  • Jenkins调用Ansible构建LNMP平台
  • 深入探索 WebSocket:构建实时应用的核心技术
  • DarkHole: 2靶场渗透
  • 用 SPL 编写阿里云 FC2.0 函数
  • AntdesignVue 的月份区间组件用法
  • mysql分页SQL
  • Dubbo(分布式RPC调用和分布式文件储存)
  • 深入解析Django重定向机制
  • 2025React面试题集锦
  • Java 与 Docker 的最佳实践
  • wins中怎么用一个bat文件启动jar包和tomcat等多个服务
  • Linux tail 命令使用说明
  • 【C++详解】C++11(四) 包装器:function、bind、STL中⼀些变化
  • 【AI论文】UI-TARS-2技术报告:借助多轮强化学习推进图形用户界面(GUI)智能体发展
  • 20. 云计算-华为云-云服务
  • Linux Centos7搭建LDAP服务(解决设置密码生成密文添加到配置文件配置后输入密码验证报错)
  • 分享星空投影灯方案
  • 高效菜单管理页面:一键增删改查
  • Word 常用快捷键大全:提升文档处理效率的必备技巧​