欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 焦点 > 缓存设计模式

缓存设计模式

2025/3/31 9:53:06 来源:https://blog.csdn.net/m0_50920058/article/details/146455891  浏览:    关键词:缓存设计模式

缓存设计模式(Cache Design Pattern)是一种用于存储和管理频繁访问数据的技术,旨在提高系统性能、降低数据库或后端服务的负载,并减少数据访问延迟。以下是几种常见的缓存设计模式,并用 Python + Redis 进行示例代码实现:


📌 1. Cache Aside(旁路缓存)

🌟 适用场景:

  • 适用于读多写少的场景,如商品详情、用户资料等。
  • 应用先从缓存中读取数据,缓存未命中时再查询数据库,并将数据写入缓存。

📝 逻辑流程:

  1. 先查询缓存,如果命中,则直接返回数据。
  2. 如果缓存未命中,则查询数据库,并将查询结果写入缓存。
  3. 返回数据库查询的数据。

🚀 代码示例

import redis
import timeredis_client = redis.StrictRedis(host="localhost", port=6379, decode_responses=True)def get_data_cache_aside(key):cache_key = f"cache:{key}"# Step 1: 先查询缓存data = redis_client.get(cache_key)if data:return data  # 直接返回缓存数据# Step 2: 缓存未命中,查询数据库data = query_database(key)# Step 3: 回填缓存if data:redis_client.setex(cache_key, 60, data)  # 缓存 60sreturn datadef query_database(key):"""模拟数据库查询"""time.sleep(1)  # 模拟查询时间return f"data_for_{key}"# 测试
print(get_data_cache_aside("hot_item"))

存在的问题:旁路缓存(Cache Aside)如果数据库中的数据为空(例如,数据确实不存在或者被删除了),那么每次查询都会缓存未命中,导致系统一直查数据库,这就是缓存穿透(Cache Penetration)问题。

🎯 解决方案

方案 1:缓存空值

  • 思路:如果数据库查询结果为空,则缓存一个“空值”(如 null 或特殊占位符),并设置较短的 TTL(如 5~10 秒)。
  • 好处:防止短时间内同一个 Key 反复查询数据库。
🔹 改进的旁路缓存代码
import redis
import timeredis_client = redis.StrictRedis(host="localhost", port=6379, decode_responses=True)def get_data_cache_aside(key):cache_key = f"cache:{key}"# Step 1: 查询缓存data = redis_client.get(cache_key)if data is not None:  # 即使数据为空(缓存的空值),也不会去数据库查询return None if data == "NULL" else data# Step 2: 查询数据库data = query_database(key)# Step 3: 数据为空,缓存“空值”并设置较短过期时间if data is None:redis_client.setex(cache_key, 10, "NULL")  # 10秒缓存空值,避免短时间重复查库return None# Step 4: 数据有效,回填缓存redis_client.setex(cache_key, 60, data)  # 60秒缓存数据return datadef query_database(key):"""模拟数据库查询"""time.sleep(1)  # 模拟数据库查询时间return None  # 假设数据不存在# 测试
print(get_data_cache_aside("hot_item"))  # 第一次查数据库
print(get_data_cache_aside("hot_item"))  # 直接命中缓存(返回 None,不查数据库)

效果

  • 第一次查询,数据库返回 None,缓存 "NULL" 并设置 10 秒过期。
  • 10 秒内相同的 Key 再次查询时,直接返回 None不会重复访问数据库
  • 10 秒后,缓存过期,才会重新查询数据库。

方案 2:布隆过滤器(Bloom Filter)

  • 思路:使用布隆过滤器(Bloom Filter)提前判断数据是否存在,不存在的 Key 直接返回 None,不查询数据库。
  • 适用场景:大规模 Key 查询,如用户 ID、商品 ID
