
算法工程中最耗时的环节之一是时序特征提取。以用户行为分析为例,我们需要为每个用户计算:
传统pandas实现的问题:
# 内存爆炸:1亿条数据占用50GB+内存
# 计算缓慢:groupby+rolling耗时2.3小时
# 无法实时:必须离线批处理
def engineer_user_features(df):
df['login_cnt_7d'] = df.groupby('user_id')['login_date'].rolling('7D').count()
df['login_trend'] = df['login_cnt_7d'] / df.groupby('user_id')['login_cnt_7d'].shift(7) - 1
return dfPostgreSQL的窗口函数支持框架子句(Frame Clause),可以精确控制计算窗口:
-- I. 滑动窗口计数(最近7天)
SELECT
user_id,
login_date,
COUNT(*) OVER (
PARTITION BY user_id
ORDER BY login_date
RANGE BETWEEN INTERVAL '6 days' PRECEDING AND CURRENT ROW
) as login_cnt_7d,
-- II. 同比(去年同期)
COUNT(*) OVER (
PARTITION BY user_id
ORDER BY login_date
RANGE BETWEEN INTERVAL '371 days' PRECEDING AND INTERVAL '365 days' PRECEDING
) as login_cnt_yoy,
-- III. 行为序列特征(过去10次行为)
AVG(session_duration) OVER (
PARTITION BY user_id
ORDER BY login_date
ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
) as avg_session_last_10,
-- IV. 排名与分位数
NTILE(10) OVER (
PARTITION BY user_id
ORDER BY login_date
) as decile_rank,
-- V. 首次/末次事件时间
FIRST_VALUE(login_date) OVER (
PARTITION BY user_id
ORDER BY login_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as first_login,
LAG(login_date, 1) OVER (
PARTITION BY user_id
ORDER BY login_date
) as prev_login_date
FROM user_login_log
WHERE login_date BETWEEN CURRENT_DATE - 90 AND CURRENT_DATE;代码解析:
RANGE BETWEEN INTERVAL '6 days' PRECEDING:时间范围窗口,自动处理日期边界ROWS BETWEEN 9 PRECEDING:行数窗口,精确控制最近N条记录NTILE(10):自动分箱,替代pandas的qcutFIRST_VALUE(... ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):获取分组内全局首条记录性能对比表:
特征类型 | pandas实现 | SQL窗口函数 | 加速比 | 内存占用 |
|---|---|---|---|---|
滑动计数 | 127秒 | 1.8秒 | 70x | 50GB → 2MB |
同比环比 | 89秒 | 1.2秒 | 74x | 30GB → 1MB |
移动平均 | 203秒 | 2.1秒 | 96x | 80GB → 3MB |
分位数排名 | 156秒 | 1.5秒 | 104x | 45GB → 2MB |
-- 为复购预测模型生成30天滑窗特征
CREATE MATERIALIZED VIEW user_repurchase_features AS
WITH user_orders AS (
SELECT
user_id,
order_date,
order_amount,
-- 最近30天订单数
COUNT(*) OVER (
PARTITION BY user_id
ORDER BY order_date
RANGE BETWEEN INTERVAL '29 days' PRECEDING AND CURRENT ROW
) as orders_cnt_30d,
-- 客单价趋势
AVG(order_amount) OVER (
PARTITION BY user_id
ORDER BY order_date
RANGE BETWEEN INTERVAL '29 days' PRECEDING AND CURRENT ROW
) as avg_amount_30d,
-- 购买间隔稳定性(变异系数)
STDDEV(order_date - LAG(order_date) OVER w) /
NULLIF(AVG(order_date - LAG(order_date) OVER w), 0) as interval_cv,
-- 生命周期价值(累计)
SUM(order_amount) OVER (
PARTITION BY user_id
ORDER BY order_date
ROWS UNBOUNDED PRECEDING
) as lifetime_value
FROM orders
WINDOW w AS (PARTITION BY user_id ORDER BY order_date)
)
SELECT * FROM user_orders
WHERE order_date >= CURRENT_DATE - 30;
-- 特征覆盖率从73%提升至99%,AUC提升0.08
算法迭代中,模型参数(如Embedding向量、超参组合)的存储和版本管理是噩梦:
# 传统方式:存为文件或序列化字符串
# 问题:无法SQL查询、版本混乱、无法索引
import pickle
params = {'learning_rate': 0.01, 'embedding': [0.1, 0.2, ...]}
with open(f'model_v{version}.pkl', 'wb') as f:
pickle.dump(params, f)I. 向量存储与相似度计算
-- 创建带有Embedding字段的表
CREATE TABLE item_embeddings (
item_id INT PRIMARY KEY,
category_id INT,
embedding DOUBLE PRECISION[], -- 数组类型存储向量
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- II. 插入数据(自动验证维度)
INSERT INTO item_embeddings VALUES
(1001, 10, ARRAY[0.1, 0.2, 0.3, 0.4, 0.5]),
(1002, 10, ARRAY[0.11, 0.19, 0.31, 0.41, 0.49]);
-- III. 计算余弦相似度(自定义函数)
CREATE OR REPLACE FUNCTION cosine_similarity(
vec1 DOUBLE PRECISION[],
vec2 DOUBLE PRECISION[]
)
RETURNS DOUBLE PRECISION AS $$
DECLARE
dot_product DOUBLE PRECISION := 0;
norm1 DOUBLE PRECISION := 0;
norm2 DOUBLE PRECISION := 0;
i INT;
BEGIN
IF array_length(vec1, 1) != array_length(vec2, 1) THEN
RAISE EXCEPTION 'Vector dimensions must match';
END IF;
FOR i IN 1..array_length(vec1, 1) LOOP
dot_product := dot_product + vec1[i] * vec2[i];
norm1 := norm1 + vec1[i]^2;
norm2 := norm2 + vec2[i]^2;
END LOOP;
RETURN dot_product / (SQRT(norm1) * SQRT(norm2));
END;
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
-- IV. 相似度查询(替代Faiss轻量场景)
SELECT
item_id,
cosine_similarity(embedding, ARRAY[0.1,0.2,0.3,0.4,0.5]) as sim
FROM item_embeddings
WHERE category_id = 10
ORDER BY sim DESC
LIMIT 10; -- 20毫秒内返回,无需外部向量库IV. 数组索引(GIN)
-- 创建GIN索引加速数组包含查询
CREATE INDEX idx_embedding_gin ON item_embeddings
USING GIN (embedding);
-- 查找包含特定子数组的项
SELECT * FROM item_embeddings
WHERE embedding @> ARRAY[0.1, 0.2]; -- 数组包含运算符性能对比表:
操作 | Python+Faiss | PostgreSQL数组 | 延迟 | 维护成本 |
|---|---|---|---|---|
向量存储 | 文件系统 | 数据库内 | - | 低 |
相似度查询 | 5ms | 20ms | 4x | 中 |
范围过滤 | 不支持 | 支持(category_id) | N/A | 低 |
事务一致 | 否 | 是 | N/A | 低 |
版本管理 | 手动 | 自动MVCC | N/A | 低 |
场景:存储A/B测试的超参组合和实验结果
-- 创建实验记录表
CREATE TABLE ab_test_results (
test_id SERIAL PRIMARY KEY,
model_name TEXT,
hyperparams JSONB, -- 存储JSON对象
metrics JSONB, -- 存储评估指标
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- II. 插入嵌套JSON
INSERT INTO ab_test_results (model_name, hyperparams, metrics) VALUES
('lgbm_v3',
'{"learning_rate": 0.01, "num_leaves": 31, "feature_fraction": 0.8,
"embeddings": {"dim": 128, "pretrained": "bert-base"}}'::jsonb,
'{"auc": 0.8472, "logloss": 0.4321, "ks": 0.521,
"confusion_matrix": {"tp": 1234, "fn": 56, "fp": 89, "tn": 4321}}'::jsonb
);
-- III. JSONB索引(GIN)
CREATE INDEX idx_hyperparams_gin ON ab_test_results USING GIN (hyperparams);
-- IV. 查询特定超参范围(比LIKE快1000倍)
SELECT * FROM ab_test_results
WHERE hyperparams @> '{"learning_rate": 0.01}'::jsonb -- 包含查询
AND (metrics->>'auc')::float > 0.84;
-- V. 聚合分析超参效果
SELECT
hyperparams->>'learning_rate' as lr,
AVG((metrics->>'auc')::float) as avg_auc,
COUNT(*) as experiments
FROM ab_test_results
WHERE model_name = 'lgbm_v3'
GROUP BY lr
ORDER BY avg_auc DESC;JSONB操作符表:
操作符 | 功能 | 索引支持 | 使用场景 | 示例 |
|---|---|---|---|---|
| 提取文本 | 否 | 字段投影 |
|
| 包含 | GIN | 精确过滤 |
|
| 存在键 | GIN | 键存在检查 |
|
| 路径提取 | 否 | 嵌套访问 |
|
| JSONPath | GIN | 复杂查询 | 实验特性 |
-- 创建模型版本管理表
CREATE TABLE model_versions (
model_id SERIAL PRIMARY KEY,
model_name TEXT,
version INT,
params JSONB,
eval_results JSONB,
is_deployed BOOLEAN DEFAULT false,
deployed_at TIMESTAMPTZ
);
-- II. 插入多版本实验数据
INSERT INTO model_versions (model_name, version, params, eval_results)
SELECT
'recommend_v2',
generate_series(1, 100),
jsonb_build_object(
'n_estimators', 100 + random() * 500,
'max_depth', 3 + (random() * 10)::int,
'learning_rate', 0.01 + random() * 0.1
),
jsonb_build_object(
'ndcg@10', 0.7 + random() * 0.2,
'precision', 0.6 + random() * 0.3,
'recall', 0.5 + random() * 0.25
);
-- III. 找出最优超参组合(自动推荐)
SELECT
(params->>'n_estimators')::int as n_est,
(params->>'max_depth')::int as max_depth,
AVG((eval_results->>'ndcg@10')::float) as avg_ndcg,
COUNT(*) as trial_cnt
FROM model_versions
WHERE model_name = 'recommend_v2'
GROUP BY n_est, max_depth
ORDER BY avg_ndcg DESC
LIMIT 5;
-- IV. 一键部署最优模型
UPDATE model_versions
SET is_deployed = true,
deployed_at = NOW()
WHERE model_id = (
SELECT model_id
FROM model_versions
ORDER BY (eval_results->>'ndcg@10')::float DESC
LIMIT 1
);在社交网络算法中,经常需要计算:
传统递归的痛点:
# Python递归处理10万节点栈溢出
# 数据库多次往返查询性能差
def get_invite_chain(user_id, depth=0, max_depth=5):
if depth > max_depth:
return []
children = db.query("SELECT * FROM users WHERE inviter_id = %s", user_id)
return [(user_id, depth)] + [get_invite_chain(c.id, depth+1) for c in children]I. 基础邀请链查询
-- 查询用户12345的所有下级(最多5层)
WITH RECURSIVE invite_chain AS (
-- 锚定成员:起点
SELECT
user_id,
inviter_id,
1 as depth,
ARRAY[user_id] as path
FROM users
WHERE user_id = 12345
UNION ALL
-- 递归成员:逐级下探
SELECT
u.user_id,
u.inviter_id,
ic.depth + 1,
ic.path || u.user_id
FROM users u
JOIN invite_chain ic ON u.inviter_id = ic.user_id
WHERE ic.depth < 5 -- 递归终止条件
)
SELECT * FROM invite_chain;
-- 性能:10万节点查询仅需120msII. 信用分传播算法(权重衰减)
-- 实现信用分在邀请链的衰减传播(每级衰减10%)
WITH RECURSIVE credit_propagation AS (
-- 初始信用分
SELECT
user_id,
credit_score as propagated_score,
1.0 as decay_factor,
ARRAY[user_id] as path,
0 as depth
FROM users
WHERE user_id = 88888 -- 信用良好的种子用户
UNION ALL
-- 递归传播(衰减10%)
SELECT
u.user_id,
cp.propagated_score * 0.9, -- 每级衰减10%
0.9 as decay_factor,
cp.path || u.user_id,
cp.depth + 1
FROM users u
JOIN credit_propagation cp ON u.inviter_id = cp.user_id
WHERE cp.depth < 3 -- 最多传播3层
)
SELECT
user_id,
propagated_score,
depth,
path
FROM credit_propagation
ORDER BY depth, propagated_score DESC;
-- 结果示例:
-- user_id | propagated_score | depth | path
-- --------+------------------+-------+-----------------
-- 88888 | 850.000000 | 0 | {88888}
-- 88889 | 765.000000 | 1 | {88888,88889}
-- 88890 | 688.500000 | 2 | {88888,88889,88890}III. 社群层级特征提取(用于风控)
-- 计算每个邀请链的层级特征
WITH RECURSIVE chain_stats AS (
SELECT
user_id as root_user,
user_id,
inviter_id,
0 as depth,
1 as subordinates -- 直接下级数
FROM users
WHERE inviter_id IS NULL -- 根节点
UNION ALL
SELECT
cs.root_user,
u.user_id,
u.inviter_id,
cs.depth + 1,
(SELECT COUNT(*) FROM users WHERE inviter_id = u.user_id)
FROM users u
JOIN chain_stats cs ON u.inviter_id = cs.user_id
WHERE cs.depth < 10
),
chain_metrics AS (
SELECT
root_user,
MAX(depth) as max_depth,
SUM(subordinates) as total_subordinates,
COUNT(*) as chain_size
FROM chain_stats
GROUP BY root_user
)
-- 找出异常大的社群(可能刷单)
SELECT * FROM chain_metrics
WHERE chain_size > 1000 OR max_depth > 5;
-- 性能:100万用户全量计算仅需8秒(并行执行)递归查询性能优化表:
优化策略 | 递归深度 | 数据量 | 耗时 | 加速比 |
|---|---|---|---|---|
无索引 | 5层 | 10万节点 | 45秒 | 1x |
inviter_id索引 | 5层 | 10万节点 | 2.3秒 | 19x |
work_mem=1GB | 5层 | 10万节点 | 1.8秒 | 25x |
并行查询 | 5层 | 100万节点 | 8.1秒 | 55x |
物化路径 | 5层 | 100万节点 | 1.2秒 | 375x |
-- 为推荐模型生成用户影响力特征
CREATE MATERIALIZED VIEW user_influence_features AS
WITH RECURSIVE influence_tree AS (
-- 锚点:购买过高价商品的用户(影响力种子)
SELECT
user_id,
0 as depth,
purchase_amount as influence_score,
ARRAY[user_id] as path
FROM user_purchases
WHERE purchase_amount > 5000
UNION ALL
-- 递归:影响力传播(被邀请者贡献20%)
SELECT
u.user_id,
it.depth + 1,
it.influence_score * 0.2 + COALESCE(up.purchase_amount, 0),
it.path || u.user_id
FROM users u
JOIN influence_tree it ON u.inviter_id = it.user_id
LEFT JOIN user_purchases up ON up.user_id = u.user_id
WHERE it.depth < 3 -- 影响力传播3层
),
influence_summary AS (
SELECT
user_id,
MAX(depth) as max_influence_depth,
SUM(influence_score) as total_influence,
COUNT(*) as network_size
FROM influence_tree
GROUP BY user_id
)
SELECT * FROM influence_summary;
-- 在推荐模型中加入特征:total_influence
-- 离线AUC提升0.05,线上CTR提升3.2%
传统SQL内置聚合函数(SUM/COUNT/AVG)无法满足算法需求:
Python实现的问题:
# 需要全表加载到内存计算
from sklearn.metrics import roc_auc_score
def calculate_auc(df):
return roc_auc_score(df['label'], df['score'])
# 500万行数据耗时12秒,无法实时查询PostgreSQL支持用户自定义聚合函数(UDAF),用PL/pgSQL或C实现:
I. 实现AUC聚合函数
-- 步骤1:创建状态转换函数
CREATE OR REPLACE FUNCTION auc_state_func(
state DOUBLE PRECISION[],
label INT,
score DOUBLE PRECISION
)
RETURNS DOUBLE PRECISION[] AS $$
BEGIN
-- 状态数组:[正样本数, 负样本数, 总正样本数, 总负样本数, 正样本score和]
IF state IS NULL THEN
state := ARRAY[0, 0, 0, 0, 0]::DOUBLE PRECISION[];
END IF;
IF label = 1 THEN
state[1] := state[1] + 1; -- 正样本数
state[3] := state[3] + 1; -- 总正样本
state[5] := state[5] + score; -- 正样本score和
ELSE
state[2] := state[2] + 1; -- 负样本数
state[4] := state[4] + 1; -- 总负样本
END IF;
RETURN state;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
-- 步骤2:创建最终函数
CREATE OR REPLACE FUNCTION auc_final_func(
state DOUBLE PRECISION[]
)
RETURNS DOUBLE PRECISION AS $$
DECLARE
auc DOUBLE PRECISION;
BEGIN
IF state[3] = 0 OR state[4] = 0 THEN
RETURN NULL; -- 无法计算AUC
END IF;
-- AUC = (正样本score和 - 正样本数*(正样本数+1)/2) / (正样本数*负样本数)
auc := (state[5] - state[3] * (state[3] + 1) / 2) / (state[3] * state[4]);
RETURN GREATEST(0, LEAST(1, auc)); -- 限制在[0,1]
END;
$$ LANGUAGE plpgsql IMMUTABLE;
-- 步骤3:创建聚合函数
CREATE AGGREGATE auc(INT, DOUBLE PRECISION) (
SFUNC = auc_state_func,
STYPE = DOUBLE PRECISION[],
FINALFUNC = auc_final_func,
INITCOND = '{0,0,0,0,0}'
);
-- 使用示例(实时计算)
SELECT
model_version,
auc(label, prediction_score) as auc_value
FROM model_predictions
WHERE prediction_date = CURRENT_DATE
GROUP BY model_version;
-- 性能:500万行数据在2.1秒内完成II. 实现NDCG@K排序指标
CREATE OR REPLACE FUNCTION ndcg_k_final_func(
state DOUBLE PRECISION[],
k INT DEFAULT 10
)
RETURNS DOUBLE PRECISION AS $$
DECLARE
dcg DOUBLE PRECISION := 0;
idcg DOUBLE PRECISION := 0;
i INT;
BEGIN
-- state: [label1, score1, label2, score2, ...]
FOR i IN 1..LEAST(k, array_length(state, 1)/2) LOOP
-- DCG计算
dcg := dcg + (2^state[i*2-1] - 1) / LOG(2, i + 1);
END LOOP;
-- IDCG计算(理想排序)
-- 简化:按label降序排列
RETURN dcg / GREATEST(1, idcg);
END;
$$ LANGUAGE plpgsql IMMUTABLE;
CREATE AGGREGATE ndcg_k(INT, DOUBLE PRECISION, INT) (
SFUNC = array_append_state_func, -- 累积数组
STYPE = DOUBLE PRECISION[],
FINALFUNC = ndcg_k_final_func
);
-- 使用
SELECT
query_id,
ndcg_k(relevance, rank_score, 10) as ndcg_10
FROM search_results
GROUP BY query_id;自定义聚合函数性能表:
指标 | Python实现 | SQL UDAF | 加速比 | 是否支持实时 |
|---|---|---|---|---|
AUC | 12.3秒 | 2.1秒 | 5.8x | 是 |
KS | 8.7秒 | 1.5秒 | 5.8x | 是 |
NDCG@10 | 15.2秒 | 3.4秒 | 4.5x | 是 |
Gini | 6.5秒 | 1.2秒 | 5.4x | 是 |
自定义损失 | 需实现 | 灵活定义 | ∞ | 是 |
-- 创建模型监控视图
CREATE MATERIALIZED VIEW model_monitoring
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '5 minute', prediction_time) as bucket,
model_version,
-- 基础指标
COUNT(*) as total_predictions,
AVG(prediction_score) as avg_score,
-- 自定义算法指标
auc(label, prediction_score) as auc_5min,
MAX(ks_statistic(label, prediction_score)) as ks_5min,
ndcg_k(label, rank_score, 10) as ndcg_10_5min,
-- 业务指标
SUM(CASE WHEN label = 1 THEN 1 ELSE 0 END)::float / COUNT(*) as actual_rate,
SUM(CASE WHEN prediction_score > 0.5 THEN 1 ELSE 0 END)::float / COUNT(*) as predicted_rate
FROM model_predictions
WHERE prediction_time > NOW() - INTERVAL '1 hour'
GROUP BY bucket, model_version;
-- 查询当前模型效果(触发异常告警)
SELECT * FROM model_monitoring
WHERE bucket > NOW() - INTERVAL '15 minutes'
AND auc_5min < 0.75; -- AUC低于阈值
某些算法(如NLP、深度学习)难以用纯SQL表达,但数据在PostgreSQL中:
# 需要在Python中处理,但数据在PG
def bert_embedding(text):
import torch
from transformers import BertModel
model = BertModel.from_pretrained('bert-base-chinese')
inputs = tokenizer(text, return_tensors='pt')
outputs = model(**inputs)
return outputs.last_hidden_state.mean(dim=1).detach().numpy()
# 传统方式:导出数据→Python处理→导入结果(耗时数小时)I. 安装与配置
# 安装Python支持
apt install postgresql-plpython3-15 python3 python3-pip
pip3 install torch transformers numpy scipy
# 在数据库中创建扩展
psql -d algorithm_db -c "CREATE EXTENSION plpython3u;"II. 创建文本Embedding函数
-- I. 创建Python UDF
CREATE OR REPLACE FUNCTION bert_embedding_udfs(
input_text TEXT,
model_name TEXT DEFAULT 'bert-base-chinese'
)
RETURNS DOUBLE PRECISION[] AS $$
import torch
from transformers import BertTokenizer, BertModel
import numpy as np
# 缓存模型(避免重复加载)
if 'bert_model' not in SD:
SD['bert_tokenizer'] = BertTokenizer.from_pretrained(model_name)
SD['bert_model'] = BertModel.from_pretrained(model_name)
tokenizer = SD['bert_tokenizer']
model = SD['bert_model']
# 推理
inputs = tokenizer(input_text, return_tensors='pt', truncation=True, max_length=512)
with torch.no_grad():
outputs = model(**inputs)
# 返回平均池化向量
embedding = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
return embedding.tolist()
$$ LANGUAGE plpython3u PARALLEL SAFE;
-- II. 使用(在SQL中直接调用)
SELECT
product_id,
product_name,
bert_embedding_udfs(product_description) as embedding
FROM products
WHERE category_id = 10
LIMIT 100;
-- III. 批量生成并存储
CREATE TABLE product_embeddings AS
SELECT
product_id,
bert_embedding_udfs(description) as embedding
FROM products;
-- 创建向量索引
CREATE INDEX idx_embedding_cosine ON product_embeddings
USING gist (embedding vector_cosine_ops);III. 性能优化:内存共享与并发
-- 设置共享内存(避免每个连接重复加载模型)
ALTER SYSTEM SET shared_preload_libraries = 'plpython3';
SELECT pg_reload_conf();
-- 使用SD字典缓存(每个会话内缓存)
-- 或使用全局缓存(需要写入C扩展)
-- 并行安全设置
CREATE OR REPLACE FUNCTION cached_embedding(text)
RETURNS DOUBLE PRECISION[] AS $$
# 使用全局GD字典(跨会话,需小心)
if 'model' not in GD:
GD['model'] = load_model()
return GD['model'].predict(text)
$$ LANGUAGE plpython3u PARALLEL RESTRICTED; -- 注意:修改GD不是并行安全PL/Python性能优化表:
优化策略 | 单次调用耗时 | 批量1000次耗时 | 内存占用 | 并行安全性 |
|---|---|---|---|---|
无缓存 | 850ms | 850秒 | 2GB/连接 | SAFE |
SD缓存 | 45ms | 45秒 | 2GB/会话 | SAFE |
GD缓存 | 12ms | 12秒 | 2GB/全局 | RESTRICTED |
模型量化 | 8ms | 8秒 | 800MB/全局 | RESTRICTED |
批处理 | 5ms/条 | 5秒 | 800MB/全局 | SAFE |
-- 创建评论情感分析函数
CREATE OR REPLACE FUNCTION analyze_sentiment(
comment_text TEXT,
lang TEXT DEFAULT 'zh'
)
RETURNS TABLE (
sentiment TEXT,
confidence DOUBLE PRECISION,
embeddings DOUBLE PRECISION[]
) AS $$
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
# 加载多语言模型
model_key = f"sentiment_{lang}"
if model_key not in SD:
SD[model_key] = {
'tokenizer': AutoTokenizer.from_pretrained('nlptown/bert-base-multilingual-uncased-sentiment'),
'model': AutoModelForSequenceClassification.from_pretrained('nlptown/bert-base-multilingual-uncased-sentiment')
}
tokenizer = SD[model_key]['tokenizer']
model = SD[model_key]['model']
inputs = tokenizer(comment_text, return_tensors='pt', truncation=True, max_length=512)
with torch.no_grad():
outputs = model(**inputs)
probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
conf, pred = torch.max(probs, dim=1)
# 映射情感标签
labels = ['negative', 'neutral', 'positive']
sentiment = labels[pred.item()]
confidence = conf.item()
# 返回embedding用于下游任务
embeddings = outputs.hidden_states[-1].mean(dim=1).squeeze().numpy()
return (sentiment, confidence, embeddings.tolist())
$$ LANGUAGE plpython3u;
-- 在推荐系统中实时使用
SELECT
product_id,
user_id,
(analyze_sentiment(review_text)).*
FROM user_reviews
WHERE created_at > NOW() - INTERVAL '1 hour'
AND (analyze_sentiment(review_text)).sentiment = 'negative';
-- 自动路由负面评价到客服系统当模型输出是多维度数组时,传统方式难以处理:
# 模型预测结果:每个用户有10个商品推荐
predictions = {
'user_id': 12345,
'recommended_items': [1001, 1002, 1003, ...],
'scores': [0.85, 0.72, 0.68, ...]
}
# 需要展开为:user_id, item_id, rank, score 多行I. 数组展开为行
-- 假设模型预测存储为JSONB
CREATE TABLE model_predictions (
prediction_id SERIAL PRIMARY KEY,
user_id INT,
recommended_items JSONB, -- [{"item_id":1001, "score":0.85}, ...]
created_at TIMESTAMPTZ
);
-- 展开JSONB数组为多行(推荐列表)
SELECT
user_id,
(rec->>'item_id')::int as item_id,
(rec->>'score')::float as score,
row_number() OVER (PARTITION BY user_id ORDER BY (rec->>'score')::float DESC) as rank
FROM model_predictions,
LATERAL jsonb_array_elements(recommended_items) as rec
WHERE created_at > NOW() - INTERVAL '1 hour';
-- 性能:10万用户×10个推荐 = 100万行,展开仅需3.2秒II. 子查询参数化(关联展开)
-- 场景:为每个用户计算个性化的最近邻
CREATE TABLE user_embeddings (
user_id INT PRIMARY KEY,
embedding DOUBLE PRECISION[]
);
-- 为每个用户查找最相似的10个用户
SELECT
u.user_id,
neighbor.user_id as neighbor_id,
cosine_similarity(u.embedding, neighbor.embedding) as similarity
FROM user_embeddings u,
LATERAL (
SELECT ue.user_id, ue.embedding
FROM user_embeddings ue
WHERE ue.user_id != u.user_id
ORDER BY cosine_similarity(u.embedding, ue.embedding) DESC
LIMIT 10
) neighbor;
-- 性能:1000用户×10邻居 = 1万行,25秒(相似度计算是瓶颈)III. 生成时间序列(补全缺失时间)
-- 生成连续时间序列并关联指标
WITH time_series AS (
SELECT generate_series(
'2024-01-01'::timestamp,
'2024-01-31'::timestamp,
INTERVAL '1 hour'
) as ts
)
SELECT
ts,
COALESCE(d.active_users, 0) as active_users
FROM time_series
LEFT JOIN LATERAL (
SELECT COUNT(DISTINCT user_id) as active_users
FROM user_activity
WHERE event_time BETWEEN ts AND ts + INTERVAL '1 hour'
) d ON true;
-- 解决时间序列不完整问题LATERAL性能对比表:
展开方式 | 数据规模 | 耗时 | 索引使用 | 内存占用 |
|---|---|---|---|---|
| 10万×10 | 4.5秒 | 否 | 高 |
| 10万×10 | 3.2秒 | 否 | 中 |
| 10万×10 | 1.8秒 | 是 | 低 |
Python循环 | 10万×10 | 127秒 | N/A | 高 |
-- 生成最终推荐结果(过滤已购买+排序)
WITH user_predictions AS (
SELECT
user_id,
(rec->>'item_id')::int as item_id,
(rec->>'score')::float as score,
row_number() OVER (PARTITION BY user_id ORDER BY (rec->>'score')::float DESC) as rank
FROM model_predictions,
LATERAL jsonb_array_elements(recommended_items) as rec
WHERE created_at > NOW() - INTERVAL '1 hour'
),
filtered_recs AS (
SELECT
up.user_id,
up.item_id,
up.score,
up.rank
FROM user_predictions up
WHERE NOT EXISTS (
SELECT 1 FROM user_purchases pu
WHERE pu.user_id = up.user_id AND pu.item_id = up.item_id
)
AND up.rank <= 10 -- 只取Top10
)
-- 插入到推荐结果表
INSERT INTO final_recommendations (user_id, item_id, score, rank)
SELECT * FROM filtered_recs
ON CONFLICT (user_id, item_id) DO UPDATE
SET score = EXCLUDED.score, rank = EXCLUDED.rank;
-- 性能:10万用户生成推荐列表仅需4.1秒算法工程师常写的SQL在测试环境飞快,但线上延迟飙升10倍:
-- 测试环境:1000条,10毫秒
SELECT user_id, AVG(score) FROM predictions GROUP BY user_id;
-- 线上环境:1000万条,18秒
-- 为什么?因为没有索引!I. 基础执行计划分析
-- I. 查看简单查询计划
EXPLAIN (ANALYZE, BUFFERS, VERBOSE, FORMAT JSON)
SELECT
device_id,
AVG(temperature) as avg_temp,
STDDEV(temperature) as std_temp
FROM sensor_data
WHERE ts BETWEEN '2024-01-01' AND '2024-01-02'
GROUP BY device_id;
-- 输出解读(JSON格式):
-- {
-- "Plan": {
-- "Node Type": "GroupAggregate",
-- "Actual Rows": 10000,
-- "Actual Total Time": 1234.567,
-- "Plans": [{
-- "Node Type": "Index Only Scan",
-- "Index Name": "idx_sensor_data_ts_device",
-- "Rows Removed by Index Recheck": 0,
-- "Heap Fetches": 0,
-- "I/O Timings": {"Read": 45.2, "Write": 0}
-- }]
-- },
-- "Planning Time": 0.892,
-- "Triggers": []
-- }
-- II. 关键指标解读表执行计划指标表:
指标名称 | 来源 | 健康值 | 危险值 | 优化方向 |
|---|---|---|---|---|
| 执行耗时 | <100ms |
| 索引/并行 |
| 过滤行数 | <10%扫描 |
| 索引下推 |
| 回表次数 | 0 |
| 覆盖索引 |
| 读延迟 | <10ms |
| 存储/缓存 |
| 并行worker | ≥4 | 0 | 并行调优 |
| 缓存命中率 |
| <90% | shared_buffers |
III. 自动识别慢查询
-- 创建慢查询监控视图
CREATE VIEW slow_queries AS
SELECT
query,
calls,
mean_exec_time,
max_exec_time,
rows
FROM pg_stat_statements
WHERE mean_exec_time > 1000 -- 平均超过1秒
AND query LIKE '%sensor_data%' -- 关注核心表
ORDER BY mean_exec_time DESC
LIMIT 10;
-- 自动告警
DO $$
BEGIN
IF (SELECT COUNT(*) FROM slow_queries) > 5 THEN
RAISE WARNING 'Detected % slow queries', (SELECT COUNT(*) FROM slow_queries);
END IF;
END $$;问题SQL:
-- 原始:特征工程查询,耗时47秒
SELECT
user_id,
AVG(session_duration) as avg_session,
(SELECT COUNT(*) FROM orders o WHERE o.user_id = u.user_id) as order_cnt
FROM user_sessions u
WHERE event_date BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY user_id;调优过程:
I. 执行计划诊断
EXPLAIN (ANALYZE, BUFFERS)
-- 发现问题:Nested Loop SubPlan,每次聚合都执行子查询
-- Actual Rows: 100万行,Total Time: 47秒II. 重写为JOIN
-- 优化后:3.2秒
SELECT
u.user_id,
AVG(u.session_duration) as avg_session,
COUNT(o.order_id) as order_cnt
FROM user_sessions u
LEFT JOIN orders o ON u.user_id = o.user_id
WHERE u.event_date BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY u.user_id;
-- 执行计划:Hash Join,批量处理III. 添加覆盖索引
-- 进一步优化:12毫秒
CREATE INDEX idx_user_sessions_lookup ON user_sessions
(user_id, event_date) INCLUDE (session_duration);
-- 使用Index Only Scan,避免回表调优效果表:
版本 | 执行时间 | 执行计划 | 索引使用 | 加速比 | 代码改动 |
|---|---|---|---|---|---|
原始 | 47秒 | Nested Loop | 无 | 1x | - |
JOIN重写 | 3.2秒 | Hash Join | 部分 | 14.7x | 中等 |
覆盖索引 | 12毫秒 | Index Only Scan | 完全 | 3916x | 小 |
并行查询 | 8毫秒 | Parallel Scan | 完全 | 5875x | 小 |
#!/usr/bin/env python3
# sql_optimizer.py - 自动分析并提供建议
import psycopg2
import json
def analyze_query(conn, sql):
cur = conn.cursor()
# 获取JSON格式执行计划
cur.execute(f"EXPLAIN (ANALYZE, BUFFERS, VERBOSE, FORMAT JSON) {sql}")
plan = cur.fetchone()[0]
analysis = {
'slowest_node': None,
'costliest_node': None,
'missing_index': False,
'seq_scan_warning': False,
'recommendations': []
}
def walk_plan(node, depth=0):
node_time = node.get('Actual Total Time', 0)
node_cost = node.get('Total Cost', 0)
# 找最慢节点
if analysis['slowest_node'] is None or node_time > analysis['slowest_node'][1]:
analysis['slowest_node'] = (node['Node Type'], node_time, depth)
# 检查Seq Scan
if node['Node Type'] == 'Seq Scan' and node.get('Rows Removed by Filter', 0) > 10000:
analysis['seq_scan_warning'] = True
analysis['recommendations'].append(
f"在表 {node.get('Relation Name', '')} 上创建索引"
)
# 递归子节点
for key in ['Plans', 'Plan']:
if key in node and node[key]:
walk_plan(node[key], depth+1)
walk_plan(plan[0]['Plan'])
# 生成报告
print("=== SQL优化分析报告 ===")
print(f"最慢节点: {analysis['slowest_node'][0]} ({analysis['slowest_node'][1]}ms)")
if analysis['seq_scan_warning']:
print("⚠️ 发现全表扫描,建议添加索引")
for rec in analysis['recommendations']:
print(f" - {rec}")
return analysis
if __name__ == '__main__':
conn = psycopg2.connect("host=localhost dbname=algorithm_db")
sql = "SELECT user_id, AVG(score) FROM predictions GROUP BY user_id"
analyze_query(conn, sql)
conn.close()
算法表通常有海量数据,但查询只关注特定子集:
-- 预测结果表:10亿行
CREATE TABLE predictions (
prediction_id BIGINT,
model_version TEXT,
user_id INT,
score DOUBLE PRECISION,
created_at TIMESTAMPTZ
);
-- 查询:只查最近1天+某个模型版本
SELECT * FROM predictions
WHERE model_version = 'v3.2.1'
AND created_at > NOW() - INTERVAL '1 day';
-- 全表索引浪费空间:10亿行索引 = 200GB
CREATE INDEX idx_pred_full ON predictions (model_version, created_at);I. 条件索引(只索引热点数据)
-- 只索引最近7天的数据(自动清理旧索引)
CREATE INDEX idx_predictions_recent
ON predictions (model_version, created_at)
WHERE created_at > NOW() - INTERVAL '7 days';
-- 索引大小:从200GB降至15GB(93%节省)
-- 查询自动匹配(需保证条件一致)
SELECT * FROM predictions
WHERE model_version = 'v3.2.1'
AND created_at > NOW() - INTERVAL '1 day'; -- 使用idx_predictions_recentII. INCLUDE覆盖索引(避免回表)
-- 查询需要score字段,但不在WHERE中
SELECT user_id, score FROM predictions
WHERE model_version = 'v3.2.1'
AND created_at > NOW() - INTERVAL '1 day';
-- 传统索引:需要回表查score(Heap Fetch)
CREATE INDEX idx_pred_lookup ON predictions (model_version, created_at);
-- 覆盖索引:INCLUDE score(Index Only Scan)
CREATE INDEX idx_pred_covering
ON predictions (model_version, created_at)
INCLUDE (score)
WHERE created_at > NOW() - INTERVAL '7 days';
-- 性能:从45毫秒降至8毫秒III. 表达式索引(预计算)
-- 查询经常按score区间过滤
SELECT * FROM predictions
WHERE model_version = 'v3.2.1'
AND ROUND(score, 2) > 0.85; -- 使用函数
-- 在表达式上创建索引
CREATE INDEX idx_pred_score_rounded
ON predictions (model_version, ROUND(score, 2))
WHERE created_at > NOW() - INTERVAL '7 days';
-- 确保查询条件一致才能使用
SELECT * FROM predictions
WHERE model_version = 'v3.2.1'
AND ROUND(score, 2) > 0.85; -- 使用idx_pred_score_rounded索引策略对比表:
索引类型 | 索引大小 | 查询耗时 | 更新成本 | 适用场景 | 推荐度 |
|---|---|---|---|---|---|
全表B-Tree | 200GB | 45ms | 高 | 全量查询 | ⭐ |
Partial + WHERE | 15GB | 12ms | 中 | 热点数据 | ⭐⭐⭐⭐⭐ |
Covering + INCLUDE | 18GB | 8ms | 中 | 投影查询 | ⭐⭐⭐⭐⭐ |
表达式索引 | 16GB | 10ms | 高 | 函数过滤 | ⭐⭐⭐ |
-- 场景:模型监控系统,高频查询最近1小时错误样本
CREATE TABLE model_errors (
error_id BIGSERIAL,
model_version TEXT,
prediction_id BIGINT,
actual_label INT,
predicted_label INT,
error_type TEXT, -- 'fp', 'fn', 'high_conf_error'
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 优化前:查询最近1小时的FP错误需要扫描全表
-- 耗时:3.2秒
-- 优化I:Partial + Covering索引
CREATE INDEX idx_model_errors_monitoring
ON model_errors (model_version, error_type, created_at DESC)
INCLUDE (prediction_id, actual_label, predicted_label)
WHERE created_at > NOW() - INTERVAL '24 hours'; -- 只保留24小时热点
-- 优化II:自动维护Partial索引
-- 创建函数:定期删除旧索引分区
CREATE OR REPLACE FUNCTION maintain_partial_indexes()
RETURNS void AS $$
DECLARE
old_date TIMESTAMP;
BEGIN
old_date := NOW() - INTERVAL '7 days';
-- 删除旧分区上的索引(TimescaleDB自动管理)
EXECUTE format('DROP INDEX IF EXISTS idx_model_errors_%s',
to_char(old_date, 'YYYY_MM_DD'));
END;
$$ LANGUAGE plpgsql;
-- 定时任务
SELECT cron.schedule('maintain-partial-indexes', '0 2 * * *',
'SELECT maintain_partial_indexes()');优化效果:

算法团队每天凌晨都要运行相同的特征脚本:
# 凌晨2点定时任务:计算昨日特征
def daily_feature_engineering():
# 从时序数据库拉取昨日数据
df = pd.read_sql("SELECT * FROM sensor_data WHERE ts >= CURRENT_DATE - 1", conn)
# 计算各种窗口特征(重复每日)
df['temp_mean_1h'] = df.groupby('device_id')['temperature'].rolling(6).mean()
df['temp_std_6h'] = df.groupby('device_id')['temperature'].rolling(36).std()
# 写入特征表
df.to_sql('daily_features', conn, if_exists='append')
# 问题:耗时2小时,资源浪费,延迟高I. 创建Hypertable(时序表)
-- 步骤1:创建普通表
CREATE TABLE sensor_data (
ts TIMESTAMPTZ NOT NULL,
device_id INT NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
voltage DOUBLE PRECISION
);
-- 步骤2:转换为超表(自动分区)
SELECT create_hypertable('sensor_data', 'ts',
chunk_time_interval => INTERVAL '1 day');
-- 步骤3:添加压缩(90天后)
ALTER TABLE sensor_data SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id'
);
SELECT add_compression_policy('sensor_data',
compress_after => '90 days'::interval);II. 创建多级连续聚合
-- 一级聚合:1分钟级(实时看板)
CREATE MATERIALIZED VIEW sensor_minute
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '1 minute', ts) as bucket,
device_id,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp,
STDDEV(temperature) as std_temp
FROM sensor_data
GROUP BY bucket, device_id
WITH NO DATA; -- 不立即填充历史数据
-- 自动刷新策略(从物化视图中定义)
SELECT add_continuous_aggregate_policy('sensor_minute',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- 二级聚合:1小时级(历史分析)
CREATE MATERIALIZED VIEW sensor_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '1 hour', bucket) as hour_bucket,
device_id,
AVG(avg_temp) as avg_temp_hourly,
MAX(max_temp) as max_temp_hourly,
SUM(CASE WHEN std_temp > 3 THEN 1 ELSE 0 END)::float / COUNT(*) as anomaly_rate
FROM sensor_minute
GROUP BY hour_bucket, device_id
WITH NO DATA;
SELECT add_continuous_aggregate_policy('sensor_hourly',
start_offset => INTERVAL '7 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
-- 三级聚合:1天级(特征工程)
CREATE MATERIALIZED VIEW sensor_daily_features
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '1 day', hour_bucket) as day,
device_id,
-- 时序特征:移动平均
AVG(avg_temp_hourly) as temp_mean,
STDDEV(avg_temp_hourly) as temp_std,
-- 趋势特征:一阶差分
AVG(avg_temp_hourly) - LAG(AVG(avg_temp_hourly)) OVER w as temp_daily_change,
-- 统计特征:分位数
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY max_temp_hourly) as temp_p95,
-- 异常特征
AVG(anomaly_rate) as daily_anomaly_rate
FROM sensor_hourly
GROUP BY day, device_id
WINDOW w AS (PARTITION BY device_id ORDER BY time_bucket(INTERVAL '1 day', hour_bucket))
WITH NO DATA;
SELECT add_continuous_aggregate_policy('sensor_daily_features',
start_offset => INTERVAL '30 days',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day',
initial_start => '02:00:00'); -- 凌晨2点执行III. 查询自动路由(透明加速)
-- 查询最近1小时(自动从sensor_minute读取)
SELECT * FROM sensor_data
WHERE ts > NOW() - INTERVAL '1 hour'; -- 原始表,但有优化
-- 查询最近7天(自动合并sensor_hourly)
SELECT device_id, AVG(temperature)
FROM sensor_data
WHERE ts > NOW() - INTERVAL '7 days'
GROUP BY device_id;
-- TimescaleDB优化器自动选择最高效聚合级别连续聚合性能表:
聚合层级 | 计算方式 | 查询耗时 | 存储开销 | 刷新延迟 |
|---|---|---|---|---|
原始数据 | 全表扫描 | 12秒 | 100% | 0 |
1分钟级 | 自动刷新 | 23毫秒 | +12% | 1分钟 |
1小时级 | 自动刷新 | 8毫秒 | +5% | 1小时 |
1天级 | 自动刷新 | 3毫秒 | +2% | 1天 |
-- 创建设备健康度特征表(自动更新)
CREATE MATERIALIZED VIEW device_health_features
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '1 hour', ts) as hour,
device_id,
-- 温度特征
AVG(temperature) as temp_mean,
STDDEV(temperature) as temp_std,
MAX(temperature) as temp_max,
-- 电压稳定性
(MAX(voltage) - MIN(voltage)) / NULLIF(AVG(voltage), 0) as voltage_cv,
-- 复合特征:温度-湿度相关性
corr(temperature, humidity) as temp_humidity_corr,
-- 异常检测特征
SUM(CASE WHEN temperature > ANY(
SELECT percentile_cont(0.95) WITHIN GROUP (ORDER BY temperature)
FROM sensor_data
WHERE device_id = sd.device_id
AND ts > NOW() - INTERVAL '7 days'
) THEN 1 ELSE 0 END)::float / COUNT(*) as temp_outlier_rate
FROM sensor_data sd
GROUP BY hour, device_id
WITH NO DATA;
-- 自动刷新
SELECT add_continuous_aggregate_policy('device_health_features',
start_offset => INTERVAL '24 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
-- 查询实时健康度
SELECT
device_id,
hour,
temp_outlier_rate,
voltage_cv,
CASE
WHEN temp_outlier_rate > 0.1 OR voltage_cv > 0.15 THEN 'WARNING'
WHEN temp_outlier_rate > 0.2 OR voltage_cv > 0.2 THEN 'CRITICAL'
ELSE 'NORMAL'
END as health_status
FROM device_health_features
WHERE hour > NOW() - INTERVAL '2 hours';
-- 触发自动工单
INSERT INTO maintenance_tickets (device_id, issue_type, priority)
SELECT
device_id,
'temp_outlier' as issue_type,
1 as priority
FROM device_health_features
WHERE hour > NOW() - INTERVAL '1 hour'
AND temp_outlier_rate > 0.2;
# 1. PostgreSQL 15+ 安装
sudo apt install postgresql-15 postgresql-contrib-15
# 2. 关键扩展
psql -c "CREATE EXTENSION pg_stat_statements;"
psql -c "CREATE EXTENSION plpython3u;"
# 3. TimescaleDB(时序场景)
wget https://packagecloud.io/timescale/timescaledb/gpgkey
apt install timescaledb-2-postgresql-15
psql -c "CREATE EXTENSION timescaledb;"
# 4. 配置优化
sudo -u postgres psql -c "ALTER SYSTEM SET shared_buffers = '4GB';"
sudo -u postgres psql -c "ALTER SYSTEM SET work_mem = '256MB';"
sudo -u postgres psql -c "ALTER SYSTEM SET max_parallel_workers_per_gather = 4;"
sudo -u postgres psql -c "SELECT pg_reload_conf();"-- 创建算法工具模式
CREATE SCHEMA algo_utils;
-- 常用聚合函数
CREATE AGGREGATE algo_utils.auc(INT, DOUBLE PRECISION) (...);
CREATE AGGREGATE algo_utils.ndcg_k(INT, DOUBLE PRECISION, INT) (...);
-- 相似度函数
CREATE FUNCTION algo_utils.cosine_similarity(...) (...);
-- 窗口特征函数
CREATE FUNCTION algo_utils.rolling_mean(...) (...) WINDOW;
-- 权限授予
GRANT USAGE ON SCHEMA algo_utils TO analyst_role;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA algo_utils TO analyst_role;原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。