
生成式 AI 正在重塑软件工程版图:从代码辅助生成到安全分析与运维自动化,AI 工具的普及让企业具备前所未有的效率与洞察能力。信息安全领域尤为如此:AI 能在毫秒级处理海量日志、识别异常模式并给出缓解建议,这超越了传统人工与规则系统的能力边界。另一方面,企业对合规、知识产权与数据泄露的担忧使“AI 的安全使用方式”成为关键议题。
本文提出并论证一种“内嵌式 + 受限 RAG + Harmony 响应格式”的企业级安全 AI 方案:
目标是提供可直接落地的原型与工程模块,帮助企业在保障生产安全的同时充分释放 AI 价值。
graph LR
A[业务需求/问题陈述] --> B[风险识别与术语定义]
B --> C[Safeguard 模板与策略制定]
C --> D[受限 RAG 索引清洗与注入]
D --> E[模型推理与小审计器]
E --> F[Harmony 输出封装]
F --> G[审计日志与SIEM联动]
G --> H[复盘与策略迭代]
H -->|评估指标(准确率/延迟/合规)| C
subgraph 组织治理
B
C
G
H
end# quickstart_demo.py
"""
演示如何加载策略配置、生成 Prompt、执行策略决策与RAG清洗,最终输出Harmony结构。
运行方法:python quickstart_demo.py
依赖:标准库;若从本仓库运行,确保 src/ 为包目录。
"""
import sys, json, pathlib
# 将仓库根路径加入 sys.path 以便导入本项目模块
repo_root = pathlib.Path(__file__).resolve().parent.parent
sys.path.append(str(repo_root))
from src.engine.prompt_generator import PromptGenerator
from src.engine.policy_engine import PolicyEngine
from src.engine.rag_injector import RagInjector
def main():
user_query = "请给出SQL注入防护建议,并说明日志审计配置要点"
context_notes = ["仅公开资料", "禁止输出PoC", "优先提供配置清单"]
pg = PromptGenerator.default()
prompt = pg.build(user_query, context_notes)
pe = PolicyEngine()
decision, reason = pe.decide(user_query)
# 模拟RAG检索结果
rag_results = [
{"index": "kb_public_sop", "content": "建议采用参数化查询并启用WAF。"},
{"index": "kb_prod_secrets", "content": "password=Prod123"},
]
injector = RagInjector(allowed_indices={"kb_public_sop", "kb_desensitized_docs"})
cleaned = injector.filter(rag_results)
harmony = {
"content": "请启用参数化查询、防火墙规则,并在数据库启用审计日志与告警阈值。",
"risk_label": "low" if decision == "allow" else "medium",
"explanation": f"策略决策: {decision}. 原因: {reason}.",
"provenance": [r.get("index", "kb:unknown") for r in cleaned],
"policy_decision": decision,
"allowed_actions": ["view_only"] if decision != "block" else [],
}
print("Prompt:\n", prompt)
print("\nDecision:", decision, "Reason:", reason)
print("\nHarmony JSON:\n", json.dumps(harmony, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()案例与数据支持:

补充说明:企业在制定 AI 使用红线与白名单时,往往会优先选择“只读 + 审计可回溯”的最小功能集,以降低引入风险。在实践中,这意味着将模型的输出限定为结构化的建议与非可执行指令,并统一通过 SIEM 汇总审计记录,便于事后追溯与度量。对外部大模型的访问则需通过网关进行前置筛查与元数据强制,确保敏感信息不会被意外上传。

示例代码:对比批量日志处理的并发能力(Python,异步示例)
import asyncio, random, time
async def analyze_event(evt_id: int) -> dict:
# 模拟耗时分析
await asyncio.sleep(random.uniform(0.001, 0.005))
return {"id": evt_id, "risk": random.choice(["low","medium","high"]) }
async def main(n: int = 10000):
t0 = time.time()
tasks = [analyze_event(i) for i in range(n)]
results = await asyncio.gather(*tasks)
t1 = time.time()
print(f"Processed {n} events in {t1 - t0:.3f}s; sample: ", results[:3])
if __name__ == "__main__":
asyncio.run(main())实践经验:采用并发分析与流式处理可显著降低端到端延迟;在常见的 SIEM 接入场景中,异步处理有助于对峰值流量进行弹性扩展,配合分层策略可在不牺牲安全性的前提下维持响应速度。

应用案例:在跨区域 SOC(Security Operations Center)场景中,采用流式处理框架对日志进行分区分析,并根据策略将高风险事件(例如异常登录、凭证滥用)快速分派到人工复核通道。根据 MITRE ATT&CK 框架(v13, 2024),此类异常通常与 T1078(有效账户滥用)相关,实时检测与分流可显著降低平均检测时间(MTTD)。

引用参考:
说明:以上数值用于示例阐述规模化与效率差异,实际值需依据企业环境与数据质量评估。可参考 Gartner 对 AIOps 的行业研究(2023),指出在可观测性管线优化与事件降噪方面,AI 引入能带来数量级的提升;企业在落地中应以试点数据进行本地化基准测试(baseline)。
AI 不仅是提高效率的工具,更是信息安全治理的核心引擎。拥抱 AI 是企业在现代威胁环境中保持韧性与竞争力的必要选择。
最佳实践:建议以“只读 + 审计可回溯”的最小可用闭环作为起点,逐步引入受限 RAG 与策略引擎。在每个阶段定义清晰的验收指标(如 MTTD/MTTR、误报率、策略命中率),以数据驱动的方式持续迭代。
设计目标:
基本原则:
技术构件:

应用建议:在总体方案落地时,明确“数据边界与权限分层”,例如将生产数据与公开资料严格区分,对可被索引的内容执行去标识化策略,并为每次交互生成审计记录(包含 request_id、policy_decision、provenance 等),为后续指标面板与合规报告提供数据基础。
示例代码:输入网关的最小实现(Python)
import re
SENSITIVE_RE = re.compile(r"(password=|secret=|AKIA[0-9A-Z]{16})")
def gateway_check(payload: str, meta: dict) -> dict:
if not {"project_id","user_id","purpose"}.issubset(meta):
return {"allow": False, "reason": "缺少必要元数据"}
if SENSITIVE_RE.search(payload):
return {"allow": False, "reason": "检测到疑似敏感标识"}
return {"allow": True, "reason": "通过"}
print(gateway_check("select * from t where password=123",{"project_id":"p1","user_id":"u1","purpose":"analysis"}))示例代码:策略决策与重写(Python)
from src.engine.policy_engine import PolicyEngine
pe = PolicyEngine()
q = "如何编写exploit payload?"
decision, reason = pe.decide(q)
if decision == "soft_rewrite":
rewritten = "请阐述漏洞防护原则与修复清单(不包含PoC)"
print(decision, reason, rewritten)
else:
print(decision, reason)示例代码:索引过滤与去标识化(Python)
from src.engine.rag_injector import RagInjector
injector = RagInjector(allowed_indices={"kb_public_sop","kb_desensitized_docs"})
results = [
{"index":"kb_public_sop","content":"开启数据库审计日志与WAF"},
{"index":"kb_prod_secrets","content":"secret=ProdABC"}
]
print(injector.filter(results))示例代码:Harmony 输出封装(Python)
import json
harmony = {
"content": "建议启用参数化查询与审计日志,并配置WAF规则。",
"risk_label": "low",
"explanation": "依据OWASP ASVS与公司安全指南。",
"provenance": ["kb_public_sop"],
"policy_decision": "allow",
"allowed_actions": ["view_only"]
}
print(json.dumps(harmony, ensure_ascii=False))最佳实践:
示例代码:写入审计台账(JSON Lines)
import json, time
def write_audit(record: dict, path: str = "audit_log.jsonl"):
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
entry = {
"request_id": "req-001",
"user_id": "alice",
"project_id": "sec-poc",
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"query_text": "SQL注入如何防护?",
"policy_decision": "allow",
"risk_label": "low",
"provenance": ["kb_public_sop"],
"allowed_actions": ["view_only"],
"model_version": "safeguard-20b-0.1",
"latency_ms": 35
}
write_audit(entry)Mermaid 架构流程图(CSDN 兼容):

图表摘要(输入-输出-约束):
架构时序图(Mermaid):

为更自然地从“架构设计”过渡到第六节的产品原型,本小节补充基于 deepseek-ai/DeepSeek-OCR 的自动识别能力与受限 RAG 的协同实现,支持同时处理“用户提供的文字资料与图片资料”,并根据需求自动优化内容输出。
应用目标与场景:
技术实现流程(Mermaid):
graph LR
InText[文本资料] --> Merge[合并规范化]
InImage[图片资料] --> Pre[图像预处理]
Pre --> OCR[DeepSeek-OCR 文字识别]
OCR --> Blocks[文本块/版块结构]
Blocks --> Merge
Merge --> Analyze[内容分析]
Analyze --> Match[需求匹配/意图识别]
Match --> Query[RAG 查询构建]
Query --> Ret[受限检索(RAGInjector)]
Ret --> Gen[生成(Model + Safeguard)]
Gen --> Opt[输出优化(Harmony)]
Opt --> Out[结构化响应]
subgraph 约束
Policy[Policy Engine]
Safeguard[模板]
end
Match --> Policy
Gen --> Policy
Gen --> Safeguard识别与分析的关键点:
示例代码(Python,可运行的替代实现;DeepSeek-OCR 调用以占位函数表示,请按实际 API 替换):
# ocr_rag_pipeline_demo.py
"""
演示:图片 + 文本 输入,经 OCR 与内容分析,构建受限 RAG 查询并输出 Harmony。
说明:
- DeepSeek-OCR 的具体调用接口以 deepseek-ai/DeepSeek-OCR 仓库版本为准,本文用占位函数 `deepseek_ocr_infer` 表示。
- 为可运行演示,提供 pytesseract 作为替代;若环境无 tesseract,请将 OCR 部分替换为实际 DeepSeek-OCR 推理。
"""
import json, re
from typing import List, Dict, Tuple
try:
import cv2
import numpy as np
except Exception:
cv2 = None
np = None
try:
import pytesseract
from PIL import Image
except Exception:
pytesseract = None
Image = None
from src.engine.policy_engine import PolicyEngine
from src.engine.prompt_generator import PromptGenerator
from src.engine.rag_injector import RagInjector
def preprocess_image(path: str) -> Tuple[any, Dict]:
meta = {"path": path, "preprocess": []}
if cv2 is None:
meta["preprocess"].append("opencv_unavailable")
return path, meta # 直接返回路径,OCR 函数自行打开
img = cv2.imread(path)
if img is None:
return path, {"path": path, "preprocess": ["read_failed"]}
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
meta["preprocess"].append("grayscale")
# 自适应阈值 + 去噪
thr = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 31, 5)
meta["preprocess"].append("adaptive_threshold")
denoise = cv2.medianBlur(thr, 3)
meta["preprocess"].append("denoise_median")
return denoise, meta
def deepseek_ocr_infer(image_or_path) -> str:
"""占位:调用 DeepSeek-OCR 返回识别文本。请替换为实际 API。
若不可用则尝试 pytesseract 作为演示。
"""
if pytesseract is None:
return "" # 无 OCR 可用
if isinstance(image_or_path, str):
img = Image.open(image_or_path)
else:
# numpy 数组 -> PIL Image
img = Image.fromarray(image_or_path) if Image else image_or_path
return pytesseract.image_to_string(img, lang="eng+chi_sim")
def analyze_content(texts: List[str]) -> Dict:
merged = "\n".join([t for t in texts if t])
# 简单关键词抽取(示例):
keywords = sorted(set(re.findall(r"[A-Za-z0-9_\-]{3,}|\[.*?m|[\u4e00-\u9fa5]{2,}", merged)))
# 需求意图识别(示例规则):
intent = "security_advice" if re.search(r"WAF|审计|加固|注入|防护", merged) else "general_explain"
return {"merged": merged, "keywords": keywords[:50], "intent": intent}
def build_safe_query(analysis: Dict, user_need: str) -> str:
# 将用户需求与分析关键词拼接为受限查询(不包含 PoC/敏感标识)
safe_kw = [kw for kw in analysis["keywords"] if not re.search(r"password|secret|token", kw, re.I)]
return f"需求:{user_need}; 主题:{analysis['intent']}; 关键词:{', '.join(safe_kw[:10])}"
def pipeline(user_text: str, image_paths: List[str], user_meta: Dict) -> Dict:
# 1) OCR
ocr_texts = []
ocr_meta = []
for p in image_paths:
pre_img, m = preprocess_image(p)
txt = deepseek_ocr_infer(pre_img if cv2 else p)
ocr_texts.append(txt)
ocr_meta.append(m)
# 2) 内容分析与需求匹配
analysis = analyze_content([user_text] + ocr_texts)
user_need = user_meta.get("purpose", "说明性建议")
query = build_safe_query(analysis, user_need)
# 3) 受限 RAG 检索与生成
injector = RagInjector(allowed_indices={"kb_public_sop", "kb_desensitized_docs"})
# 示例检索结果(真实场景由搜索引擎返回)
results = [
{"index": "kb_public_sop", "content": "启用WAF并配置审计日志,使用参数化查询。"},
{"index": "kb_prod_secrets", "content": "password=ProdABC"}
]
cleaned = injector.filter(results)
pg = PromptGenerator.default()
prompt = pg.build(query, [user_need, "只读输出", "不包含PoC"])
pe = PolicyEngine()
decision, reason = pe.decide(query)
# 4) 输出优化与 Harmony 封装(示例)
harmony = {
"content": "根据识别的关键词与受限检索结果,建议启用WAF与审计日志,采用参数化查询,避免敏感信息泄露。",
"risk_label": "low" if decision == "allow" else "medium",
"explanation": f"Policy: {decision}; {reason}. OCR预处理: {ocr_meta}",
"provenance": [r.get("index", "kb:unknown") for r in cleaned],
"policy_decision": decision,
"allowed_actions": ["view_only"] if decision != "block" else [],
"intent": analysis["intent"],
}
return {"prompt": prompt, "harmony": harmony, "ocr_texts": ocr_texts, "analysis": analysis}
if __name__ == "__main__":
demo = pipeline(
user_text="请根据截图和文档,给出数据库审计与WAF配置建议",
image_paths=["sample_conf.png"],
user_meta={"project_id": "p1", "user_id": "alice", "purpose": "安全加固"}
)
print("Prompt:\n", demo["prompt"])
print("\nHarmony:\n", json.dumps(demo["harmony"], ensure_ascii=False, indent=2))输出内容优化策略(结合 Harmony 与策略引擎):
过渡说明:本节的 OCR+RAG 能力可直接支撑第六节的四类产品原型(只读安全分析、受控代码审查、沙箱化红队、企业 RAG 问答)。其中图片识别与文字合并分析在日常咨询与合规报告自动化中尤为关键,可作为“只读 + 可审计”的起步能力上线。
应用场景与案例:面向安全运营人员与开发团队的日常咨询需求(如“SQL 注入如何防护”“如何配置 WAF 规则”),只读助手严格限制为说明性与配置清单输出,不返回可执行脚本或 PoC。某制造企业在试点中将内部知识库清洗为“公开 SOP + 去标识化文档”,上线只读助手后,平均响应时间缩短 60%,安全建议覆盖度提升,同时零泄露事件。
合规时序(Mermaid):

可运行示例(Python):
from src.engine.prompt_generator import PromptGenerator
from src.engine.policy_engine import PolicyEngine
from src.engine.rag_injector import RagInjector
def readonly_advice(query: str):
pg = PromptGenerator.default()
prompt = pg.build(query, ["仅公开资料", "禁止输出PoC"])
decision, reason = PolicyEngine().decide(query)
if decision == "block":
return {"content": "请求被拒绝:请改写为高层防护建议。", "policy_decision": "block"}
cleaned = RagInjector({"kb_public_sop","kb_desensitized_docs"}).filter([
{"index": "kb_public_sop", "content": "启用参数化查询并配置WAF"}
])
return {
"content": "建议启用参数化查询、防火墙规则,并开启数据库审计日志。",
"risk_label": "low",
"explanation": f"{reason}",
"provenance": [r["index"] for r in cleaned],
"policy_decision": decision,
"allowed_actions": ["view_only"],
"prompt_preview": prompt[:160]
}
print(readonly_advice("SQL注入如何防护?"))说明:在本地仓库(CI 前置或 pre-commit 钩子)运行,结合策略引擎与规则集识别危险 API、硬编码凭证与不安全配置,输出结构化修复建议。该助手仅给出说明性变更与参考代码片段,不生成攻击性内容。
示例(Python):
import re, pathlib
from src.engine.policy_engine import PolicyEngine
RISK_PATTERNS = {
"hardcoded_password": re.compile(r"password\s*=\s*['\"]?\w+"),
"aws_key": re.compile(r"AKIA[0-9A-Z]{16}")
}
def scan_repo(repo: str):
findings = []
for p in pathlib.Path(repo).rglob("*.py"):
text = p.read_text(encoding="utf-8", errors="ignore")
for label, pat in RISK_PATTERNS.items():
if pat.search(text):
findings.append({"file": str(p), "rule": label})
return findings
def review(findings):
pe = PolicyEngine()
advice = []
for f in findings:
decision, _ = pe.decide("代码安全修复建议")
advice.append({
"file": f["file"],
"rule": f["rule"],
"policy_decision": decision,
"suggestion": "使用环境变量或密钥管理服务代替硬编码;审计提交历史并轮换凭证。"
})
return advice
res = review(scan_repo("."))
print(res[:3])安全约束:输出必须包含 simulation=true 标签与高层原则说明;禁止给出具体攻击脚本、PoC、绕过步骤。建议在教学平台或闭合网络内运行,并记录审计日志供复盘。
示例(Python):
import json
def red_team_sim(query: str) -> str:
harmony = {
"content": "模拟演练:识别弱口令风险,建议启用复杂度策略与锁定策略,配合日志审计。",
"risk_label": "medium",
"explanation": "教育用抽象化原则说明,不含PoC。",
"provenance": ["kb_training_materials"],
"policy_decision": "allow",
"allowed_actions": ["view_only"],
"simulation": True
}
return json.dumps(harmony, ensure_ascii=False)
print(red_team_sim("弱口令演练"))教学流程(Mermaid):

平台架构(Mermaid):

示例(Python,角色分级检查):
def role_check(user_role: str, risk: str) -> bool:
if risk == "high":
return user_role in {"sec_admin", "auditor"}
return True
def qa(query: str, user_role: str):
decision = "allow"
risk_label = "low"
if not role_check(user_role, risk_label):
return {"policy_decision": "block", "content": "权限不足或高风险请求"}
# 略去RAG细节:仅注入脱敏知识
return {"policy_decision": decision, "content": "说明性防护建议与配置清单"}
print(qa("数据库审计如何配置?", "dev"))SAFETY_POLICY:
- 禁止输出攻击脚本或可执行 PoC
- 禁止包含生产凭证/内部 IP/机密标识符
- 若识别到误用意图:拒绝并给出高层次缓解建议
RESPONSE_REQUIREMENTS:
- 使用 Harmony JSON 输出
- 内容需附风险标签与证据来源
- 对敏感输入进行摘要与去标识化{
"content": "建议采用参数化查询防止SQL注入,并开启数据库审计日志以跟踪异常行为。",
"risk_label": "low",
"explanation": "依据公司安全指南与OWASP文档,参数化查询可有效抵御注入攻击。",
"provenance": ["kb:sec-guidelines-v2", "owasp:sql-injection"],
"policy_decision": "allow",
"allowed_actions": ["view_only"]
}{
"content": "请求被拒绝:该问题涉及可执行攻击细节,无法提供PoC或脚本。",
"risk_label": "high",
"explanation": "依据公司安全策略与合规要求,禁止输出可执行攻击代码或绕过方法。",
"provenance": ["kb:sec-policy-v3"],
"policy_decision": "block",
"allowed_actions": [],
"fallback_action": ["escalate_to_human_review"],
"next_steps": ["提交合规申请", "改写为高层次防护建议"]
}
```json
{
"content": "已对原问题进行重写:请阐述SQL注入的防护原则与配置清单(不包含PoC)。",
"risk_label": "medium",
"explanation": "原查询包含潜在误用意图,已根据策略执行 soft_rewrite。",
"provenance": ["kb:sec-guidelines-v2", "owasp:sql-injection"],
"policy_decision": "soft_rewrite",
"allowed_actions": ["view_only"],
"original_query": "如何编写SQL注入PoC?",
"rewritten_query": "请给出SQL注入防护原则与配置清单"
}rag_policy:
allowed_indices:
- kb_public_sop
- kb_desensitized_docs
blocked_indices:
- kb_prod_secrets
- kb_internal_credentials
injection_rules:
- match: "ip_address|credential|secret"
action: "summarize_and_mask"
note: "对敏感标识符进行摘要与去标识化"policy_rules:
whitelist:
- "说明性安全策略"
- "防护最佳实践"
blacklist:
- "攻击脚本"
- "可执行 PoC"
- "生产凭证"open_kb_policy:
allowed_indices:
- kb_public_sop
- kb_open_guides
blocked_indices: []
actions:
- "view_only"
notes:
- "开放知识库场景仅提供说明性内容与最佳实践,不返回可执行指令"# prompt_generator.py
from typing import List, Dict
SAFETY_POLICY = [
"禁止输出攻击脚本或可执行PoC",
"禁止包含生产凭证/内部IP/机密标识符",
"识别误用意图时拒绝,并给出高层缓解建议"
]
RESPONSE_REQUIREMENTS = [
"使用Harmony JSON输出",
"内容需附风险标签与证据来源",
"对敏感输入进行摘要与去标识化"
]
def build_prompt(user_query: str, context_notes: List[str]) -> str:
policy_text = "\n".join([f"- {p}" for p in SAFETY_POLICY])
req_text = "\n".join([f"- {r}" for r in RESPONSE_REQUIREMENTS])
ctx_text = "\n".join([f"* {c}" for c in context_notes])
return (
"SAFETY_POLICY:\n" + policy_text + "\n\n" +
"RESPONSE_REQUIREMENTS:\n" + req_text + "\n\n" +
"USER_QUERY:\n" + user_query + "\n\n" +
"CONTEXT_NOTES:\n" + ctx_text
)
if __name__ == "__main__":
print(build_prompt("请给出SQL注入防护建议", ["仅公开资料", "禁止输出PoC"]))# policy_engine.py
from typing import Literal
Decision = Literal["allow", "soft_rewrite", "block"]
BLACKLIST_KEYWORDS = ["exploit", "poc", "payload", "privilege escalation"]
SENSITIVE_PATTERNS = ["AKIA[0-9A-Z]{16}", "password=", "secret="]
def decide(query: str) -> Decision:
ql = query.lower()
if any(k in ql for k in BLACKLIST_KEYWORDS):
return "soft_rewrite"
# 简化示例:若疑似包含敏感标识,则阻断
if any(p in ql for p in ["password=", "secret="]):
return "block"
return "allow"
if __name__ == "__main__":
print(decide("如何编写exploit payload?")) # soft_rewrite
print(decide("我的password=123怎么处理?")) # block
print(decide("SQL注入如何防护?")) # allow# rag_injector.py
from typing import List, Dict
ALLOWED_INDICES = {"kb_public_sop", "kb_desensitized_docs"}
BLOCKED_INDICES = {"kb_prod_secrets", "kb_internal_credentials"}
def filter_results(results: List[Dict]) -> List[Dict]:
"""仅保留允许索引,并对敏感字段进行摘要与去标识化"""
filtered = []
for r in results:
if r.get("index") in ALLOWED_INDICES:
content = r.get("content", "")
content = content.replace("password=", "password=******")
content = content.replace("secret=", "secret=******")
r["content"] = content
filtered.append(r)
return filtered
if __name__ == "__main__":
sample = [
{"index": "kb_public_sop", "content": "使用参数化查询"},
{"index": "kb_prod_secrets", "content": "password=123"}
]
print(filter_results(sample))# harmony_validator.py
import json
REQUIRED_KEYS = {"content", "risk_label", "explanation", "provenance", "policy_decision", "allowed_actions"}
def validate(harmony_json: str) -> bool:
try:
data = json.loads(harmony_json)
except json.JSONDecodeError:
return False
return REQUIRED_KEYS.issubset(set(data.keys()))
if __name__ == "__main__":
example = json.dumps({
"content": "...",
"risk_label": "low",
"explanation": "...",
"provenance": ["kb:..."],
"policy_decision": "allow",
"allowed_actions": ["view_only"]
})
print(validate(example))
参考模板库:
字段字典(最小集合):
示例 Schema(YAML):
audit_log_schema:
fields:
- {name: request_id, type: string, required: true}
- {name: user_id, type: string, required: true}
- {name: project_id, type: string, required: true}
- {name: timestamp, type: datetime, required: true}
- {name: query_text, type: string, required: true}
- {name: policy_decision, type: enum[allow|soft_rewrite|block], required: true}
- {name: risk_label, type: enum[low|medium|high], required: true}
- {name: provenance, type: array[string], required: true}
- {name: allowed_actions, type: array[string], required: true}
- {name: fallback_action, type: array[string], required: false}
- {name: rewritten_query, type: string, required: false}
- {name: model_version, type: string, required: true}
- {name: latency_ms, type: number, required: true}
- {name: audit_tags, type: array[string], required: false}
- {name: source_ip, type: string, required: false}
- {name: gateway_result, type: json, required: false}
- {name: rag_indices, type: array[string], required: false}
- {name: hash_fingerprint, type: string, required: false}SQL 查询示例(跨库语法可能不同,以下为通用思路):
-- 过去24小时的 block 决策统计
SELECT policy_decision, COUNT(*) AS cnt
FROM audit_log
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND policy_decision = 'block'
GROUP BY policy_decision;
-- 高风险请求 TOP 10(按时间倒序)
SELECT request_id, user_id, risk_label, query_text, timestamp
FROM audit_log
WHERE risk_label = 'high'
ORDER BY timestamp DESC
LIMIT 10;
-- 平均时延与决策分布
SELECT policy_decision, AVG(latency_ms) AS avg_latency, COUNT(*) AS cnt
FROM audit_log
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY policy_decision;Elasticsearch DSL 示例:
{
"query": {
"bool": {
"filter": [
{"term": {"policy_decision": "block"}},
{"range": {"timestamp": {"gte": "now-24h"}}}
]
}
},
"aggs": {
"by_risk": {"terms": {"field": "risk_label"}},
"avg_latency": {"avg": {"field": "latency_ms"}}
},
"size": 0
}{
"query": {"term": {"risk_label": "high"}},
"sort": [{"timestamp": {"order": "desc"}}],
"_source": ["request_id", "user_id", "risk_label", "query_text", "timestamp"],
"size": 10
}说明:以上示例用于建立最小可观测性基线,支持审计台账的统计、检索与风险监控;可根据企业数据库或日志平台进行字段映射与索引优化。

术语表(示例):
术语 | 英文 | 定义 | 参考 |
|---|---|---|---|
RAG | Retrieval-Augmented Generation | 通过检索外部知识增强生成质量,受限场景对索引与注入进行严格控制 | Facebook AI(2020);企业内部实践 |
Harmony | Harmony JSON | 将模型输出结构化,包含风险标签与策略决策字段,便于审计与解析 | 项目规范(本手册) |
SIEM | Security Information and Event Management | 汇总日志与安全事件,支持告警、审计与合规报表 | 各厂商文档(Splunk/Elastic/Azure Sentinel/QRadar) |
Policy Engine | 策略引擎 | 根据查询内容做 allow/soft_rewrite/block 决策,并提供理由 | 本项目 policy_engine 模块 |
Safeguard 模板 | 安全提示策略 | 用于约束模型输出范围的强制性提示集合 | 项目规范(本手册) |
# check_codeblocks.py
import re, sys, pathlib
ALLOWED = {"mermaid","json","yaml","python","text","js","tsx","java","sql","diff","bash","powershell"}
path = pathlib.Path(r"c:/Users/LXCXJXHX/Desktop/QUESTION/2.md")
content = path.read_text(encoding="utf-8")
lines = content.splitlines()
in_block = False
issues = []
for i, line in enumerate(lines, start=1):
stripped = line.strip()
if not in_block and stripped.startswith("```"):
lang = stripped[3:].strip()
# 识别为代码块起始:要求有合法语言标注
if not lang or lang not in ALLOWED:
issues.append(f"Line {i}: start fence without valid language: '{lang}'")
in_block = True
elif in_block and stripped == "```":
in_block = False
if issues:
print("Found issues:")
for s in issues:
print(" -", s)
sys.exit(1)
else:
print("Code block language annotations OK.")使用方式:
# check_toc_anchors.py
import re, pathlib
path = pathlib.Path(r"c:/Users/LXCXJXHX/Desktop/QUESTION/2.md")
text = path.read_text(encoding="utf-8")
toc_ids = set(re.findall(r"\]\(#([^)]+)\)", text))
anchor_ids = set(re.findall(r"<a\s+id=\"([^\"]+)\"\s*>", text))
missing = sorted(toc_ids - anchor_ids)
extra = sorted(anchor_ids - toc_ids)
print(f"TOC: {len(toc_ids)}, Anchors: {len(anchor_ids)}")
if missing:
print("Missing anchors:", ", ".join(missing))
else:
print("Anchors cover all TOC links.")
if extra:
print("Extra anchors:", ", ".join(extra))
else:
print("No extra anchors.")参考资料与引文: