
在数字化转型的浪潮中,电商数据的实时监控和分析已成为企业竞争的关键要素。本文将分享我们团队基于腾讯云服务构建的亚马逊榜单监控系统的完整架构设计和实施经验,该系统现已稳定运行6个月,为多家电商企业提供数据支持。
作为一家为电商企业提供数据服务的公司,我们面临着以下核心挑战:

graph TB
subgraph "用户层"
A[Web控制台]
B[移动端App]
C[API客户端]
end
subgraph "接入层 - 腾讯云CLB"
D[负载均衡器]
end
subgraph "应用层 - 腾讯云TKE"
E[API网关服务]
F[业务逻辑服务]
G[数据采集服务]
H[分析计算服务]
end
subgraph "数据层"
I[腾讯云MongoDB]
J[腾讯云Redis]
K[腾讯云COS]
L[腾讯云ES]
end
subgraph "消息队列"
M[腾讯云CMQ]
N[腾讯云CKafka]
end
subgraph "监控运维"
O[腾讯云监控]
P[腾讯云日志服务]
Q[腾讯云告警]
end
A --> D
B --> D
C --> D
D --> E
E --> F
F --> G
F --> H
G --> I
G --> J
H --> I
H --> L
G --> M
H --> N
F --> O
G --> P
H --> Q计算资源
存储服务
网络与安全
基于腾讯云的分布式采集架构,我们设计了一套智能的数据采集系统:
import asyncio
import aiohttp
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.mongodb.v20190725 import mongodb_client, models
import redis
import json
from typing import List, Dict, Optional
import random
import logging
class TencentCloudAmazonCollector:
def __init__(self, config: Dict):
self.config = config
self.setup_cloud_services()
self.setup_proxy_pool()
def setup_cloud_services(self):
"""初始化腾讯云服务"""
# 初始化腾讯云认证
cred = credential.Credential(
self.config['secret_id'],
self.config['secret_key']
)
# MongoDB客户端
self.mongodb_client = mongodb_client.MongodbClient(cred, "ap-beijing")
# Redis连接
self.redis_client = redis.Redis(
host=self.config['redis_host'],
port=self.config['redis_port'],
password=self.config['redis_password'],
decode_responses=True
)
# 对象存储客户端
from qcloud_cos import CosConfig, CosS3Client
cos_config = CosConfig(
Region=self.config['cos_region'],
SecretId=self.config['secret_id'],
SecretKey=self.config['secret_key']
)
self.cos_client = CosS3Client(cos_config)
def setup_proxy_pool(self):
"""设置代理池"""
# 从腾讯云Redis获取可用代理列表
proxy_key = "proxy_pool:available"
self.proxy_list = self.redis_client.lrange(proxy_key, 0, -1)
if not self.proxy_list:
# 如果没有代理,使用默认配置
self.proxy_list = self.config.get('default_proxies', [])
async def collect_with_intelligent_retry(self, url: str, max_retries: int = 5) -> Optional[str]:
"""智能重试机制的数据采集"""
retry_delays = [1, 2, 5, 10, 20] # 递增延迟
for attempt in range(max_retries):
try:
# 动态选择代理
proxy = self.select_optimal_proxy()
# 构建请求头
headers = self.generate_request_headers()
# 设置超时和连接器
timeout = aiohttp.ClientTimeout(total=30, connect=10)
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=20,
ttl_dns_cache=300
)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers=headers
) as session:
proxy_url = f"http://{proxy}" if proxy else None
async with session.get(url, proxy=proxy_url) as response:
if response.status == 200:
content = await response.text()
# 保存原始HTML到COS
await self.save_to_cos(url, content)
# 更新代理成功率
self.update_proxy_success_rate(proxy, True)
return content
elif response.status == 503:
# 服务暂时不可用,标记代理可能被限制
self.update_proxy_success_rate(proxy, False)
await asyncio.sleep(retry_delays[attempt])
continue
else:
logging.warning(f"HTTP {response.status} for {url}")
except asyncio.TimeoutError:
logging.warning(f"Timeout for {url}, attempt {attempt + 1}")
self.update_proxy_success_rate(proxy, False)
await asyncio.sleep(retry_delays[attempt])
except Exception as e:
logging.error(f"Error fetching {url}: {e}")
await asyncio.sleep(retry_delays[attempt])
return None
def select_optimal_proxy(self) -> Optional[str]:
"""选择最优代理"""
if not self.proxy_list:
return None
# 从Redis获取代理成功率统计
proxy_stats = {}
for proxy in self.proxy_list:
stats_key = f"proxy_stats:{proxy}"
stats = self.redis_client.hgetall(stats_key)
if stats:
success_rate = float(stats.get('success_rate', 0))
last_used = int(stats.get('last_used', 0))
proxy_stats[proxy] = {
'success_rate': success_rate,
'last_used': last_used
}
# 选择成功率高且最近使用较少的代理
import time
current_time = int(time.time())
best_proxy = None
best_score = -1
for proxy in self.proxy_list:
if proxy in proxy_stats:
stats = proxy_stats[proxy]
# 计算综合得分:成功率 + 时间间隔加权
time_weight = min((current_time - stats['last_used']) / 3600, 1.0) # 最多1小时权重
score = stats['success_rate'] * 0.7 + time_weight * 0.3
if score > best_score:
best_score = score
best_proxy = proxy
return best_proxy or random.choice(self.proxy_list)
def update_proxy_success_rate(self, proxy: str, success: bool):
"""更新代理成功率"""
if not proxy:
return
stats_key = f"proxy_stats:{proxy}"
pipe = self.redis_client.pipeline()
# 获取当前统计
current_stats = self.redis_client.hgetall(stats_key)
total_requests = int(current_stats.get('total_requests', 0)) + 1
successful_requests = int(current_stats.get('successful_requests', 0))
if success:
successful_requests += 1
success_rate = successful_requests / total_requests if total_requests > 0 else 0
# 更新统计信息
pipe.hset(stats_key, mapping={
'total_requests': total_requests,
'successful_requests': successful_requests,
'success_rate': success_rate,
'last_used': int(time.time())
})
# 设置过期时间(7天)
pipe.expire(stats_key, 7 * 24 * 3600)
pipe.execute()
async def save_to_cos(self, url: str, content: str):
"""保存原始内容到腾讯云COS"""
try:
import hashlib
from datetime import datetime
# 生成文件名
url_hash = hashlib.md5(url.encode()).hexdigest()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
object_key = f"raw_html/{timestamp}_{url_hash}.html"
# 上传到COS
response = self.cos_client.put_object(
Bucket=self.config['cos_bucket'],
Body=content.encode('utf-8'),
Key=object_key,
ContentType='text/html'
)
logging.info(f"Saved to COS: {object_key}")
except Exception as e:
logging.error(f"Failed to save to COS: {e}")
def generate_request_headers(self) -> Dict[str, str]:
"""生成请求头"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
]
return {
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0'
}利用腾讯云Elasticsearch服务构建强大的数据分析能力:
from elasticsearch import Elasticsearch
from tencentcloud.common import credential
from tencentcloud.es.v20180416 import es_client, models
import pandas as pd
import numpy as np
from typing import Dict, List, Any
import json
from datetime import datetime, timedelta
class TencentCloudAnalysisEngine:
def __init__(self, config: Dict):
self.config = config
self.setup_elasticsearch()
def setup_elasticsearch(self):
"""初始化腾讯云ES连接"""
self.es_client = Elasticsearch(
hosts=[{
'host': self.config['es_host'],
'port': self.config['es_port']
}],
http_auth=(self.config['es_username'], self.config['es_password']),
use_ssl=True,
verify_certs=True,
timeout=30
)
# 创建索引模板
self.create_index_templates()
def create_index_templates(self):
"""创建ES索引模板"""
# 榜单数据索引模板
ranking_template = {
"index_patterns": ["amazon_rankings_*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.refresh_interval": "30s",
"index.max_result_window": 50000
},
"mappings": {
"properties": {
"asin": {"type": "keyword"},
"rank": {"type": "integer"},
"title": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword", "ignore_above": 256}
}
},
"price": {"type": "float"},
"rating": {"type": "float"},
"review_count": {"type": "integer"},
"category": {"type": "keyword"},
"marketplace": {"type": "keyword"},
"list_type": {"type": "keyword"},
"timestamp": {"type": "date"},
"collection_date": {"type": "date", "format": "yyyy-MM-dd"},
"trend_score": {"type": "float"},
"momentum": {"type": "float"},
"volatility": {"type": "float"}
}
}
}
}
try:
self.es_client.indices.put_index_template(
name="amazon_rankings_template",
body=ranking_template
)
logging.info("Created ES index template successfully")
except Exception as e:
logging.error(f"Failed to create ES index template: {e}")
async def analyze_market_trends(self, category: str, marketplace: str, days: int = 30) -> Dict[str, Any]:
"""分析市场趋势"""
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# 构建ES查询
query = {
"query": {
"bool": {
"must": [
{"term": {"category": category}},
{"term": {"marketplace": marketplace}},
{"range": {
"timestamp": {
"gte": start_date.isoformat(),
"lte": end_date.isoformat()
}
}}
]
}
},
"aggs": {
"daily_trends": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "day"
},
"aggs": {
"avg_rank": {"avg": {"field": "rank"}},
"avg_price": {"avg": {"field": "price"}},
"avg_rating": {"avg": {"field": "rating"}},
"total_products": {"cardinality": {"field": "asin"}},
"rank_distribution": {
"histogram": {
"field": "rank",
"interval": 10
}
}
}
},
"top_movers": {
"terms": {
"field": "asin",
"size": 50
},
"aggs": {
"rank_stats": {"stats": {"field": "rank"}},
"latest_rank": {
"top_hits": {
"sort": [{"timestamp": {"order": "desc"}}],
"size": 1,
"_source": ["rank", "title", "timestamp"]
}
},
"earliest_rank": {
"top_hits": {
"sort": [{"timestamp": {"order": "asc"}}],
"size": 1,
"_source": ["rank", "timestamp"]
}
}
}
},
"category_insights": {
"terms": {
"field": "list_type",
"size": 10
},
"aggs": {
"avg_metrics": {
"avg": {"field": "rank"}
},
"trend_analysis": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "day"
},
"aggs": {
"daily_avg_rank": {"avg": {"field": "rank"}}
}
}
}
}
},
"size": 0
}
try:
index_name = f"amazon_rankings_{marketplace}_{category}"
response = self.es_client.search(index=index_name, body=query)
# 处理聚合结果
analysis_result = self.process_trend_analysis(response['aggregations'])
return {
"category": category,
"marketplace": marketplace,
"analysis_period": f"{start_date.date()} to {end_date.date()}",
"total_hits": response['hits']['total']['value'],
"trends": analysis_result,
"generated_at": datetime.now().isoformat()
}
except Exception as e:
logging.error(f"ES trend analysis failed: {e}")
return {"error": str(e)}
def process_trend_analysis(self, aggregations: Dict) -> Dict[str, Any]:
"""处理趋势分析结果"""
result = {}
# 处理每日趋势
daily_trends = aggregations.get('daily_trends', {}).get('buckets', [])
result['daily_trends'] = []
for bucket in daily_trends:
date = bucket['key_as_string'][:10] # 只取日期部分
result['daily_trends'].append({
'date': date,
'avg_rank': round(bucket['avg_rank']['value'], 2) if bucket['avg_rank']['value'] else None,
'avg_price': round(bucket['avg_price']['value'], 2) if bucket['avg_price']['value'] else None,
'avg_rating': round(bucket['avg_rating']['value'], 2) if bucket['avg_rating']['value'] else None,
'total_products': bucket['total_products']['value'],
'rank_distribution': [
{
'rank_range': f"{int(b['key'])}-{int(b['key'])+9}",
'count': b['doc_count']
}
for b in bucket['rank_distribution']['buckets']
]
})
# 处理热门变动产品
top_movers = aggregations.get('top_movers', {}).get('buckets', [])
result['top_movers'] = []
for bucket in top_movers:
asin = bucket['key']
rank_stats = bucket['rank_stats']
latest = bucket['latest_rank']['hits']['hits'][0]['_source']
earliest = bucket['earliest_rank']['hits']['hits'][0]['_source']
rank_change = earliest['rank'] - latest['rank'] # 正数表示排名上升
result['top_movers'].append({
'asin': asin,
'title': latest['title'],
'current_rank': latest['rank'],
'rank_change': rank_change,
'rank_change_percent': round((rank_change / earliest['rank']) * 100, 2) if earliest['rank'] > 0 else 0,
'min_rank': rank_stats['min'],
'max_rank': rank_stats['max'],
'avg_rank': round(rank_stats['avg'], 2),
'volatility': round(rank_stats['max'] - rank_stats['min'], 2)
})
# 按排名变化排序
result['top_movers'].sort(key=lambda x: x['rank_change'], reverse=True)
# 处理类目洞察
category_insights = aggregations.get('category_insights', {}).get('buckets', [])
result['category_insights'] = []
for bucket in category_insights:
list_type = bucket['key']
trend_data = bucket['trend_analysis']['buckets']
# 计算趋势斜率
if len(trend_data) >= 2:
ranks = [b['daily_avg_rank']['value'] for b in trend_data if b['daily_avg_rank']['value']]
if len(ranks) >= 2:
# 简单线性回归计算趋势
x = np.arange(len(ranks))
slope = np.polyfit(x, ranks, 1)[0]
else:
slope = 0
else:
slope = 0
result['category_insights'].append({
'list_type': list_type,
'avg_rank': round(bucket['avg_metrics']['value'], 2),
'trend_slope': round(slope, 4),
'trend_direction': 'rising' if slope < 0 else 'falling' if slope > 0 else 'stable',
'data_points': len(trend_data)
})
return result
async def detect_market_opportunities(self, category: str, marketplace: str) -> List[Dict[str, Any]]:
"""检测市场机会"""
# 查找最近7天内排名快速上升的产品
query = {
"query": {
"bool": {
"must": [
{"term": {"category": category}},
{"term": {"marketplace": marketplace}},
{"range": {
"timestamp": {
"gte": "now-7d/d"
}
}}
]
}
},
"aggs": {
"products": {
"terms": {
"field": "asin",
"size": 1000
},
"aggs": {
"rank_timeline": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "day",
"order": {"_key": "asc"}
},
"aggs": {
"avg_rank": {"avg": {"field": "rank"}},
"latest_data": {
"top_hits": {
"sort": [{"timestamp": {"order": "desc"}}],
"size": 1,
"_source": ["title", "price", "rating", "review_count"]
}
}
}
}
}
}
},
"size": 0
}
try:
index_name = f"amazon_rankings_{marketplace}_{category}"
response = self.es_client.search(index=index_name, body=query)
opportunities = []
for bucket in response['aggregations']['products']['buckets']:
asin = bucket['key']
timeline = bucket['rank_timeline']['buckets']
if len(timeline) < 3: # 需要至少3天数据
continue
# 计算排名趋势
ranks = [b['avg_rank']['value'] for b in timeline if b['avg_rank']['value']]
if len(ranks) < 3:
continue
# 计算趋势指标
recent_ranks = ranks[-3:] # 最近3天
early_ranks = ranks[:3] if len(ranks) >= 6 else ranks[:-3]
if not early_ranks:
continue
recent_avg = np.mean(recent_ranks)
early_avg = np.mean(early_ranks)
# 排名改善(数值减小表示排名上升)
rank_improvement = early_avg - recent_avg
improvement_rate = rank_improvement / early_avg if early_avg > 0 else 0
# 获取最新产品信息
latest_data = timeline[-1]['latest_data']['hits']['hits'][0]['_source']
# 筛选有潜力的产品
if (rank_improvement > 5 and # 排名至少上升5位
improvement_rate > 0.1 and # 改善率超过10%
recent_avg <= 100): # 当前排名在前100
opportunities.append({
'asin': asin,
'title': latest_data['title'],
'current_avg_rank': round(recent_avg, 1),
'rank_improvement': round(rank_improvement, 1),
'improvement_rate': round(improvement_rate * 100, 1),
'price': latest_data.get('price'),
'rating': latest_data.get('rating'),
'review_count': latest_data.get('review_count'),
'opportunity_score': round(improvement_rate * 100 + (rank_improvement / 10), 2),
'data_points': len(ranks)
})
# 按机会得分排序
opportunities.sort(key=lambda x: x['opportunity_score'], reverse=True)
return opportunities[:50] # 返回前50个机会
except Exception as e:
logging.error(f"Market opportunity detection failed: {e}")
return []from tencentcloud.common import credential
from tencentcloud.monitor.v20180724 import monitor_client, models as monitor_models
from tencentcloud.cls.v20201016 import cls_client, models as cls_models
import json
import time
from typing import Dict, List
class TencentCloudMonitoringSystem:
def __init__(self, config: Dict):
self.config = config
self.setup_clients()
self.setup_custom_metrics()
def setup_clients(self):
"""初始化腾讯云客户端"""
cred = credential.Credential(
self.config['secret_id'],
self.config['secret_key']
)
# 监控客户端
self.monitor_client = monitor_client.MonitorClient(cred, "ap-beijing")
# 日志客户端
self.cls_client = cls_client.ClsClient(cred, "ap-beijing")
def setup_custom_metrics(self):
"""设置自定义监控指标"""
self.custom_metrics = {
'collection_success_rate': {
'namespace': 'QCE/AMAZON_COLLECTOR',
'metric_name': 'CollectionSuccessRate',
'unit': 'Percent'
},
'data_processing_latency': {
'namespace': 'QCE/AMAZON_COLLECTOR',
'metric_name': 'DataProcessingLatency',
'unit': 'Milliseconds'
},
'api_response_time': {
'namespace': 'QCE/AMAZON_COLLECTOR',
'metric_name': 'ApiResponseTime',
'unit': 'Milliseconds'
},
'queue_depth': {
'namespace': 'QCE/AMAZON_COLLECTOR',
'metric_name': 'QueueDepth',
'unit': 'Count'
}
}
def report_custom_metric(self, metric_name: str, value: float, dimensions: Dict[str, str] = None):
"""上报自定义监控指标"""
try:
if metric_name not in self.custom_metrics:
logging.error(f"Unknown metric: {metric_name}")
return
metric_config = self.custom_metrics[metric_name]
# 构建监控数据
metric_data = monitor_models.MetricDatum()
metric_data.MetricName = metric_config['metric_name']
metric_data.Value = value
metric_data.Timestamp = int(time.time())
# 添加维度
if dimensions:
metric_data.Dimensions = []
for key, val in dimensions.items():
dimension = monitor_models.Dimension()
dimension.Name = key
dimension.Value = val
metric_data.Dimensions.append(dimension)
# 创建请求
req = monitor_models.PutMonitorDataRequest()
req.Namespace = metric_config['namespace']
req.MetricData = [metric_data]
# 发送请求
resp = self.monitor_client.PutMonitorData(req)
logging.info(f"Reported metric {metric_name}: {value}")
except Exception as e:
logging.error(f"Failed to report metric {metric_name}: {e}")
def create_alert_policy(self, policy_name: str, conditions: List[Dict]) -> str:
"""创建告警策略"""
try:
req = monitor_models.CreateAlarmPolicyRequest()
req.PolicyName = policy_name
req.Module = "monitor"
req.PolicyType = "cvm_device"
req.Remark = f"Amazon collector alert policy: {policy_name}"
# 设置告警条件
req.Conditions = []
for condition in conditions:
alarm_condition = monitor_models.AlarmPolicyCondition()
alarm_condition.MetricName = condition['metric_name']
alarm_condition.Period = condition.get('period', 300) # 5分钟
alarm_condition.Operator = condition.get('operator', 'gt') # 大于
alarm_condition.Value = str(condition['threshold'])
alarm_condition.ContinuePeriod = condition.get('continue_period', 2)
req.Conditions.append(alarm_condition)
# 设置通知模板
req.NoticeIds = [self.config.get('notice_template_id')]
resp = self.monitor_client.CreateAlarmPolicy(req)
policy_id = resp.PolicyId
logging.info(f"Created alert policy: {policy_name} (ID: {policy_id})")
return policy_id
except Exception as e:
logging.error(f"Failed to create alert policy: {e}")
return None
def setup_system_alerts(self):
"""设置系统告警"""
# 数据采集成功率告警
collection_alert = self.create_alert_policy(
"Amazon Collection Success Rate Alert",
[{
'metric_name': 'CollectionSuccessRate',
'threshold': 90, # 成功率低于90%告警
'operator': 'lt',
'period': 300,
'continue_period': 2
}]
)
# API响应时间告警
api_latency_alert = self.create_alert_policy(
"Amazon API Response Time Alert",
[{
'metric_name': 'ApiResponseTime',
'threshold': 5000, # 响应时间超过5秒告警
'operator': 'gt',
'period': 300,
'continue_period': 3
}]
)
# 队列深度告警
queue_alert = self.create_alert_policy(
"Amazon Queue Depth Alert",
[{
'metric_name': 'QueueDepth',
'threshold': 1000, # 队列深度超过1000告警
'operator': 'gt',
'period': 300,
'continue_period': 2
}]
)
return {
'collection_alert_id': collection_alert,
'api_latency_alert_id': api_latency_alert,
'queue_alert_id': queue_alert
}
def log_system_event(self, event_type: str, message: str, level: str = "INFO", extra_data: Dict = None):
"""记录系统事件到腾讯云日志服务"""
try:
log_data = {
'timestamp': int(time.time() * 1000),
'level': level,
'event_type': event_type,
'message': message,
'service': 'amazon_collector',
'version': self.config.get('version', '1.0.0')
}
if extra_data:
log_data.update(extra_data)
# 构建日志请求
req = cls_models.UploadLogRequest()
req.TopicId = self.config['cls_topic_id']
req.HashKey = f"amazon_collector_{int(time.time())}"
req.CompressType = ""
# 日志内容
log_content = cls_models.LogItem()
log_content.Time = log_data['timestamp']
log_content.Contents = []
for key, value in log_data.items():
content = cls_models.LogContent()
content.Key = key
content.Value = str(value)
log_content.Contents.append(content)
req.LogItems = [log_content]
# 发送日志
resp = self.cls_client.UploadLog(req)
except Exception as e:
logging.error(f"Failed to upload log to CLS: {e}")基于腾讯云TKE的HPA(水平Pod自动扩缩容)配置:
# hpa-config.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: amazon-collector-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: amazon-collector
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: queue_depth
target:
type: AverageValue
averageValue: "100"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60class CostOptimizer:
def __init__(self, config: Dict):
self.config = config
self.setup_cost_monitoring()
def setup_cost_monitoring(self):
"""设置成本监控"""
# 监控各服务的成本
self.cost_thresholds = {
'mongodb': 1000, # 月成本阈值(元)
'redis': 500,
'cos': 200,
'elasticsearch': 800,
'cvm': 2000
}
def optimize_storage_costs(self):
"""优化存储成本"""
# 1. 数据生命周期管理
self.setup_data_lifecycle()
# 2. 冷热数据分离
self.implement_data_tiering()
# 3. 压缩和去重
self.optimize_data_compression()
def setup_data_lifecycle(self):
"""设置数据生命周期"""
lifecycle_rules = {
'hot_data': {
'retention_days': 30,
'storage_class': 'STANDARD'
},
'warm_data': {
'retention_days': 90,
'storage_class': 'STANDARD_IA'
},
'cold_data': {
'retention_days': 365,
'storage_class': 'ARCHIVE'
}
}
# 实施COS生命周期规则
for rule_name, config in lifecycle_rules.items():
self.create_cos_lifecycle_rule(rule_name, config)
def implement_data_tiering(self):
"""实施数据分层"""
# 热数据:最近7天,存储在Redis + MongoDB
# 温数据:7-30天,存储在MongoDB
# 冷数据:30天以上,存储在COS + ES(仅索引)
current_time = datetime.now()
# 迁移30天前的数据到COS
cutoff_date = current_time - timedelta(days=30)
# 查询需要迁移的数据
old_data_query = {
"timestamp": {"$lt": cutoff_date}
}
# 执行数据迁移逻辑
self.migrate_old_data_to_cos(old_data_query)
def calculate_cost_efficiency(self) -> Dict[str, float]:
"""计算成本效率"""
metrics = {
'cost_per_request': self.get_cost_per_request(),
'cost_per_gb_processed': self.get_cost_per_gb(),
'cost_per_active_user': self.get_cost_per_user(),
'infrastructure_utilization': self.get_utilization_rate()
}
return metrics
def recommend_optimizations(self) -> List[Dict[str, str]]:
"""推荐优化建议"""
recommendations = []
# 分析资源使用情况
utilization = self.analyze_resource_utilization()
if utilization['cpu_avg'] < 0.3:
recommendations.append({
'type': 'resource_optimization',
'suggestion': '考虑降低CPU配置或减少实例数量',
'potential_savings': '20-30%'
})
if utilization['storage_growth_rate'] > 0.5:
recommendations.append({
'type': 'storage_optimization',
'suggestion': '启用数据压缩和生命周期管理',
'potential_savings': '15-25%'
})
return recommendations基于腾讯云监控和Grafana的监控大盘:
{
"dashboard": {
"title": "Amazon Ranking Collector Dashboard",
"panels": [
{
"title": "Collection Success Rate",
"type": "stat",
"targets": [
{
"expr": "avg(collection_success_rate)",
"legendFormat": "Success Rate"
}
],
"thresholds": [
{"color": "red", "value": 0},
{"color": "yellow", "value": 90},
{"color": "green", "value": 95}
]
},
{
"title": "API Response Time",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, api_response_time_bucket)",
"legendFormat": "95th percentile"
},
{
"expr": "histogram_quantile(0.50, api_response_time_bucket)",
"legendFormat": "50th percentile"
}
]
},
{
"title": "Queue Depth",
"type": "graph",
"targets": [
{
"expr": "queue_depth",
"legendFormat": "Queue Depth"
}
]
},
{
"title": "Error Rate by Category",
"type": "table",
"targets": [
{
"expr": "sum by (category) (rate(collection_errors_total[5m]))",
"format": "table"
}
]
}
]
}
}class IncidentManager:
def __init__(self, config: Dict):
self.config = config
self.alert_channels = self.setup_alert_channels()
def setup_alert_channels(self):
"""设置告警通道"""
return {
'wechat': self.config.get('wechat_webhook'),
'email': self.config.get('email_smtp'),
'sms': self.config.get('sms_api'),
'slack': self.config.get('slack_webhook')
}
async def handle_collection_failure(self, error_details: Dict):
"""处理采集失败"""
severity = self.assess_failure_severity(error_details)
if severity == 'critical':
# 立即通知运维团队
await self.send_critical_alert(error_details)
# 启动自动恢复流程
await self.initiate_auto_recovery(error_details)
elif severity == 'warning':
# 记录日志并监控
self.log_warning(error_details)
# 增加重试频率
await self.increase_retry_frequency(error_details)
def assess_failure_severity(self, error_details: Dict) -> str:
"""评估故障严重程度"""
failure_rate = error_details.get('failure_rate', 0)
affected_categories = error_details.get('affected_categories', [])
duration = error_details.get('duration_minutes', 0)
if (failure_rate > 0.5 or
len(affected_categories) > 10 or
duration > 30):
return 'critical'
elif (failure_rate > 0.2 or
len(affected_categories) > 5 or
duration > 15):
return 'warning'
else:
return 'info'
async def initiate_auto_recovery(self, error_details: Dict):
"""启动自动恢复"""
recovery_actions = [
self.restart_failed_workers,
self.switch_to_backup_proxies,
self.reduce_collection_frequency,
self.enable_circuit_breaker
]
for action in recovery_actions:
try:
await action(error_details)
logging.info(f"Recovery action completed: {action.__name__}")
except Exception as e:
logging.error(f"Recovery action failed: {action.__name__}: {e}")经过6个月的生产环境运行,我们的系统取得了显著成效:
性能指标
成本效益
业务价值
在项目实施过程中,我们也评估了使用专业API服务的方案。以Pangolin Scrape API为例:
优势对比
适用场景
基于腾讯云构建的亚马逊榜单监控系统为我们提供了宝贵的实践经验。云原生架构不仅提升了系统的可扩展性和可靠性,也大幅降低了运维复杂度。
对于企业在选择数据采集方案时,建议综合考虑以下因素:
无论选择哪种方案,关键是要建立完善的数据治理体系,确保数据的质量、安全和合规使用。
作者简介:腾讯云解决方案架构师,专注于大数据和AI领域的云原生架构设计,拥有丰富的电商数据平台建设经验。
技术交流:欢迎关注腾讯云开发者社区,获取更多云原生架构实践分享。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。