🔹 代码示例
from bloom_filter import BloomFilter  # 需要安装 pip install bloom-filter2# 初始化布隆过滤器(假设有 10 万个 Key,误判率 0.01)
bloom = BloomFilter(max_elements=100000, error_rate=0.01)# 预先加入一些存在的 Key
bloom.add("valid_key_1")
bloom.add("valid_key_2")def get_data_bloom_filter(key):cache_key = f"cache:{key}"# Step 1: 先检查布隆过滤器,数据可能不存在if key not in bloom:return None  # 直接返回,不查数据库# Step 2: 查询缓存data = redis_client.get(cache_key)if data is not None:return None if data == "NULL" else data# Step 3: 查询数据库data = query_database(key)# Step 4: 缓存数据或空值if data is None:redis_client.setex(cache_key, 10, "NULL")return Noneredis_client.setex(cache_key, 60, data)return data# 测试
print(get_data_bloom_filter("invalid_key"))  # 布隆过滤器拦截,直接返回 None
print(get_data_bloom_filter("valid_key_1"))  # 查询缓存或数据库

效果

  • 布隆过滤器拦截不存在的数据,避免数据库查询
  • 误判率极低(0.01%),但不会产生误漏。

方案 3:限流 & 黑名单

  • 思路:针对异常高频查询某个 Key 的请求,可以加入限流机制黑名单,防止恶意攻击导致数据库压力过大。
🔹 代码示例
from collections import defaultdictrequest_count = defaultdict(int)def get_data_with_rate_limit(key):cache_key = f"cache:{key}"# Step 1: 统计 Key 查询次数request_count[key] += 1if request_count[key] > 100:  # 限制单个 Key 短时间内查询次数return "Too Many Requests"  # 直接拒绝请求# Step 2: 查询缓存data = redis_client.get(cache_key)if data is not None:return None if data == "NULL" else data# Step 3: 查询数据库data = query_database(key)# Step 4: 缓存数据或空值if data is None:redis_client.setex(cache_key, 10, "NULL")return Noneredis_client.setex(cache_key, 60, data)return data

效果

  • 限制某个 Key 频繁查询,避免恶意攻击导致数据库崩溃。
  • 结合 IP 级别限流,进一步防护。

📌 总结

方案适用场景优势缺点
缓存空值任何缓存穿透情况简单高效,防止短时间重复查询额外占用 Redis 空间
布隆过滤器大量 Key 查询,如用户 ID、商品 ID高效过滤无效 Key,减少数据库压力需要额外存储布隆过滤器
限流 & 黑名单恶意攻击、异常高频请求防止恶意攻击,减少数据库压力需要额外维护限流规则

🚀 推荐方案组合

  • 一般情况:✅ 缓存空值
  • 海量 Key 级别:✅ 布隆过滤器 + 缓存空值
  • 防恶意攻击:✅ 限流 & 黑名单

这样可以有效防止缓存穿透,保护数据库不被高并发打崩!🔥

📌 2. Read-Through(读穿透)

🌟 适用场景:

  • 适用于需要自动填充缓存的情况,例如 CDN、NoSQL 代理等。
  • 读请求永远只访问缓存,如果缓存未命中,由缓存层自动加载数据。

📝 逻辑流程:

  1. 读取数据时,应用程序只访问缓存。
  2. 如果缓存未命中,则缓存层自动从数据库加载数据并缓存
  3. 数据返回给用户。

🚀 代码示例

class Cache:def __init__(self):self.redis_client = redis.StrictRedis(host="localhost", port=6379, decode_responses=True)def get(self, key):"""从缓存读取数据"""data = self.redis_client.get(key)if not data:data = self.load_from_db(key)  # 由缓存层自动加载self.redis_client.setex(key, 60, data)  # 缓存 60sreturn datadef load_from_db(self, key):"""模拟数据库查询"""time.sleep(1)  # 模拟查询时间return f"data_for_{key}"# 测试
cache = Cache()
print(cache.get("hot_item"))

存在的问题:读穿透(Read-Through)同样可能会遇到类似的缓存穿透问题。如果缓存未命中且查询结果为空(例如,数据库中没有该数据),那么每次查询都会去数据库查询,造成重复查询数据库的情况。

🎯 解决方案

与旁路缓存的解决方案一致


📌 3. Write-Through(写穿透)

🌟 适用场景:

  • 适用于写多读少的情况,如用户状态、计数器等。
  • 所有写操作都会先更新缓存,然后再更新数据库,确保缓存和数据库的一致性。

📝 逻辑流程:

  1. 当数据写入时,先更新缓存,再更新数据库。
  2. 读请求仍然直接从缓存读取数据。

🚀 代码示例

