没事干买了一块板子实现了人体检测加天气播报。拿出来给大家分享,高手勿喷。
用到的硬件有:
1. ESP32 wroom
2.人体红外检测模块
3.MAX98357A+5v喇叭
4.跳线
这里是借助micropython实现客户端代码。
因为在连接wifi遇到了一些问题,代码里加了一些重连和检测的逻辑。
from machine import Pin, PWM, I2S
import time
import gc
import machine
import network
import urequests
print("=== 超稳定人体检测系统 ===")
# 🛡️ 安全引脚配置 - 避免SPI Flash冲突
SAFE_PIN_CONFIG = {
'i2s_sck': 25, # 绝对安全 - 原GPIO12
'i2s_ws': 26, # 绝对安全 - 原GPIO14
'i2s_sd': 27, # 绝对安全 - 原GPIO13
'sensor': 4, # 绝对安全 - 替代GPIO15
'led': 2 # 绝对安全 - 板载LED
}
# 系统状态
system_state = {
'human_detected': False,
'sensor_current_state': False,
'sensor_last_state': False,
'audio_playing': False,
'wifi_connected': False,
'wifi_connecting': False,
'detection_count': 0,
'last_detection_time': 0,
'sensor_error_count': 0,
'wifi_error_count': 0,
'boot_count': 0
}
# 硬件对象
audio_out = None
human_detect = None
led_pwm = None
wlan = None
# 配置参数
CONFIG = {
'ssid': "",
'password': "",
'audio_url': "http://192.168.1.5:5000/weather?city=123456",
'debounce_time': 1000, # 增加防抖时间(ms)
'max_audio_duration': 20000, # 减少最大音频时长(ms)
'wifi_timeout': 8, # 减少WiFi连接超时(秒)
'min_memory': 80000, # 增加最小内存阈值
'sensor_check_interval': 2000, # 增加传感器状态检查间隔(ms)
'max_wifi_retries': 2, # 最大WiFi重试次数
'wifi_retry_delay': 10 # WiFi重试延迟(秒)
}
def load_boot_count():
"""尝试从文件加载启动计数"""
try:
with open('boot_count.txt', 'r') as f:
system_state['boot_count'] = int(f.read().strip()) + 1
except:
system_state['boot_count'] = 1
try:
with open('boot_count.txt', 'w') as f:
f.write(str(system_state['boot_count']))
except:
pass
def safe_hardware_init():
"""安全初始化所有硬件组件"""
global audio_out, human_detect, led_pwm
print("初始化硬件组件...")
# 先初始化简单的组件
try:
# LED PWM输出 - 最先初始化,用于状态指示
led_pwm = PWM(Pin(SAFE_PIN_CONFIG['led']), freq=1000, duty=0)
print("✅ LED PWM初始化成功")
# 立即点亮LED表示系统开始启动
led_pwm.duty(300)
except Exception as e:
print(f"❌ LED PWM失败: {e}")
led_pwm = None
try:
# 人体检测传感器 - 添加上拉电阻
human_detect = Pin(SAFE_PIN_CONFIG['sensor'], Pin.IN, Pin.PULL_UP)
# 读取初始状态
system_state['sensor_current_state'] = human_detect.value()
system_state['sensor_last_state'] = system_state['sensor_current_state']
print("✅ 人体传感器初始化成功")
print(f" 初始状态: {'高电平(检测到)' if system_state['sensor_current_state'] else '低电平(未检测)'}")
except Exception as e:
print(f"❌ 人体传感器失败: {e}")
human_detect = None
# I2S音频最后初始化,因为它最复杂
try:
# I2S音频输出 - 使用安全引脚和保守配置
audio_out = I2S(
0,
sck=Pin(SAFE_PIN_CONFIG['i2s_sck']),
ws=Pin(SAFE_PIN_CONFIG['i2s_ws']),
sd=Pin(SAFE_PIN_CONFIG['i2s_sd']),
mode=I2S.TX,
bits=16,
format=I2S.MONO,
rate=8000, # 降低采样率
ibuf=4000 # 进一步减小缓冲区
)
print("✅ I2S音频初始化成功")
except Exception as e:
print(f"❌ I2S初始化失败: {e}")
audio_out = None
# LED恢复呼吸效果
if led_pwm:
led_pwm.duty(0)
return True # 即使部分硬件失败也继续运行
def ultra_safe_wifi_connect():
"""超安全WiFi连接,避免任何可能导致重启的操作"""
global wlan, system_state
if system_state['wifi_connecting']:
print("⚠️ WiFi连接正在进行中,跳过重复连接")
return False
system_state['wifi_connecting'] = True
system_state['wifi_error_count'] += 1
print(f"尝试第{system_state['wifi_error_count']}次WiFi连接...")
# 如果错误次数太多,暂时禁用WiFi
if system_state['wifi_error_count'] > CONFIG['max_wifi_retries']:
print(f"❌ WiFi连接失败次数过多,暂时禁用WiFi功能")
system_state['wifi_connecting'] = False
return False
try:
# 完全重置网络状态
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
time.sleep(2) # 给硬件足够时间重置
wlan.active(True)
time.sleep(1)
if not wlan.isconnected():
print(f'连接到 {CONFIG["ssid"]}...')
# 非常保守的连接尝试
wlan.connect(CONFIG["ssid"], CONFIG["password"])
# 短暂等待,避免长时间阻塞
for i in range(CONFIG['wifi_timeout']):
if wlan.isconnected():
break
time.sleep(1)
print('.', end='')
gc.collect()
if wlan.isconnected():
system_state['wifi_connected'] = True
system_state['wifi_error_count'] = 0
print('\n✅ WiFi连接成功!')
print(f'IP地址: {wlan.ifconfig()[0]}')
system_state['wifi_connecting'] = False
return True
else:
print('\n❌ WiFi连接超时')
# 不立即重试,等待主循环中的重试机制
wlan.active(False)
except Exception as e:
print(f'\n❌ WiFi连接异常: {e}')
# 不打印详细异常信息,避免可能的内存问题
system_state['wifi_connecting'] = False
return False
def handle_human_detected(pin):
"""人体检测中断处理 - 极简版本"""
current_time = time.ticks_ms()
# 防抖处理:防止短时间内重复触发
if time.ticks_diff(current_time, system_state['last_detection_time']) > CONFIG['debounce_time']:
system_state['human_detected'] = True
system_state['last_detection_time'] = current_time
system_state['detection_count'] += 1
# 读取当前传感器状态
try:
system_state['sensor_current_state'] = human_detect.value()
print(f"👤 人体检测 #{system_state['detection_count']} - 传感器状态: {'高电平' if system_state['sensor_current_state'] else '低电平'}")
except Exception as e:
print(f"⚠️ 读取传感器状态失败: {e}")
system_state['sensor_error_count'] += 1
def check_sensor_status():
"""定期检查传感器状态"""
if not human_detect:
return
try:
current_state = human_detect.value()
# 状态变化检测
if current_state != system_state['sensor_last_state']:
system_state['sensor_last_state'] = current_state
state_desc = "高电平(检测到)" if current_state else "低电平(未检测)"
print(f"🔍 传感器状态变化: {state_desc}")
system_state['sensor_current_state'] = current_state
except Exception as e:
print(f"⚠️ 传感器状态检查失败: {e}")
system_state['sensor_error_count'] += 1
def simple_led_effect():
"""简单的LED闪烁效果,避免复杂计算"""
led_state = False
counter = 0
def update():
nonlocal led_state, counter
counter += 1
if counter >= 25: # 每25个周期切换一次(约500ms)
led_state = not led_state
if led_pwm:
led_pwm.duty(200 if led_state else 0)
counter = 0
return update
def safe_audio_playback():
"""安全的音频播放 - 极度保守"""
global system_state
if system_state['audio_playing']:
return False
if not system_state['wifi_connected'] or not audio_out:
return False
system_state['audio_playing'] = True
print("🎵 尝试播放音频...")
try:
# 极度保守的音频播放
response = urequests.get(CONFIG['audio_url'], stream=True, timeout=5)
response.raw.read(44) # 跳过头部
start_time = time.ticks_ms()
bytes_played = 0
max_bytes = 16000 # 限制最大播放数据量
while bytes_played < max_bytes:
# 严格的超时和内存检查
if (time.ticks_diff(time.ticks_ms(), start_time) > CONFIG['max_audio_duration'] or
gc.mem_free() < CONFIG['min_memory']):
break
chunk = response.raw.read(128) # 非常小的读取块
if len(chunk) == 0:
break
audio_out.write(chunk)
bytes_played += len(chunk)
# 频繁的垃圾回收
if bytes_played % 1024 == 0:
gc.collect()
response.close()
print(f"✅ 音频播放完成: {bytes_played} bytes")
return True
except Exception as e:
print(f"❌ 音频播放失败: {e}")
return False
finally:
system_state['audio_playing'] = False
def play_local_feedback():
"""本地反馈(无网络时使用)"""
print("🔊 播放本地提示")
if not led_pwm:
return
# 超简单的LED闪烁反馈
for i in range(2): # 减少闪烁次数
led_pwm.duty(500)
time.sleep(0.3)
led_pwm.duty(0)
time.sleep(0.2)
def get_sensor_status_text():
"""获取传感器状态文本描述"""
if not human_detect:
return "传感器未初始化"
try:
current_state = human_detect.value()
status_text = "高电平(检测到)" if current_state else "低电平(未检测)"
if system_state['sensor_error_count'] > 0:
status_text += f" [错误:{system_state['sensor_error_count']}]"
return status_text
except:
return "传感器读取失败"
def print_system_status():
"""打印系统状态 - 简化版本"""
free_memory = gc.mem_free()
print(f"\n📊 系统状态 [启动#{system_state['boot_count']}]")
print(f"💾 内存: {free_memory} bytes")
print(f"👤 检测: {system_state['detection_count']}次")
print(f"📡 WiFi: {'已连接' if system_state['wifi_connected'] else '未连接'}")
print(f"🔍 传感器: {get_sensor_status_text()}")
print(f"🎵 音频: {'播放中' if system_state['audio_playing'] else '空闲'}")
# 内存警告
if free_memory < CONFIG['min_memory']:
print("🧹 内存紧张,进行垃圾回收...")
gc.collect()
def main_loop():
"""主循环 - 极度保守版本"""
print("进入主循环...")
# 初始化LED效果
update_led = simple_led_effect()
# 计时器
status_timer = time.ticks_ms()
sensor_check_timer = time.ticks_ms()
wifi_retry_timer = time.ticks_ms()
loop_count = 0
while True:
current_time = time.ticks_ms()
loop_count += 1
# 1. 更新LED
update_led()
# 2. 定期检查传感器状态(减少频率)
if time.ticks_diff(current_time, sensor_check_timer) > CONFIG['sensor_check_interval']:
check_sensor_status()
sensor_check_timer = current_time
# 3. 每60秒报告系统状态(减少频率)
if time.ticks_diff(current_time, status_timer) > 60000:
print_system_status()
status_timer = current_time
# 强制垃圾回收
gc.collect()
# 4. WiFi重连机制(保守策略)
if (not system_state['wifi_connected'] and
not system_state['wifi_connecting'] and
time.ticks_diff(current_time, wifi_retry_timer) > (CONFIG['wifi_retry_delay'] * 1000)):
print("🔄 尝试WiFi重连...")
ultra_safe_wifi_connect()
wifi_retry_timer = current_time
# 5. 处理人体检测(简化处理)
if system_state['human_detected'] and not system_state['audio_playing']:
print(f"🎯 处理检测事件 #{system_state['detection_count']}")
system_state['human_detected'] = False
# 极度保守的响应策略
if system_state['wifi_connected']:
safe_audio_playback() # 不关心返回值
else:
play_local_feedback()
# 6. 防止循环计数溢出
if loop_count > 1000000:
loop_count = 0
# 7. 适当延时
time.sleep_ms(50) # 增加延时,减少CPU负载
def emergency_recovery():
"""紧急恢复机制"""
print("执行紧急恢复检查...")
# 显示复位信息
reset_cause = machine.reset_cause()
reset_reasons = {
0: "电源开机", 1: "硬件复位", 2: "看门狗复位",
3: "深度睡眠", 4: "软复位"
}
res=reset_reasons.get(reset_cause, f'未知({reset_cause})')
print(f"复位原因: {res}")
# 强制垃圾回收
gc.collect()
free_mem = gc.mem_free()
print(f"可用内存: {free_mem} bytes")
# 如果频繁重启,进入安全模式
if system_state['boot_count'] > 3:
print("⚠️ 检测到频繁重启,进入安全模式")
return False
return free_mem > 40000 # 内存阈值检查
def main():
"""主程序"""
print("启动超稳定人体检测系统...")
# 加载启动计数
load_boot_count()
# 显示系统信息
print(f"启动次数: #{system_state['boot_count']}")
print(f"初始内存: {gc.mem_free()} bytes")
# 紧急恢复检查
if not emergency_recovery():
print("🔄 系统状态不佳,延迟启动...")
time.sleep(5)
# 初始化硬件
safe_hardware_init()
# 设置人体检测中断
if human_detect:
human_detect.irq(trigger=Pin.IRQ_RISING, handler=handle_human_detected)
print("✅ 人体检测中断已设置")
else:
print("⚠️ 无法设置人体检测中断")
# 显示运行模式
print("🎯 系统模式: 保守运行模式")
print("💡 WiFi将自动重连,失败不影响基本功能")
print("🚀 系统启动完成,开始运行...")
# 进入主循环
try:
main_loop()
except Exception as e:
print(f"💥 主循环异常: {e}")
print("🔄 系统将在10秒后重启...")
time.sleep(10)
machine.reset()
if __name__ == "__main__":
main()
客户端代码跑在ESP32上
服务端代码跑在本地电脑,用flask实现。使用到了高德的天气API,腾讯的TTS,都有免费的额度可以申请。代码也极其简单,参考如下:
from flask import Flask, request, jsonify, Response
import os
import hashlib
import hmac
import json
import time
import base64
import logging
from datetime import datetime
from http.client import HTTPSConnection, HTTPConnection
import sys
import urllib.parse
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__)
# API配置
TENCENT_SECRET_ID = "YOUR_TENCENT_SECRET_ID" # 替换为您的腾讯云SecretId
TENCENT_SECRET_KEY = "YOUR_TENCENT_SECRET_KEY" # 替换为您的腾讯云SecretKey
AMAP_API_KEY = "" # 替换为您的高德地图API Key
class TTSManager:
def __init__(self, secret_id, secret_key):
self.secret_id = secret_id
self.secret_key = secret_key
self.service = "tts"
self.host = "tts.tencentcloudapi.com"
self.version = "2019-08-23"
self.action = "TextToVoice"
# 验证密钥
self.validate_credentials()
def validate_credentials(self):
"""验证腾讯云密钥是否有效"""
if not self.secret_id or self.secret_id == "YOUR_TENCENT_SECRET_ID":
raise ValueError("TENCENT_SECRET_ID 未设置,请修改代码中的 TENCENT_SECRET_ID")
if not self.secret_key or self.secret_key == "YOUR_TENCENT_SECRET_KEY":
raise ValueError("TENCENT_SECRET_KEY 未设置,请修改代码中的 TENCENT_SECRET_KEY")
logger.info("腾讯云凭证验证通过")
def sign(self, key, msg):
"""HMAC签名"""
if isinstance(msg, str):
msg = msg.encode('utf-8')
return hmac.new(key, msg, hashlib.sha256).digest()
def generate_signature(self, date, string_to_sign):
"""生成签名"""
try:
secret_date = self.sign(("TC3" + self.secret_key).encode("utf-8"), date)
secret_service = self.sign(secret_date, self.service)
secret_signing = self.sign(secret_service, "tc3_request")
signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
return signature
except Exception as e:
logger.error(f"生成签名失败: {e}")
return None
def text_to_speech(self, text, speed=-1, volume=1.0, voice_type=601015):
"""调用腾讯云TTS API将文本转换为语音"""
try:
# 验证输入参数
if not text or not isinstance(text, str):
logger.error("文本参数无效")
return None
# 构造请求payload
payload = {
"Text": text,
"SessionId": f"session-{int(time.time())}",
"Volume": volume,
"Speed": speed,
"ProjectId": 0,
"ModelType": 1,
"VoiceType": voice_type,
"Codec": "wav",
"EmotionIntensity": 100
}
# 使用ensure_ascii=False确保中文字符正确编码
payload_str = json.dumps(payload, ensure_ascii=False)
logger.info(f"TTS请求文本: {text}")
# 生成签名所需参数
timestamp = int(time.time())
date_str = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")
algorithm = "TC3-HMAC-SHA256"
# 步骤1:拼接规范请求串
http_request_method = "POST"
canonical_uri = "/"
canonical_querystring = ""
ct = "application/json; charset=utf-8"
canonical_headers = f"content-type:{ct}\nhost:{self.host}\nx-tc-action:{self.action.lower()}\n"
signed_headers = "content-type;host;x-tc-action"
# 确保payload使用UTF-8编码
hashed_request_payload = hashlib.sha256(payload_str.encode("utf-8")).hexdigest()
canonical_request = (http_request_method + "\n" +
canonical_uri + "\n" +
canonical_querystring + "\n" +
canonical_headers + "\n" +
signed_headers + "\n" +
hashed_request_payload)
# 步骤2:拼接待签名字符串
credential_scope = date_str + "/" + self.service + "/" + "tc3_request"
hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
string_to_sign = (algorithm + "\n" +
str(timestamp) + "\n" +
credential_scope + "\n" +
hashed_canonical_request)
# 步骤3:计算签名
signature = self.generate_signature(date_str, string_to_sign)
if not signature:
logger.error("签名生成失败")
return None
# 步骤4:拼接Authorization
authorization = (algorithm + " " +
"Credential=" + self.secret_id + "/" + credential_scope + ", " +
"SignedHeaders=" + signed_headers + ", " +
"Signature=" + signature)
# 步骤5:构造请求头
headers = {
"Authorization": authorization,
"Content-Type": "application/json; charset=utf-8",
"Host": self.host,
"X-TC-Action": self.action,
"X-TC-Timestamp": str(timestamp),
"X-TC-Version": self.version
}
# 发送请求
logger.info("发送请求到腾讯云TTS API...")
conn = HTTPSConnection(self.host)
conn.request("POST", "/", headers=headers, body=payload_str.encode("utf-8"))
response = conn.getresponse()
response_data = response.read()
conn.close()
# 处理响应
if response.status == 200:
try:
response_json = json.loads(response_data.decode('utf-8'))
except Exception as e:
logger.error(f"响应JSON解析失败: {e}")
return None
if 'Response' in response_json and 'Audio' in response_json['Response']:
audio_base64 = response_json['Response']['Audio']
# Base64解码音频数据
try:
audio_binary = base64.b64decode(audio_base64)
logger.info(f"TTS转换成功: {len(audio_binary)}字节音频")
return audio_binary
except Exception as e:
logger.error(f"Base64解码失败: {e}")
return None
else:
error_info = response_json.get('Response', {}).get('Error', {})
error_msg = f"API错误: {error_info.get('Code', 'Unknown')} - {error_info.get('Message', 'Unknown error')}"
logger.error(error_msg)
return None
else:
logger.error(f"HTTP请求失败: {response.status} - {response.reason}")
try:
error_response = json.loads(response_data.decode('utf-8'))
logger.error(f"错误详情: {error_response}")
except:
logger.error(f"原始响应: {response_data}")
return None
except Exception as e:
logger.error(f"TTS转换异常: {str(e)}")
return None
class WeatherManager:
def __init__(self, api_key):
self.api_key = api_key
self.host = "restapi.amap.com"
self.base_url = "/v3/weather/weatherInfo"
# 验证API Key
self.validate_api_key()
def validate_api_key(self):
"""验证高德API Key"""
if not self.api_key or self.api_key == "YOUR_AMAP_API_KEY":
raise ValueError("AMAP_API_KEY 未设置,请修改代码中的 AMAP_API_KEY")
logger.info("高德地图API凭证验证通过")
def get_weather_data(self, city_code="110000"):
"""获取天气数据"""
try:
# 构建查询参数
params = {
"city": city_code,
"key": self.api_key,
"output": "JSON"
}
query_string = urllib.parse.urlencode(params)
url = f"{self.base_url}?{query_string}"
logger.info(f"请求天气数据: 城市代码 {city_code}")
# 发送HTTP请求
conn = HTTPConnection(self.host)
conn.request("GET", url)
response = conn.getresponse()
response_data = response.read().decode('utf-8')
conn.close()
# 解析响应
weather_data = json.loads(response_data)
if weather_data.get("status") == "1" and weather_data.get("count") != "0":
lives = weather_data.get("lives", [])
if lives:
live_data = lives[0]
logger.info(f"天气数据获取成功: {live_data.get('city')} - {live_data.get('weather')}")
return live_data
else:
logger.error("天气数据为空")
return None
else:
error_msg = weather_data.get("info", "未知错误")
logger.error(f"天气API请求失败: {error_msg}")
return None
except Exception as e:
logger.error(f"获取天气数据异常: {str(e)}")
return None
def format_weather_report(self, weather_data):
"""格式化天气报告文本"""
if not weather_data:
return "无法获取天气信息"
try:
province = weather_data.get("province", "")
city = weather_data.get("city", "")
weather = weather_data.get("weather", "")
temperature = weather_data.get("temperature", "")
wind_direction = weather_data.get("winddirection", "")
wind_power = weather_data.get("windpower", "")
humidity = weather_data.get("humidity", "")
# 构建自然语言天气报告
report_parts = []
if province and city:
if province == city:
report_parts.append(f"您好!您现在所在{city}")
else:
report_parts.append(f"您好!您现在所在{province}{city}")
if weather:
report_parts.append(f"天气{weather}")
if temperature:
report_parts.append(f"温度{temperature}度")
if wind_direction and wind_power:
# 处理风力等级描述
if wind_power == "≤3":
wind_desc = "微风"
else:
wind_desc = f"{wind_power}级"
report_parts.append(f"{wind_direction}风{wind_desc}")
if humidity:
report_parts.append(f"湿度{humidity}%")
if report_parts:
weather_text = ",".join(report_parts) + "。"
logger.info(f"天气报告文本: {weather_text}")
return weather_text
else:
return "天气信息不完整"
except Exception as e:
logger.error(f"格式化天气报告异常: {str(e)}")
return "天气信息处理错误"
# 创建管理器实例
try:
tts_manager = TTSManager(TENCENT_SECRET_ID, TENCENT_SECRET_KEY)
weather_manager = WeatherManager(AMAP_API_KEY)
logger.info("所有服务管理器初始化成功")
except ValueError as e:
logger.error(f"服务初始化失败: {e}")
print(f"❌ 错误: {e}")
print("请修改代码中的 API 密钥配置")
sys.exit(1)
except Exception as e:
logger.error(f"服务初始化异常: {e}")
sys.exit(1)
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查端点"""
return jsonify({
"status": "healthy",
"service": "Weather TTS Server",
"tencent_tts_configured": bool(TENCENT_SECRET_ID and TENCENT_SECRET_ID != "YOUR_TENCENT_SECRET_ID"),
"amap_weather_configured": bool(AMAP_API_KEY and AMAP_API_KEY != "YOUR_AMAP_API_KEY")
})
@app.route('/weather', methods=['GET'])
def get_weather_audio():
"""获取天气语音报告"""
try:
# 获取城市代码参数,默认为北京
city_code = request.args.get('city', '110000')
logger.info(f"请求天气语音报告,城市代码: {city_code}")
# 获取天气数据
weather_data = weather_manager.get_weather_data(city_code)
if not weather_data:
return jsonify({"error": "获取天气数据失败"}), 500
# 格式化天气报告
weather_text = weather_manager.format_weather_report(weather_data)
# 转换为语音
audio_data = tts_manager.text_to_speech(weather_text)
if not audio_data:
return jsonify({"error": "语音合成失败"}), 500
# 返回音频数据 - 修复编码问题
response = Response(
audio_data,
mimetype='audio/wav'
)
# 设置安全的响应头,避免中文字符
response.headers['Content-Length'] = str(len(audio_data))
response.headers['Content-Type'] = 'audio/wav'
# 使用Base64编码传递中文字符,或者直接省略
# 如果需要传递天气文本,使用Base64编码
import base64
weather_text_encoded = base64.b64encode(weather_text.encode('utf-8')).decode('ascii')
response.headers['X-Weather-Text-Base64'] = weather_text_encoded
return response
except Exception as e:
logger.error(f"天气语音端点异常: {str(e)}")
return jsonify({"error": f"服务器内部错误: {str(e)}"}), 500
@app.route('/weather/text', methods=['GET'])
def get_weather_text():
"""获取天气文本报告(不合成语音)"""
try:
city_code = request.args.get('city', '110000')
logger.info(f"请求天气文本报告,城市代码: {city_code}")
# 获取天气数据
weather_data = weather_manager.get_weather_data(city_code)
if not weather_data:
return jsonify({"error": "获取天气数据失败"}), 500
# 格式化天气报告
weather_text = weather_manager.format_weather_report(weather_data)
return jsonify({
"city_code": city_code,
"weather_data": weather_data,
"weather_report": weather_text,
"timestamp": int(time.time())
})
except Exception as e:
logger.error(f"天气文本端点异常: {str(e)}")
return jsonify({"error": f"服务器内部错误: {str(e)}"}), 500
@app.route('/cities', methods=['GET'])
def get_supported_cities():
"""获取支持的城市列表"""
cities = {
"北京": "110000",
"上海": "310000",
"广州": "440100",
"深圳": "440300",
"杭州": "330100",
"南京": "320100",
"武汉": "420100",
"成都": "510100",
"西安": "610100",
"郑州": "410100"
}
return jsonify(cities)
@app.route('/example', methods=['GET'])
def example_requests():
"""提供请求示例"""
import socket
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
examples = {
"health_check": f'curl http://{local_ip}:5000/health',
"weather_audio_beijing": f'curl http://{local_ip}:5000/weather?city=110000 --output beijing_weather.wav',
"weather_audio_shanghai": f'curl http://{local_ip}:5000/weather?city=310000 --output shanghai_weather.wav',
"weather_text": f'curl http://{local_ip}:5000/weather/text?city=110000',
"supported_cities": f'curl http://{local_ip}:5000/cities'
}
return jsonify(examples)
if __name__ == '__main__':
# 设置环境编码为UTF-8,解决latin-1编码问题
import locale
if hasattr(locale, 'LC_ALL'):
try:
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
except:
try:
locale.setlocale(locale.LC_ALL, 'C.UTF-8')
except:
pass
# 获取本机IP地址
import socket
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
print("=" * 60)
print("🌤️ 天气TTS服务器启动信息:")
print(f"本地访问: http://localhost:5000")
print(f"网络访问: http://{local_ip}:5000")
print("=" * 60)
print("📋 可用端点:")
print(" GET /health - 健康检查")
print(" GET /example - 获取请求示例")
print(" GET /cities - 获取支持的城市列表")
print(" GET /weather - 获取天气语音报告(WAV音频)")
print(" GET /weather/text - 获取天气文本报告(JSON)")
print("=" * 60)
print("🏙️ 支持的城市代码示例:")
print(" 北京: 110000, 上海: 310000, 广州: 440100")
print(" 深圳: 440300, 杭州: 330100, 成都: 510100")
print("=" * 60)
# 检查API密钥配置
if TENCENT_SECRET_ID == "YOUR_TENCENT_SECRET_ID":
print("⚠️ 警告: 请设置 TENCENT_SECRET_ID")
if TENCENT_SECRET_KEY == "YOUR_TENCENT_SECRET_KEY":
print("⚠️ 警告: 请设置 TENCENT_SECRET_KEY")
if AMAP_API_KEY == "YOUR_AMAP_API_KEY":
print("⚠️ 警告: 请设置 AMAP_API_KEY")
# 启动Flask服务器
try:
# 使用更兼容的服务器设置
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
except Exception as e:
print(f"❌ 服务器启动失败: {e}")
sys.exit(1)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。