欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > note-Redis实战4 核心-构建支持程序

note-Redis实战4 核心-构建支持程序

2025/4/20 5:17:37 来源:https://blog.csdn.net/qq_42783188/article/details/142147292  浏览:    关键词:note-Redis实战4 核心-构建支持程序
助记提要
  1. Linux系统记录日志的方式 2个
  2. 使用Redis记录最新日志和常见日志
  3. 计数器的用处
  4. Redis实现计数器,更新、获取和清理
  5. Redis存储统计数据,更新和获取
  6. 统计数据的简化处理
  7. 实现IP所属地查找 3步(IP转换+两个映射)
  8. 用Redis存储配置信息
  9. 用装饰器实现自动Redis连接管理

5章 构建支持程序

  • Redis实现日志、计数器、IP所属地查找程序、服务配置程序

5.1 使用Redis来记录日志

日志的作用:诊断系统问题;发现潜在的问题;挖掘用户信息;

Linux记录日志的两种方法
  1. 文件记录
    随时间把日志添加到文件里。一段时间后创建新的日志文件。

每个服务创建不同的日志,这些服务轮换日志的机制不同。缺少能方便地聚合所有日志进行处理的方式。

  1. syslog服务
    syslog服务接受其它程序发来的日志,然后将这些消息存在硬盘的各个日志文件里。
    syslog还负责旧日志的轮换和删除工作。
    通过配置,syslog能把日志消息转发给其他服务来做进一步处理。

可以把系统的syslog守护进程替换为syslog-ng,因为它的配置语言更简单易用。

用Redis记录最新日志
# 日志的安全级别映射为简单的字符串
SEVERITY = {logging.DEBUG: 'debug',logging.INFO: 'info',logging.WARNING: 'warning',logging.ERROR: 'error',logging.CRITICAL: 'critical',
}
SEVERITY.update((name, name) for name in SEVERITY.values())def log_recent(conn, name, message, severity=logging.INFO, pipe=None):severity = str(SEVERITY.get(severity, severity)).lower()# 创建键,把不同级别的消息存在不同的列表里destination = 'recent:%s:%s' % (name, severity)# 把当前时间记录在消息前面message = time.asctime() + ' ' + messagepipe = pipe or conn.pipeline()# 消息加到日志列表最前面pipe.lpush(destination, message)# 日志列表只保存最新的100条消息pipe.ltrim(destination, 0, 99)pipe.execute()

之后可以使用LRANGE命令查看记录的日志消息。

用Redis记录常见日志

仅仅记录消息,无法得知哪些消息是重要的。
可以记录特定消息的频率,并根据频率来决定消息的排列顺序。

具体做法。把消息当做成员存到有序集合里面,消息出现的次数是成员的分值。程序每个小时轮换一次。在轮换日志时对上一个小时常见日志的有序集合改名,然后更新记录当前小时数的键,保留上一个小时的常见日志。

def log_common(conn, name, message, severity=logging.INFO, timeout=5):severity = str(SEVERITY.get(severity, severity)).lower()# 创建键,用来存储近期的常见日志消息destination = 'common:%s:%s' % (name, severity)# 每小时需要轮换一次日志,用键来记录常见日志列表记录的小时数start_key = destination + ':start'pipe = conn.pipeline()end = time.time() + timeoutwhile time.time() < end:try:# 监视当前小时数,确保轮换操作正常执行pipe.watch(start_key)# 当前时间now = datetime.utcnow().timetuple()# 当前小时数hour_start = datetime(*now[:4]).isoformat()existing = pipe.get(start_key)pipe.multi()# 如果常见日志列表存的是上一个小时的日志if existing and existing < hour_start:# 归档旧的常见日志数据,更新小时数记录pipe.rename(destination, destination + ':last')pipe.rename(start_key, destination + ':pstart')pipe.set(start_key, hour_start)elif not existing:pipe.set(start_key, hour_start)# 对记录日志出现次数的计数器执行自增pipe.zincrby(destination, message)# 记录最新日志并执行execute函数log_recent(pipe, name, message, severity, pipe)returnexcept redis.exceptions.WacthError:continue

5.2 实现计数器

计数的作用
  1. 通过访问计数,决定如何缓存页面
  2. 统计整体流量增量状况,判断是否需要升级服务器
  3. 监控程序访问量,了解运营行为对流量的影响
