文章目录
- 4.1.2 Redis协议与异步方式
- 1. redis pipeline
- 2. redis事务
- 1. MULTI
- 2. EXEC
- 3. DISCARD
- 4. WATCH
- 3. lua脚本
- 1. lua基础语法
- 2. Lua 脚本中访问 Redis 的方式
- 3. Lua 脚本中的 KEYS 和 ARGV
- 4、返回值
- 5、错误处理
- EVALSHA 来代替 EVAL
- 4. ACID特性分析
- 5. redis发布订阅
- 1. 工作原理
- 2. 命令
- 6. redis异步连接
- 1、思想
- 2. 代码
- 1. static int redisAttach(reactor_t *r, redisAsyncContext *ac)
- 2. ac->ev.addRead = redisAddRead;ac->ev.addRead = redisAddWrite;
- 3. redisEventUpdate
- 4. 使用逻辑
- 7. redis自定义实现
- 1. Redis 协议简介(RESP)
- 2. 示例代码(发送命令 + 接收 RESP 响应)
4.1.2 Redis协议与异步方式
1. redis pipeline
- 一次性发送多个指令,减少网络传输时间
就是不用请求应答,直接一股脑发,redis整完,一股脑回 - 事务性(事务性特征)是指一组操作要么全部成功执行,要么全部失败回滚,中间不能只执行一部分。这种特性通常用于数据库或数据操作中,保证数据的一致性和可靠性,pipeline不具备
2. redis事务
用户定义一系列数据库操作,定义为一个完整的处理单元,一起执行
锁总线,其他核心看到执行结束或者还没执行,中间过程看不到
也是一种原子操作,数据库的操作,某个事务由多条命令构成,不允许其他跟redis建立的连接不允许碰
什么时候探讨事务?多条并发连接
什么时候探讨原子操作?多核
1. MULTI
开启事务,事务执行过程中,单个命令是入队列操作,直到调用EXEC一起执行
2. EXEC
提交事务
3. DISCARD
取消事务
4. WATCH
检测key的变动,若在事务执行过程中,key变动则取消事务(就比如同时有其他人操作了这个变量,就发现数值变动,就会执行失败neil)
乐观锁(redis):
“我假设你不会碰我操作的变量,冲突很少,出问题我再处理。”(用版本号实现,操作时候发现版本号不对,说明被改了,再去重试)
悲观锁(mysql):
“我假设你一定会碰,所以我提前上锁,防止你碰。”
对比项 | 乐观锁(Optimistic Lock) | 悲观锁(Pessimistic Lock) |
---|---|---|
核心假设 | 不会发生冲突 | 一定会发生冲突 |
操作方式 | 不加锁,操作前后做数据校验 | 操作前加锁,阻塞其他操作 |
并发性能 | 性能好,冲突少时效率高 | 并发低,阻塞多时效率低 |
实现方式 | 版本号(version)、时间戳等 | 数据库 select … for update、行锁等 |
失败处理 | 操作失败后重试 | 先加锁,保证别人不能动 |
典型场景 | 数据读多写少,如缓存、排行榜更新 | 数据写多读多,如账户转账、订单处理 |
3. lua脚本
redis嵌入了lua虚拟机:可以同时执行多个命令
工作中,通常使用lua脚本
1. lua基础语法
- 变量
local x = 5
- 表(类似数组、字典)
local arr = {1, 2, 3}
local map = {key = "value", age = 18}
- 条件判断
if x > 10 thenreturn "大于10"
elsereturn "小于等于10"
end
- 循环
for i = 1, 5 doredis.call("INCR", "counter")
end
- 函数
local function add(a, b)return a + b
end
2. Lua 脚本中访问 Redis 的方式
在 Lua 脚本中操作 Redis,要通过内置的 redis.call 或 redis.pcall 函数。
用法格式:
redis.call("命令", key1, key2, ..., arg1, arg2, ...)
call:命令失败抛异常(一般用这个)
pcall:命令失败不抛异常,返回错误对象(控制容错时用)
示例:
redis.call("SET", "foo", "bar")
local val = redis.call("GET", "foo")
3. Lua 脚本中的 KEYS 和 ARGV
Redis Lua 脚本只能通过传参的方式操作数据:
EVAL "return redis.call('GET', KEYS[1])" 1 mykey
KEYS[1]、KEYS[2]…:用于传递 key 名
ARGV[1]、ARGV[2]…:用于传递 参数值
示例:
local stock = tonumber(redis.call("GET", KEYS[1])) --tonumber把字符串转换成数字类型
if stock > 0 thenredis.call("DECR", KEYS[1])return 1
elsereturn 0
end
4、返回值
可以返回任意类型的值(字符串、数字、数组等):
return 1 -- 数字
return "ok" -- 字符串
return {1, 2, 3} -- 数组
Redis 会自动把 Lua 值转成客户端可识别的响应
5、错误处理
return redis.pcall("GET", KEYS[1])
使用 pcall 可以捕获错误,避免脚本中断。
EVALSHA 来代替 EVAL
EVAL 是 Redis 执行一段 Lua 脚本的命令,支持多 key、多参数传入,原子执行
使用 EVALSHA 来代替 EVAL 时,相当于只传递一个 hash(SHA1)值 到 Redis,而不传整段 Lua 脚本:
- 网络传输更小
- 安全(不暴露源码)
- 快速查找(Redis 内部用哈希表管理脚本)
Redis 内部维护了一个结构类似于:
unordered_map<string /* sha1 */, LuaScript>
Redis 内部维护了一张“哈希表”,这样可以根据 hash 快速定位并执行脚本,EVALSHA 时 Redis 会:
- 查找哈希是否存在
- 找到原始脚本
- 执行脚本(在 Lua 虚拟机中)
4. ACID特性分析
A:原子性:
其他核心要么看到还没有执行,要不然就是执行完了,不可能看到中间的结果,这里还有要求就是要不然操作全部成功,要不全部失败(要增加回滚操作,要是失败,需要恢复到事务开始之前的状态)
C:一致性:
执行事务后,数据从一个一致状态过渡到另一个一致状态(满足约束、逻辑正确)
一个是逻辑上一致性,一个是数据库一致性(完整约束)
I:隔离性
多个事务互不干扰,执行结果与串行执行一致
各个事务之间的影响程度,redis单线程执行,天然具有隔离性
D:持久性
一旦事务执行成功,结果会被永久保存,即使系统崩溃也能恢复
redis只有在aof持久化策略时候,才具备持久性
lua脚本满足原子性和隔离性,不满足一致性和持久性
5. redis发布订阅
消息推送、实时通信、微服务解耦、事件驱动等场景。
1. 工作原理
发布者 使用 PUBLISH 向某个频道发送消息;
订阅者 使用 SUBSCRIBE、PSUBSCRIBE 订阅一个或多个频道;
一旦有新消息,所有订阅者会立即收到消息;
是即时通信模型,不做持久化,也不记录历史消息。
2. 命令
命令 | 说明 |
---|---|
SUBSCRIBE | 订阅一个或多个频道 |
UNSUBSCRIBE | 取消订阅 |
PUBLISH | 向频道发布消息 |
PSUBSCRIBE | 模糊订阅(支持通配符) |
PUNSUBSCRIBE | 取消模糊订阅 |
PUBSUB | 查看订阅状态、频道数量等元信息 |
6. redis异步连接
1、思想
hiredis 异步客户端(redisAsyncContext)接入自定义的 reactor 事件驱动系统的适配器,核心作用是桥接 Redis 的异步事件与你的事件循环机制,相当于实现了一套 hiredis 的 I/O 多路复用抽象接口
2. 代码
1. static int redisAttach(reactor_t *r, redisAsyncContext *ac)
核心的“接口对接点”,它做了:
- 创建一个 redis_event_t 对象(包含 event_t 和 Redis 上下文);
- 设置该对象的 addRead、delRead、addWrite、delWrite、cleanup 函数;
- 将这些函数注册进 redisAsyncContext 的 ev 成员;
从此,Redis 就能通过调用这些函数来操作你的事件模型。
static int redisAttach(reactor_t *r, redisAsyncContext *ac) {redisContext *c = &(ac->c);redis_event_t *re;/* Nothing should be attached when something is already attached */if (ac->ev.data != NULL)return REDIS_ERR;/* Create container for ctx and r/w events */re = (redis_event_t*)hi_malloc(sizeof(*re));if (re == NULL)return REDIS_ERR;re->ctx = ac;re->e.fd = c->fd;re->e.r = r;// dont use event buffer, using hiredis's bufferre->e.in = NULL;re->e.out = NULL;re->mask = 0;//这些是 hiredis 要求你实现的“注册函数”。每当 Redis 需要监听某个 FD 的读/写事件,就会调用这些函数ac->ev.addRead = redisAddRead;ac->ev.delRead = redisDelRead;ac->ev.addRead = redisAddWrite;ac->ev.delWrite = redisDelWrite;ac->ev.cleanup = redisCleanup;//清理函数必不可少ac->ev.data = re;return REDIS_OK;
}
2. ac->ev.addRead = redisAddRead;ac->ev.addRead = redisAddWrite;
这两个去调用redisReadHandler / redisWriteHandler,去使用hiredis提供的读写处理
3. redisEventUpdate
这是事件变化的调度器,核心逻辑是根据 mask 来:
- 新增事件:调用 add_event()
- 删除事件:调用 del_event()
- 修改事件:调用 enable_event() 切换读写状态
4. 使用逻辑
int main() {// 1. 创建 event loopreactor_t *r = reactor_create();// 2. 连接 Redis 异步客户端redisAsyncContext *ac = redisAsyncConnect("127.0.0.1", 6379);if (ac->err) {printf("Redis error: %s\n", ac->errstr);return -1;}// 3. 接入自己的 reactorif (redisAttach(r, ac) != REDIS_OK) {printf("Failed to attach redis context to reactor\n");return -1;}// 4. 设置连接成功/断开回调(可选)redisAsyncSetConnectCallback(ac, connectCallback);redisAsyncSetDisconnectCallback(ac, disconnectCallback);// 5. 发送异步命令redisAsyncCommand(ac, commandCallback, NULL, "SET foo bar");// 6. 启动事件循环reactor_run(r);return 0;
}
hiredis实现了协议解析、读写事件、缓冲区操作、协议加密
我们适配文件实现了:适配事件对象以及事件操作函数
7. redis自定义实现
有时候,用户除了需要与项目网络层兼容,同时需要考虑与项目中数据结构契合;这个时候可以考虑自己实现 redis 协议,从解析协议开始转换成项目中的数据结构
1. Redis 协议简介(RESP)
Redis 协议其实非常简单,叫做 RESP(REdis Serialization Protocol),它是一个 文本协议,只要解析这些前缀符号就能实现读写。
类型 | 开头 | 示例 |
---|---|---|
简单字符串 | + | +OK\r\n |
错误 | - | -Error message\r\n |
整数 | : | :1000\r\n |
批量字符串 | $ | $3\r\nfoo\r\n |
多条批量字符串(数组) | * | *2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n |
比如我们想执行这个命令:
SET name ChatGPT
手写协议是这样的:
*3\r\n
$3\r\n
SET\r\n
$4\r\n
name\r\n
$7\r\n
ChatGPT\r\n
只要把这个拼好(用 sprintf 或 writev),直接发给 Redis 服务端,它就能处理。
2. 示例代码(发送命令 + 接收 RESP 响应)
int fd = socket(AF_INET, SOCK_STREAM, 0);
connect(fd, ...);// 拼接 Redis 协议
char buf[256];
int len = snprintf(buf, sizeof(buf),"*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$7\r\nChatGPT\r\n");
write(fd, buf, len);// 读取返回
char resp[128];
int n = read(fd, resp, sizeof(resp));
resp[n] = 0;printf("Redis response: %s\n", resp); // 应该打印:+OK