Redis协议与异步方式
1.redis pipeline
通过一次发送多次请求命令,为了减少网络传输时间。
注意:pipeline 不具备事务性。
2.redis 事务
事务:用户定义一系列数据库操作,这些操作视为一个完整的逻辑处理工作单元,要么全部执行, 要么全部不执行,是不可分割的工作单元。
当出现多条并发连接时,我们会想到事务,当出现多核时,我们会想到原子操作。
2.1 MULTI
开启事务,事务执行过程中,单个命令是入队列操作。
2.2 EXEC
提交事务,执行事务。
2.3 DISCARD
取消事务
2.4 WATCH
检测 key 的变动,若在事务执行中, key 变动则取消事务;
在事务开启前调用乐观锁实现。
乐观锁(redis):
“我假设你不会碰我操作的变量,冲突很少,出问题我再处理。”(用版本号实现,操作时候发现版本号不对,说明被改了,再去重试)
悲观锁(mysql):
“我假设你一定会碰,所以我提前上锁,防止你碰。”
对比项 | 乐观锁 | 悲观锁 |
核心假设 | 不会发生冲突 | 一定会发生冲突 |
操作方式 | 不加锁,操作前后做数据校验 | 操作前加锁,阻塞其他操作 |
并发性能 | 性能好,冲突少时效率高 | 并发低,阻塞多时效率低 |
实现方式 | 版本号(version),时间戳 | 数据库 select...for update,行锁等 |
失败处理 | 操作失败后重试 | 先加锁,保证别人不能动 |
典型场景 | 数据读多写少,如缓存 | 数据写多读多,如账户转账,订单处理 |
3.lua 脚本
redis 中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的;当某 个脚本正在执行的时候,不会有其他命令或者脚本被执行;
调用方式
redis.call("命令",key1,key2,...,arg1,arg2,...)//call:命令失败抛出异常
//pcall:命令失败不抛异常,返回错误对象
EVAL 是 Redis 执行一段 Lua 脚本的命令,支持多 key、多参数传入,原子执行
使用 EVALSHA 来代替 EVAL 时,相当于只传递一个 hash(SHA1)值 到 Redis,而不传整段 Lua 脚本:Redis 内部维护了一张“哈希表”,这样可以根据 hash 快速定位并执行脚本,EVALSHA 时 Redis 会:
- 查找哈希是否存在
- 找到原始脚本
- 执行脚本(在 Lua 虚拟机中)
4.ACID 特性分析
A :原子性;事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis 不支持回滚;即使事务队列中的某个命令在执行期间出现了错误,整个事务也会继续执行下去,直 到将事务队列中的所有命令都执行完毕为止。
C :一致性;执行事务后,数据从一个一致状态过渡到另一个一致状态(满足约束、逻辑正确)
一个是逻辑上一致性,一个是数据库一致性(完整约束)
I :隔离性;各个事务之间互相影响的程度;redis 是单线程执行,天然具备隔离性;
D :持久性;一旦事务执行成功,结果会被永久保存,即使系统崩溃也能恢复
redis只有在aof持久化策略时候,才具备持久性,实际项目中几乎不会使用aof 持久化策略;
lua 脚本满足原子性和隔离性;一致性和持久性不满足;
5.发布订阅
原理
发布者 使用 PUBLISH 向某个频道发送消息;
订阅者 使用 SUBSCRIBE、PSUBSCRIBE 订阅一个或多个频道;
一旦有新消息,所有订阅者会立即收到消息;
是即时通信模型,不做持久化,也不记录历史消息。
命令
命令 | 说明 |
SUBSCRIBE | 订阅一个或多个频道 |
UNSUBSCRIBE | 取消订阅 |
PUBLISH | 向频道中发布消息 |
PSUBSCRIBE | 订阅一个或多个频道(支持通配符) |
PUNSUBSCRIBE | 取消订阅 |
PUBSUB | 查看订阅状态,频道数量等信息 |
发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而 pubsub 能收到 redis 主动推送的内容;所以实际项目中如果支持 pubsub 的话,需要另开一条连接 用于处理发布订阅;
注意:redis 停机重启,pubsub 的消息是不会持久化的,所有的消息被直接丢弃;
6.异步连接
借鉴于:小李小李快乐不已 redis异步连接
思想
hiredis 异步客户端接入自定义的 reactor 事件驱动系统的适配器,核心作用是桥接Redis的异步事件自定义事件循环机制,实现了一套hiredis的IO多路复用抽象接口。
代码
static int redisAttach(reactor_t *r, redisAsyncContext *ac)
- 创建一个 redis_event_t 对象(包含 event_t 和 Redis 上下文);
- 设置该对象的 addRead、delRead、addWrite、delWrite、cleanup 函数;
- 将这些函数注册进 redisAsyncContext 的 ev 成员;
static int redisAttach(reactor_t *r, redisAsyncContext *ac) {redisContext *c = &(ac->c);redis_event_t *re;/* Nothing should be attached when something is already attached */if (ac->ev.data != NULL)return REDIS_ERR;/* Create container for ctx and r/w events */re = (redis_event_t*)hi_malloc(sizeof(*re));if (re == NULL)return REDIS_ERR;re->ctx = ac;re->e.fd = c->fd;re->e.r = r;// dont use event buffer, using hiredis's bufferre->e.in = NULL;re->e.out = NULL;re->mask = 0;//这些是 hiredis 要求你实现的“注册函数”。每当 Redis 需要监听某个 FD 的读/写事件,就会调用这些函数ac->ev.addRead = redisAddRead;ac->ev.delRead = redisDelRead;ac->ev.addRead = redisAddWrite;ac->ev.delWrite = redisDelWrite;ac->ev.cleanup = redisCleanup;//清理函数必不可少ac->ev.data = re;return REDIS_OK;
}
ac->ev.addRead = redisAddRead;ac->ev.addRead = redisAddWrite;
这两个去调用redisReadHandler / redisWriteHandler,去使用hiredis提供的读写处理
redisEventUpdate
这是事件变化的调度器,核心逻辑是根据 mask 来:
- 新增事件:调用 add_event()
- 删除事件:调用 del_event()
- 修改事件:调用 enable_event() 切换读写状态
使用逻辑
int main() {// 1. 创建 event loopreactor_t *r = reactor_create();// 2. 连接 Redis 异步客户端redisAsyncContext *ac = redisAsyncConnect("127.0.0.1", 6379);if (ac->err) {printf("Redis error: %s\n", ac->errstr);return -1;}// 3. 接入自己的 reactorif (redisAttach(r, ac) != REDIS_OK) {printf("Failed to attach redis context to reactor\n");return -1;}// 4. 设置连接成功/断开回调(可选)redisAsyncSetConnectCallback(ac, connectCallback);redisAsyncSetDisconnectCallback(ac, disconnectCallback);// 5. 发送异步命令redisAsyncCommand(ac, commandCallback, NULL, "SET foo bar");// 6. 启动事件循环reactor_run(r);return 0;
}
hiredis实现了协议解析、读写事件、缓冲区操作、协议加密
我们适配文件实现了:适配事件对象以及事件操作函数