基于Python脚本实现Flink on YARN任务批量触发Savepoint的实践指南
一、背景与价值
在流计算生产环境中,Flink on YARN的部署方式凭借其资源管理优势被广泛采用。Savepoint作为Flink任务状态的一致性快照,承载着故障恢复、版本升级、作业暂停等重要场景的核心保障。当集群中运行数十个Flink作业时,手动逐个触发Savepoint耗时且易出错。本文提出一种基于Python脚本的批量化操作方案,可显著提升运维效率。
二、技术实现原理
2.1 核心流程设计
- 作业发现机制:通过YARN REST API或
yarn application
命令获取RUNNING状态作业列表 - 元数据解析:提取Application ID和Job ID(需注意Per-Job模式与Session模式的区别)
- 并发控制:采用队列机制管理并行触发请求,避免集群资源过载
- 状态反馈:实时捕获
flink cancel -s [targetDirectory]
命令输出,记录成功/失败状态
2.2 关键实现步骤
#******************************************************************#
##author: david.zhou
##create time: 2025-03-27 16:56:34
#******************************************************************#
#!/usr/bin/env python3
# -*- coding: utf-8 -*-#"""
#获取并为指定的Flink任务创建savepoint的脚本
#"""import os
import re
import json
import time
import subprocess
import requests
from typing import List, Dict, Tuple, Optional# 设置不同版本Flink的路径
FLINK_PATHS = {"1.14": "/opt/flink-1.14/bin/flink","1.15": "/opt/flink-1.15/bin/flink","1.16": "/opt/flink-1.16/bin/flink","1.17": "/opt/flink-1.17/bin/flink","1.18": "/opt/flink-1.18/bin/flink"
}
DEFAULT_FLINK_PATH = "flink" # 默认使用环境变量中的flink命令# 直接在脚本中定义过滤参数(可以根据需要修改)
# 留空列表表示处理所有任务,添加元素表示只处理匹配的任务
FILTERS = ["aaaa"]
# 如果要处理所有任务,请使用:FILTERS = []
# FILTERS = []def run_command(command: str) -> Tuple[str, int]:#"""执行shell命令并返回输出和状态码"""# 检查Python版本,决定使用哪个参数import sysif sys.version_info >= (3, 7):process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)else:process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)stdout, stderr = process.communicate()return stdout + stderr, process.returncodedef get_running_flink_jobs() -> List[Dict]:#"""获取所有运行中的Flink任务"""print("正在获取所有运行中的Flink任务...")output, _ = run_command("yarn application -list -appStates RUNNING")running_jobs = []for line in output.splitlines():if "Apache Flink" in line:# 提取应用IDapp_id_match = re.search(r"(application_\d+_\d+)", line)if app_id_match:app_id = app_id_match.group(1)# 提取任务名称job_name_match = re.search(r"\s+([^\s]+)\s+Apache\s+Flink", line)job_name = job_name_match.group(1) if job_name_match else app_idrunning_jobs.append({"app_id": app_id,"info": line,"name": job_name})print(f"- {line}")return running_jobsdef filter_jobs(jobs: List[Dict], filters: List[str]) -> List[Dict]:#"""根据过滤条件筛选任务"""if not filters:return jobsprint(f"\n应用过滤条件: {filters}")filtered_jobs = []for job in jobs:app_id = job["app_id"]app_info = job["info"]app_name = job["name"]matched = Falsefor filter_str in filters:if filter_str == app_id or filter_str in app_info:matched = Truebreakif matched:filtered_jobs.append(job)print(f"匹配任务: {app_id} ({app_name})")return filtered_jobsdef get_tracking_url(app_id: str) -> Optional[str]:#"""获取应用的Tracking URL"""print(f" 获取Tracking URL...")output, _ = run_command(f"yarn application -status {app_id}")tracking_url_match = re.search(r"Tracking-URL\s*:\s*(http://[^\s]+)", output, re.IGNORECASE)if tracking_url_match:tracking_url = tracking_url_match.group(1)print(f" 找到Tracking URL: {tracking_url}")return tracking_urlreturn Nonedef extract_host_from_url(url: str) -> Optional[str]:#"""从URL中提取主机名"""host_match = re.search(r"http://([^/]+)", url)if host_match:host = host_match.group(1)print(f" Flink主机: {host}")return hostreturn Nonedef get_flink_version(host: str) -> str:#"""获取Flink版本"""print(" 获取Flink版本...")config_api_url = f"http://{host}/config/"print(f" 访问配置API: {config_api_url}")try:response = requests.get(config_api_url, timeout=10)config_json = response.textprint(f" 配置API返回: {config_json}")# 尝试解析JSONtry:config_data = json.loads(config_json)flink_version = config_data.get("flink-version")if flink_version:print(f" Flink版本: {flink_version}")return flink_versionexcept json.JSONDecodeError:pass# 使用正则表达式提取版本version_match = re.search(r'"flink-version":"([0-9\.]+)"', config_json)if version_match:flink_version = version_match.group(1)print(f" Flink版本: {flink_version}")return flink_versionexcept Exception as e:print(f" 获取Flink版本出错: {e}")# 默认版本print(" 无法获取Flink版本,假设为1.14版本")return "1.14"def get_flink_job_id(host: str) -> Optional[str]:#"""获取Flink JobID"""print(" 通过REST API获取JobID...")jobs_api_url = f"http://{host}/jobs"print(f" 访问作业API: {jobs_api_url}")try:response = requests.get(jobs_api_url, timeout=10)jobs_json = response.textprint(f" 作业API返回: {jobs_json}")# 尝试解析JSONtry:jobs_data = json.loads(jobs_json)for job in jobs_data.get("jobs", []):if job.get("status") == "RUNNING":job_id = job.get("id")if job_id:print(f" 找到运行中的Flink JobID: {job_id}")return job_idexcept json.JSONDecodeError:pass# 使用正则表达式提取JobIDjob_id_match = re.search(r'"id":"([a-z0-9]+)","status":"RUNNING"', jobs_json)if job_id_match:job_id = job_id_match.group(1)print(f" 找到运行中的Flink JobID: {job_id}")return job_idexcept Exception as e:print(f" 获取Flink JobID出错: {e}")print(" 无法从JSON响应中提取JobID")return Nonedef get_flink_command(version: str) -> str:#"""根据Flink版本选择对应的命令路径"""major_version, minor_version = version.split(".")[:2]version_key = f"{major_version}.{minor_version}"print(f" Flink主版本: {major_version}, 次版本: {minor_version}")if major_version == "1":flink_path = FLINK_PATHS.get(version_key)if flink_path and os.path.isfile(flink_path):print(f" 使用Flink {version_key}路径: {flink_path}")return flink_pathelse:print(f" Flink {version_key}路径不存在,使用默认路径")# 对于1.14版本,使用特定的默认路径if version_key == "1.14":return "/opt/flink-1.14/bin/flink"else:print(f" 未知的Flink主版本: {major_version},使用默认路径")return DEFAULT_FLINK_PATHdef check_savepoint_status(host: str, job_id: str, timeout: int = 3600) -> bool:#"""检查savepoint状态,直到完成或超时"""print(" Savepoint操作已触发,开始检查执行状态...")start_time = time.time()completed = Falsesavepoint_path = None#超时时间可以自定义,默认 1 小时while time.time() - start_time < timeout:checkpoints_api_url = f"http://{host}/jobs/{job_id}/checkpoints"print(f" 检查savepoint状态: {checkpoints_api_url}")try:response = requests.get(checkpoints_api_url, timeout=10)checkpoints_json = response.text# 尝试解析JSONin_progress = Falsetry:checkpoints_data = json.loads(checkpoints_json)# 检查历史记录中的savepointhistory = checkpoints_data.get("history", [])for checkpoint in history:if checkpoint.get("is_savepoint") == True:status = checkpoint.get("status")print(f" 历史记录中最新savepoint状态: {status}")if status == "IN_PROGRESS":in_progress = Trueprint(" Savepoint操作正在进行中")elif status == "COMPLETED":completed = Truesavepoint_path = checkpoint.get("external_path")if savepoint_path:print(f" Savepoint保存路径: {savepoint_path}")return Truebreak# 如果历史记录中没有找到,检查最新的savepointlatest_savepoints = checkpoints_data.get("latest", {}).get("savepoints", [])if latest_savepoints:latest = latest_savepoints[0]status = latest.get("status")is_savepoint = latest.get("is_savepoint")print(f" 最新savepoint状态: {status}, 是否为savepoint: {is_savepoint}")if status == "COMPLETED" and is_savepoint == True:completed = Truesavepoint_path = latest.get("external_path")if savepoint_path:print(f" Savepoint保存路径: {savepoint_path}")return Trueelif status == "IN_PROGRESS" and is_savepoint == True:in_progress = Trueprint(" Savepoint操作正在进行中")except json.JSONDecodeError:# 使用正则表达式检查if '"status":"COMPLETED"' in checkpoints_json and '"is_savepoint":true' in checkpoints_json:completed = Trueprint(" 检测到savepoint已完成")path_match = re.search(r'"external_path":"([^"]+)"', checkpoints_json)if path_match:savepoint_path = path_match.group(1)print(f" Savepoint保存路径: {savepoint_path}")return Trueelif '"status":"IN_PROGRESS"' in checkpoints_json and '"is_savepoint":true' in checkpoints_json:in_progress = Trueprint(" 检测到savepoint正在进行中")if in_progress:print(" Savepoint操作正在进行中,等待30秒后再次检查...")else:print(" 未检测到正在进行的savepoint操作,等待30秒后再次检查...")# 等待30秒后再次检查time.sleep(30)except Exception as e:print(f" 检查savepoint状态出错: {e}")time.sleep(30)# 超时elapsed = int(time.time() - start_time)print(f" Savepoint操作超时或失败!已等待 {elapsed} 秒,超过最大等待时间 {timeout} 秒")return Falsedef create_savepoint(job: Dict, savepoint_dir: str) -> None:#"""为指定任务创建savepoint"""app_id = job["app_id"]app_name = job["name"]# 创建包含任务名称的savepoint路径task_savepoint_dir = f"{savepoint_dir}/{app_name}"print(f"\n处理任务: {app_id} ({app_name})")# 获取Tracking URLtracking_url = get_tracking_url(app_id)if not tracking_url:print(" 无法获取Tracking URL")return# 从Tracking URL提取主机host = extract_host_from_url(tracking_url)if not host:print(" 无法从Tracking URL提取主机信息")return# 获取Flink版本flink_version = get_flink_version(host)# 获取JobIDjob_id = get_flink_job_id(host)if not job_id:print(" 无法获取Flink JobID")return# 提取主版本号和次版本号major_version, minor_version = flink_version.split(".")[:2]# 针对 Flink 1.18 版本跳过 savepoint 操作#if major_version == "1" and minor_version == "18":# print(" 检测到 Flink 1.18 版本任务,根据配置跳过 savepoint 操作")# return# 获取对应版本的Flink命令flink_cmd = get_flink_command(flink_version)# 构建savepoint命令savepoint_cmd = f"{flink_cmd} savepoint {job_id} {task_savepoint_dir} -yid {app_id}"print(f" 使用Flink SAVEPOINT_CMD 的命令格式")print(f" 执行命令: {savepoint_cmd}")# 执行savepoint命令output, status = run_command(savepoint_cmd)print(f" 命令输出: {output}")print(f" 命令返回状态: {status}")# 检查savepoint状态success = check_savepoint_status(host, job_id)if success:print(f" Savepoint创建成功!保存在: {task_savepoint_dir}")else:print(f" 任务: {app_id} ({app_name}):Savepoint操作失败!")def main():#"""主函数"""# 获取所有运行中的Flink任务running_jobs = get_running_flink_jobs()if not running_jobs:print("未找到正在运行的Flink任务。")return# 过滤任务filtered_jobs = filter_jobs(running_jobs, FILTERS)if not filtered_jobs:print("没有匹配的任务,退出操作。")returnprint(f"\n开始为匹配的 {len(filtered_jobs)} 个任务创建savepoint...")# 请在此处设置savepoint目录savepoint_dir = "hdfs:///flink-savepoints" # hdfs # 为每个任务创建savepointfor job in filtered_jobs:create_savepoint(job, savepoint_dir)print("\n所有匹配任务的savepoint操作已完成!")if __name__ == "__main__":main()
2.3 异常处理策略
• 超时重试机制:对于比较大的状态的作业,默认会检查是完成,超时1小时
• 状态验证:完成后检查HDFS目录是否存在_metadata
文件
• 报警集成:告警可以按需添加即可
三、生产环境最佳实践
3.1 安全增强建议
- 权限隔离:为Savepoint目录设置专属HDFS账户,避免作业间相互覆盖
- 存储限额:通过HDFS Quota限制目录容量,防止磁盘写满
- 生命周期管理:添加定期清理脚本,保留最近3次Savepoint
3.2 性能优化方案
四、运维监控方案
通过Prometheus+Grafana实现可视化监控:
- 统计成功率指标
- 记录单次触发耗时
- 跟踪HDFS存储用量增长趋势