整理了一份Grafana项目实战案例教程:连接专有时序数据库。 为有相关需求的技术人员提供一个思路和一个技术指导。希望能给大家带来帮助。
为自研或行业专用时序数据库提供可视化解决方案
[设备/应用] --> [专有TSDB] <--> [Grafana]
▲
└── [适配层](协议转换/数据加工)
场景 | 技术方案 | 适用案例 |
---|---|---|
标准查询接口支持 | 开发Grafana插件 | 数据库提供类SQL接口 |
私有二进制协议 | Telegraf自定义插件 + 中间件 | 工业SCADA系统 |
REST API封装 | Grafana Infinity插件 + 数据转换 | 自研监控平台 |
流数据管道 | Kafka + Flink实时转换 | 金融高频交易数据 |
# 示例:启动自研TSDB容器
docker run -d --name custom_tsdb \
-p 9086:9086 \
-v /data/tsdb:/var/lib/tsdb \
custom-tsdb:2.1 \
--max-series=1000000 \
--auth-token=SECRET_KEY
# 安装Grafana
docker run -d --name=grafana \
-p 3000:3000 \
-v grafana-storage:/var/lib/grafana \
grafana/grafana-enterprise:9.4.3
# 安装通用时序插件(可选)
docker exec grafana grafana-cli \
--pluginUrl https://example.com/plugins/custom-datasource.zip \
plugins install custom-datasource
插件开发步骤:
package main
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type CustomTSDB struct {
Endpoint string `toml:"endpoint"`
// 其他配置字段
}
func (c *CustomTSDB) Gather(acc telegraf.Accumulator) error {
// 实现数据采集逻辑
return nil
}
func init() {
inputs.Add("custom_tsdb", func() telegraf.Input { return &CustomTSDB{} })
}
go build -buildmode=plugin -o custom.so
[[inputs.custom_tsdb]]
endpoint = "http://tsdb:9086"
interval = "10s"
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUX_TOKEN"
-- Grafana查询示例
SELECT
time_bucket('1m', timestamp) AS time,
device_id,
avg(temperature)
FROM sensors
WHERE $__timeFilter(timestamp)
AND status = 'active'
GROUP BY time, device_id
ORDER BY time DESC
# 自定义数据源配置(YAML格式)
apiVersion: 1
datasources:
- name: Custom-TSDB
type: custom-datasource
access: proxy
url: http://tsdb:9086
jsonData:
authType: "token"
timeField: "observation_time"
timeFormat: "2006-01-02T15:04:05Z07:00"
secureJsonData:
token: "dXNlcjpwYXNzd29yZA=="
// 字段转换配置
"transformations": [
{
"type": "rename",
"config": {
"columns": {
"obs_time": "time",
"dev_id": "device"
}
}
}
]
查询模板:
SELECT
time_shift($__timeFrom(), '5m') as start,
$__timeTo() as end,
device_id,
value
FROM raw_metrics
WHERE metric_name = 'vibration'
AND quality = 0 -- 0表示数据有效
高级特性:
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "green", "value": null},
{"color": "red", "value": 10}
]
}
# 使用Grafana API实时推送数据
from grafana_api.grafana_face import GrafanaFace
grafana = GrafanaFace(auth='admin:admin', host='localhost:3000')
data = {
"panelId": 2,
"points": [[time.time()*1000, random.random()*100] for _ in range(1000)]
}
grafana.live.publish_data('trade/metrics', data)
SELECT /*+ TIME_SPLIT(interval='1h') */
avg(value)
FROM metrics
WHERE $__timeFilter(timestamp)
# 数据源配置项
jsonData:
defaultInterval: "1m"
queryTimeout: "300s"
maxDataPoints: 1000
多级缓存配置:
graph LR
A[客户端缓存] --> B[Grafana查询缓存]
B --> C[时序数据库内存表]
C --> D[磁盘冷存储]
# 使用Vault实现动态Token
import hvac
client = hvac.Client(url='http://vault:8200')
secret = client.secrets.kv.v2.read_secret_version(path='grafana/tsdb')
datasource_config = {
"secureJsonData": {
"token": secret['data']['data']['token']
}
}
# 数据库审计配置
[audit]
enable = true
log_path = /var/log/tsdb/audit.log
retention_days = 365
现象 | 排查工具 | 关键命令/检查点 |
---|---|---|
查询超时 | 慢查询日志 + Explain分析 | EXPLAIN ANALYZE <query> |
数据点缺失 | 原始数据校验 + 时区检查 | SELECT * FROM metrics WHERE time BETWEEN ... |
可视化渲染错误 | 浏览器开发者工具 + 数据预览 | 检查JSON数据格式一致性 |
# 使用PyTorch进行异常检测
import torch
import numpy as np
model = torch.load('anomaly_detection.pth')
query_data = np.array([...]) # 从TSDB获取的数据
with torch.no_grad():
pred = model(torch.Tensor(query_data))
anomalies = (pred > 0.5).numpy()
// 使用Three.js插件
const scene = new THREE.Scene();
const loader = new TSDBLoader();
loader.load('http://tsdb/query?metrics=pressure', (geometry) => {
const mesh = new THREE.Mesh(geometry, material);
scene.add(mesh);
});
项目交付物清单:
通过本方案,某能源企业成功将20年历史的SCADA系统数据接入Grafana,查询响应时间从分钟级降至亚秒级。建议生产环境采用双活架构保证高可用性。
本篇的分享就到这里了,感谢观看,如果对你有帮助,别忘了点赞+收藏+关注。