class Cache:def __init__(self):self.redis_client = redis.StrictRedis(host="localhost", port=6379, decode_responses=True)def set(self, key, value):"""写入缓存 + 数据库"""self.redis_client.setex(key, 60, value)  # 先写入缓存self.write_to_db(key, value)  # 再写入数据库def get(self, key):"""读取缓存"""return self.redis_client.get(key)def write_to_db(self, key, value):"""模拟数据库写入"""print(f"数据 {key} 已写入数据库: {value}")# 测试
cache = Cache()
cache.set("user:123", "UserData")
print(cache.get("user:123"))

存在问题
一致性问题:缓存和数据库可能在短时间内处于不同步的状态。例如,如果缓存成功写入,但数据库写入失败,数据就不一致了。为了避免这种情况,你可能需要引入一些机制来确保最终一致性,如 异步写入 或 消息队列 来处理数据库更新

错误处理和重试:如果数据库写入失败,如何处理这个问题是一个关键点。可以考虑将数据库写入操作异步化,或者通过定期的任务检查数据一致性并做修复。


📌 4. Write-Behind(异步写回)

🌟 适用场景:

  • 适用于高并发写入,如日志存储、计数器等。
  • 先写入缓存,异步批量更新数据库,提高写入性能。

📝 逻辑流程:

  1. 写操作只写入缓存,不直接更新数据库。
  2. 后台异步线程定期批量同步缓存数据到数据库

🚀 代码示例

import threadingclass Cache:def __init__(self):self.redis_client = redis.StrictRedis(host="localhost", port=6379, decode_responses=True)self.batch_data = {}  # 临时存储待写入的数据def set(self, key, value):"""写入缓存"""self.redis_client.setex(key, 60, value)  # 先写入缓存self.batch_data[key] = value  # 添加到待写入数据库的批量任务def get(self, key):"""读取缓存"""return self.redis_client.get(key)def batch_write_to_db(self):"""批量写入数据库(定期执行)"""while True:if self.batch_data:for key, value in self.batch_data.items():print(f"批量写入数据库: {key} -> {value}")self.batch_data.clear()time.sleep(5)  # 每 5 秒写入一次# 启动异步写线程
cache = Cache()
threading.Thread(target=cache.batch_write_to_db, daemon=True).start()# 测试
cache.set("user:123", "UserData")
cache.set("user:124", "UserData2")
print(cache.get("user:123"))
time.sleep(6)  # 等待异步写入数据库

📌 5. 分布式锁(Mutex + Double Check)

🌟 适用场景:

  • 解决缓存击穿问题,防止多个线程同时查询数据库。
  • 适用于高并发的热点数据,如商品详情、排行榜等。

📝 逻辑流程:

  1. 先查询缓存,如果命中,则直接返回数据。
  2. 如果缓存未命中,尝试获取 Redis 互斥锁
    • 获取锁成功:查询数据库,并回填缓存,最后释放锁。
    • 获取锁失败:等待一段时间后重试,避免并发查询数据库。

🚀 代码示例

import uuiddef get_data_with_mutex(key):cache_key = f"cache:{key}"lock_key = f"lock:{key}"lock_value = str(uuid.uuid4())# 先查询缓存data = redis_client.get(cache_key)if data:return data# 尝试获取锁if redis_client.set(lock_key, lock_value, nx=True, ex=5):  try:# 再次检查缓存data = redis_client.get(cache_key)if data:return data# 查询数据库data = query_database(key)# 回填缓存redis_client.setex(cache_key, 60, data)return datafinally:if redis_client.get(lock_key) == lock_value:redis_client.delete(lock_key)else:# 其他线程等待后重试time.sleep(0.2)return get_data_with_mutex(key)# 测试
print(get_data_with_mutex("hot_item"))

🎯 总结

设计模式适用场景主要特点
Cache Aside读多写少业务代码控制缓存逻辑
Read-Through读写频繁读请求只访问缓存,自动回填
Write-Through写多读少写请求先更新缓存,再写数据库
Write-Behind高并发写先写缓存,异步批量写数据库
分布式锁缓存击穿互斥锁防止多个线程并发查询数据库

不同的缓存模式可以根据实际的业务需求和系统架构进行组合使用。组合的依据主要取决于以下几个因素:

1. 一致性要求

