
普通视图:每次查询都执行SQL,结果不存储
-- 虚拟视图:每次访问都重新计算
CREATE VIEW user_features_realtime AS
SELECT
user_id,
AVG(session_duration) as avg_session,
COUNT(DISTINCT login_date) as active_days
FROM user_behavior
WHERE event_date >= CURRENT_DATE - 30
GROUP BY user_id;
-- 问题:每次SELECT都扫描user_behavior表,重复计算
SELECT * FROM user_features_realtime WHERE user_id = 12345; -- 耗时2.1秒
SELECT * FROM user_features_realtime WHERE user_id = 67890; -- 再次耗时2.1秒物化视图:查询结果物理存储为表
-- 物化视图:数据物理存储,查询时直接读取
CREATE MATERIALIZED VIEW user_features_cached AS
SELECT
user_id,
AVG(session_duration) as avg_session,
COUNT(DISTINCT login_date) as active_days
FROM user_behavior
WHERE event_date >= CURRENT_DATE - 30
GROUP BY user_id;
-- 首次创建需全量计算(耗时但仅一次)
-- 后续查询从物化表读取
SELECT * FROM user_features_cached WHERE user_id = 12345; -- 耗时12毫秒
-- 物化视图自动创建物理表
\d user_features_cached
-- 输出:物化表,有数据文件存储性能对比表:
指标 | 普通视图 | 物化视图 | 差异 | 适用场景 |
|---|---|---|---|---|
首次查询 | 2.1秒 | 2.1秒(创建时) | 0% | 都需计算 |
重复查询 | 2.1秒 | 12毫秒 | 17500% | 物化绝对优势 |
存储成本 | 0 | 8MB(100万用户) | 新增 | 可接受 |
更新延迟 | 实时 | 依赖刷新策略 | 延迟 | 需权衡 |
CPU消耗 | 100% | 5% | 95%↓ | 物化绝对优势 |
PostgreSQL物化视图本质上是一个带元数据的普通表:
# 查看物化视图物理存储
psql -c "
SELECT
schemaname,
matviewname,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||matviewname)) as size,
relfilenode
FROM pg_matviews mv
JOIN pg_class c ON mv.matviewname = c.relname;
"
# 输出:
# schemaname | matviewname | size | relfilenode
# -----------+--------------------+---------+-------------
# public | user_features_cached | 8192 kB | 12345
# 查看数据文件
ls -lh /data/pgsql/data/base/16384/12345
# -rw------- 1 postgres postgres 8.0M Jan 15 10:23 12345与普通表的区别:
REFRESH命令刷新-- 查看物化视图依赖链
SELECT
c.relname as matview_name,
pg_get_viewdef(c.oid) as definition,
array_agg(distinct p.relname) as depends_on_tables
FROM pg_class c
JOIN pg_depend d ON c.oid = d.objid
JOIN pg_class p ON d.refobjid = p.oid
WHERE c.relkind = 'm'
GROUP BY c.oid, c.relname;
-- 输出:
# matview_name | definition | depends_on_tables
# ---------------------+-------------------------------------+-------------------
# user_features_cached | SELECT user_id,avg(...) FROM ... | {user_behavior}依赖变更处理:
-- 如果基表user_behavior结构变更(如删除列)
ALTER TABLE user_behavior DROP COLUMN session_duration;
-- 物化视图会失效:
SELECT * FROM user_features_cached;
-- ERROR: materialized view "user_features_cached" is not synchronized with master table
-- 需要重建
DROP MATERIALIZED VIEW user_features_cached;
CREATE MATERIALIZED VIEW user_features_cached AS ...;
-- 基础语法
CREATE MATERIALIZED VIEW view_name AS
SELECT ... FROM ...;
-- 带索引的物化视图(重要:物化表可建索引)
CREATE MATERIALIZED VIEW user_features_cached AS
SELECT
user_id,
AVG(session_duration) as avg_session,
COUNT(DISTINCT login_date) as active_days
FROM user_behavior
WHERE event_date >= CURRENT_DATE - 30
GROUP BY user_id;
-- 添加唯一索引(支持REFRESH CONCURRENTLY)
CREATE UNIQUE INDEX ON user_features_cached (user_id);
-- 添加普通索引加速查询
CREATE INDEX idx_avg_session ON user_features_cached (avg_session DESC);场景:为用户推荐模型准备实时特征
I. 创建基表
-- 用户行为日志表(时序)
CREATE TABLE user_behavior (
event_id BIGSERIAL,
user_id BIGINT NOT NULL,
event_type TEXT, -- 'login', 'purchase', 'click'
event_value DOUBLE PRECISION,
event_timestamp TIMESTAMPTZ DEFAULT NOW(),
session_id UUID
) PARTITION BY RANGE (event_timestamp);
-- 创建分区(自动)
CREATE TABLE user_behavior_20240101 PARTITION OF user_behavior
FOR VALUES FROM ('2024-01-01') TO ('2024-01-02');
-- ...创建更多分区
-- 添加索引
CREATE INDEX idx_behavior_user_time ON user_behavior (user_id, event_timestamp DESC);
CREATE INDEX idx_behavior_type ON user_behavior (event_type, event_timestamp);II. 创建物化视图(特征工程)
-- 推荐特征物化视图
CREATE MATERIALIZED VIEW user_recommend_features AS
WITH user_30d_stats AS (
SELECT
user_id,
-- 活跃度特征
COUNT(*) FILTER (WHERE event_type = 'login') as login_cnt_30d,
-- 购买力特征
SUM(event_value) FILTER (WHERE event_type = 'purchase') as purchase_amount_30d,
AVG(event_value) FILTER (WHERE event_type = 'purchase') as avg_purchase_30d,
-- 行为序列特征(滞后)
LAG(event_value, 1) OVER user_window as last_event_value,
-- 时间衰减特征
SUM(event_value * EXP(-EXTRACT(EPOCH FROM (NOW() - event_timestamp)) / 86400))
as decayed_score_30d
FROM user_behavior
WHERE event_timestamp >= NOW() - INTERVAL '30 days'
GROUP BY user_id
WINDOW user_window AS (PARTITION BY user_id ORDER BY event_timestamp)
),
user_category_pref AS (
-- 类别偏好特征(JSONB聚合)
SELECT
user_id,
jsonb_object_agg(category, cnt) as category_pref
FROM (
SELECT user_id, category, COUNT(*) as cnt
FROM user_behavior
WHERE event_type = 'click'
GROUP BY user_id, category
) sub
GROUP BY user_id
)
SELECT
s.user_id,
s.login_cnt_30d,
s.purchase_amount_30d,
s.avg_purchase_30d,
s.last_event_value,
s.decayed_score_30d,
c.category_pref,
-- 衍生特征
CASE
WHEN s.purchase_amount_30d > 1000 THEN 'high_value'
WHEN s.purchase_amount_30d > 100 THEN 'medium_value'
ELSE 'low_value'
END as user_segment
FROM user_30d_stats s
LEFT JOIN user_category_pref c ON s.user_id = c.user_id;
-- 创建必要索引
CREATE UNIQUE INDEX ON user_recommend_features (user_id);
CREATE INDEX idx_segment ON user_recommend_features (user_segment);III. 填充数据(首次)
-- 首次创建需全量刷新(耗时但必需)
REFRESH MATERIALIZED VIEW user_recommend_features;
-- 查看进度(大表)
SELECT * FROM pg_stat_activity
WHERE query LIKE '%REFRESH MATERIALIZED VIEW%';
-- 或查看系统负载I. 大表物化策略
-- 问题:全量刷新10亿行数据,锁表10分钟
REFRESH MATERIALIZED VIEW user_recommend_features; -- 锁表,阻塞查询
-- 解决方案1:CONCURRENTLY(无锁,但需唯一索引)
REFRESH MATERIALIZED VIEW CONCURRENTLY user_recommend_features;
-- 限制:需要unique index on user_id
-- 优势:允许查询旧数据,刷新不阻塞
-- 解决方案2:增量分区刷新(TimescaleDB)
-- 只刷新最近7天的分区
REFRESH MATERIALIZED VIEW user_recommend_features
WHERE user_id IN (
SELECT DISTINCT user_id FROM user_behavior
WHERE event_timestamp >= NOW() - INTERVAL '7 days'
);II. 资源控制
-- 在业务低峰期刷新
ALTER MATERIALIZED VIEW user_recommend_features
SET (timescaledb.materialized_only = true);
-- 设置刷新超时
SET statement_timeout = '10min';
REFRESH MATERIALIZED VIEW user_recommend_features;
-- 使用资源组限制(需cgroup)
CREATE RESOURCE GROUP refresh_group WITH (CPU_RATE_LIMIT=200);
SET cgroup = 'refresh_group';
REFRESH MATERIALIZED VIEW user_recommend_features;III. 版本管理
-- 为不同模型版本创建独立物化视图
CREATE MATERIALIZED VIEW user_features_v1 AS ...;
CREATE MATERIALIZED VIEW user_features_v2 AS ...;
-- 灰度切换
ALTER MATERIALIZED VIEW user_features_v1 RENAME TO user_features_old;
ALTER MATERIALIZED VIEW user_features_v2 RENAME TO user_features_current;
-- 保留旧版本用于回溯步骤 | 命令/操作 | 检查项 | 耗时预估 | 风险等级 |
|---|---|---|---|---|
I. 基表设计 | 创建分区表+索引 | 分区策略是否合理 | 30分钟 | 低 |
II. 物化SQL编写 | 验证SELECT性能 | 单次查询<30秒 | 2小时 | 中 |
III. 首次刷新 | REFRESH | 磁盘空间是否充足 | 1-10小时 | 高 |
IV. 索引创建 | CREATE INDEX | 是否创建唯一索引 | 30分钟 | 中 |
V. 并发测试 | CONCURRENTLY | 查询是否阻塞 | 10分钟 | 低 |
VI. 调度配置 | cron/pgAgent | 刷新周期合理 | 20分钟 | 中 |
VII. 监控告警 | 创建视图 | 失败告警 | 30分钟 | 低 |
创建时间预估:1000万行为例

