在开发金融数据可视化应用时,一个常见的需求是通过API获取标准化的K线数据,并在前端通过专业的K线图表插件进行展示。本文将完整展示从API对接、数据处理到图表集成的全流程实战。
后端API处理: Python + Flask/FastAPI
前端图表: Lightweight Charts / ECharts
数据格式: JSON
通信方式: RESTful API# api\_client.py - 基础API客户端
import requests
import pandas as pd
from datetime import datetime, timedelta
import time
class KlineDataAPI:
"""K线数据API客户端"""
def \_\_init\_\_(self, base\_url, api\_key=None):
self.base\_url = base\_url
self.api\_key = api\_key
self.session = requests.Session()
if api\_key:
self.session.headers.update({
'Authorization': f'Bearer {api\_key}',
'Content-Type': 'application/json'
})
def get\_historical\_kline(self, symbol, interval, start\_time, end\_time, limit=1000):
"""
获取历史K线数据
参数:
symbol: 交易对符号 (如: BTC\_USDT)
interval: K线周期 (1m, 5m, 15m, 1h, 4h, 1d)
start\_time: 开始时间 (时间戳或ISO格式字符串)
end\_time: 结束时间
limit: 数据条数限制
"""
endpoint = f"{self.base\_url}/api/v1/klines"
# 构建请求参数
params = {
'symbol': symbol,
'interval': interval,
'startTime': self.\_format\_time(start\_time),
'endTime': self.\_format\_time(end\_time),
'limit': limit
}
try:
response = self.session.get(endpoint, params=params, timeout=10)
response.raise\_for\_status()
return self.\_parse\_kline\_data(response.json())
except requests.RequestException as e:
print(f"API请求失败: {e}")
return None
def \_format\_time(self, time\_input):
"""格式化时间参数"""
if isinstance(time\_input, (int, float)):
return int(time\_input)
elif isinstance(time\_input, str):
return int(pd.Timestamp(time\_input).timestamp() \* 1000)
elif isinstance(time\_input, datetime):
return int(time\_input.timestamp() \* 1000)
return time\_input
def \_parse\_kline\_data(self, raw\_data):
"""解析API返回的K线数据"""
klines = []
for item in raw\_data.get('data', []):
kline = {
'timestamp': item[0], # 时间戳
'open': float(item[1]),
'high': float(item[2]),
'low': float(item[3]),
'close': float(item[4]),
'volume': float(item[5]),
'time': pd.to\_datetime(item[0], unit='ms').strftime('%Y-%m-%d %H:%M:%S')
}
klines.append(kline)
return pd.DataFrame(klines)
def get\_realtime\_kline(self, symbol, interval, callback):
"""
获取实时K线数据(WebSocket)
参数:
symbol: 交易对符号
interval: K线周期
callback: 数据回调函数
"""
import websocket
import json
ws\_url = f"wss://{self.base\_url.replace('https://', '').replace('http://', '')}/ws"
def on\_message(ws, message):
data = json.loads(message)
if data.get('e') == 'kline':
kline\_data = self.\_parse\_ws\_kline(data)
callback(kline\_data)
def on\_error(ws, error):
print(f"WebSocket错误: {error}")
def on\_close(ws, close\_status\_code, close\_msg):
print("WebSocket连接关闭")
def on\_open(ws):
# 订阅K线频道
subscribe\_msg = {
"method": "SUBSCRIBE",
"params": [f"{symbol.lower()}@kline\_{interval}"],
"id": 1
}
ws.send(json.dumps(subscribe\_msg))
ws = websocket.WebSocketApp(
ws\_url,
on\_open=on\_open,
on\_message=on\_message,
on\_error=on\_error,
on\_close=on\_close
)
return ws
def \_parse\_ws\_kline(self, ws\_data):
"""解析WebSocket K线数据"""
kline = ws\_data['k']
return {
'timestamp': kline['t'],
'open': float(kline['o']),
'high': float(kline['h']),
'low': float(kline['l']),
'close': float(kline['c']),
'volume': float(kline['v']),
'is\_closed': kline['x'],
'time': pd.to\_datetime(kline['t'], unit='ms').isoformat()
}# data\_processor.py - 数据处理模块
import pandas as pd
import numpy as np
from typing import List, Dict, Any
class KlineDataProcessor:
"""K线数据处理工具"""
@staticmethod
def normalize\_kline\_data(df: pd.DataFrame) -> pd.DataFrame:
"""
标准化K线数据格式
"""
required\_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
# 确保列名标准化
column\_mapping = {
'time': 'timestamp',
'date': 'timestamp',
'amount': 'volume',
'vol': 'volume'
}
df = df.rename(columns=column\_mapping)
# 确保数据类型正确
df['timestamp'] = pd.to\_datetime(df['timestamp'], unit='ms', errors='coerce')
df['open'] = pd.to\_numeric(df['open'], errors='coerce')
df['high'] = pd.to\_numeric(df['high'], errors='coerce')
df['low'] = pd.to\_numeric(df['low'], errors='coerce')
df['close'] = pd.to\_numeric(df['close'], errors='coerce')
df['volume'] = pd.to\_numeric(df['volume'], errors='coerce')
# 按时间排序
df = df.sort\_values('timestamp').reset\_index(drop=True)
return df
@staticmethod
def calculate\_technical\_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
计算技术指标
"""
# 移动平均线
df['ma5'] = df['close'].rolling(window=5).mean()
df['ma10'] = df['close'].rolling(window=10).mean()
df['ma20'] = df['close'].rolling(window=20).mean()
# 布林带
df['bb\_middle'] = df['close'].rolling(window=20).mean()
bb\_std = df['close'].rolling(window=20).std()
df['bb\_upper'] = df['bb\_middle'] + 2 \* bb\_std
df['bb\_lower'] = df['bb\_middle'] - 2 \* bb\_std
# RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
return df
@staticmethod
def resample\_data(df: pd.DataFrame, interval: str) -> pd.DataFrame:
"""
重采样K线数据到不同周期
"""
if df.empty:
return df
df = df.copy()
df.set\_index('timestamp', inplace=True)
# 定义重采样规则
ohlc\_dict = {
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}
# 执行重采样
resampled = df.resample(interval).agg(ohlc\_dict).dropna()
resampled.reset\_index(inplace=True)
return resampled
@staticmethod
def format\_for\_frontend(df: pd.DataFrame) -> List[Dict]:
"""
格式化为前端需要的JSON格式
"""
data\_list = []
for \_, row in df.iterrows():
item = {
'time': row['timestamp'].strftime('%Y-%m-%d') if hasattr(row['timestamp'], 'strftime')
else str(row['timestamp']),
'open': float(row['open']),
'high': float(row['high']),
'low': float(row['low']),
'close': float(row['close']),
'volume': float(row['volume'])
}
# 添加技术指标
for indicator in ['ma5', 'ma10', 'ma20', 'bb\_upper', 'bb\_lower', 'rsi']:
if indicator in df.columns and not pd.isna(row[indicator]):
if indicator not in item:
item[indicator] = {}
item[indicator] = float(row[indicator])
data\_list.append(item)
return data\_list# main.py - FastAPI后端服务
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List
import uvicorn
from datetime import datetime, timedelta
from api\_client import KlineDataAPI
from data\_processor import KlineDataProcessor
app = FastAPI(title="K线数据API服务", version="1.0.0")
# 配置CORS
app.add\_middleware(
CORSMiddleware,
allow\_origins=["\*"],
allow\_credentials=True,
allow\_methods=["\*"],
allow\_headers=["\*"],
)
# 初始化API客户端
api\_client = KlineDataAPI(
base\_url="https://api.data-service.com", # 数据服务地址
api\_key="your\_api\_key\_here"
)
class KlineRequest(BaseModel):
"""K线数据请求模型"""
symbol: str
interval: str = "1h"
start\_time: Optional[str] = None
end\_time: Optional[str] = None
limit: int = 1000
indicators: List[str] = []
class KlineResponse(BaseModel):
"""K线数据响应模型"""
code: int
message: str
data: List[dict]
symbol: str
interval: str
count: int
timestamp: int
@app.get("/")
async def root():
"""API根端点"""
return {
"service": "K线数据API服务",
"version": "1.0.0",
"endpoints": {
"获取K线数据": "/api/v1/klines",
"获取实时数据": "/api/v1/klines/ws",
"获取交易对列表": "/api/v1/symbols"
}
}
@app.post("/api/v1/klines", response\_model=KlineResponse)
async def get\_kline\_data(request: KlineRequest):
"""
获取K线数据接口
支持RESTful API调用,返回标准化的K线数据
"""
try:
# 设置默认时间范围
if not request.start\_time:
request.end\_time = datetime.now()
request.start\_time = request.end\_time - timedelta(days=30)
# 调用API获取原始数据
raw\_df = api\_client.get\_historical\_kline(
symbol=request.symbol,
interval=request.interval,
start\_time=request.start\_time,
end\_time=request.end\_time,
limit=request.limit
)
if raw\_df is None or raw\_df.empty:
raise HTTPException(status\_code=404, detail="未找到K线数据")
# 数据处理
processed\_df = KlineDataProcessor.normalize\_kline\_data(raw\_df)
# 计算技术指标
if request.indicators:
processed\_df = KlineDataProcessor.calculate\_technical\_indicators(processed\_df)
# 格式化为前端所需格式
frontend\_data = KlineDataProcessor.format\_for\_frontend(processed\_df)
return KlineResponse(
code=200,
message="success",
data=frontend\_data,
symbol=request.symbol,
interval=request.interval,
count=len(frontend\_data),
timestamp=int(datetime.now().timestamp() \* 1000)
)
except Exception as e:
raise HTTPException(status\_code=500, detail=f"服务器内部错误: {str(e)}")
@app.get("/api/v1/klines/ws")
async def websocket\_endpoint():
"""WebSocket端点用于实时数据"""
from fastapi import WebSocket
async def websocket\_handler(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收客户端消息
data = await websocket.receive\_json()
if data.get("action") == "subscribe":
symbol = data.get("symbol")
interval = data.get("interval", "1m")
# 这里可以实现WebSocket数据推送逻辑
# 实际项目中可能需要集成消息队列
await websocket.send\_json({
"type": "subscribed",
"symbol": symbol,
"interval": interval
})
elif data.get("action") == "unsubscribe":
await websocket.send\_json({"type": "unsubscribed"})
except Exception as e:
print(f"WebSocket错误: {e}")
finally:
await websocket.close()
return websocket\_handler
@app.get("/api/v1/symbols")
async def get\_symbols():
"""获取支持的交易对列表"""
# 这里可以从配置或数据库获取
symbols = [
{"symbol": "BTC\_USDT", "name": "比特币/泰达币"},
{"symbol": "ETH\_USDT", "name": "以太坊/泰达币"},
{"symbol": "BNB\_USDT", "name": "币安币/泰达币"}
]
return {
"code": 200,
"data": symbols,
"count": len(symbols)
}
if \_\_name\_\_ == "\_\_main\_\_":
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
reload=True
)# cache\_manager.py - 数据缓存管理
import redis
import json
from datetime import datetime, timedelta
from functools import wraps
import hashlib
class CacheManager:
"""Redis缓存管理器"""
def \_\_init\_\_(self, host='localhost', port=6379, db=0):
self.redis\_client = redis.Redis(
host=host,
port=port,
db=db,
decode\_responses=True
)
self.default\_ttl = 300 # 默认5分钟缓存
def generate\_cache\_key(self, func\_name, \*args, \*\*kwargs):
"""生成缓存键"""
key\_str = f"{func\_name}:{str(args)}:{str(kwargs)}"
return hashlib.md5(key\_str.encode()).hexdigest()
def cache\_data(self, ttl=None):
"""缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(\*args, \*\*kwargs):
cache\_key = self.generate\_cache\_key(func.\_\_name\_\_, \*args, \*\*kwargs)
# 尝试从缓存获取
cached\_data = self.redis\_client.get(cache\_key)
if cached\_data:
return json.loads(cached\_data)
# 执行函数获取数据
result = func(\*args, \*\*kwargs)
# 缓存结果
if result is not None:
self.redis\_client.setex(
cache\_key,
ttl or self.default\_ttl,
json.dumps(result, default=str)
)
return result
return wrapper
return decorator
def invalidate\_pattern(self, pattern):
"""批量删除匹配模式的缓存"""
keys = self.redis\_client.keys(pattern)
if keys:
self.redis\_client.delete(\*keys)
def get\_cache\_stats(self):
"""获取缓存统计信息"""
info = self.redis\_client.info('memory')
return {
'used\_memory': info['used\_memory\_human'],
'key\_count': self.redis\_client.dbsize(),
'hit\_rate': 0.95 # 这里可以添加实际的命中率计算
}<!-- kline-chart.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>K线图表展示</title>
<script src="https://unpkg.com/lightweight-charts@3.8.0/dist/lightweight-charts.standalone.production.js"></script>
<style>
\* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #f5f5f5;
color: #333;
}
.container {
max-width: 1400px;
margin: 0 auto;
padding: 20px;
}
.header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 20px;
padding: 20px;
background: white;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.symbol-selector {
display: flex;
gap: 10px;
align-items: center;
}
.controls {
display: flex;
gap: 10px;
align-items: center;
}
select, button, input {
padding: 8px 12px;
border: 1px solid #ddd;
border-radius: 6px;
font-size: 14px;
}
button {
background: #1890ff;
color: white;
border: none;
cursor: pointer;
transition: background 0.3s;
}
button:hover {
background: #40a9ff;
}
.chart-container {
background: white;
border-radius: 10px;
padding: 20px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
#chart {
width: 100%;
height: 600px;
}
.indicators {
display: flex;
gap: 10px;
margin-top: 20px;
flex-wrap: wrap;
}
.indicator-tag {
padding: 6px 12px;
background: #f0f0f0;
border-radius: 20px;
font-size: 12px;
cursor: pointer;
transition: all 0.3s;
}
.indicator-tag.active {
background: #1890ff;
color: white;
}
.time-controls {
display: flex;
gap: 5px;
margin-left: 20px;
}
.time-btn {
padding: 6px 12px;
font-size: 12px;
}
.loading {
display: none;
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(255,255,255,0.8);
justify-content: center;
align-items: center;
z-index: 1000;
}
.loading.show {
display: flex;
}
.loader {
width: 50px;
height: 50px;
border: 3px solid #f3f3f3;
border-top: 3px solid #1890ff;
border-radius: 50%;
animation: spin 1s linear infinite;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.error-toast {
position: fixed;
top: 20px;
right: 20px;
padding: 12px 20px;
background: #ff4d4f;
color: white;
border-radius: 6px;
display: none;
z-index: 1001;
}
.error-toast.show {
display: block;
animation: slideIn 0.3s;
}
@keyframes slideIn {
from {
transform: translateX(100%);
opacity: 0;
}
to {
transform: translateX(0);
opacity: 1;
}
}
</style>
</head>
<body>
<div class="loading" id="loading">
<div class="loader"></div>
</div>
<div class="error-toast" id="errorToast"></div>
<div class="container">
<div class="header">
<div class="symbol-selector">
<select id="symbolSelect">
<option value="BTC\_USDT">BTC/USDT</option>
<option value="ETH\_USDT">ETH/USDT</option>
<option value="BNB\_USDT">BNB/USDT</option>
</select>
<div class="time-controls">
<button class="time-btn" data-period="1h">1小时</button>
<button class="time-btn" data-period="4h">4小时</button>
<button class="time-btn" data-period="1d" class="active">1天</button>
<button class="time-btn" data-period="1w">1周</button>
</div>
</div>
<div class="controls">
<input type="datetime-local" id="startTime">
<input type="datetime-local" id="endTime">
<button onclick="fetchKlineData()">查询</button>
</div>
</div>
<div class="chart-container">
<div id="chart"></div>
</div>
<div class="indicators">
<span class="indicator-tag" data-indicator="ma5">MA5</span>
<span class="indicator-tag" data-indicator="ma10">MA10</span>
<span class="indicator-tag" data-indicator="ma20">MA20</span>
<span class="indicator-tag" data-indicator="bb">布林带</span>
<span class="indicator-tag" data-indicator="rsi">RSI</span>
</div>
</div>
<script>
// API配置
const API\_BASE\_URL = 'http://localhost:8000';
let chart = null;
let candleSeries = null;
let indicatorSeries = {};
let currentSymbol = 'BTC\_USDT';
let currentInterval = '1d';
let currentIndicators = new Set(['ma5', 'ma10', 'ma20']);
// 初始化图表
function initChart() {
const chartContainer = document.getElementById('chart');
chart = LightweightCharts.createChart(chartContainer, {
width: chartContainer.clientWidth,
height: 600,
layout: {
background: { color: '#ffffff' },
textColor: '#333333',
},
grid: {
vertLines: { color: '#f0f0f0' },
horzLines: { color: '#f0f0f0' },
},
crosshair: {
mode: LightweightCharts.CrosshairMode.Normal,
},
rightPriceScale: {
borderColor: '#d1d4dc',
},
timeScale: {
borderColor: '#d1d4dc',
timeVisible: true,
secondsVisible: false,
},
});
// 创建K线系列
candleSeries = chart.addCandlestickSeries({
upColor: '#ef5350',
downColor: '#26a69a',
borderVisible: false,
wickUpColor: '#ef5350',
wickDownColor: '#26a69a',
});
// 初始化指标系列
indicatorSeries.ma5 = chart.addLineSeries({
color: '#2962FF',
lineWidth: 1,
title: 'MA5',
});
indicatorSeries.ma10 = chart.addLineSeries({
color: '#FF6B6B',
lineWidth: 1,
title: 'MA10',
});
indicatorSeries.ma20 = chart.addLineSeries({
color: '#4CAF50',
lineWidth: 1,
title: 'MA20',
});
// 布林带上轨
indicatorSeries.bbUpper = chart.addLineSeries({
color: '#9C27B0',
lineWidth: 1,
lineStyle: 2, // 虚线
title: 'BB Upper',
});
// 布林带下轨
indicatorSeries.bbLower = chart.addLineSeries({
color: '#9C27B0',
lineWidth: 1,
lineStyle: 2,
title: 'BB Lower',
});
}
// 获取K线数据
async function fetchKlineData() {
showLoading(true);
try {
const symbol = document.getElementById('symbolSelect').value;
const startTime = document.getElementById('startTime').value;
const endTime = document.getElementById('endTime').value;
const params = {
symbol: symbol,
interval: currentInterval,
limit: 1000,
indicators: Array.from(currentIndicators)
};
if (startTime) params.start\_time = startTime;
if (endTime) params.end\_time = endTime;
const response = await fetch(`${API\_BASE\_URL}/api/v1/klines`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(params)
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const result = await response.json();
if (result.code === 200) {
updateChart(result.data);
} else {
showError(result.message);
}
} catch (error) {
console.error('获取数据失败:', error);
showError('数据获取失败: ' + error.message);
} finally {
showLoading(false);
}
}
// 更新图表数据
function updateChart(data) {
if (!data || data.length === 0) {
showError('没有获取到数据');
return;
}
// 更新K线数据
const klineData = data.map(item => ({
time: item.time,
open: item.open,
high: item.high,
low: item.low,
close: item.close,
}));
candleSeries.setData(klineData);
// 更新指标数据
Object.keys(indicatorSeries).forEach(indicator => {
indicatorSeries[indicator].setData([]);
});
// 添加指标数据
currentIndicators.forEach(indicator => {
if (indicatorSeries[indicator]) {
const indicatorData = data
.filter(item => item[indicator] !== undefined)
.map(item => ({
time: item.time,
value: item[indicator]
}));
if (indicatorData.length > 0) {
indicatorSeries[indicator].setData(indicatorData);
}
}
});
// 更新布林带
if (currentIndicators.has('bb')) {
const bbUpperData = data
.filter(item => item.bb\_upper !== undefined)
.map(item => ({
time: item.time,
value: item.bb\_upper
}));
const bbLowerData = data
.filter(item => item.bb\_lower !== undefined)
.map(item => ({
time: item.time,
value: item.bb\_lower
}));
if (bbUpperData.length > 0) {
indicatorSeries.bbUpper.setData(bbUpperData);
}
if (bbLowerData.length > 0) {
indicatorSeries.bbLower.setData(bbLowerData);
}
}
}
// 切换时间周期
document.querySelectorAll('.time-btn').forEach(btn => {
btn.addEventListener('click', function() {
document.querySelectorAll('.time-btn').forEach(b => b.classList.remove('active'));
this.classList.add('active');
currentInterval = this.dataset.period;
fetchKlineData();
});
});
// 切换指标显示
document.querySelectorAll('.indicator-tag').forEach(tag => {
tag.addEventListener('click', function() {
const indicator = this.dataset.indicator;
if (indicator === 'bb') {
// 布林带需要同时显示上下轨
if (currentIndicators.has('bb')) {
currentIndicators.delete('bb');
this.classList.remove('active');
// 隐藏布林带
indicatorSeries.bbUpper.setData([]);
indicatorSeries.bbLower.setData([]);
} else {
currentIndicators.add('bb');
this.classList.add('active');
fetchKlineData(); // 重新获取数据
}
} else {
// 普通指标
if (currentIndicators.has(indicator)) {
currentIndicators.delete(indicator);
this.classList.remove('active');
indicatorSeries[indicator].setData([]);
} else {
currentIndicators.add(indicator);
this.classList.add('active');
fetchKlineData(); // 重新获取数据
}
}
});
});
// 显示/隐藏加载动画
function showLoading(show) {
const loading = document.getElementById('loading');
loading.classList.toggle('show', show);
}
// 显示错误提示
function showError(message) {
const toast = document.getElementById('errorToast');
toast.textContent = message;
toast.classList.add('show');
setTimeout(() => {
toast.classList.remove('show');
}, 3000);
}
// 页面加载完成后初始化
document.addEventListener('DOMContentLoaded', function() {
initChart();
// 设置默认时间范围
const end = new Date();
const start = new Date();
start.setDate(start.getDate() - 30);
document.getElementById('startTime').value = start.toISOString().slice(0, 16);
document.getElementById('endTime').value = end.toISOString().slice(0, 16);
// 激活默认指标
currentIndicators.forEach(indicator => {
const tag = document.querySelector(`[data-indicator="${indicator}"]`);
if (tag) tag.classList.add('active');
});
// 初始加载数据
fetchKlineData();
});
// 窗口大小变化时调整图表
window.addEventListener('resize', function() {
if (chart) {
const chartContainer = document.getElementById('chart');
chart.applyOptions({ width: chartContainer.clientWidth });
}
});
</script>
</body>
</html>// realtime.js - 实时数据更新
class RealtimeKline {
constructor(symbol, interval, onDataCallback) {
this.symbol = symbol;
this.interval = interval;
this.onDataCallback = onDataCallback;
this.ws = null;
this.isConnected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
connect() {
const wsUrl = `ws://localhost:8000/api/v1/klines/ws`;
this.ws = new WebSocket(wsUrl);
this.ws.onopen = () => {
console.log('WebSocket连接已建立');
this.isConnected = true;
this.reconnectAttempts = 0;
// 订阅K线数据
this.subscribe();
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.onDataCallback(data);
};
this.ws.onerror = (error) => {
console.error('WebSocket错误:', error);
};
this.ws.onclose = () => {
console.log('WebSocket连接已关闭');
this.isConnected = false;
this.attemptReconnect();
};
}
subscribe() {
if (this.ws && this.isConnected) {
this.ws.send(JSON.stringify({
action: 'subscribe',
symbol: this.symbol,
interval: this.interval
}));
}
}
unsubscribe() {
if (this.ws && this.isConnected) {
this.ws.send(JSON.stringify({
action: 'unsubscribe'
}));
}
}
attemptReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 \* Math.pow(2, this.reconnectAttempts), 30000);
console.log(`尝试重新连接 (${this.reconnectAttempts}/${this.maxReconnectAttempts})...`);
setTimeout(() => {
this.connect();
}, delay);
} else {
console.error('达到最大重连次数,连接终止');
}
}
disconnect() {
if (this.ws) {
this.unsubscribe();
this.ws.close();
}
}
updateSymbol(symbol) {
this.symbol = symbol;
if (this.isConnected) {
this.unsubscribe();
setTimeout(() => this.subscribe(), 100);
}
}
updateInterval(interval) {
this.interval = interval;
if (this.isConnected) {
this.unsubscribe();
setTimeout(() => this.subscribe(), 100);
}
}
}
// 使用示例
const realtimeKline = new RealtimeKline('BTC\_USDT', '1m', (data) => {
if (data.type === 'kline') {
// 更新K线图表
updateRealtimeKline(data);
}
});
// 开始连接
realtimeKline.connect();# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/\*
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.yml
version: '3.8'
services:
api-service:
build: .
ports:
- "8000:8000"
environment:
- REDIS\_HOST=redis
- REDIS\_PORT=6379
- API\_BASE\_URL=${API\_BASE\_URL}
- API\_KEY=${API\_KEY}
depends\_on:
- redis
restart: unless-stopped
volumes:
- ./logs:/app/logs
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
command: redis-server --appendonly yes
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends\_on:
- api-service
restart: unless-stopped
volumes:
redis-data:# monitor.py - 服务监控
import time
import psutil
from prometheus\_client import start\_http\_server, Gauge, Counter, Histogram
from datetime import datetime
# 定义监控指标
api\_requests\_total = Counter('api\_requests\_total', 'Total API requests', ['endpoint', 'method', 'status'])
api\_request\_duration = Histogram('api\_request\_duration\_seconds', 'API request duration in seconds', ['endpoint'])
api\_active\_connections = Gauge('api\_active\_connections', 'Active WebSocket connections')
api\_data\_points = Gauge('api\_data\_points', 'Number of data points served')
system\_cpu\_usage = Gauge('system\_cpu\_usage', 'System CPU usage percentage')
system\_memory\_usage = Gauge('system\_memory\_usage', 'System memory usage percentage')
class APIMonitor:
"""API监控器"""
def \_\_init\_\_(self, metrics\_port=9090):
self.metrics\_port = metrics\_port
self.start\_time = datetime.now()
def start(self):
"""启动监控服务"""
start\_http\_server(self.metrics\_port)
print(f"监控服务已启动,端口: {self.metrics\_port}")
def record\_request(self, endpoint, method, status, duration):
"""记录API请求"""
api\_requests\_total.labels(
endpoint=endpoint,
method=method,
status=status
).inc()
api\_request\_duration.labels(endpoint=endpoint).observe(duration)
def update\_system\_metrics(self):
"""更新系统指标"""
system\_cpu\_usage.set(psutil.cpu\_percent())
system\_memory\_usage.set(psutil.virtual\_memory().percent)
def get\_uptime(self):
"""获取服务运行时间"""
return datetime.now() - self.start\_time
def get\_status\_report(self):
"""获取状态报告"""
return {
'uptime': str(self.get\_uptime()),
'cpu\_usage': psutil.cpu\_percent(),
'memory\_usage': psutil.virtual\_memory().percent,
'active\_connections': api\_active\_connections.\_value.get(),
'total\_requests': api\_requests\_total.\_value.get()
}本文详细介绍了通过API获取K线数据并集成K线图表插件的完整流程,包括:
通过这套方案,可以快速构建稳定、高效的K线数据可视化应用。实际部署时,建议根据具体业务需求调整数据缓存策略、监控指标和安全配置。
**注意事项:**
这套方案提供了完整的参考实现,开发者可以根据实际需求进行调整和扩展。对于更复杂的需求,如多时间周期对比、自定义技术指标、回测功能等,可以在现有架构基础上进行功能扩展。
**本文不构成任何投资建议**
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。