数据结构
说明数据结构名称内容备注
计数信息散列count:计数器精度:计数类型键是每个时间段的开始时间戳,值是数量点击量、销量、查库次数等
计数器信息有序集合known:成员是计数器的精度和名字;值是0为了清理计数器的旧数据而做的记录

有序集合的分值相等时,会按照成员名称排序。

更新和取数据

更新计数器

# 计数器时间精度
PRECISION = [1, 5, 60, 300, 3600, 18000, 86400]def update_counter(conn, name, count=1, now=None):now = now or time.time()pipe = conn.pipeline()# 每一种精度都对应一个计数器for prec in PRECISION:# 当前时间片的开始时间pnow = int(now / prec) * prechash = '%s:%s' % (prec, name)pipe.zadd('known:', hash, 0)# 更新给定名字和精度的计数器pipe.zincrby('count:' + hash, pnow, count)pipe.execute()

指定精度和名字取计数器里的数据

def get_counter(conn, name, precision):hash = "%s:%s" % (precision, name)data = conn.hgetall('count:' + hash)to_return = []for key, value in data.iteritems():# 转换为整型to_return.append((int(key), int(value)))# 排序,按时间把旧的数据样本排前面to_return.sort()return to_return
清理旧计数器

所有计数器都记录在有序集合里面,因此只要遍历有序集合,将旧计数器删掉就行。

需要注意的点:

  • 任何时候都可能有新的计数器添加进来
  • 同时存在多个清理操作的情况
  • 更新频率极低的计数器不需要频繁清理
  • 空的计数器不需要清理
# 需要是守护进程
def clean_counters(conn):pipe = conn.pipeline()# 清理操作执行的次数passes = 0while not QUIT:# 清理操作开始执行的时间,用于计算执行清理的时长strat = time.time()index = 0while index < conn.zcard('known:'):hash = conn.zrange('known:', index, index)index += 1if not hash:breakhash = hash[0]# 取计数器精度,prec = int(hash.partition(':')[0])# 按更新频率确定需不需要清理# 1分钟内会有更新的,每分钟清理一次;# 更新频率低于每分钟1次的,清理频率等于更新频率bprec = int(prec // 60) or 1if passes % bprec:continuehkey = 'count:' + hash# 根据精度和保留的样本数量,计算需要保留哪个时间点之后的数据cutoff = time.time() - SAMPLE_COUNT * prec# 所有样本的记录时间samples = map(int, conn.hkeys(hkey))samples.sort()# 需要删除的样本数量remove = bisect.bisect_right(samples, cutoff)if remove:# 删除旧数据conn.hdel(hkey, *samples[:remove])# 计数器可能被清空if remove == len(samples):try:pipe.watch(hkey)# 计数器确定是空的,就删掉该计数器的记录if not pipe.hlen(hkey):pipe.multi()pipe.zrem('known:', hash)pipe.execute()# 删掉计数器的话,索引不需要变index -= 1else:pipe.unwatch()except redis.exceptions.WacthError:passpasses += 1duration = min(int(time.time() - start) + 1, 60)# 如果执行循环不够60s,差的时间就休眠;超过60s,则休眠1stime.sleep(max(60 - duration, 1))

5.3 存储统计数据

页面的点击量计数,可以用来判断是否需要对页面进行缓存。
综合统计数据可以用来判断哪些地方需要优化。

数据存储
说明数据结构名称内容备注
统计数据有序集合stats:页面:统计类型成员是聚合统计项,min、max、sumsq、sum、count,值为统计值使用有序集合是为了方便和其他有序集合做并集计算,筛选元素
更新统计数据
def update_stats(conn, context, type, value, timeout=5):# context表示被统计的页面;type是统计类型,点击数、访问数等destination = 'stats:%s:%s' % (context, type)start_key = destination + ':start'pipe = conn.pipeline()end = time.time() + timeoutwhile time.time() < end:try:# 处理上一个小时的数据pipe.watch(start_key)now = datetime.utcnow().timetuple()hour_start = datetime(*now[:4]).isoformat()existing = pipe.get(start_key)pipe.multi()if existing and existing < hour_start:pipe.rename(destination, destination + ':last')pipe.rename(start_key, destination + ':pstart')pipe.set(start_key, hour_start)# 使用临时键记录值tkey1 = str(uuid.uuid4())tkey2 = str(uuid.uuid4())pipe.zadd(tkey1, 'min', value)pipe.zadd(tkey2, 'max', value)pipe.zunionstore(destination, [destination, tkey1], aggregate='min')pipe.zunionstore(destination, [destination, tkey2], aggregate='max')pipe.delete(tkey1, tkey2)pipe.zincrby(destination, 'count')pipe.zincrby(destination, 'sum', value)pipe.zincrby(destination, 'sumsq', value*value)return pipe.execute()[-3:]except redis.exceptions.WacthError:continue
取数据
def get_stats(conn, context, type):key = 'stats:%s:%s' % (context, type)data = dict(conn.zrange(key, 0, -1, withscoress=True))data['average'] = data['sum'] / data['count']# 计算标准差numerator = data['sumsq'] - data['sum'] **2 / data['count']data['stddev'] = (numerator / (data['count'] - 1 or 1)) ** .5return data
简化统计数据的记录和发现