-- 问题:每天全量刷新800万用户特征
REFRESH MATERIALIZED VIEW user_recommend_features;
-- 痛点:
-- 1. 锁表:阻塞查询30分钟
-- 2. 资源浪费:95%数据未变化
-- 3. 延迟:特征更新滞后
-- 4. 存储:temp文件占用50GBI. 基于时间戳的增量
-- 方案:只刷新最近7天有行为的用户
CREATE OR REPLACE FUNCTION refresh_user_features_incremental()
RETURNS void AS $$
DECLARE
affected_users BIGINT[];
BEGIN
-- Step 1: 找出最近7天有行为的用户
SELECT array_agg(DISTINCT user_id) INTO affected_users
FROM user_behavior
WHERE event_timestamp >= NOW() - INTERVAL '7 days';
-- Step 2: 删除物化视图中这些用户的旧数据
DELETE FROM user_recommend_features
WHERE user_id = ANY(affected_users);
-- Step 3: 插入新计算的数据
INSERT INTO user_recommend_features
SELECT * FROM calculate_user_features(affected_users);
END;
$$ LANGUAGE plpgsql;
-- 创建辅助函数
CREATE OR REPLACE FUNCTION calculate_user_features(user_ids BIGINT[])
RETURNS TABLE (...) AS $$
SELECT
user_id,
AVG(session_duration) as avg_session,
...
FROM user_behavior
WHERE user_id = ANY(user_ids)
AND event_timestamp >= NOW() - INTERVAL '30 days'
GROUP BY user_id;
$$ LANGUAGE SQL;II. 基于触发器的实时更新
-- 创建触发器函数
CREATE OR REPLACE FUNCTION update_user_features()
RETURNS TRIGGER AS $$
DECLARE
user_id_val BIGINT;
BEGIN
user_id_val := NEW.user_id;
-- 删除旧记录
DELETE FROM user_recommend_features WHERE user_id = user_id_val;
-- 插入新计算
INSERT INTO user_recommend_features
SELECT * FROM calculate_user_features(ARRAY[user_id_val]);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- 在行为表上创建触发器
CREATE TRIGGER trg_user_behavior
AFTER INSERT OR UPDATE ON user_behavior
FOR EACH ROW
EXECUTE FUNCTION update_user_features();
-- 问题:高频写入时,触发器成为性能瓶颈
-- 优化:改为AFTER INSERT OR UPDATE FOR EACH STATEMENTIII. 基于TimescaleDB的chunk级刷新
-- TimescaleDB自动管理分区
-- 只刷新最近2个chunk(最近2天)
CREATE OR REPLACE FUNCTION refresh_recent_chunks()
RETURNS void AS $$
DECLARE
chunk_name TEXT;
BEGIN
FOR chunk_name IN
SELECT format('_timescaledb_internal.%I', chunk_schema||'.'||chunk_name)
FROM timescaledb_information.chunks
WHERE hypertable_name = 'user_behavior'
AND range_end > NOW() - INTERVAL '2 days'
LOOP
-- 刷新该chunk对应的用户
EXECUTE format('REFRESH MATERIALIZED VIEW user_recommend_features
WHERE user_id IN (
SELECT DISTINCT user_id FROM %I
)', chunk_name);
END LOOP;
END;
$$ LANGUAGE plpgsql;策略 | 实现复杂度 | 刷新延迟 | 资源消耗 | 适用场景 | 推荐度 |
|---|---|---|---|---|---|
全量刷新 | 极低 | 高(30min) | 极高 | 小表(<100万) | ⭐ |
时间戳增量 | 中 | 中(5min) | 低 | 时间明确 | ⭐⭐⭐ |
触发器实时 | 高 | 极低(实时) | 高(高频写入) | 低频更新 | ⭐⭐ |
Chunk级刷新 | 中 | 低(1min) | 极低 | 时序表 | ⭐⭐⭐⭐⭐ |
CONCURRENTLY | 低 | 中(10min) | 中 | 通用 | ⭐⭐⭐⭐⭐ |
-- 场景:用户点击行为实时更新推荐特征
-- Step 1: 创建行为缓冲表(最近1分钟)
CREATE TABLE user_behavior_buffer (
LIKE user_behavior
) WITH (autovacuum_enabled = false); -- 关闭自动清理,手动管理
-- Step 2: 每分钟合并缓冲到主表
CREATE OR REPLACE FUNCTION merge_buffer()
RETURNS void AS $$
BEGIN
-- 插入主表
INSERT INTO user_behavior
SELECT * FROM user_behavior_buffer;
-- 清空缓冲表(TRUNCATE快速)
TRUNCATE user_behavior_buffer;
-- 刷新特征(增量)
PERFORM refresh_user_features_incremental();
END;
$$ LANGUAGE plpgsql;
-- Step 3: 定时调度(每分钟)
SELECT cron.schedule('merge-buffer', '* * * * *', 'SELECT merge_buffer()');
-- Step 4: 应用端双写
# Python应用代码
def log_user_behavior(user_id, event_type, event_value):
# 写缓冲表(快速)
cursor.execute("INSERT INTO user_behavior_buffer VALUES (%s, %s, %s, NOW())",
(user_id, event_type, event_value))
# 同时写主表(异步,可延迟)
background_queue.enqueue(insert_to_main_table, user_id, event_type, event_value)
-- 效果:特征更新延迟从30分钟降至1分钟
-- 资源消耗:下降85%(只计算增量)
I. 物化视图上创建索引
-- 唯一索引(支持CONCURRENTLY刷新)
CREATE UNIQUE INDEX ON user_recommend_features (user_id);
-- 普通索引加速查询
CREATE INDEX idx_avg_session ON user_recommend_features (avg_session DESC);
CREATE INDEX idx_user_segment ON user_recommend_features (user_segment, avg_session DESC);
-- 部分索引(热点数据)
CREATE INDEX idx_high_value ON user_recommend_features (user_id)
WHERE user_segment = 'high_value';
-- 表达式索引(加速计算)
CREATE INDEX idx_decayed_score ON user_recommend_features (decayed_score_30d);II. 索引对刷新性能的影响
-- 测试:刷新100万用户特征
-- 场景1: 无索引
REFRESH MATERIALIZED VIEW user_recommend_features; -- 180秒
-- 场景2: 只有唯一索引
CREATE UNIQUE INDEX ON user_recommend_features (user_id);
REFRESH MATERIALIZED VIEW user_recommend_features; -- 195秒(+8%)
-- 场景3: 多个索引
CREATE INDEX idx_avg_session ON user_recommend_features (avg_session);
CREATE INDEX idx_segment ON user_recommend_features (user_segment);
REFRESH MATERIALIZED VIEW user_recommend_features; -- 240秒(+33%)
-- 结论:索引会增加刷新时间,但大幅提升查询性能
-- 权衡:在刷新和查询之间平衡索引策略对比表:
索引类型 | 刷新时间增加 | 查询加速 | 适用场景 | 建议 |
|---|---|---|---|---|
无索引 | 0% | 1x(Seq Scan) | 数据仓库 | ❌ |
唯一索引 | 8% | 100x(Index Scan) | 支持增量刷新 | ⭐⭐⭐⭐⭐ |
普通索引 | 15% | 50x | 高频过滤 | ⭐⭐⭐⭐ |
部分索引 | 5% | 80x | 热点数据 | ⭐⭐⭐ |
表达式索引 | 20% | 30x | 计算列 | ⭐⭐ |
场景:物化视图本身也达到千万级
-- 创建分区物化视图(PostgreSQL 14+)
CREATE MATERIALIZED VIEW user_features_partitioned
PARTITION BY RANGE (user_segment) AS
SELECT
user_id,
user_segment,
avg_session,
purchase_amount_30d
FROM user_recommend_features;
-- 创建分区
CREATE MATERIALIZED VIEW user_features_high_value
PARTITION OF user_features_partitioned
FOR VALUES IN ('high_value');
CREATE MATERIALIZED VIEW user_features_medium_value
PARTITION OF user_features_partitioned
FOR VALUES IN ('medium_value');
CREATE MATERIALIZED VIEW user_features_low_value
PARTITION OF user_features_partitioned
FOR VALUES IN ('low_value');
-- 优势:
-- 1. 刷新单个分区(如只刷新高价值用户)
REFRESH MATERIALIZED VIEW user_features_high_value;
-- 2. 查询分区裁剪
SELECT * FROM user_features_partitioned
WHERE user_segment = 'high_value'; -- 只扫描对应分区
-- 性能:刷新时间从180秒降至45秒(高价值用户仅200万)-- 利用并行查询加速刷新
SET max_parallel_workers_per_gather = 8;
SET parallel_setup_cost = 100;
SET parallel_tuple_cost = 0.01;
-- 刷新时自动并行
REFRESH MATERIALIZED VIEW user_recommend_features;
-- 执行计划:
-- -> Finalize GroupAggregate (cost=12345.67..56789.01)
-- -> Gather (cost=12345.67..45678.90)
-- Workers Planned: 8
-- -> Partial HashAggregate
-- -> Parallel Index Only Scan
-- 性能:180秒 → 45秒(4x加速)并行参数调优表:
参数 | 默认值 | 刷新推荐值 | 作用 | 风险 |
|---|---|---|---|---|
| 2 | 8 | 并行Worker数 | 高CPU占用 |
| 1000 | 100 | 降低并行门槛 | 小查询也并行 |
| 0.1 | 0.01 | 鼓励并行传输 | 无 |
| 4MB | 256MB | 哈希聚合内存 | OOM风险 |
-- 为物化视图设置专用表空间(SSD)
CREATE TABLESPACE fast_ssd LOCATION '/data/ssd';
-- 移动物化视图到高速存储
ALTER MATERIALIZED VIEW user_recommend_features SET TABLESPACE fast_ssd;
-- 调整填充因子(预留更新空间)
ALTER MATERIALIZED VIEW user_recommend_features SET (fillfactor = 85);
-- 禁用自动vacuum(手动管理)
ALTER MATERIALIZED VIEW user_recommend_features SET (
autovacuum_enabled = false,
toast.autovacuum_enabled = false
);存储优化效果表:
优化项 | 刷新时间 | 查询时间 | 存储空间 | 适用场景 |
|---|---|---|---|---|
默认配置 | 180秒 | 12ms | 8MB | 通用 |
SSD表空间 | 165秒 | 8ms | 8MB | I/O瓶颈 |
fillfactor=85 | 170秒 | 12ms | 9MB | 频繁更新 |
关闭autovacuum | 175秒 | 12ms | 10MB | 手动维护 |
并行+SSD+索引 | 45秒 | 8ms | 9MB | 生产推荐 |

场景:为不同模型版本提供独立特征
-- 创建模型版本化的物化视图
CREATE MATERIALIZED VIEW user_features_v1 AS
SELECT * FROM user_recommend_features;
CREATE MATERIALIZED VIEW user_features_v2 AS
SELECT
user_id,
avg_session * 1.2 as avg_session_scaled, -- v2特征调整
purchase_amount_30d,
decayed_score_30d,
category_pref || '{"new_feature": 1}'::jsonb as category_pref_enhanced
FROM user_recommend_features;
-- 版本切换(蓝绿部署)
-- 正在服务的版本
ALTER MATERIALIZED VIEW user_features_v1 RENAME TO user_features_serving;
-- 新训练版本
REFRESH MATERIALIZED VIEW user_features_v2;
ALTER MATERIALIZED VIEW user_features_v2 RENAME TO user_features_serving_new;
-- 原子切换
BEGIN;
ALTER MATERIALIZED VIEW user_features_serving RENAME TO user_features_v1_old;
ALTER MATERIALIZED VIEW user_features_serving_new RENAME TO user_features_serving;
COMMIT;特征版本对比表:
版本 | 特征数 | 计算逻辑 | 物化耗时 | 查询耗时 | 适用模型 |
|---|---|---|---|---|---|
v1 | 153 | 基础统计 | 45秒 | 8ms | 老模型 |
v2 | 187 | 增加滞后特征 | 68秒 | 12ms | 新模型 |
v3(开发) | 201 | 滞后+embedding | 120秒 | 18ms | 实验模型 |
场景:API服务直接查询物化视图
# Flask/FastAPI推理服务
from flask import Flask, jsonify
import psycopg2
app = Flask(__name__)
@app.route('/predict/<int:user_id>')
def predict(user_id):
"""
推理API:从物化视图获取特征,调用模型
"""
# 1. 从物化视图获取特征(毫秒级)
conn = psycopg2.connect("host=pg-db dbname=ml_features")
cursor = conn.cursor()
cursor.execute("""
SELECT avg_session, purchase_amount_30d, category_pref, user_segment
FROM user_recommend_features
WHERE user_id = %s
""", (user_id,))
feature_row = cursor.fetchone()
if not feature_row:
return jsonify({'error': 'User not found'}), 404
# 2. 转换特征格式
features = {
'avg_session': feature_row[0],
'purchase_amount_30d': feature_row[1],
'category_pref': feature_row[2],
'user_segment': feature_row[3]
}
# 3. 调用模型(TensorFlow Serving/ONNX)
# model.predict([...])
cursor.close()
conn.close()
return jsonify({
'user_id': user_id,
'features': features,
'prediction': 0.85,
'latency_ms': 45 # 特征准备仅45ms(原4.2秒)
})
# 性能对比
# 原方案:查询基表 4.2秒
# 新方案:查询物化视图 45ms + 模型推理 15ms = 60ms-- 创建特征统计物化视图(监控数据分布)
CREATE MATERIALIZED VIEW feature_drift_monitor AS
SELECT
CURRENT_DATE as refresh_date,
-- 特征分布统计
AVG(avg_session) as avg_session_mean,
STDDEV(avg_session) as avg_session_std,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY avg_session) as avg_session_median,
-- 类别特征基数
jsonb_object_agg(user_segment, cnt) as segment_distribution
FROM (
SELECT
*,
COUNT(*) OVER (PARTITION BY user_segment) as cnt
FROM user_recommend_features
) sub;
-- 定时刷新
SELECT add_continuous_aggregate_policy('feature_drift_monitor',
start_offset => INTERVAL '7 days',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day');
-- 漂移告警
SELECT refresh_date, avg_session_mean
FROM feature_drift_monitor
WHERE avg_session_mean < (SELECT avg(avg_session_mean) FROM feature_drift_monitor) * 0.8;特征漂移监控表:
日期 | avg_session均值 | 标准差 | 分位数 | 用户分布 | 漂移状态 |
|---|---|---|---|---|---|
2024-01-15 | 125.3 | 89.2 | 98 | high:30% | 正常 |
2024-01-16 | 118.7 | 85.1 | 95 | high:28% | 正常 |
2024-01-17 | 89.4 | 112.3 | 72 | high:22% | ⚠️ 漂移 |

-- 创建监控视图
CREATE VIEW mv_health_monitor AS
SELECT
schemaname,
matviewname,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||matviewname)) as size,
-- 最后刷新时间
CASE
WHEN schemaname = 'public' THEN
(SELECT last_refresh FROM pg_matviews WHERE matviewname = mv.matviewname)
ELSE NULL
END as last_refresh,
-- 行数
(SELECT n_live_tup FROM pg_stat_user_tables
WHERE schemaname = mv.schemaname AND relname = mv.matviewname) as row_count,
-- 死元组率
ROUND(100.0 * (SELECT n_dead_tup FROM pg_stat_user_tables
WHERE schemaname = mv.schemaname AND relname = mv.matviewname)
/ NULLIF((SELECT n_live_tup FROM pg_stat_user_tables
WHERE schemaname = mv.schemaname AND relname = mv.matviewname), 0), 2) as dead_tuple_ratio
FROM pg_matviews mv;
-- 查询健康状态
SELECT * FROM mv_health_monitor
WHERE dead_tuple_ratio > 5; -- 死元组率超过5%需清理监控告警脚本:
#!/usr/bin/env python3
# mv_monitor.py
import psycopg2
import sys
def check_mv_health(conn):
cur = conn.cursor()
# 检查长时间未刷新
cur.execute("""
SELECT matviewname,
EXTRACT(EPOCH FROM (NOW() - last_refresh))/3600 as hours_since_refresh
FROM pg_matviews
WHERE last_refresh < NOW() - INTERVAL '2 hours'
""")
for name, hours in cur.fetchall():
print(f"⚠️ {name} 未刷新 {hours:.1f} 小时", file=sys.stderr)
sys.exit(1)
# 检查死元组率
cur.execute("""
SELECT matviewname, dead_tuple_ratio
FROM mv_health_monitor
WHERE dead_tuple_ratio > 5
""")
for name, ratio in cur.fetchall():
print(f"🔴 {name} 死元组率 {ratio}%", file=sys.stderr)
sys.exit(1)
print("✅ 物化视图健康状态正常")
if __name__ == '__main__':
conn = psycopg2.connect("host=localhost dbname=ml_features")
check_mv_health(conn)
conn.close()-- 创建刷新日志表
CREATE TABLE mv_refresh_log (
log_id SERIAL PRIMARY KEY,
matviewname TEXT,
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
status TEXT, -- 'success', 'failed', 'timeout'
error_message TEXT,
duration_ms INT
);
-- 包装REFRESH函数(带错误处理)
CREATE OR REPLACE FUNCTION safe_refresh_mv(view_name TEXT)
RETURNS void AS $$
DECLARE
start_time TIMESTAMPTZ;
end_time TIMESTAMPTZ;
error_msg TEXT;
BEGIN
start_time := clock_timestamp();
BEGIN
EXECUTE format('REFRESH MATERIALIZED VIEW CONCURRENTLY %I', view_name);
end_time := clock_timestamp();
INSERT INTO mv_refresh_log (matviewname, started_at, finished_at, status, duration_ms)
VALUES (view_name, start_time, end_time, 'success',
EXTRACT(EPOCH FROM (end_time - start_time))*1000::int);
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS error_msg = MESSAGE_TEXT;
INSERT INTO mv_refresh_log (matviewname, started_at, finished_at, status, error_message, duration_ms)
VALUES (view_name, start_time, clock_timestamp(), 'failed', error_msg, 0);
RAISE WARNING '刷新 % 失败: %', view_name, error_msg;
END;
END;
$$ LANGUAGE plpgsql;
-- 使用
SELECT safe_refresh_mv('user_recommend_features');失败自动重试:
# 在cron脚本中实现重试逻辑
import subprocess
import time
def refresh_with_retry(view_name, max_retries=3):
for attempt in range(max_retries):
try:
subprocess.run(
['psql', '-c', f"SELECT safe_refresh_mv('{view_name}');"],
check=True,
capture_output=True
)
print(f"✅ {view_name} 刷新成功")
return True
except subprocess.CalledProcessError as e:
print(f"⚠️ 第{attempt+1}次失败: {e.stderr}")
time.sleep(60 * (attempt + 1)) # 指数退避
print(f"🔴 {view_name} 刷新失败,已达最大重试次数")
return False
if __name__ == '__main__':
refresh_with_retry('user_recommend_features')-- 清理死元组(手动VACUUM)
CREATE OR REPLACE FUNCTION mv_maintenance()
RETURNS void AS $$
DECLARE
mv_record RECORD;
BEGIN
FOR mv_record IN
SELECT schemaname, matviewname
FROM pg_matviews
WHERE schemaname = 'public'
LOOP
-- VACUUM清理死元组
EXECUTE format('VACUUM ANALYZE %I.%I',
mv_record.schemaname, mv_record.matviewname);
-- REINDEX重建索引(碎片率高时)
EXECUTE format('REINDEX INDEX CONCURRENTLY %I',
'idx_'||mv_record.matviewname||'_user_id');
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- 每周日执行维护
SELECT cron.schedule('mv-maintenance', '0 2 * * 0', 'SELECT mv_maintenance()');维护操作对比表:
操作 | 频率 | 耗时 | 影响 | 必要性 |
|---|---|---|---|---|
VACUUM | 每日 | 5-30秒 | 清理死元组 | ⭐⭐⭐⭐ |
REINDEX | 每周 | 1-5分钟 | 重建碎片索引 | ⭐⭐⭐ |
ANALYZE | 每日 | 5-10秒 | 更新统计信息 | ⭐⭐⭐⭐ |
REFRESH | 按需 | 45-180秒 | 更新数据 | ⭐⭐⭐⭐⭐ |
日志清理 | 每月 | 1秒 | 防止日志膨胀 | ⭐⭐ |

