欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 旅游 > 基于DeepSeek/AI的资产测绘与威胁图谱构建

基于DeepSeek/AI的资产测绘与威胁图谱构建

2025/4/22 22:41:45 来源:https://blog.csdn.net/qq_37865996/article/details/147400637  浏览:    关键词:基于DeepSeek/AI的资产测绘与威胁图谱构建

引言:

在网络安全攻防实践中,资产测绘是红队作战与蓝队安全运营的第一步,其本质都是通过系统性信息采集实现攻击面管理。

图片

当前普遍存在的痛点在于,当企业级资产规模呈指数级增长时,传统基于规则引擎的低效批量处理方式导致信息价值密度显著下降。

这种情况下,红队人员往往陷入"线索过载"困境——在繁冗的开放数据与私有资产中定位关键攻击向量,因为涉及多维度风险评估(包括资产暴露度、漏洞可利用性、业务关联性等要素)及攻击路径成本核算,其时间成本与人力投入常呈非线性增长。

而对于企业安全运营而言,单纯的资产发现已无法满足风险管理需求。

总的来说,典型挑战体现在三个维度:

  • 威胁情报融合:需建立多源异构数据的关联分析机制

  • 风险量化评估:构建包含CVSS评分、业务影响因子、修复优先级的动态评估模型

  • 防御体系映射:通过攻击链逆向推导实现纵深防御体系的闭环验证

这篇文章所提出的解决方案,是基于AI大模型(以DeepSeek为例)构建智能信息处理引擎,结合Neo4j图数据库实现威胁情报的知识图谱化构建。主要分为两个层面的处理机制:

• 前端:通过AI大模型的上下文感知能力,自动提取与资产属性标签,根据专业经验预测可行的APT攻击模式特征

• 后端:基于Neo4j的图遍历算法生成攻击面拓扑视图,从而支持攻击路径模拟与防御策略推演。

接下来,我们逐步阐明思路和方法。

【注意】

本文所述技术方案均为网络安全领域的学术探讨,旨在促进防御体系的技术演进,严禁任何形式的非法利用。作者及研究团队:

1. 不提供、不支持、不鼓励将文中方法用于未授权测试或攻击行为

2. 不承担因技术误用导致的任何法律及道德责任

3. 不公开任何可能降低攻击门槛的模型细节(如exploit生成模块的奖励函数设计)相关技术实施应严格遵守《网络安全法》《数据安全法》及所在国法律法规,建议在隔离测试环境中验证学术猜想。

4. 技术锋芒的指向应是加固系统而非突破防线——这是所有安全研究者不可逾越的伦理基线。

01

图片

研究背景与价值定位

图片

传统资产测绘的痛点

紧接上文,传统的资产测绘方法面临着诸多挑战,这些挑战严重影响了安全团队的工作效率和效果。具体来说,主要痛点包括:

  • 多源数据孤岛难以形成攻击面全景视图:通过对企业内外部的资产进行勘测,多类工具、多种手段往往带来繁荣的数据;另一方面,企业内部通常存在多个不同的数据源,如网络设备日志、应用日志、安全设备日志等。以上的数据往往分散在不同的系统中,形成了数据孤岛,使得安全团队难以整合这些数据,构建一个全面的攻击面视图。

  • 人工分析效率低下:在渗透测试项目中,安全团队需要处理大量的资产节点,人工分析不仅耗时费力,还容易出现遗漏和错误,降低了整体工作效率。

本文希望达成的测绘体系的核心价值

为了解决上述痛点,本文探索DeepSeek或AI加持下,帮助红队人员或企业安全团队实现更智能、直观的测绘能力。核心价值主要体现在以下几个方面:

  • 红队作战视角:

    • 攻击路径预构建:利用知识图谱技术,测绘体系能够自动推导攻击链可能性。这有助于红队提前识别潜在的攻击路径,制定更为有效的防御策略。

    • 目标优先级判定:基于动态权重算法,识别出最关键的突破点。这种优先级判定机制使得红队能够集中资源和精力,对最具威胁的目标进行重点防护。

    • 成果可视化呈现:系统自动生成符合MITRE ATT&CK框架的战术路径图,直观展示攻击路径和关键节点。这不仅提高了分析结果的可读性,还便于红队成员之间的沟通和协作。

  • 成本效益分析:

    • 扫描耗时降低:通过目标优先级划分与筛选,动态频率调整和协议识别技术,在保证扫描质量的前提下,大幅减少扫描时间。这对于快速响应和处置安全事件具有重要意义。

    • 漏洞验证效率提升:借助AI辅助PoC生成技术,系统能够自动化生成漏洞验证代码,助力漏洞验证的效果提升。

    • 报告编制辅助:通过资产测绘知识图谱的提供,使用代码或原生指令,快速生成详细的渗透测试逻辑路径,减少人工编写工作。

02

图片

隐侠具体实践

图片

接下来,简单打样,调通数据逻辑,简单体现DeepSeek或其他AI大模型赋能的可能性和效果。整体架构为:

graph TD 
A[探测层] -->|存活资产| B(知识图谱构建) 
A -->|流量特征| C(动态资产捕获) 
B --> D{风险计算} 
C --> D 
D --> E[可视化平台]

在下面的实验环境中,环境:Python 3.11 + Neo4j 5.26.1

步骤一:资产扫描

简单来说,调用nmap库扫描IP资产即可,不管是直接运行nmap还是相关工具或代码,建议都将命令行切到root权限。