需要从一堆统计数据中发现生成速度较慢,或是比以往慢的网页。

记录页面的访问时长,必须在页面被访问时进行统计。python的上下文管理器可以包裹起需要计算并记录访问时长的代码。

# 将这个生成器当做上下文管理器
@contextlib.contextmanager
def access_time(conn, context):start = time.time()# 运行被包裹的代码块yielddelta = time.time() - start# 更新统计数据stats = update_stats(conn, context, 'AccessTime', delta)average = stats[1] / stats[0]pipe = conn.pipeline(True)# 页面的平均访问时长添加到最长访问时间的有序集合pipe.zadd('slowest:AccessTime', context, average)# 仅保留最慢的100条记录pipe.zremrangebyrank('slowest:AccessTime', 0, -101)pipe.execute()

在视图中使用上下文管理器:

# 视图的参数callback是生成页面内容的回调函数
def process_view(conn, callback):# 上下文管理器通过with包围代码块with access_time(conn, request.path):# yield执行之后,该语句会执行return callback()

5.4 查找IP所属城市以及国家

Redis实现IP所属地查找的理由
  1. 运行速度快
  2. 对用户进行定位需要的信息量较大,载入本地查找表会降低应用程序的启动速度。
数据存储

实现IP所属地查询需要用到两个表,第一个表通过IP地址查找对应的城市的ID,第二个表根据城市ID查找对应城市的信息。

https://dev.maxmind.com/geoip 网站提供免费使用的IP所属城市数据库数据。它提供了两个重要的文件:GeoLiteCity-Blocks.csv,记录IP地址段和所属城市ID;GeoLiteCity-Location.csv,记录城市ID与城市名、地区名/州名/省名、国家名等信息。
原书引用的两个文件当前可能过期或更新。

说明数据结构名称内容
城市ID对应IP有序集合ip2cityid:成员是具体城市ID,分值是根据IP地址计算出的整数值
城市ID对应城市信息散列cityid2city:键为城市ID,值为json格式的城市、地区、国家列表
将位置信息导入Redis

创建IP和城市ID的映射,需要先将点分十进制的IP地址转为整数分值。

def ip_to_score(ip_address):# 将IP转换为整数值score = 0for v in ip_address.split('.'):score = score * 256 + int(v, 10)return score

建立城市ID对应IP分值的有序集合。

def import_ips_to_redis(conn, filename):# filename是GeoLiteCity-Blocks.csv文件的路径csv_file = csv.reader(open(filename, 'rb'))for count, row in enumerate(csv_file):start_ip = row[0] if row else ''if 'i' in start_ip.lower():continueif '.' in start_ip:start_ip = ip_to_score(start_ip)elif start_ip.isdigit():start_ip = int(start_ip, 10)else:# 略过文件首行和格式错误的条目continue# 城市ID后面加上已有城市数目,构建唯一的城市ID,允许多个IP地址映射到同一城市city_id = row[2] + '_' + str(count)conn.zadd('ip2cityid:', city_id, start_ip)

把城市ID对应城市信息的映射加到Redis里面

def import_cities_to_redis(conn, filename):for row in csv.reader(open(filename, 'rb')):if len(row) < 4 or not row[0].isdigit():continuerow = [i.decode('latin-1') for i in row]city_id = row[0]country = row[1]region = row[2]city = row[3]conn.hget('cityid2city:', city_id, json.dumps([city, region, country]))
查找IP所属城市
def find_city_by_ip(conn, ip_address):if isinstance(ip_address, str):ip_address = ip_to_score(ip_address)# 查找唯一城市IDcity_id = conn.zrevrangebyscore('ip2cityid:', ip_address, 0, start=0, num=1)if not city_id:return None# 唯一城市ID的后缀去掉city_id = city_id[0].partition('_')[0]return json.loads(conn.hget('cityid2city:', city_id))