场景:IoT传感器实时特征,每天10亿条,保留90天
问题:
-- 创建超表
CREATE TABLE sensor_raw (
ts TIMESTAMPTZ NOT NULL,
sensor_id INT,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
voltage DOUBLE PRECISION
);
SELECT create_hypertable('sensor_raw', 'ts',
chunk_time_interval => INTERVAL '1 day');
-- 创建1分钟级连续聚合(实时)
CREATE MATERIALIZED VIEW sensor_minute
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '1 minute', ts) as bucket,
sensor_id,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp,
STDDEV(temperature) as std_temp,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY temperature) as p95_temp
FROM sensor_raw
GROUP BY bucket, sensor_id
WITH NO DATA;
-- 自动刷新策略
SELECT add_continuous_aggregate_policy('sensor_minute',
start_offset => INTERVAL '2 hours',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- 查询最近1小时(从物化表读,2秒响应)
SELECT * FROM sensor_minute
WHERE bucket > NOW() - INTERVAL '1 hour';
-- 查询原始表(慢300x)
SELECT time_bucket(INTERVAL '1 minute', ts) as bucket, AVG(temperature)
FROM sensor_raw
WHERE ts > NOW() - INTERVAL '1 hour'
GROUP BY bucket, sensor_id; -- 12分钟响应连续聚合性能表:
查询范围 | 原始表查询 | 连续聚合查询 | 加速比 | 存储开销 |
|---|---|---|---|---|
最近1小时 | 12分钟 | 2秒 | 360x | +0.1% |
最近1天 | 2.3小时 | 12秒 | 690x | +0.5% |
最近7天 | 16小时 | 45秒 | 1280x | +1% |
最近30天 | 超时 | 3分钟 | ∞ | +3% |
-- 创建设备健康特征物化视图
CREATE MATERIALIZED VIEW device_health_features
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '1 hour', ts) as hour,
sensor_id as device_id,
-- 温度稳定性
AVG(temperature) as temp_mean,
STDDEV(temperature) as temp_std,
-- 电压波动
(MAX(voltage) - MIN(voltage)) / NULLIF(AVG(voltage), 0) as voltage_cv,
-- 异常计数(3σ原则)
SUM(CASE WHEN temperature > AVG(temperature) + 3 * STDDEV(temperature) THEN 1 ELSE 0 END)::float /
NULLIF(COUNT(*), 0) as anomaly_rate,
-- 趋势特征
AVG(temperature) - LAG(AVG(temperature)) OVER w as temp_change
FROM sensor_raw
GROUP BY hour, device_id
WINDOW w AS (PARTITION BY device_id ORDER BY time_bucket(INTERVAL '1 hour', ts))
WITH NO DATA;
-- 自动刷新
SELECT add_continuous_aggregate_policy('device_health_features',
start_offset => INTERVAL '7 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
-- 实时预警
SELECT device_id, hour, anomaly_rate, temp_change
FROM device_health_features
WHERE hour > NOW() - INTERVAL '2 hours'
AND (anomaly_rate > 0.05 OR temp_change > 10);
-- 触发运维工单
INSERT INTO maintenance_alerts (device_id, issue, severity)
SELECT
device_id,
'temperature_anomaly' as issue,
CASE WHEN anomaly_rate > 0.1 THEN 1 ELSE 2 END as severity
FROM device_health_features
WHERE hour > NOW() - INTERVAL '1 hour'
AND anomaly_rate > 0.05;
-- 估算物化视图大小
SELECT
pg_size_pretty(pg_total_relation_size('user_behavior')) as base_table_size,
pg_size_pretty(pg_total_relation_size('user_recommend_features')) as mv_size,
ROUND(100.0 * pg_total_relation_size('user_recommend_features') /
pg_total_relation_size('user_behavior'), 2) as size_ratio;
-- 经验公式:物化视图大小 ≈ 基表大小的0.5-5%
-- 聚合度越高,比例越小
-- 预留增长空间
ALTER MATERIALIZED VIEW user_recommend_features
SET (fillfactor = 85); -- 预留15%用于增量更新容量规划表:
基表大小 | 物化视图预估 | 索引额外 | 增长预留 | 建议存储 |
|---|---|---|---|---|
100GB | 5GB | +1GB | 20% | 7.2GB |
1TB | 50GB | +10GB | 20% | 72GB |
10TB | 500GB | +100GB | 20% | 720GB |
-- 主备复制配置
-- 主库: postgresql.conf
wal_level = replica
max_wal_senders = 10
wal_keep_size = '1GB'
-- 备库: recovery.conf
primary_conninfo = 'host=master_ip port=5432 user=replica'
recovery_target_timeline = 'latest'
-- 物化视图在备库可读
-- 主库刷新后,通过流复制同步到备库
-- 延迟测试
SELECT
now() - pg_last_xact_replay_timestamp() as replication_lag;
-- 需求:延迟<1秒(保证特征实时性)#!/bin/bash
# mv_maintenance.sh
# 1. 刷新物化视图(凌晨2点)
psql -c "SELECT safe_refresh_mv('user_recommend_features');"
# 2. VACUUM清理死元组(刷新后)
psql -c "VACUUM ANALYZE user_recommend_features;"
# 3. 监控死元组率
dead_ratio=$(psql -t -c "SELECT dead_tuple_ratio FROM mv_health_monitor")
if (( $(echo "$dead_ratio > 5" | bc -l) )); then
echo "死元组率过高: $dead_ratio%" | mail -s "MV告警" dba@company.com
fi
# 4. 检查刷新日志
failed_count=$(psql -t -c "SELECT COUNT(*) FROM mv_refresh_log WHERE status='failed' AND started_at > NOW() - INTERVAL '1 day'")
if [ $failed_count -gt 0 ]; then
echo "昨日刷新失败: $failed_count次" | mail -s "MV失败告警" dba@company.com
fi维护操作时间表:
操作 | 频率 | 时间窗口 | 影响 | 必要性 |
|---|---|---|---|---|
全量REFRESH | 每日 | 02:00-03:00 | 锁表(用CONCURRENTLY) | ⭐⭐⭐⭐⭐ |
增量REFRESH | 每小时 | xx:00-xx:10 | 无锁 | ⭐⭐⭐⭐ |
VACUUM | 每日 | 03:00-03:30 | I/O增加 | ⭐⭐⭐⭐ |
REINDEX | 每周日 | 02:00-03:00 | 锁索引(CONCURRENTLY) | ⭐⭐⭐ |
ANALYZE | 每日 | 03:30-04:00 | CPU增加 | ⭐⭐⭐ |
-- 1. 权限控制:只读访问物化视图
CREATE ROLE ml_service_readonly;
GRANT SELECT ON user_recommend_features TO ml_service_readonly;
-- 2. 行级安全(RLS)
ALTER TABLE user_recommend_features ENABLE ROW LEVEL SECURITY;
CREATE POLICY user_filter ON user_recommend_features
FOR SELECT TO ml_service_readonly
USING (user_segment IN ('high_value', 'medium_value')); -- 只返回高价值用户特征
-- 3. 刷新权限隔离
CREATE ROLE mv_admin;
GRANT ALL ON user_recommend_features TO mv_admin;
GRANT EXECUTE ON FUNCTION refresh_user_features_incremental() TO mv_admin;
-- 4. 审计日志
CREATE EXTENSION pg_audit;
ALTER SYSTEM SET pgaudit.log = 'ddl, write';
SELECT pg_reload_conf();安全策略表:
控制项 | 配置 | 作用 | 风险等级 |
|---|---|---|---|
只读权限 | GRANT SELECT | 防止误删 | 低 |
RLS行级安全 | ENABLE RLS | 数据隔离 | 中 |
刷新权限独立 | ROLE分离 | 职责分离 | 中 |
审计日志 | pg_audit | 追溯变更 | 低 |
网络加密 | ssl=on | 防窃听 | 中 |

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。