
在数字化金融时代,欺诈交易已成为金融机构面临的重要挑战,欺诈交易已成为金融机构面临的重要挑战。传统基于规则的检测方法难以应对日益复杂的欺诈模式,今天我们实现一种基于信息论的智能金融风控系统,通过KL散度、信息增益和熵等核心信息论概念,我们可以使用信息熵来度量交易数据的混乱程度,并使用KL散度来比较正常和异常交易的分布差异,运用一些信息论的知识来构建一个兼具高性能和强可解释性的欺诈检测框架。
目标:

流程图:

核心步骤:
开始 │ ├── 生成模拟金融交易数据(正常和异常) │ ├── 计算正常交易和异常交易的数据熵 │ │ │ └── 对每个特征离散化后计算熵 │ ├── 计算正常与异常分布之间的KL散度 │ │ │ ├── 对每个特征,使用相同分箱计算分布 │ ├── 计算KL散度 │ └── 按KL散度排序特征 │ └── 输出最重要的异常检测特征
基于信息论原理构建的智能风控系统,包含:
class FinancialRiskSystem:
"""
金融风控系统核心类
"""
def __init__(self):
"""初始化风控系统"""
self.model = None # 欺诈检测模型
self.feature_importance = None # 特征重要性分析结果
self.df = None # 存储交易数据def generate_financial_data(self, n_samples=10000):
"""
生成模拟金融交易数据
"""
np.random.seed(42) # 设置随机种子保证结果可重现
# 正常交易特征分布
normal_amount = np.random.normal(100, 50, n_samples) # 金额: 均值100, 标准差50
normal_time = np.random.normal(14, 4, n_samples) # 时间: 下午2点附近
normal_frequency = np.random.poisson(5, n_samples) # 频率: 泊松分布, 均值5
# 随机选择3%的交易作为欺诈交易
fraud_indices = np.random.choice(n_samples, size=int(n_samples*0.03), replace=False)
data = []
for i in range(n_samples):
if i in fraud_indices:
# 欺诈交易特征 - 刻意设计不同的分布参数
amount = np.random.normal(500, 200) # 金额更大, 方差更大
time = np.random.normal(3, 2) # 凌晨时段
frequency = np.random.poisson(15) # 频率异常高
is_fraud = 1
else:
# 正常交易特征
amount = max(1, normal_amount[i]) # 保证金额为正
time = normal_time[i] % 24 # 时间标准化到0-24小时
frequency = normal_frequency[i]
is_fraud = 0
# 添加基于Beta分布的风险特征 - Beta分布适合模拟概率值
# 欺诈交易的location_risk和device_risk分布偏向更高风险
location_risk = np.random.beta(2, 5) if is_fraud else np.random.beta(5, 2)
device_risk = np.random.beta(1, 8) if is_fraud else np.random.beta(8, 1)
data.append([amount, time, frequency, location_risk, device_risk, is_fraud])
# 创建DataFrame
columns = ['amount', 'time', 'frequency', 'location_risk', 'device_risk', 'is_fraud']
self.df = pd.DataFrame(data, columns=columns)
print(f"金融数据生成完成: {len(self.df)} 条交易")
print(f"欺诈交易比例: {self.df['is_fraud'].mean():.3f}")
return self.dfdef calculate_distribution_divergence(self):
"""
计算正常交易与欺诈交易的分布差异(KL散度)
"""
# 分离正常交易和欺诈交易数据
normal_data = self.df[self.df['is_fraud'] == 0]
fraud_data = self.df[self.df['is_fraud'] == 1]
features = ['amount', 'time', 'frequency', 'location_risk', 'device_risk']
divergence_scores = {}
for feature in features:
# 离散化连续特征以计算KL散度
# 使用相同的bins保证分布可比性
normal_hist, bins = np.histogram(normal_data[feature], bins=20, density=True)
fraud_hist, _ = np.histogram(fraud_data[feature], bins=bins, density=True)
# 添加小值避免除零和log(0)的情况
normal_hist = normal_hist + 1e-10
fraud_hist = fraud_hist + 1e-10
# 计算KL散度: D_KL(P||Q) = Σ P(i) * log(P(i)/Q(i))
kl_divergence = np.sum(normal_hist * np.log(normal_hist / fraud_hist))
divergence_scores[feature] = kl_divergence
print("\n特征分布差异(KL散度):")
# 按KL散度值降序排列并输出
for feature, score in sorted(divergence_scores.items(), key=lambda x: x[1], reverse=True):
print(f" {feature}: {score:.4f}")
return divergence_scoresdef train_fraud_detection_model(self):
"""
训练欺诈检测模型
"""
# 准备特征和标签
X = self.df.drop('is_fraud', axis=1)
y = self.df['is_fraud']
# 分割数据集,保持正负样本比例
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42, stratify=y
)
# 使用随机森林分类器
# 随机森林内置特征重要性基于信息增益(信息论概念)
self.model = RandomForestClassifier(
n_estimators=100, # 树的数量
max_depth=10, # 树的最大深度
random_state=42, # 随机种子
class_weight='balanced' # 考虑类别不平衡,调整信息价值
)
# 训练模型 - 最小化交叉熵损失(信息论概念)
self.model.fit(X_train, y_train)
# 模型评估
y_pred = self.model.predict(X_test)
y_pred_proba = self.model.predict_proba(X_test)[:, 1]
print("\n模型性能评估:")
print(f" AUC Score: {roc_auc_score(y_test, y_pred_proba):.4f}")
print(f" 准确率: {np.mean(y_pred == y_test):.4f}")
# 提取特征重要性(基于信息增益)
self.feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
print("\n特征重要性排名:")
for i, row in self.feature_importance.iterrows():
print(f" {row['feature']}: {row['importance']:.4f}")
return self.modeldef detect_anomalous_transactions(self, new_transactions):
"""
检测异常交易
"""
if self.model is None:
raise ValueError("请先训练模型")
# 模型预测
predictions = self.model.predict(new_transactions)
probabilities = self.model.predict_proba(new_transactions)[:, 1]
results = []
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
# 根据预测结果确定风险等级
risk_level = "高风险" if pred == 1 else "低风险"
# 置信度计算:预测为高风险时使用欺诈概率,低风险时使用正常概率
confidence = prob if pred == 1 else 1 - prob
results.append({
'transaction_id': i,
'risk_level': risk_level,
'confidence': confidence,
'fraud_probability': prob
})
return resultsdef calculate_risk_score(self, transaction_features):
"""
计算交易风险分数(基于信息论概念)
"""
if self.model is None:
raise ValueError("请先训练模型")
# 获取模型预测的欺诈概率
fraud_prob = self.model.predict_proba([transaction_features])[0, 1]
# 基于预测概率计算熵(信息论概念)
# 熵衡量预测的不确定性:熵越高,模型越不确定
entropy_val = - (fraud_prob * np.log2(fraud_prob + 1e-10) +
(1-fraud_prob) * np.log2(1-fraud_prob + 1e-10))
# 风险分数结合概率和不确定性
# 高概率+高熵:模型不确定但倾向欺诈,需要额外关注
# 高概率+低熵:模型确定是欺诈,可信度高
risk_score = fraud_prob * (1 + entropy_val)
return min(risk_score, 1.0) # 限制在0-1范围内提供全面的可视化分析,帮助理解:
class FinancialRiskVisualizer:
"""
金融风控可视化分析类
"""
def __init__(self, risk_system):
"""
初始化可视化器
参数:
risk_system: FinancialRiskSystem实例
"""
self.risk_system = risk_system
self.figures = {} # 存储生成的所有图表def plot_feature_distributions(self):
"""
绘制特征分布对比图
"""
# 创建2x3的子图布局
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
fig.suptitle('正常交易与欺诈交易的特征分布对比', fontsize=16, fontweight='bold')
features = ['amount', 'time', 'frequency', 'location_risk', 'device_risk']
colors = ['skyblue', 'coral'] # 正常交易和欺诈交易的颜色
for idx, feature in enumerate(features):
# 计算子图位置
ax = axes[idx//3, idx%3]
# 分别获取正常和欺诈交易的数据
normal_data = self.risk_system.df[self.risk_system.df['is_fraud'] == 0][feature]
fraud_data = self.risk_system.df[self.risk_system.df['is_fraud'] == 1][feature]
# 绘制分布直方图
ax.hist(normal_data, bins=30, alpha=0.7, label='正常交易',
color=colors[0], density=True)
ax.hist(fraud_data, bins=30, alpha=0.7, label='欺诈交易',
color=colors[1], density=True)
ax.set_title(f'{feature} 分布')
ax.set_xlabel(feature)
ax.set_ylabel('密度')
ax.legend()
ax.grid(True, alpha=0.3)
# 第六个子图显示类别比例
ax = axes[1, 2]
fraud_counts = self.risk_system.df['is_fraud'].value_counts()
ax.pie(fraud_counts.values, labels=['正常交易', '欺诈交易'],
autopct='%1.1f%%', colors=colors, startangle=90)
ax.set_title('交易类别分布')
plt.tight_layout()
self.figures['feature_distributions'] = fig
return figdef plot_feature_importance(self):
"""
绘制特征重要性图
"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
fig.suptitle('特征重要性分析', fontsize=16, fontweight='bold')
# 左图:模型特征重要性(基于信息增益)
importance_df = self.risk_system.feature_importance
# 使用viridis色彩映射
colors = plt.cm.viridis(np.linspace(0, 1, len(importance_df)))
# 绘制水平条形图
bars = ax1.barh(importance_df['feature'], importance_df['importance'], color=colors)
ax1.set_xlabel('重要性分数')
ax1.set_title('随机森林特征重要性')
ax1.grid(True, alpha=0.3)
# 在条形上添加数值标签
for bar, value in zip(bars, importance_df['importance']):
ax1.text(bar.get_width() + 0.01, bar.get_y() + bar.get_height()/2,
f'{value:.4f}', ha='left', va='center')
# 右图:KL散度热力图(基于分布差异)
divergence_scores = self.risk_system.calculate_distribution_divergence()
features = list(divergence_scores.keys())
scores = list(divergence_scores.values())
# 创建热力图
im = ax2.imshow([scores], cmap='YlOrRd', aspect='auto')
ax2.set_xticks(range(len(features)))
ax2.set_xticklabels(features, rotation=45)
ax2.set_yticks([0])
ax2.set_yticklabels(['KL散度'])
ax2.set_title('特征分布差异 (KL散度)')
# 在热力图上显示数值
for i, score in enumerate(scores):
# 根据背景色选择文字颜色以确保可读性
ax2.text(i, 0, f'{score:.3f}', ha='center', va='center',
color='white' if score > max(scores)*0.6 else 'black')
# 添加颜色条
plt.colorbar(im, ax=ax2)
plt.tight_layout()
self.figures['feature_importance'] = fig
return figdef plot_correlation_analysis(self):
"""
绘制相关性分析图
"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
fig.suptitle('特征相关性分析', fontsize=16, fontweight='bold')
# 左图:整体相关性热力图
correlation_matrix = self.risk_system.df.corr()
# 创建上三角掩码
mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
# 绘制热力图
sns.heatmap(correlation_matrix, mask=mask, annot=True, cmap='coolwarm',
center=0, ax=ax1, fmt='.3f')
ax1.set_title('特征相关性热力图')
# 右图:分组相关性对比
# 分别计算正常和欺诈交易中各特征与欺诈标签的相关性
fraud_corr = self.risk_system.df[self.risk_system.df['is_fraud'] == 1].corr()['is_fraud'].drop('is_fraud')
normal_corr = self.risk_system.df[self.risk_system.df['is_fraud'] == 0].corr()['is_fraud'].drop('is_fraud')
x = range(len(fraud_corr))
width = 0.35 # 条形宽度
# 绘制分组条形图
ax2.bar([i - width/2 for i in x], fraud_corr.values, width,
label='欺诈交易', alpha=0.8)
ax2.bar([i + width/2 for i in x], normal_corr.values, width,
label='正常交易', alpha=0.8)
ax2.set_xlabel('特征')
ax2.set_ylabel('与欺诈标签的相关性')
ax2.set_title('欺诈vs正常交易的相关性对比')
ax2.set_xticks(x)
ax2.set_xticklabels(fraud_corr.index, rotation=45)
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
self.figures['correlation_analysis'] = fig
return figdef plot_risk_scoring_analysis(self):
"""
绘制风险评分分析图
"""
# 计算所有交易的风险分数
risk_scores = []
for _, transaction in self.risk_system.df.iterrows():
features = transaction.drop('is_fraud').values
risk_score = self.risk_system.calculate_risk_score(features)
risk_scores.append(risk_score)
# 将风险分数添加到数据集中
self.risk_system.df['risk_score'] = risk_scores
# 创建2x2子图布局
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('风险评分系统分析', fontsize=16, fontweight='bold')
# 左上:风险分数分布
normal_risk = self.risk_system.df[self.risk_system.df['is_fraud'] == 0]['risk_score']
fraud_risk = self.risk_system.df[self.risk_system.df['is_fraud'] == 1]['risk_score']
axes[0,0].hist([normal_risk, fraud_risk], bins=30, alpha=0.7,
label=['正常交易', '欺诈交易'],
color=['skyblue', 'coral'], density=True)
axes[0,0].set_xlabel('风险分数')
axes[0,0].set_ylabel('密度')
axes[0,0].set_title('风险分数分布')
axes[0,0].legend()
axes[0,0].grid(True, alpha=0.3)
# 右上:ROC曲线(基于风险分数的分类效果)
thresholds = np.linspace(0, 1, 100) # 生成100个阈值
tpr = [] # 真正率
fpr = [] # 假正率
for threshold in thresholds:
predicted_fraud = (self.risk_system.df['risk_score'] > threshold)
actual_fraud = (self.risk_system.df['is_fraud'] == 1)
# 计算混淆矩阵元素
tp = np.sum(predicted_fraud & actual_fraud) # 真阳性
fp = np.sum(predicted_fraud & ~actual_fraud) # 假阳性
tn = np.sum(~predicted_fraud & ~actual_fraud) # 真阴性
fn = np.sum(~predicted_fraud & actual_fraud) # 假阴性
# 计算真正率和假正率
tpr.append(tp / (tp + fn) if (tp + fn) > 0 else 0)
fpr.append(fp / (fp + tn) if (fp + tn) > 0 else 0)
# 绘制ROC曲线
axes[0,1].plot(fpr, tpr, linewidth=2)
axes[0,1].plot([0, 1], [0, 1], 'k--', alpha=0.5) # 随机分类线
axes[0,1].set_xlabel('假正率 (FPR)')
axes[0,1].set_ylabel('真正率 (TPR)')
axes[0,1].set_title('风险评分ROC曲线模拟')
axes[0,1].grid(True, alpha=0.3)
axes[0,1].fill_between(fpr, tpr, alpha=0.3) # 填充曲线下面积
# 左下:风险分数与金额的关系散点图
scatter = axes[1,0].scatter(self.risk_system.df['amount'],
self.risk_system.df['risk_score'],
c=self.risk_system.df['is_fraud'],
cmap='coolwarm', alpha=0.6)
axes[1,0].set_xlabel('交易金额')
axes[1,0].set_ylabel('风险分数')
axes[1,0].set_title('交易金额 vs 风险分数')
axes[1,0].grid(True, alpha=0.3)
plt.colorbar(scatter, ax=axes[1,0], label='欺诈标签')
# 右下:高风险交易特征分析
high_risk_threshold = 0.7 # 高风险阈值
high_risk_data = self.risk_system.df[self.risk_system.df['risk_score'] > high_risk_threshold]
if len(high_risk_data) > 0:
# 计算高风险交易和整体交易的特征均值
features_means = high_risk_data[['amount', 'time', 'frequency', 'location_risk', 'device_risk']].mean()
overall_means = self.risk_system.df[['amount', 'time', 'frequency', 'location_risk', 'device_risk']].mean()
x = range(len(features_means))
# 绘制对比条形图
axes[1,1].bar([i - 0.2 for i in x], overall_means.values, 0.4,
label='整体平均', alpha=0.7)
axes[1,1].bar([i + 0.2 for i in x], features_means.values, 0.4,
label='高风险交易', alpha=0.7)
axes[1,1].set_xticks(x)
axes[1,1].set_xticklabels(features_means.index, rotation=45)
axes[1,1].set_ylabel('特征值')
axes[1,1].set_title(f'高风险交易(分数>{high_risk_threshold})特征对比')
axes[1,1].legend()
axes[1,1].grid(True, alpha=0.3)
plt.tight_layout()
self.figures['risk_scoring'] = fig
return figdef plot_model_performance(self):
"""
绘制模型性能分析图
"""
# 创建2x2子图布局
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('模型性能综合分析', fontsize=16, fontweight='bold')
# 准备测试数据
X = self.risk_system.df.drop(['is_fraud', 'risk_score'], axis=1)
y = self.risk_system.df['is_fraud']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
if hasattr(self.risk_system.model, 'predict_proba'):
y_scores = self.risk_system.model.predict_proba(X_test)[:, 1]
# 左上:精确度-召回率曲线
precision, recall, thresholds = precision_recall_curve(y_test, y_scores)
axes[0,0].plot(recall, precision, linewidth=2)
axes[0,0].set_xlabel('召回率')
axes[0,0].set_ylabel('精确度')
axes[0,0].set_title('精确度-召回率曲线')
axes[0,0].grid(True, alpha=0.3)
axes[0,0].fill_between(recall, precision, alpha=0.3)
# 右上:阈值选择分析
# 计算F1分数
f1_scores = 2 * (precision * recall) / (precision + recall + 1e-10)
best_idx = np.argmax(f1_scores[:-1]) # 排除最后一个点
axes[0,1].plot(thresholds, precision[:-1], label='精确度', linewidth=2)
axes[0,1].plot(thresholds, recall[:-1], label='召回率', linewidth=2)
axes[0,1].plot(thresholds, f1_scores[:-1], label='F1分数', linewidth=2)
# 标记最佳阈值
axes[0,1].axvline(thresholds[best_idx], color='red', linestyle='--',
label=f'最佳阈值: {thresholds[best_idx]:.3f}')
axes[0,1].set_xlabel('分类阈值')
axes[0,1].set_ylabel('分数')
axes[0,1].set_title('阈值选择分析')
axes[0,1].legend()
axes[0,1].grid(True, alpha=0.3)
# 左下:特征重要性累积图
importance_df = self.risk_system.feature_importance
cumulative_importance = np.cumsum(importance_df['importance'])
axes[1,0].plot(range(1, len(cumulative_importance)+1), cumulative_importance,
marker='o', linewidth=2)
# 标记80%重要性阈值
axes[1,0].axhline(y=0.8, color='red', linestyle='--', alpha=0.7,
label='80%重要性阈值')
axes[1,0].set_xlabel('特征数量')
axes[1,0].set_ylabel('累积重要性')
axes[1,0].set_title('特征重要性累积图')
axes[1,0].grid(True, alpha=0.3)
axes[1,0].legend()
# 右下:学习曲线(简化版)
n_estimators_range = [10, 50, 100, 150, 200]
# 模拟的训练和测试分数
train_scores = [0.85, 0.92, 0.95, 0.96, 0.96]
test_scores = [0.82, 0.89, 0.91, 0.91, 0.91]
axes[1,1].plot(n_estimators_range, train_scores, 'o-', label='训练分数', linewidth=2)
axes[1,1].plot(n_estimators_range, test_scores, 'o-', label='测试分数', linewidth=2)
axes[1,1].set_xlabel('树的数量')
axes[1,1].set_ylabel('分数')
axes[1,1].set_title('模型学习曲线')
axes[1,1].legend()
axes[1,1].grid(True, alpha=0.3)
plt.tight_layout()
self.figures['model_performance'] = fig
return figdef generate_all_plots(self):
"""
生成所有分析图表
"""
print("生成金融风控分析图表...")
# 依次生成所有图表
self.plot_feature_distributions()
self.plot_feature_importance()
self.plot_correlation_analysis()
self.plot_risk_scoring_analysis()
self.plot_model_performance()
print("所有图表生成完成!")
return self.figures=== 金融风控系统 === 金融数据生成完成: 10000 条交易 欺诈交易比例: 0.030 特征分布差异(KL散度): device_risk: 665.3519 location_risk: 234.1907 frequency: 15.9859 time: 14.7650 amount: 0.5321 模型性能评估: AUC Score: 1.0000 准确率: 1.0000 特征重要性排名: device_risk: 0.4510 time: 0.2314 amount: 0.2216 frequency: 0.0887 location_risk: 0.0072 === 异常交易检测 === 交易 0: 高风险 (置信度: 0.570, 欺诈概率: 0.570) 交易 1: 低风险 (置信度: 0.770, 欺诈概率: 0.230) 交易 2: 高风险 (置信度: 0.570, 欺诈概率: 0.570) 交易 3: 低风险 (置信度: 0.820, 欺诈概率: 0.180) 交易 4: 高风险 (置信度: 0.570, 欺诈概率: 0.570) === 交易风险评分 === 交易 0: 风险分数 = 1.000 交易 1: 风险分数 = 0.409 交易 2: 风险分数 = 1.000 交易 3: 风险分数 = 0.302 交易 4: 风险分数 = 1.000 === 金融风控可视化分析 === 生成金融风控分析图表... 特征分布差异(KL散度): device_risk: 665.3519 location_risk: 234.1907 frequency: 15.9859 time: 14.7650 amount: 0.5321

含义与目的:
具体分析:
达到的效果:

含义与目的:
具体分析:
达到的效果:

含义与目的:
具体分析:
达到的效果:

含义与目的:
具体分析:
达到的效果:

含义与目的:
具体分析:
达到的效果:
欺诈行为模式确认:
模型有效性验证:
今天我们主要是将信息论原理系统性地应用于金融风控的全流程,并提出了结合概率和不确定性的风险评分方法,将抽象的数学理论转化为实用的工程解决方案,通过KL散度、信息增益和熵等核心概念,系统在保持高性能的同时,提供了强有力的理论解释和业务洞察。
未来,随着数据量的持续增长和计算能力的不断提升,信息论在风险控制领域的应用前景将更加广阔,今天我们也清楚了信息论不仅是理论工具,更是解决实际金融风控问题的有力武器。从特征分析到风险决策,信息论提供了一套完整的量化框架,为构建可解释、高性能的智能风控系统奠定了坚实基础。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。