zrevrangebyscore表示按分值逆序排序,取ip_address到0之间,从第start位开始的num个值。

5.5 服务的发现与配置

使用更多的Redis和其他服务后,存储和管理这些服务器的各种配置信息会很麻烦。
可以把配置信息放到Redis里面,使应用程序自己完成大部分配置工作。

Redis存储配置信息的优点

每次更新配置,都需要将新的配置文件推送到所有的服务器上去。收到更新的服务器可能需要重新载入配置,甚至重启应用程序服务。

与其为不断增多的服务写入和维护配置文件,不如直接把配置写入Redis里面,并编写应用程序获取这些信息。这样不需要推送配置信息,服务器也不需要通过重载配置文件的方式来更新配置信息了。

实例:在Redis存一个表示服务器正在维护的配置信息。
写一个函数插入应用程序对应位置上,检测到服务器正在维护是,应用程序向用户显示维护页面;没在维护,就正常展示。

# 设为全局变量,便于之后进行写入和使用
LAST_CHECKED = None
IS_UNDER_MAINTENANCE = Falsedef is_under_maintenance(conn):global LAST_CHECKED, IS_UNDER_MAINTENANCE# 距离上次检查超过1s才做更新,避免用户不停刷新导致负载加大if LAST_CHECKED < time.time() - 1:# 更新检查时间LAST_CHECKED = time.time()# 检查是否在维护IS_UNDER_MAINTENANCE = bool(conn.get('is-under-maintenance'))return IS_UNDER_MAINTENANCE
为每个独立的部分配置Redis服务

应用程序发展到后期,只使用一台Redis将不能满足需求。
为了便于从单台服务器过渡到多台服务器,最好为应用程序的每个独立部分都分别运行一个Redis服务器,如一个专门记录日志、一个专门统计、一个专门存cookie等。或者这些功能分别使用不同的库。

这些对应不同功能的Redis服务器的配置信息,可以由一个专门的Redis进行管理。
通过这个服务器存储的配置信息,可以连接不同组件和为其提供数据的Redis服务器。

在Redis服务器设置配置值:

def set_config(conn, type, component, config):# config:服务的类型:使用该服务器的应用程序conn.set('config:%s:%s' % (type, component), json.dumps(config)

获取某个组件的配置信息

CONFIGS = {}
CHECKED = {}def get_config(conn, type, component, wait=1):key = 'config:%s:%s' % (type, component)# 检查配置信息是否需要更新if CHECKED.get(key) < time.time() - wait:# 记录更新时间CHECKED[key] = time.time()config = json.loads(conn.get(key) or '{}')config = dict((str(k), config[k]) for k in config)# 当前正在使用的配置old_config = CONFIGS.get(key)# 配置出现变化就更新if config != old_config:CONFIGS[key] = configreturn CONFIGS.get(key)
自动Redis连接管理

应用程序连接不同的Redis服务需要获取配置、创建连接,使用完后再关闭连接。

创建一个装饰器,负责连接除了配置服务器之外的所有Redis服务器。

REDIS_CONNECTIONS = {}# component是应用程序组件的名字
def redis_connection(component, wait=1):key = 'config:redis:' + componentdef wrapper(function):# 被包裹函数的元数据复制给配置处理器# call函数具体管理连接信息@functools.wraps(function):def call(*args, **kwargs):# 获取旧配置old_config = CONFIGS.get(key, object())# 获取新配置,config_connection用于连接到配置服务器_config = get_config(config_connection, 'redis', component, wait)config = {}for k, v in _config.iteritems():config[k.encode('utf-8')] = v# 使用可用的配置创建Redis连接if config != old_config:REDIS_CONNECTIONS[key] = redis.Redis(**config)# 把对应的连接传给被包裹函数,并执行return function(REDIS_CONNECTIONS.get(key), *args, **kwargs)return callreturn wrapper

使用装饰器

# 原函数不需要发生变化
@redis_connection('logs')
def log_recent(conn, app, message):...# 使用时不需要手动传递Redis日志服务器的连接了
log_recent('main', 'Uesr 235 logged in')

Redis管理配置信息、Redis自动连接也能用到其他服务器和服务上。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词