文章目录
- 助记提要
- 11章 Lua脚本编程
- 11.1 不编写C的情况下添加新功能
- Lua脚本载入Redis
- Python对Lua脚本返回值的转换
- Lua脚本是原子操作
- 使用Lua脚本创建新的状态消息
- 11.2 用Lua重写锁和信号量
- 使用Lua实现锁的原因
- Lua重写锁
- Lua实现计数信号量
- 11.3 移除WATCH/MULTI/EXEC事务
- 改进自动补全程序
- 改进商品买卖程序
- 11.4 使用Lua对列表分片
- 分片列表的构成
- 元素推入
- 元素弹出
- 阻塞式弹出
助记提要
- Lua脚本载入的参数;
- Lua脚本返回值的转换;
- Lua脚本是原子操作;
- Lua脚本可以提升锁性能的原理;
- Lua脚本替代事务的原因;
- 分片列表的构成;
- 分片列表的推入、弹出操作;
11章 Lua脚本编程
Lua脚本扩展Redis + 提升Redis性能
11.1 不编写C的情况下添加新功能
当需要添加Redis没有的高级功能时,只能通过客户端编写代码,或修改Redis的C源代码实现。
Lua脚本载入Redis
脚本载入Redis需要用到SCRIPT LOAD
命令。该命令接收字符串格式的Lua脚本为,然后将脚本存储,返回被存储脚本的SHA1校验和。
之后只要调用EVALSHA
命令,输入脚本的SHA1校验和,就能调用存储的脚本。
def script_load(script):# 已载入脚本的SHA1校验和,便于后续调用sha = [None]def call(conn, keys=[], args=[], force_eval=False):if not force_eval:if not sha[0]:# 如果未载入,就载入脚本sha[0] = conn.execute_command("SCRIPT", "LOAD", script, parse="LOAD")try:# 使用缓存的SHA1校验和执行命令return conn.execute_command("EVALSHA", sha[0], len(keys), *(keys+args))except redis.exceptions.ResponseError as msg:# 抛出脚本缺失之外的异常if not msg.args[0].startswith("NOSCRIPT"):raise# 脚本错误或要求强制执行时,使用EVAL命令执行指定脚本。# EVAl也会把脚本缓存起来,并且产生的SHA1校验和跟EVALSHA相同return conn.execute_command("EVAL", script, len(keys), *(keys+args))# call函数在被调用时会自动载入和执行脚本return call
-
call的参数
keys表示脚本可能会读取或写入的键。软件层在有需要的时候,可能会检查这些键是否位于同一分片里。有自动分片功能的Redis集群在执行脚本前,也会对脚本将要访问的键进行检查,如果它们不是位于同一个服务器里面,Redis就会返回错误。脚本在尝试对不可用的键进行读取或写入时,Redis集群可以拒绝。
args是脚本内部使用的其他参数组成的列表。
force_eval当需要在流水线和事务中执行脚本时,会很有用。 -
脚本缺失
脚本缺失是指函数存了SHA1校验和,但是服务器却没有存储对应的脚本。
在服务器重启或使用SCRIPT FLUSH
命令清空脚本缓存,或提供不同的Redis服务器连接时都会出现脚本缺失。
Python的redis-py项目已经提供了脚本载入程序,也是类似的实现。但是自制的脚本载入程序处理分片网络连接的时候更灵活。
Python对Lua脚本返回值的转换
Lua脚本有些数据类型在传给Python后会做相应的修改:
Lua值 | 转换为Python值 |
---|---|
true | 1 |
false | None |
nil | 不转换,使脚本停止返回Lua表格中的剩余值 |
1.5或其他浮点数 | 舍弃小数部分,转换为整数 |
1e30或其他巨大的浮点数 | 转换为当前Python的最小整数 |
strings | 不变 |
1或介于±2^53-1的整数 | 不变 |
这种转换可能会导致脚本返回模糊不清的结果,因此最好显式的返回字符串,然后手动分析。 |
Lua脚本是原子操作
Redis一次只会执行一个命令,每个单独的命令都是原子的。
MULTI/EXEC命令组成的事务是原子的,EVAL和EVALSHA两个命令也是原子的,执行时不会受到其他命令的干扰。
- 停止正在运行的Lua脚本
EVAL和EVALSHA执行的Lua脚本可能永远不会返回值,导致其他客户端无法正常执行命令。
对于不执行写命令的脚本,用户可以在脚本运行时间超过lua-time-limit
配置项指定的时间后,执行SCRIPT KILL
命令杀死正在运行的脚本。
但是已经执行了写入的Lua脚本,杀死脚本可能会造成数据不一致的状态。用户只能使用SHUTDOWN NOSAVE
命令停止Redis服务器,Redis丢掉最近一次创建快照后的变化。
因此,Lua脚本必须测试后才可以进入生产环境。
使用Lua脚本创建新的状态消息
之前实现的状态发布操作:
def create_status(conn, uid, message, **data):pipeline = conn.pipeline(True)pipeline.hget('user:%s' % uid, 'login')# 新建消息idpipeline.incr('status:id:')login, id = pipeline.execute()# 验证账号存在if not login:return Nonedata.update({'message': message, 'posted': time.time(),'id': id, 'uid': uid, 'login': login,})pipeline.hmset('status:%s', % id, data)pipeline.hincrby('user:%s' % uid, 'posts')pipeline.execute()return id
使用Lua脚本改写之前的创建消息的函数:
def create_status(conn, uid, message, **data):args = ['message': message,'posted': time.time(),'uid': uid,]for key, value in data.iteritems():args.append(key)args.append(value)return create_status_lua(conn, ['user:%s' % uid, 'status:id:'], args)create_status_lua = script_load("""-- 根据用户ID获取用户名。Lua的表格(列表)索引是从1开始的。local login = redis.call('hget', keys[1], 'login')-- 未登录不执行后续操作if not login thenreturn falseend-- 获取新的状态消息IDlocal id = redis.call('incr', KEYS[2])local key = string.format('status:%s', id)-- 设置状态消息的数据redis.call('hmset', key, 'login', login, 'id', id, unpack(ARGV))redis.call('hincrby', KEYS[1], 'posts', 1)return id
""")
新的消息发布操作第一次执行前需要载入Lua脚本,之后直接调用载入的脚本即可。
新的函数和之前操作是完全一样的,但是每次发布状态的通信次数从两次变为1次。对于大多程序来说,多次通信会花费不必要的时间,甚至造成WATCH/MULTI/EXEC事务冲突。
上面的Lua脚本中,对一个没被传入keys参数的散列进行了写入,这种行为使脚本无法兼容Redis集群。
11.2 用Lua重写锁和信号量
高流量场景下,使用锁和信号量可以减少WATCH/MULTI/EXEC事务带来的竞争问题。
但是获取和释放锁至少会做两次的通信,并且锁本身也可能出现冲突。
使用Lua实现锁的原因
- 脚本如果对未包含在KEYS参数中的键进行了读取或写入,可能会在程序迁移到Redis集群时不兼容。
- 处理Redis数据时,程序可能会需要一些无法在最开始的调用中取到的数据。不加锁的话,多个同时取数到Redis的操作会有更多额外消耗,还可能导致新数据被旧数据覆盖。
Lua重写锁
之前的获取锁的操作。粗腰处理各种失败和重试的情况。
def acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10):identifier = str(uuid.uuid4())lockname = 'lock:' + locknamelock_timeout = int(math.cell(lock_timeout))end = time.time() + acquire_timeoutwhile time.time() < end:if conn.setnx(lockname, identifier):# 获取锁并设置过期时间conn.expire(lockname, lock_timeout)return identifierelif not conn.ttl(lockname):# 检查过期时间,有需要就更新conn.expire(lockname, lock_timeout)time.sleep(.001)return False
改写获取锁的操作
def acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10):identifier = str(uuid.uuid4())lockname = 'lock:' + locknamelock_timeout = int(math.cell(lock_timeout))acquired = Falseend = time.time() + acquire_timeoutwhile time.time() < end and not acquired:# 实际的锁获取操作acquired = acquire_lock_with_timeout_lua(conn, [lockname], [lock_timeout, identifier]) == 'OK'time.sleep(.001 * (not acquired))return acquired and identifieracquire_lock_with_timeout_lua = script_load("""
-- 检测锁是否已经存在
if redis.call('exists', KEYS[1]) == 0 then-- 使用过期时间和标识符设置键return redis.call('setex', KEYS[1], unpack(ARGV))
end
""")
之前的获取锁使用SETNX命令,这里用SETEX,确保客户端获取的锁总有过期时间。
def release_lock(conn, lockname, identifier):lockname = 'lock:' + locknamereturn release_lock_lua(conn, [lockname], [identifier])release_lock_lua = script_load("""
-- 检查锁是否匹配
if redis.call('get', KEYS[1]) == ARGV[1] then-- 删除锁,并确保脚本总是返回真值return redis.call('del', KEYS[1]) or True
end
""")
Lua版本的加锁和释放锁的操作不需要执行WATCH/MULTI/EXEC步骤,并且减少了通信往返次数,因此实际性能比原版的要好。并且代码更加简洁。
Lua实现计数信号量
之前实现的获取信号量的函数
def acquire_semaphore(conn, semname, limit, timeout=10):identifier = str(uuid,uuid4())now - time.time()pipeline = conn.pipeline(True)# 清理过期信号量pipeline.zremrangebyscore(semname, '-inf', now-timeout)# 尝试获取信号量pipeline.zadd(semname, identifier, now)# 检查是否成功获取pipeline.zrank(semname, identifier)if pipeline.execute()[-1] < limit:return identifier# 获取失败,删除添加的标识符conn.zrem(semname, identifier)return None
Lua重写获取信号量的函数
def acquire_semaphore(conn, semname, limit, timeout=10):# 取当前时间,用来处理超时信号量now = time.time()return acquire_semaphore_lua(conn, [semname], [now-timeout, limit, now, str(uuid.uuid4())])acquire_semaphore_lua = script_load("""
-- 清除过期的信号量
redis.call('zremrangebyscore', KEYS[1], '-inf', ARGV[1])-- 检查是否有剩余信号量可用
if redis.call('zcard', KEYS[1] < tonumber(ARGV[2])) then-- 时间戳记录在超时有序集合redis.call('zadd', KEYS[1], ARGV[3], ARGV[4])return ARGV[4]
end
""")
Lua实现的信号量获取操作,不需要计数器和信号量拥有者集合,因为第一个执行脚本的客户端就是获取信号量的客户端。也不需要使用锁、ZINTERSTORE、ZRANGEBYRANK,所以运行速度变快了很多。
Lua实现信号量刷新函数
def refresh_semaphore(conn, semname, identifier):# 信号量没被刷新,Lua脚本返回空值,Python会将它转为Nonereturn refresh_semaphore_lua(conn, [semaname], [identifier, time.time()]) != Nonerefresh_semaphore_lua = script_load("""
-- 信号量存在的话,更新时间戳
if redis.call('zscore', KEYS[1], ARGV[1]) thenreturn redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) or true
end
""")
11.3 移除WATCH/MULTI/EXEC事务
由WATCH、MULTI和EXEC组成的事务,在只有少数客户端尝试修改被WATCH监视的数据时,事务可以没有冲突或重试的情况下完成。
但是如果操作需要多次通信往返、或冲突几率高,或网络延迟影响的话,客户端可能要重试很多次。
改进自动补全程序
之前的自动补全程序需要使用大量的代码来处理重试的情况
def autocomplete_on_prefix(conn, guild, prefix):start, end = find_prefix_range(prefix)identifier = str(uuid.uuid4())# 计算查找范围的起点和终点start += identifierend += identifierzset_name = 'members:' + guild# 范围的起始元素和结束元素加到有序集合conn.zadd(zset_name, start, 0, end, 0)pipeline = conn.pipeline(True)while 1:try:pipeline.watch(zset_name)# 获取被插入元素在有序集合的排名sindex = pipeline.zrank(zset_name, start)eindex = pipeline.zrank(zset_name, end)erange = min(sindex + 9, eindex - 2)pipeline.multi()# 获取范围内的值,删除之前插入的起始元素和结束元素pipeline.zrem(zset_name, start, end)pipeline.zrange(zset_name, sindex, erange)items = pipeline.execute()[-1]breakexcept redis.exceptions.WatchError:# 自动补全集合被别的客户端修改了,重试continue# 如果其他自动补全操作正在执行,就从获取到的元素里面移除起始元素和结束元素return [item for item in items if '{' not in item]
使用Lua脚本实现自动补全。减少通信往返次数,并且无须担心竞争引发的WATCH错误,更适应高并发的场景。
def autocomplete_on_prefix(conn, guild, prefix):start, end = find_prefix_range(prefix)identifier = str(uuid.uuid4())items = autocomplete_on_prefix_lua(conn, ['members:' + guild], [start + identifier, end + identifier])return [item for item in items if '{' not in item]autocomplete_on_prefix_lua = script_load("""
-- 标记起点终点的标识符加到有序集合
redis.call('zadd', KEYS[1], 0, ARGV[1], 0, ARGV[2])
-- 在有序集合找范围元素的位置
local sindex = redis.call('zrank', KEYS[1], ARGV[1])
local eindex = redis.call('zrank', KEYS[1], ARGV[2])
-- 计算想获取的元素所处的范围
eindex = math.min(sindex + 9, eindex - 2)
-- 移除范围元素
redis.call('zrem', KEYS[1], unpack(ARGV))
return redis.call('zrange', KEYS[1], sindex, eindex)
""")
改进商品买卖程序
之前的商品买卖程序,使用了锁代替事务,并且通过调整锁的粒度减少冲突。
def purchase_item_with_lock(conn, buyerid, itemid, sellerid):buyer = "users:%s" % buyeridseller = "users:%s" % selleriditem = "%s.%s" % (itemid, sellerid)inventory = "inventory:%s" % buyeridlocked = acquire_lock(conn, 'market:')if not locked:return Falsepipe = conn.pipeline(True)try:# 检查商品是否还在,买家是否钱够pipe.zscore("market:", item)pipe.hget(buyer, 'funds')price, funds = pipe.execute()if price is None or price > funds:return None# 买家付钱拿货,卖家收钱pipe.hincrby(seller, 'funds', int(price))pipe.hincrby(buyer, 'funds', int(-price))pipe.sadd(inventory, itemid)pipe.zrem("market:", item)pipe.execute()return Truefinally:# 释放锁release_lock(conn, 'market:', locked)
使用Lua脚本实现商品买卖,不需要锁,降低了通信次数。
def purchase_item(conn, buyerid, itemid, sellerid):buyer = "users:%s" % buyeridseller = "users:%s" % selleriditem = "%s.%s" % (itemid, sellerid)inventory = "inventory:%s" % buyeridreturn purchase_item_lua(conn, ['market:', buyer, seller, inventory], [item, itemid])purchase_item_lua = script_load("""
-- 商品价格和买家钱数
local price = tonumber(redis.call('zscore', KEYS[1], ARGV[1]))
local funds = tonumber(redis.call('hget', KEYS[2], 'funds'))-- 商品在售,且买家钱够
if price and funds and funds >= price thenredis.call('hincrby', KEYS[3], 'funds', price)redis.call('hincrby', KEYS[2], 'funds', -price)redis.call('sadd', KEYS[4], ARGV[2])redis.call('zrem', KEYS[1], ARGV[1])return true
end
""")
Lua脚本可以带来性能提升,但是只能访问脚本之内或Redis数据库内的数据。而锁或WATCH/MULTI/EXEC事务没有这个限制。
11.4 使用Lua对列表分片
散列、集合和字符串做分片可以降低内存占用(分片后可以使用短结构)。
有序集合做分片能扩展搜索索引的大小,提升搜索操作的性能。
列表可以通过Lua脚本实现分片。分片列表支持两端的推入操作,和阻塞、非阻塞的弹出操作。
分片列表的所有操作命令都能使用WATCH/MULTI/EXEC事务实现,但是由于这些列表操作不仅需要同时对多个键进行处理,还需要对一些事务相关的结构进行处理。所以不适合在事务冲突较多出现的情况。这种情况加锁只能一定程度上减轻问题,只有Lua才能显著提升性能。
分片列表的构成
说明 | 数据结构 | 键名 | 内容 |
---|---|---|---|
记录第一个分片ID | 字符串 | 列表名:first | 第一个分片的ID |
记录最后一个分片ID | 字符串 | 列表名:last | 最后一个分片的ID |
列表的分片 | 列表 | 列表名:分片ID | 列表内的数据 |
为了能对分片列表的两端执行推入操作和弹出操作,除了需要存储各个分片外,还需要记录第一个分片和最后一个分片的ID。分片列表为空时,这两个字符串存储的ID是相同的。
分片ID按顺序进行分配。总是左边小,右边大。如果是右进左出,新的分片ID会越来越大,如果是左进右出,新的分片ID会越来越小。
位于分片列表两端的列表可能是未被填满的,但是位于两端之间的列表总是被填满的。这样可以快速地计算分片列表的总长度。
元素推入
def sharded_push_helper(conn, key, *items, **kwargs):# 元素转列表items = list(items)total = 0# 存在元素需要推入,则使用Lua脚本把元素推入分片列表while items:# 每次推入64个元素,可以根据压缩列表的最大长度调整这个值pushed = sharded_push_lua(conn, [key+':', key+':first', key+':last'], [kwargs['cmd']] + items[:64])# 统计被推入的元素数量total += pushed# 移除已推入的元素del items[:pushed]return totaldef sharded_lpush(conn, key, *items):return sharded_push_helper(conn, key, *items, cmd='lpush')def sharded_rpush(conn, key, *items):return sharded_push_helper(conn, key, *items, cmd='rpush')sharded_push_lua = script_load("""
-- 确定每个列表分片的最大长度
local max = tonumber(redis.call('config', 'get', 'list-max-ziplist-entries')[2])
-- 没有元素需要推入或压缩列表的最大程度太小
if #ARGV < 2 or max < 2 then return 0 endlocal skey = ARGV[1] == 'lpush' and KEYS[2] or KEYS[3]
local shard = redis.call('get', skey) or '0'while 1 do-- 取分片的当前长度local current = tonumber(redis.call('llen', KEYS[1]..shard))-- 不超过上限的情况下,这个分片允许推入的元素数量-- 减1,是保留一个节点的空间,方便后续的阻塞弹出操作local topush = math.min(#ARGV - 1, max - current - 1)if topush > 0 then-- 满足限制条件时,向列表推入尽可能多的元素redis.call(ARGV[1], KEYS[1]..shard, UNPACK(ARGV, 2, topush+1))return topushend-- 当前分片已满,生成新分片,继续推入shard = redis.call(ARGV[1] == 'lpush' and 'decr' or 'incr', skey)
end
""")
元素推入的时候,并不清楚是否有客户端在进行阻塞弹出操作,因此推入大量数据时,需要将数据分拆,然后进行多次推入。可以根据自身的压缩列表最大长度来调整每次推入元素的数量。
由于Lua脚本不能提前知道元素会被推入到哪个分片,所以无法在KEYS中记录修改的键。因此上述实现只能在单个Redis服务器上使用。
Lua脚本中的While循环,最多会进行两次,第一次发现分片被填满,第二次把元素推到新的分片里面。
元素弹出
Lua脚本实现元素弹出,需要处理弹出端分片为空的情况,先判断是当前端的分片为空还是整个列表都为空。如果是当前端为空,需要调整弹出分片的位置。
def sharded_lpop(conn, key):return sharded_list_pop_lua(conn, [key+':', key+':first', key+':last'], ['lpop'])def sharded_rpop(conn, key):return sharded_list_pop_lua(conn, [key+':', key+':first', key+':last'], ['rpop'])sharded_list_pop_lua = script_load("""
-- 需要执行弹出操作的分片
local skey = ARGV[1] == 'lpop' and KEYS[2] or KEYS[3]
-- 不需要执行弹出的分片
local okey = ARGV[1] == 'lpop' and KEYS[2] or KEYS[3]
-- 需要执行弹出操作的分片ID
local shard = redis.call('get', skey) or '0'-- 弹出一个元素
local ret = redis.call(ARGV[1], KEYS[1]..shard)
-- 空分片没有弹出元素,或者弹出后分片变空
if not ret or redis.call('llen', KEYS[1]...shard) == '0' then-- 不需要执行弹出的分片IDlocal oshard = redis.call('get', okey) or '0'-- 分片列表的两端ID相同,说明整个列表是空的if shard == oshard thenreturn retend-- 根据弹出元素来自左端还是右端,确定分片的ID该增加还是减少local cmd = ARGV[1] == 'lpop' and 'incr' or 'decr'-- 调整分片端点shard = redis.call(cmd, skey)-- 没有取出元素,则对新分片进行弹出if not ret thenret = redis.call(ARGV[1], KEYS[1]..shard)end
end
return ret
""")
阻塞式弹出
阻塞式弹出表示一直重试直到获取到元素。
-
程序流程
先通过非阻塞弹出操作获取元素,如果成功获取,就完成操作。如果获取失败,就循环执行一些步骤,直到成功取得元素或达到用户指定的时限为止。由于Lua脚本和WATCH/MULTI/EXEC事务可能产生不正确的数据,所以在不需要实际阻塞客户端等待请求的情况下,程序应该尽可能地使用分片列表的非阻塞操作。
-
端点发生变化的处理
由于通信往返存在延迟,程序获取分片列表的两个端点,到程序尝试执行弹出操作期间,列表的端点可能已经变化。
为了处理这个问题,在阻塞弹出之前,先在流水线里进行一次EVAL脚本调用。这个脚本会检查程序是否在从正确的分片中弹出元素。如果是错误的分片,脚本会向那个列表推入一个额外的伪元素,它会被之后的阻塞弹出操作弹出。 -
处理竞争条件
Lua脚本被执行后,阻塞弹出操作被执行之前,如果有另一个客户端也处于这个状态,并同一个分片做了推入或弹出操作,程序就可能获取不正确的数据(新推入),或者阻塞在错误的分片上(刚变空取不到)。为了防止程序被阻塞在错误的分片上,最好限定最大阻塞时间。
对于阻塞弹出取到的数据并非来自列表两端分片的问题,程序会基于这个假设进行操作:数据在两个非事务流水线调用之间到达,就当其是正确的。为了实现阻塞弹出操作,不能使用WATCH/MULTI/EXEC事务来消除竞争。因为阻塞的BLPOP或BRPOP在遇上空列表的时候,会因为事务而不允许其他客户端添加元素,导致服务器一直被阻塞。
# 预先定义的伪元素
DUMMY = str(uuid.uuid4())# 负责循环尝试获取元素的辅助函数
def sharded_bpop_helper(conn, key, timeout, pop, bpop, endp, push):# 流水线对象和超时信息pipe = conn.pipeline(False)timeout = max(timeout, 0) or 2**64end = time.time() + timeoutwhile time.time() < end:# 执行一次非阻塞式弹出,如果成功取得弹出值并且这个值不是伪元素,就返回result = pop(conn, key)if result not in (None, DUMMY):return result# 需要执行弹出操作的分片shard = conn.get(key + endp) or '0'# Lua脚本会在程序尝试从错误的分片里弹出元素时,把伪元素推入这个分片sharded_bpop_helper_lua(pipe, [key+':', key + endp], [shard, push, DUMMY], force_eval=True)# 使用用户传入的BLPOP或BRPOP,执行阻塞式弹出操作getattr(pipe, bpop)(key + ':' + shard, 1)# 返回元素则执行完毕,否则重试result = (pipe.execute()[-1] or [None])[-1]if result not in (None, DUMMY):return result# 用户实际使用的API
def sharded_blpop(conn, key, timeout=0):return sharded_bpop_helper(conn, key, timeout, sharded_lpop, 'blpop', ':first', 'lpush')def sharded_brpop(conn, key, timeout=0):return sharded_bpop_helper(conn, key, timeout, sharded_lpop, 'brpop', ':first', 'rpush')# 处理阻塞
sharded_bpop_helper_lua = script_load("""
-- 找到需要执行弹出操作的列表端,取得这一端的分片
local shard = redis.call('get', KEYS[2]) or '0'
-- 如果程序传入的分片不是当前的端点分片
if shard ~= ARGV[1] then-- 把伪元素推入这个分片redis.call(ARGV[2], KEYS[1]..ARGV[1], ARGV[3])
end
""")
由于不能在流水线里执行可能会失败的EVALSHA调用。所以这里需要使
force_eval=True
确保使用的是EVAL。