# nmap扫描模块
def nmap_scan(target):nm = nmap.PortScanner()# 采用混合扫描策略(SYN+ACK+UDP)nm.scan(hosts=target, arguments='-sS -sU -T4 --min-rate 100')assets = []for host in nm.all_hosts():asset = {"ip": host,"mac": nm[host]['addresses'].get('mac', 'Unknown'),"os": nm[host].get('osmatch', [{}])[0].get('name', 'Unknown'),"ports": [],"last_seen": datetime.now().isoformat()}for proto in nm[host].all_protocols():ports = nm[host][proto].keys()for port in ports:service = nm[host][proto][port].get('name', 'unknown')asset["ports"].append({"port": port,"protocol": proto,"service": service,"version": nm[host][proto][port].get('version', '')})assets.append(asset)return assets

扫描结果:

当然,这里扫描的范围后续根据需要可以扩展到IPv6、域名、主域名等,需要类型判断和处理,这里简单提供示例代码:

import re
import ipaddress
from idna import encode as idna_encode
from typing import Dict, UnionclassInvalidTargetException(ValueError):"""用于标识非法目标格式 """def__init__(self, message: str):super().__init__(f"Invalid target format: {message}")defvalidate_ipv4(addr: str) -> bool:"""验证IPv4地址合法性(支持CIDR)"""try:ipaddress.IPv4Network(addr, strict=False)returnTrueexcept ValueError:returnFalsedefvalidate_ipv6(addr: str) -> bool:"""验证IPv6地址合法性(支持缩写格式)"""try:ipaddress.IPv6Network(addr, strict=False)returnTrueexcept ValueError:returnFalsedefvalidate_domain(domain: str) -> bool:"""验证域名合法性(支持国际化域名IDNA)遵循RFC 5891规范:- 单个标签长度1~63字符- 总长度不超过253字符- 允许字母、数字、连字符(不能以连字符开头/结尾)"""try:# 转换为Punycode格式进行规范验证encoded = idna_encode(domain).decode('ascii')except UnicodeError:returnFalsepattern = r"^([a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z0-9]{2,63}$"return re.fullmatch(pattern, encoded) isnotNonedeftarget_adapter(target: str) -> Dict[str, Union[str, None]]:"""param target: 输入目标(IPv4/IPv6/域名)"""# 去除首尾空白及端口号(如果存在)target = target.strip().split(":", 1)[0]# 类型判断逻辑if validate_ipv4(target):return {"type": "ipv4","value": str(ipaddress.IPv4Network(target, strict=False).network_address)}elif validate_ipv6(target):# IPv6地址规范化(扩展缩写形式)normalized = ipaddress.IPv6Address(target).explodedreturn {"type": "ipv6","value": normalized}elif validate_domain(target):# 国际化域名编码转换try:encoded_domain = idna_encode(target).decode('ascii')return {"type": "domain","value": encoded_domain.lower() # 域名统一小写处理}except UnicodeError as e:raise InvalidTargetException(f"IDNA encoding failed: {e}")else:raise InvalidTargetException(f"Unsupported target type: {target}")# 测试用例
if __name__ == "__main__":test_cases = ["192.168.1.1","2001:db8::1","example.com","例子.中国", # 国际化域名"invalid..domain","123.456.789.000"]for case in test_cases:try:print(f"Input: {case}\nOutput: {target_adapter(case)}\n")except InvalidTargetException as e:print(f"Input: {case}\nError: {e}\n")

通过步骤一我们已经实现了对特定目标进行信息搜集,从结果图片就可以看到,已经有大量IP、端口、服务结果。

步骤二:基于DeepSeek/AI做资产风险识别

基于步骤一结果,这边我们基于DeepSeek/AI做资产风险识别,该部分也是本文的核心。

那本部分的核心,就是prompt的设置了,规定数据的输入和输出格式,以便AI大模型更好的进行数据分析。这部分我写的细一些,把思考的过程展示一下。

初始的时候,只考虑了对资产进行分析和风险估算,因此主要内容为:

# 构建结构化提示词prompt = f"""作为网络安全分析师,请基于以下扫描数据进行深度分析:
## 资产特征
- IP地址:{asset_data['ip']}
- 操作系统:{asset_data['os']}
- 开放端口数:{len(asset_data['ports'])}
- 暴露服务:{', '.join(set(p['service'] for p in asset_data['ports']))}
- 最后活动时间:{asset_data['last_seen']}## 分析要求
1. 漏洞风险评估(CVSS评分+漏洞类型分类)
2. 建议的ATT&CK攻击路径(包含战术编号)
3. 资产关键性评分(1-10分制,需说明评分依据)"""

这里的资产特征,主要是来源于python使用nmap库扫描完成后的结果输出。

而在AI大模型输出结果后,发现格式不一,因为只给了分析要求,没给结果模板。于是通过反复调试和对接(过程中步骤三已经在尝试了),最后的prompt为:

# 结构化输出模板(强制API返回格式)
ANALYSIS_TEMPLATE = """请严格按照以下Markdown模板生成分析报告:### 网络安全分析报告#### 一、漏洞风险评估
{% for vuln in vulnerabilities %}
**{{ loop.index }}. {{ vuln.service }} (端口{{ vuln.port }})**
- 风险等级: {{ vuln.risk_level }}
- CVSS评分: {{ vuln.cvss }} ({{ vuln.vector }})
- 漏洞类型: {{ vuln.types|join('、') }}
{% endfor %}#### 二、建议的ATT&CK攻击路径
{% for tactic in tactics %}
**战术编号**: {{ tactic.id }} - {{ tactic.name }}
- 描述: {{ tactic.description }}
{% endfor %}#### 三、资产关键性评分
**评分**: {{ criticality }}/10**评分依据**:
{% for reason in criticality_reasons %}
{{ loop.index }}. {{ reason }}
{% endfor %}"""prompt = f"""作为网络安全分析师,请严格按模板分析以下数据:### 输入数据
- IP:{asset_data['ip']}
- 操作系统:{asset_data['os']}
- 开放端口:{', '.join(f"{p['port']}/{p['protocol']}({p['service']})"for p in asset_data['ports'])}
- 最后活动时间:{asset_data['last_seen']}### 分析要求
{ANALYSIS_TEMPLATE}"""

在AI大模型的选择方面,其实DeepSeek、ChatGPT还是混元大模型等等,没什么太大的区别,都是通用AI大模型,关键还是咱上面prompt设置的好。

调用DeepSeek的api,进行分析:

# DeepSeek连接
defdeepseek_analysis(asset_data) -> str:# 初始化官方SDK客户端client = OpenAI(api_key=DEEPSEEK_API_KEY,base_url="https://api.deepseek.com/v1",timeout=30)# 构建结构化提示词prompt = f"""作为网络安全分析师,请基于以下扫描数据进行深度分析:
## 资产特征
- IP地址:{asset_data['ip']}
- 操作系统:{asset_data['os']}
- 开放端口数:{len(asset_data['ports'])}
- 暴露服务:{', '.join(set(p['service'] for p in asset_data['ports']))}
- 最后活动时间:{asset_data['last_seen']}## 分析要求
1. 漏洞风险评估(CVSS评分+漏洞类型分类)
2. 建议的ATT&CK攻击路径(包含战术编号)
3. 资产关键性评分(1-10分制,需说明评分依据)"""#print(prompt)try:response = client.chat.completions.create(model="deepseek-chat", messages=[{"role": "system", "content": "你是一名经验丰富的网络安全分析师,擅长漏洞挖掘和攻击路径分析"},{"role": "user", "content": prompt}],temperature=0.2,max_tokens=1500,stream=False)# 增强响应解析健壮性if response.choices and response.choices[0].message:return response.choices[0].message.contentelse:raise ValueError("Empty response from API")except Exception as e:# 异常处理error_msg = f"DeepSeek API调用失败: {str(e)}"print(error_msg)return error_msg

余额不足,报错:

图片

暂时无法充值:

图片

那么,这里转而使用腾讯混元大模型的api,用的是OpenAI的SDK:

defhunyuan_analysis(asset_data) -> str:# 初始化OpenAI兼容客户端client = OpenAI(api_key=HUNYUAN_API_KEY,base_url="https://api.hunyuan.cloud.tencent.com/v1", # 官方API端点timeout=30# 网络超时保护)# 结构化提示词prompt = f"""作为网络安全分析师,请基于以下扫描数据进行深度分析:
## 资产特征
- IP地址:{asset_data['ip']}
- 操作系统:{asset_data['os']}
- 开放端口数:{len(asset_data['ports'])}
- 暴露服务:{', '.join(set(p['service'] for p in asset_data['ports']))}
- 最后活动时间:{asset_data['last_seen']}## 分析要求
1. 漏洞风险评估(CVSS评分+漏洞类型分类)
2. 建议的ATT&CK攻击路径(包含战术编号)
3. 资产关键性评分(1-10分制,需说明评分依据)"""try:response = client.chat.completions.create(model="hunyuan-pro", # 付费模型(免费可用hunyuan-lite[1](@ref))messages=[{"role": "system", "content": "你是一名经验丰富的网络安全分析师,擅长漏洞挖掘和攻击路径分析"},{"role": "user", "content": prompt}],temperature=0.3, # 平衡严谨性与创造性max_tokens=1200,extra_body={ # 腾讯增强参数"enable_enhancement": True, # 启用知识检索增强"enable_speed_search": False# 关闭极速模式保证准确性})# 增强型响应解析if response.choices and response.choices[0].message:return response.choices[0].message.contentelse:raise ValueError("API返回空响应")except Exception as e:

在使用最初的prompt时,得到如下响应:

分析报告:### 网络安全分析报告#### 一、漏洞风险评估**1.1 MySQL (开放端口)**
- **风险等级**: 高
- **CVSS评分**: 8.0 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H)
- **漏洞类型**: 未授权访问、SQL注入**1.2 HTTPS (开放端口)**
- **风险等级**: 中
- **CVSS评分**: 6.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:L/I:L/A:N)
- **漏洞类型**: 中间人攻击(MITM)、弱加密**1.3 HTTP (开放端口)**
- **风险等级**: 高
- **CVSS评分**: 7.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:N)
- **漏洞类型**: 跨站脚本攻击(XSS)、跨站请求伪造(CSRF)**1.4 Intermapper (开放端口)**
- **风险等级**: 低
- **CVSS评分**: 4.0 (CVSS:3.0/AV:N/AC:M/PR:N/UI:N/S:U/C:N/I:N/A:N)
- **漏洞类型**: 信息泄露**1.5 HTTP-Proxy (开放端口)**
- **风险等级**: 中
- **CVSS评分**: 6.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:L/I:L/A:N)
- **漏洞类型**: 代理绕过、未授权访问**1.6 FTP (开放端口)**
- **风险等级**: 高
- **CVSS评分**: 8.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H)
- **漏洞类型**: 未授权访问、弱密码#### 二、建议的ATT&CK攻击路径**T1190 - 利用目标共享**
- 攻击者通过未授权访问FTP服务获取敏感文件。**T1203 - Exploit Public-Facing Application**
- 利用HTTP/HTTPS服务的漏洞(如XSS、CSRF)进行攻击。**T1192 - 利用目标协议漏洞**
- 利用Intermapper或HTTP-Proxy的协议漏洞进行信息收集或未授权访问。**T1206 - Exploit SQL Database**
- 通过MySQL的未授权访问或SQL注入漏洞获取数据库中的敏感数据。**T1021.002 - 远程桌面协议(RDP)**
- 虽然未直接提及RDP,但攻击者可能利用其他服务的漏洞获取RDP访问权限。#### 三、资产关键性评分**评分**: 8分**评分依据**:
1. **开放端口多**: 共有6个开放端口,提供了多个潜在的攻击面。
2. **高风险服务多**: 包括MySQL、HTTP、FTP等高风险服务,这些服务历史上常见多种漏洞。
3. **敏感服务暴露**: HTTPS和HTTP服务的暴露增加了数据传输过程中的风险。
4. **未知操作系统**: 操作系统的不确定性增加了评估难度,可能隐藏更多未知风险。
5. **最后活动时间较近**: 资产在2025年仍有活动,表明其仍在使用中,攻击价值较高。### 总结该资产存在多个高风险漏洞,建议立即进行详细的漏洞扫描和渗透测试,并采取相应的安全措施,如修补已知漏洞、加强访问控制、定期更新系统和软件等。同时,建议对资产的关键性进行持续监控和评估,确保网络安全。
sh-3.2#

获得响应后,对markdown文档进行解析,解析代码为:

def_parse_hunyuan_response(text: str) -> Dict:"""解析结构化响应"""# 漏洞解析vuln_pattern = (r"\*\*(\d+)\.\s*(.+?)\s*$端口(\d+)$\*\*\s*\n"# 允许空格和换行差异r"-+\s*风险等级\s*[:-]+\s*(\S+)\s*\n"            # 兼容中英文冒号r"-+\s*CVSS评分\s*[:-]+\s*([\d.]+)\s*$([^)]+)$\s*\n"r"-+\s*漏洞类型\s*[:-]+\s*(.+?)\s*(\n|$)")print("[DEBUG] 原始分析报告:\n", text)matches = re.findall(vuln_pattern, text)print("[DEBUG] 匹配到的漏洞条目:", matches)vulnerabilities = [{"service": match[1],"port": int(match[2]),"risk_level": match[3],"cvss": float(match[4]),"vector": match[5],"types": match[6].split("、")} for match in re.findall(vuln_pattern, text)]# 攻击路径解析tactic_pattern = r"\*\*战术编号\*\*: (T\d+) - (.+?)\n- 描述: (.+)"tactics = [{"id": match[0],"name": match[1],"description": match[2]} for match in re.findall(tactic_pattern, text)]# 关键性评分解析criticality_match = re.search(r"评分: (\d+)/10", text)criticality = int(criticality_match.group(1)) if criticality_match else5return {"vulnerabilities": vulnerabilities,"tactics": tactics,"criticality": criticality}

最终得到结构化的分析数据:

{'vulnerabilities': [], 'tactics': [{'id': 'T1190', 'name': '利用FTP', 'description': '攻击者通过未授权访问或弱密码获取FTP服务器的控制权,进而进行数据窃取或上传恶意文件。'}, {'id': 'T1192', 'name': '利用HTTP', 'description': '攻击者通过SQL注入、XSS等漏洞获取敏感信息或执行恶意代码。'}, {'id': 'T1195', 'name': '利用HTTPS', 'description': '攻击者通过中间人攻击或证书伪造获取加密数据的明文内容。'}, {'id': 'T1203', 'name': '利用MySQL', 'description': '攻击者通过未授权访问或SQL注入获取数据库中的敏感信息。'}, {'id': 'T1190', 'name': '利用Intermapper', 'description': '攻击者通过信息泄露获取系统配置或网络拓扑信息,为进一步攻击做准备。'}], 'criticality': 5}

因为nmap所扫描的漏洞比较局限,对于最新关注的漏洞的扫描能力是缺失的,因此上方结果中漏洞内容为空,只有可能能产生危害的攻击策略或者风险的防护策略,即tactics。

步骤三:数据存储与展示

当我们已经通过AI大模型得到分析后,即可根据现有结构化数据,做数据输入。

这里简单介绍Neo4j的使用,如果在本地实验,下载Neo4j Desktop,起一个本地数据库即可:

图片

检查Neo4j连通性:

import nmap
import requests
from neo4j import GraphDatabase
from datetime import datetime# 配置参数
#DEEPSEEK_API_KEY = "your_deepseek_api_key"
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "your_password"# 测试neo4j连通性
deftest_neo4j_connection():try:driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))with driver.session() as session:# 版本监测version_data = session.execute_read(lambda tx: list(tx.run("CALL dbms.components() YIELD versions RETURN versions[0] AS version")))node_data = session.execute_read(lambda tx: list(tx.run("MATCH (n) RETURN count(n) AS count")))# 分析数据db_version = version_data[0]["version"]node_count = node_data[0]["count"]print("\n=== Neo4j连接状态 ===")print(f"连接成功!服务状态:正常") print(f"数据库版本: {db_version}")print(f"当前节点总数: {node_count}")print("====================")except Exception as e:print(f"\n!!! 连接异常: {str(e)}")if"Unable to connect"in str(e):print("⇒ 检查Neo4j服务是否启动(neo4j.bat console)")elif"authentication failure"in str(e):print("⇒ 密码错误,尝试重置: ALTER USER neo4j SET PASSWORD 'newpassword'")test_neo4j_connection()

正常情况下,输出结果为:

图片

接着我们只需要设计节点类型和关系类型,目前根据我们有的数据,可以创建ip(资产)、端口服务、漏洞、攻击策略等类型节点,响应就可创建“ip拥有端口服务”、“端口服务存在漏洞”和“针对该ip有哪些攻击策略”的关系。

坦言之,我好久没碰Neo4j和正则了,上文的代码和下面的代码我整了两天。。

我有点没眼看了,少侠们根据我的注释即可了解代码逻辑。

classThreatIntelligenceGraph:def__init__(self):self.driver = GraphDatabase.driver(NEO4J_URI,auth=(NEO4J_USER, NEO4J_PASSWORD),encrypted=False,max_connection_pool_size=20)self._clean_legacy_indexes()self._init_constraints()def_clean_legacy_indexes(self):"""清理与约束冲突的遗留索引"""with self.driver.session() as session:# 合规查询result = session.run("""SHOW INDEXES YIELD name, type, labelsOrTypes, propertiesWHERE labelsOrTypes IN ['Asset', 'Port', 'Vulnerability']AND (properties = ['ip'] OR properties = ['number', 'protocol']OR properties = ['cve_id'])""")for record in result:print(f"删除遗留索引: {record['name']}")session.run(f"DROP INDEX {record['name']}")def_init_constraints(self):"""创建带名称的约束"""with self.driver.session() as session:session.run("""CREATE CONSTRAINT asset_ip_unique IF NOT EXISTSFOR (a:Asset) REQUIRE a.ip IS UNIQUE""")session.run("""CREATE CONSTRAINT port_identifier IF NOT EXISTSFOR (p:Port) REQUIRE (p.number, p.protocol) IS UNIQUE""")session.run("""CREATE CONSTRAINT vuln_cve_unique IF NOT EXISTSFOR (v:Vulnerability) REQUIRE v.cve_id IS UNIQUE""")session.run("""CREATE CONSTRAINT tactic_id_unique IF NOT EXISTSFOR (t:AttackTactic) REQUIRE t.id IS UNIQUE""")defstore_analysis(self, asset: Dict, analysis: Dict):"""全量存储入口"""with self.driver.session() as session:# 资产节点session.execute_write(self._merge_asset,asset["ip"],asset["os"],analysis["criticality"],asset["last_seen"])# 端口及漏洞for port in asset["ports"]:session.execute_write(self._link_port,asset["ip"],port["port"],port["protocol"],port["service"])for vuln in analysis.get("vulnerabilities", []):if vuln["port"] == port["port"]:session.execute_write(self._link_vulnerability,port["port"],port["protocol"],vuln)for tactic in analysis.get("tactics", []):session.execute_write(self._link_tactic,asset['ip'],tactic)# 攻击路径for tactic in analysis.get("tactics", []):session.execute_write(self._link_attack_tactic,tactic,[v["types"] for v in analysis["vulnerabilities"]])@staticmethoddef_merge_asset(tx, ip: str, os: str, criticality: int, last_seen: str):tx.run("""MERGE (a:Asset {ip: $ip})SET a.os = $os, a.criticality = $criticality,a.last_seen = datetime($last_seen),a.updated = datetime()""", ip=ip, os=os, criticality=criticality, last_seen=last_seen)@staticmethoddef_link_port(tx, ip: str, port: int, protocol: str, service: str):tx.run("""MATCH (a:Asset {ip: $ip})MERGE (p:Port {number: $port, protocol: $protocol})SET p.service = $serviceMERGE (a)-[r:HAS_PORT]->(p)""", ip=ip, port=port, protocol=protocol, service=service)@staticmethod def_link_vulnerability(tx, port: int, protocol: str, vuln: Dict):"""漏洞关系创建"""try:# 设置IDcve_id = f"DFYX-VUL-{datetime.now().year}-{port}-{protocol.upper()}"tx.run("""// 确保Port存在MERGE (p:Port {number: $port, protocol: $protocol})// 创建或更新漏洞MERGE (v:Vulnerability {cve_id: $cve_id})ON CREATE SET v.risk_level = $risk,v.cvss_score = $cvss,v.vector = $vector,v.types = $typesON MATCH SET v.last_updated = datetime()// 创建关系MERGE (p)-[r:HAS_VULNERABILITY]->(v)""",port=port,protocol=protocol,cve_id=cve_id,risk=vuln["risk_level"],cvss=vuln["cvss"],vector=vuln["vector"],types=vuln["types"])print(f"成功创建漏洞关系: {port}/{protocol} → {cve_id}")except Exception as e:print(f"关系创建失败: {str(e)}")raise@staticmethod def_link_tactic(tx, ip: int, tactic: Dict):"""漏洞关系创建"""try:tx.run("""// 确保ip存在MERGE (a:Asset {ip: $ip})// 创建或更新策略MERGE (t:AttackTactic {id: $id})ON CREATE SET t.name = $name,t.description = $descriptionON MATCH SET t.last_updated = datetime()// 创建关系MERGE (a)-[r:HAS_TACTIC]->(t)""",ip=ip,id=tactic["id"],name=tactic["name"],description=tactic["description"])#print("成功创建TACTIC关系")except Exception as e:print(f"关系创建失败: {str(e)}")raise# 正则表达式def_parse_hunyuan_response(text: str) -> Dict:# 漏洞解析vuln_pattern = r"\*\*(\d+)\. (.+?) $端口(\d+)$\*\*\n- 风险等级: (.+?)\n- CVSS评分: (.+?) $(.+?)$\n- 漏洞类型: (.+)"vulnerabilities = [{"service": match[1],"port": int(match[2]),"risk_level": match[3],"cvss": float(match[4]),"vector": match[5],"types": match[6].split("、")} for match in re.findall(vuln_pattern, text)]@staticmethoddef_link_attack_tactic(tx, tactic: Dict, vuln_types: List[List[str]]):tx.run("""MERGE (t:AttackTactic {id: $id})SET t.name = $name,t.description = $desc,t.last_observed = datetime()WITH tUNWIND $vuln_types AS typesMATCH (v:Vulnerability)WHERE ANY(type IN v.types WHERE type IN types)MERGE (t)-[r:EXPLOITS]->(v)""",id=tactic["id"],name=tactic["name"],desc=tactic["description"],vuln_types=vuln_types)

效果咋样呢,请看:

图片

从图中可以清晰查看当前ip开放了什么端口,ip有哪些攻击策略,因为此时没有漏洞数据所以没有出现漏洞类型节点,另外ip和端口的关系、ip和攻击策略之间的关系也很清晰,所有关系、节点都可以查看详细的属性值。

当然,攻击策略和端口关联会更好,我实在不想去调整prompt和正则了,太痛苦了。

另外,当前模型未建立"攻击战术→漏洞类型"的关系,后续可以补充CYPHER语句:

MATCH (t:AttackTactic), (v:Vulnerability) 
WHEREANY(type IN v.types WHERE type IN t.required_conditions)
MERGE (t)-[r:EXPLOITS]->(v)
 

现有criticality评分未体现在节点属性,也可以在Neo4j中设置节点大小映射:

:paramcriticality_scale: {8: 2.0, 5: 1.5, 3: 1.0}
MATCH (a:Asset) SET a.size = $criticality_scale[a.criticality]
 
步骤四:整体连通

那么整体效果咋样呢,拭目以待,我们清除一下测试数据,正式扫描个C端看看:

图片

最终代码为:

import nmap
import requests
from neo4j import GraphDatabase
from datetime import datetime
from openai import OpenAI
import os
import re
from typing import Dict, Any, List# 配置参数(建议使用环境变量)
HUNYUAN_API_KEY = "API_KEY_value"
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "PASSWORD_value"# nmap扫描模块
def nmap_scan(target):nm = nmap.PortScanner()# 采用混合扫描策略(SYN+ACK+UDP)nm.scan(hosts=target, arguments='-sS -sU -T4 --min-rate 100')assets = []for host in nm.all_hosts():asset = {"ip": host,"mac": nm[host]['addresses'].get('mac', 'Unknown'),"os": nm[host].get('osmatch', [{}])[0].get('name', 'Unknown'),"ports": [],"last_seen": datetime.now().isoformat()}for proto in nm[host].all_protocols():ports = nm[host][proto].keys()for port in ports:service = nm[host][proto][port].get('name', 'unknown')asset["ports"].append({"port": port,"protocol": proto,"service": service,"version": nm[host][proto][port].get('version', '')})assets.append(asset)return assets# 结构化输出模板(强制API返回格式)
ANALYSIS_TEMPLATE = """请严格按照以下Markdown模板生成分析报告:### 网络安全分析报告#### 一、漏洞风险评估
{% for vuln in vulnerabilities %}
**{{ loop.index }}. {{ vuln.service }} (端口{{ vuln.port }})**
- 风险等级: {{ vuln.risk_level }}
- CVSS评分: {{ vuln.cvss }} ({{ vuln.vector }})
- 漏洞类型: {{ vuln.types|join('、') }}
{% endfor %}#### 二、建议的ATT&CK攻击路径
{% for tactic in tactics %}
**战术编号**: {{ tactic.id }} - {{ tactic.name }}
- 描述: {{ tactic.description }}
{% endfor %}#### 三、资产关键性评分
**评分**: {{ criticality }}/10**评分依据**:
{% for reason in criticality_reasons %}
{{ loop.index }}. {{ reason }}
{% endfor %}"""# 腾讯混元API增强调用
def hunyuan_analysis(asset_data: Dict) -> Dict:"""结构化漏洞分析"""client = OpenAI(api_key=HUNYUAN_API_KEY,base_url="https://api.hunyuan.cloud.tencent.com/v1",timeout=30)# 强制结构化提示词prompt = f"""作为网络安全分析师,请严格按模板分析以下数据:### 输入数据
- IP:{asset_data['ip']}
- 操作系统:{asset_data['os']}
- 开放端口:{', '.join(f"{p['port']}/{p['protocol']}({p['service']})" for p in asset_data['ports'])}
- 最后活动时间:{asset_data['last_seen']}### 分析要求
{ANALYSIS_TEMPLATE}"""try:response = client.chat.completions.create(model="hunyuan-pro",messages=[{"role": "system", "content": "你是一名严格遵循输出格式的网络安全专家"},{"role": "user", "content": prompt}],temperature=0.1, # 降低随机性max_tokens=1500,extra_body={"enable_enhancement": True})return _parse_hunyuan_response(response.choices[0].message.content)except Exception as e:return {"error": str(e)}def _parse_hunyuan_response(text: str) -> Dict:"""解析结构化响应"""# 漏洞解析vuln_pattern = (r"\*\*(\d+)\.\s*(.+?)\s*$端口(\d+)$\*\*\s*\n"  # 允许空格和换行差异r"-+\s*风险等级\s*[:-]+\s*(\S+)\s*\n"            # 兼容中英文冒号r"-+\s*CVSS评分\s*[:-]+\s*([\d.]+)\s*$([^)]+)$\s*\n"r"-+\s*漏洞类型\s*[:-]+\s*(.+?)\s*(\n|$)")#print("[DEBUG] 原始分析报告:\n", text)matches = re.findall(vuln_pattern, text)print("[DEBUG] 匹配到的漏洞条目:", matches)vulnerabilities = [{"service": match[1],"port": int(match[2]),"risk_level": match[3],"cvss": float(match[4]),"vector": match[5],"types": match[6].split("、")} for match in re.findall(vuln_pattern, text)]# 攻击路径解析tactic_pattern = r"\*\*战术编号\*\*: (T\d+) - (.+?)\n- 描述: (.+)"tactics = [{"id": match[0],"name": match[1],"description": match[2],} for match in re.findall(tactic_pattern, text)]# 关键性评分解析criticality_match = re.search(r"评分: (\d+)/10", text)criticality = int(criticality_match.group(1)) if criticality_match else 5return {"vulnerabilities": vulnerabilities,"tactics": tactics,"criticality": criticality}# Neo4j增强存储模块
class ThreatIntelligenceGraph:def __init__(self):self.driver = GraphDatabase.driver(NEO4J_URI,auth=(NEO4J_USER, NEO4J_PASSWORD),encrypted=False,max_connection_pool_size=20)self._clean_legacy_indexes()self._init_constraints()def _clean_legacy_indexes(self):"""清理与约束冲突的遗留索引"""with self.driver.session() as session:# 合规查询result = session.run("""SHOW INDEXES YIELD name, type, labelsOrTypes, propertiesWHERE labelsOrTypes IN ['Asset', 'Port', 'Vulnerability']AND (properties = ['ip'] OR properties = ['number', 'protocol']OR properties = ['cve_id'])""")for record in result:print(f"删除遗留索引: {record['name']}")session.run(f"DROP INDEX {record['name']}")def _init_constraints(self):"""创建带名称的约束"""with self.driver.session() as session:session.run("""CREATE CONSTRAINT asset_ip_unique IF NOT EXISTSFOR (a:Asset) REQUIRE a.ip IS UNIQUE""")session.run("""CREATE CONSTRAINT port_identifier IF NOT EXISTSFOR (p:Port) REQUIRE (p.number, p.protocol) IS UNIQUE""")session.run("""CREATE CONSTRAINT vuln_cve_unique IF NOT EXISTSFOR (v:Vulnerability) REQUIRE v.cve_id IS UNIQUE""")session.run("""CREATE CONSTRAINT tactic_id_unique IF NOT EXISTSFOR (t:AttackTactic) REQUIRE t.id IS UNIQUE""")def store_analysis(self, asset: Dict, analysis: Dict):"""全量存储入口"""with self.driver.session() as session:# 资产节点session.execute_write(self._merge_asset,asset["ip"],asset["os"],analysis["criticality"],asset["last_seen"])# 端口及漏洞for port in asset["ports"]:session.execute_write(self._link_port,asset["ip"],port["port"],port["protocol"],port["service"])for vuln in analysis.get("vulnerabilities", []):if vuln["port"] == port["port"]:session.execute_write(self._link_vulnerability,port["port"],port["protocol"],vuln)for tactic in analysis.get("tactics", []):session.execute_write(self._link_tactic,asset['ip'],tactic)# 攻击路径for tactic in analysis.get("tactics", []):session.execute_write(self._link_attack_tactic,tactic,[v["types"] for v in analysis["vulnerabilities"]])@staticmethoddef _merge_asset(tx, ip: str, os: str, criticality: int, last_seen: str):tx.run("""MERGE (a:Asset {ip: $ip})SET a.os = $os, a.criticality = $criticality,a.last_seen = datetime($last_seen),a.updated = datetime()""", ip=ip, os=os, criticality=criticality, last_seen=last_seen)@staticmethoddef _link_port(tx, ip: str, port: int, protocol: str, service: str):tx.run("""MATCH (a:Asset {ip: $ip})MERGE (p:Port {number: $port, protocol: $protocol})SET p.service = $serviceMERGE (a)-[r:HAS_PORT]->(p)""", ip=ip, port=port, protocol=protocol, service=service)@staticmethod def _link_vulnerability(tx, port: int, protocol: str, vuln: Dict):"""漏洞关系创建"""try:# 生成更合理的ID(加入协议)cve_id = f"DFYX-VUL-{datetime.now().year}-{port}-{protocol.upper()}"tx.run("""// 确保Port存在MERGE (p:Port {number: $port, protocol: $protocol})// 创建或更新漏洞MERGE (v:Vulnerability {cve_id: $cve_id})ON CREATE SET v.risk_level = $risk,v.cvss_score = $cvss,v.vector = $vector,v.types = $typesON MATCH SET v.last_updated = datetime()// 创建关系MERGE (p)-[r:HAS_VULNERABILITY]->(v)""",port=port,protocol=protocol,cve_id=cve_id,risk=vuln["risk_level"],cvss=vuln["cvss"],vector=vuln["vector"],types=vuln["types"])print(f"成功创建漏洞关系: {port}/{protocol} → {cve_id}")except Exception as e:print(f"关系创建失败: {str(e)}")raise@staticmethod def _link_tactic(tx, ip: int, tactic: Dict):"""策略关系创建"""try:tx.run("""// 确保ip存在MERGE (a:Asset {ip: $ip})// 创建或更新策略MERGE (t:AttackTactic {id: $id})ON CREATE SET t.name = $name,t.description = $descriptionON MATCH SET t.last_updated = datetime()// 创建关系MERGE (a)-[r:HAS_TACTIC]->(t)""",ip=ip,id=tactic["id"],name=tactic["name"],description=tactic["description"])#print("成功创建TACTIC关系")except Exception as e:print(f"关系创建失败: {str(e)}")raise# 修正正则表达式def _parse_hunyuan_response(text: str) -> Dict:# 漏洞解析(修正$符号问题)vuln_pattern = r"\*\*(\d+)\. (.+?) $端口(\d+)$\*\*\n- 风险等级: (.+?)\n- CVSS评分: (.+?) $(.+?)$\n- 漏洞类型: (.+)"vulnerabilities = [{"service": match[1],"port": int(match[2]),"risk_level": match[3],"cvss": float(match[4]),"vector": match[5],"types": match[6].split("、")} for match in re.findall(vuln_pattern, text)]@staticmethoddef _link_attack_tactic(tx, tactic: Dict, vuln_types: List[List[str]]):tx.run("""MERGE (t:AttackTactic {id: $id})SET t.name = $name,t.description = $desc,t.last_observed = datetime()WITH tUNWIND $vuln_types AS typesMATCH (v:Vulnerability)WHERE ANY(type IN v.types WHERE type IN types)MERGE (t)-[r:EXPLOITS]->(v)""",id=tactic["id"],name=tactic["name"],desc=tactic["description"],vuln_types=vuln_types)# 主流程
if __name__ == "__main__":# 模拟扫描数据assets = nmap_scan("xx.xx.xx.xx/24")for asset in assets:# 获取分析结果analysis_result = hunyuan_analysis(asset)print(analysis_result)if "error" not in analysis_result:# 存储到知识图谱graph = ThreatIntelligenceGraph()graph.store_analysis(asset, analysis_result)print("数据存储成功!")else:print(f"分析失败:{analysis_result['error']}")

04

图片

资产与威胁图谱应用

图片

其实这里就是畅想应用场景,以此也能得出后续的针对性补强动作需求。

场景一:基于高危端口清单梳理当前资产暴露面

以22端口、3306端口为例,企业安全管理中一般禁止这样的高危服务端口对公网开放,容易遭受暴力破解、密码泄漏、数据泄露等攻击或损失,因此可以通过筛选条件,直接得到高危服务子图:

图片

场景二:构建动态的渗透攻击序列模型

基于场景一的工作来做也可以,或者按照渗透的思路,比如我现在需要提取所有web资产,因为当前没有漏洞扫描的结果,那就下一步导出结果给nuclei进行扫描:

图片

值得注意的是,具体ip或者端口与Tactic(攻击策略)也有关联,利用策略可以减少无谓的无用功,针对可行方向进行针对性漏洞扫描或者渗透测试,类似于根据指纹匹配对应的PoCs来进行测试。

漏洞扫描完成后,可再次利用AI大模型对识别出的脆弱点进行智能化的攻击路径编排。具体而言,将结合漏洞的CVSS评分、利用可行性、资产权重(如业务系统等级、数据敏感度)及网络拓扑关系,构建动态的渗透攻击序列模型。

通过集成MITRE ATT&CK知识库的攻击模式特征,采用相关机器学习算法对攻击路径进行概率推演,智能生成包含横向移动策略、权限提升路线和载荷注入节点的渗透方案。还可通过prompt指定匹配Metasploit、Cobalt Strike等工具链的攻击模块,预演漏洞组合利用的叠加效应,最终输出包含POC验证脚本、修复优先级建议和应急响应预案的渗透测试报告,实现从脆弱性识别到攻击模拟验证的闭环安全处置流程。

通过一些公开的渗透文章为例,可以有如下应用案例:

实例1:Web应用漏洞链式攻击

漏洞扫描结果发现电商系统存在:

  1. Apache Struts2 RCE(CVE-2023-XXXX,CVSS 9.8)

  2. 后台管理弱口令(admin/admin)

  3. PostgreSQL数据库未授权访问(CVE-2022-4567,CVSS 8.5)

智能化编排过程

  1. 优先级判定

    • 根据CVSS评分和资产权重(电商系统属于核心业务),优先利用Struts2 RCE漏洞

    • 结合MITRE ATT&CK T1190(漏洞利用)和T1059(命令执行)生成攻击链

  2. 攻击路径推演

# 计算攻击成功率
if 漏洞可利用性 > 90% and 目标在DMZ区:  生成Payload: msfvenom反向Shell → 植入Cobalt Strike Beacon  
if 获取Web服务器权限:  探测内网PostgreSQL数据库(T1046网络发现)  利用弱口令横向移动(T1078合法凭证)
3. 工具链自动化
    • Metasploit调用exploit/multi/http/struts2_code_exec模块

    • 通过数据库未授权访问执行pg_read_file('/etc/passwd')提取敏感信息

    • 输出POC验证脚本:自动生成Struts2漏洞的HTTP请求包和数据库爆破脚本

实例2:内网横向移动渗透

初始入口通过边界打印机服务漏洞(CVE-2022-XXXX,CVSS 7.2)获取内网据点

智能化决策过程

  1. 网络拓扑分析

    • 识别目标为研发部门子网(资产权重高)

    • 检测到子网间存在防火墙隔离(仅开放SMB端口)

  2. 攻击链生成

攻击序列:  
1. 打印机漏洞获取立足点(Initial Access)  
2. 使用PsExec横向移动(T1570 Lateral Tool Transfer)  
3. 窃取研发服务器RDP凭据(T1552 Credential Access)  
4. 通过Mimikatz注入域控(T1484 Domain Policy Modification)

3.组合漏洞利用

    • 结合打印机服务的低危漏洞(CVSS 7.2)与域控服务器的Kerberos协议漏洞(CVSS 8.8)

    • 自动化生成Kerberoasting攻击脚本

实例3:供应链攻击模拟

漏洞环境某企业OA系统存在:

  1. 第三方组件漏洞(CVE-2023-XXXX,CVSS 9.0)

  2. 文件上传功能未鉴权(CVSS 8.0)

  3. 开发环境与生产环境未隔离

渗透方案生成

  1. ATT&CK战术映射

    • 初始访问(T1195):通过供应链组件漏洞植入WebShell

    • 权限提升(T1068):利用文件上传漏洞获取系统权限

    • 横向移动(T1210):通过开发环境JumpServer跳板攻击代码仓库

  2. 自动化工具联动

    • 调用Nuclei验证组件漏洞:nuclei -t cve-2023-5678.yaml

    • 使用Chisel建立穿透隧道绕过网络隔离

    • 生成修复建议

04

图片

总结

图片

本文主要介绍了DeepSeek或其他AI大模型可以在资产测绘、渗透路径规划方面的赋能方向,并通过一些基础代码做了基础常识,给出了较为基础的prompt案例,希望能助力师傅们往技术更深处思考。

值得一提,企业的安全体系建设可以遵循"攻防同步进化,边作战边建设"的原则:在持续完善IT基础设施(如自动化资产发现平台、威胁情报管道)的同时,配套开展安全团队能力建设(包括ATT&CK框架适配训练、红蓝对抗演习机制),在实战化对抗中实现技术优势向防御效能的有效转化。

否则,业内的这项看家本领,还真可能被AI替代。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词