不同缓存模式的选择和组合通常取决于对数据一致性的要求:

  • 强一致性:如果你需要确保缓存和数据库的严格一致性,可以使用 写缓存(Write-Through Cache),确保每次数据写入时,缓存和数据库同步更新。这可以保证数据库和缓存中的数据始终一致。
  • 最终一致性:如果你可以接受数据暂时的不一致(例如稍微过期或延迟同步),可以考虑使用 写回缓存(Write-Back Cache),将数据先写入缓存,异步地将数据写入数据库,从而减少对数据库的压力。

2. 数据读取频率与缓存容量

  • 高读取频率、低写入频率:如果应用程序主要是读取数据,且数据不经常变化,可以使用 读缓存(Read-Through Cache)旁路缓存(Cache-Aside)。通过将热点数据加载到缓存中,减少对数据库的查询压力。
  • 高写入频率、低读取频率:如果写操作频繁,但读取较少(如日志系统),可以考虑 写回缓存(Write-Back Cache),这样可以减少对数据库的写入频率,提高性能。

3. 缓存失效策略

  • 过期时间(TTL):某些数据可能会在一定时间后过期,可以结合 过期缓存(TTL) 策略使用。例如,你可以在使用 旁路缓存(Cache-Aside) 时设置一个缓存过期时间,确保缓存中不存储过时数据,同时保证不频繁查询数据库。
  • 清除策略:结合 LRU(Least Recently Used) 等缓存淘汰策略,当缓存容量达到上限时,移除不常用的数据。

4. 缓存失效和更新策略

  • 缓存雪崩(Cache Avalanche):当大量缓存同时失效时,可能导致大量请求直接打到数据库上,造成压力。通过 设置不同的过期时间分布式缓存策略,可以避免缓存雪崩现象。组合使用 缓存分片(Sharded Cache)缓存预热 等方式,有助于平衡缓存更新带来的负担。

  • 缓存穿透(Cache Penetration):如果某些数据根本不在缓存中(例如查询不存在的用户信息),则每次都会访问数据库。可以通过 布隆过滤器(Bloom Filter) 或其他方式来过滤这些请求,减少数据库压力。

5. 性能与可扩展性需求

  • 分布式缓存:如果系统需要高可扩展性,可以使用 分布式缓存(Sharded Cache),将数据分片存储到多个缓存节点中,保证高并发时的缓存命中率,同时避免单个缓存节点的负载过高。
  • 异步缓存更新:可以使用消息队列或后台任务来异步更新缓存和数据库,避免在请求处理过程中进行阻塞操作。

常见的缓存模式组合

组合 1:读缓存(Read-Through Cache) + 过期缓存(TTL)
  • 适用场景:读取频繁、数据变化不大。
  • 组合原因:缓存数据可以在一段时间后过期,从而确保缓存不存储过时的数据,同时不需要每次都查询数据库。
def read_through_with_ttl(key):value = cache.get(key)if value is None:value = db.get(key)cache.set(key, value, ex=3600)  # 1 小时后过期return value
组合 2:旁路缓存(Cache-Aside) + LRU 淘汰策略
  • 适用场景:数据访问较不频繁,但需要缓存热点数据。
  • 组合原因:通过 LRU 策略确保缓存中存储的始终是最近访问的数据,而不需要每次都清理整个缓存。
def cache_aside_lru(key):value = cache.get(key)if value is None:value = db.get(key)cache.set(key, value)return value
组合 3:写回缓存(Write-Back Cache) + 过期缓存(TTL)
  • 适用场景:写入频繁,且数据库更新不需要立刻完成。
  • 组合原因:缓存首先写入,而后台异步更新数据库,同时使用缓存过期时间,确保不存储过时的数据。
def write_back_with_ttl(key, value):cache.set(key, value)  # 先写缓存write_queue.append((key, value))  # 异步写数据库cache.expire(key, 3600)  # 设置过期时间
组合 4:分片缓存(Sharded Cache) + 写回缓存(Write-Back Cache)
  • 适用场景:数据量巨大,且需要分布式缓存支持。
  • 组合原因:将缓存数据分布到不同的节点,并且使用写回缓存减少数据库访问频率,提高系统性能。

总结

缓存模式组合的依据主要取决于你的应用场景,特别是数据一致性要求、性能需求、读取/写入频率、以及缓存过期策略等因素。在实际开发中,可以根据具体的需求灵活选择或组合这些模式,以达到最佳的系统性能和数据一致性。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com