AI 三分钟写出来的代码,和你能放心上线的代码之间,有一条很多人忽略的沟。
用 AI 编程助手几个月,我形成一个判断:AI 给的代码默认是 Demo 级的。
它能跑。测试能过。功能看起来正常。但如果你直接部署到生产环境,大概率会在某个时刻出问题——而且那个时刻你往往不在电脑前。
这不是 AI 模型的问题。这是训练数据的结构性问题:AI 从海量代码中学到的,是"怎么实现功能",不是"怎么让功能稳定运行三个月"。前者是教程和 demo 的内容,后者是运维日志和事故报告的内容。后者的样本量远远少于前者。
这篇文章梳理了 Demo 到生产之间最常被跳过的五道坎。
def process_order(order_id):
order = db.get_order(order_id)
inventory = db.get_inventory(order.sku)
if inventory.quantity >= order.quantity:
db.decrement_inventory(order.sku, order.quantity)
db.update_order_status(order_id, "confirmed")
send_confirmation_email(order.user_email)能跑吗?能。对吗?对——当一切正常的时候。
def process_order(order_id):
try:
order = db.get_order(order_id)
if not order:
return {"status": "error", "reason": "order_not_found"}
inventory = db.get_inventory(order.sku)
if not inventory:
return {"status": "error", "reason": "sku_not_found"}
if inventory.quantity < order.quantity:
return {"status": "error", "reason": "insufficient_inventory"}
# 扣库存和更新订单应该是原子的
with db.transaction():
db.decrement_inventory(order.sku, order.quantity)
db.update_order_status(order_id, "confirmed")
# 邮件发送失败不应该影响订单确认
try:
send_confirmation_email(order.user_email)
except EmailException:
log.error(f"邮件发送失败,订单 {order_id} 已确认", exc_info=True)
# 加入重试队列,不阻塞主流程
return {"status": "ok", "order_id": order_id}
except DatabaseException as e:
log.error(f"数据库异常,订单 {order_id}", exc_info=True)
return {"status": "error", "reason": "database_error", "detail": str(e)}Demo 和生产的差距不在"有没有 try",在"每个异常都有明确的业务含义和回退路径"。
# Demo 级的"错误处理"——异常被吞了,没人知道
try:
do_something()
except:
pass
# 生产级的错误处理——每个分支都有日志、有状态、有回退
try:
do_something()
except TimeoutError:
log.warning("超时,使用缓存数据")
return get_cached_result()
except ConnectionError:
log.error("连接失败,通知监控")
alert_monitoring("service_down")
raise # 让上层决定重试还是降级# 一个全局计数器
total_requests = 0
def handle_request():
global total_requests
total_requests += 1
# ... 处理逻辑 ...单线程测试永远没问题。并发来了,计数器就疯了。
import threading
_lock = threading.Lock()
total_requests = 0
def handle_request():
with _lock:
global total_requests
total_requests += 1
# 锁外做业务逻辑,锁内只做数据修改
process()但更关键的是共享状态的保护:
# ❌ AI 给的:读和写不在同一保护域
config = json.load(open("config.json")) # 读
# 另一个线程可能已经写入了
config["last_update"] = now() # 写(基于过期数据)
json.dump(config, open("config.json", "w"))
# ✅ 生产级:读改写原子化
with _config_lock:
config = json.load(open("config.json"))
config["last_update"] = now()
json.dump(config, open("config.json", "w"))AI 不会主动提醒你并发问题,因为大多数训练样本是单线程代码。 你需要自己养成一个习惯:看到全局变量、共享文件、缓存字典——立刻问"多个线程同时访问会怎样"。
def read_large_file(path):
data = open(path).read()
return process(data)文件不会被显式关闭。在 CPython 中,小脚本跑完进程退出,GC 会处理。但长时间运行的服务里,文件句柄泄漏是慢性死亡。
# ✅ 上下文管理器是唯一安全的写法
def read_large_file(path):
with open(path) as f:
data = f.read()
return process(data)
# ✅ 不只文件——连接池、线程池、锁都需要生命周期管理
class DatabasePool:
def __init__(self, max_connections=10):
self.pool = Queue(maxsize=max_connections)
def __enter__(self):
for _ in range(self.pool.maxsize):
self.pool.put(create_connection())
return self
def __exit__(self, *args):
while not self.pool.empty():
conn = self.pool.get()
conn.close()资源泄漏的 bug 在测试环境测不出来。 测试跑 5 秒,句柄还没泄漏够。生产跑 5 天,操作系统说"文件打开数超限"。
def process_payment(amount, user_id):
result = payment_gateway.charge(amount, user_id)
return result出了问题,你只知道"支付失败了"。不知道是网络超时、余额不足、网关限流还是参数错误。
import logging
logger = logging.getLogger(__name__)
def process_payment(amount, user_id):
logger.info(f"开始处理支付: user={user_id}, amount={amount}")
start = time.time()
try:
result = payment_gateway.charge(amount, user_id, timeout=30)
elapsed = time.time() - start
logger.info(
f"支付成功: user={user_id}, amount={amount}, "
f"耗时={elapsed:.2f}s, txn_id={result.transaction_id}"
)
return result
except TimeoutError:
elapsed = time.time() - start
logger.error(
f"支付超时: user={user_id}, amount={amount}, "
f"耗时={elapsed:.2f}s(超过30s阈值)"
)
return {"status": "timeout", "user_id": user_id}
except PaymentDeclinedError as e:
logger.warning(
f"支付被拒绝: user={user_id}, reason={e.reason}, code={e.code}"
)
return {"status": "declined", "reason": e.reason}日志要回答三个问题:发生了什么、为什么发生、影响了什么。 Demo 代码只回答"发生了什么"("失败"),生产代码必须在三秒内能定位到具体原因。
def get_user_profile(user_id):
response = requests.get(f"https://api.service.com/users/{user_id}")
return response.json()假设:API 永远在线、永远返回 200、返回数据格式永远一致。
import requests
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def get_user_profile(user_id):
try:
response = requests.get(
f"https://api.service.com/users/{user_id}",
timeout=10
)
if response.status_code == 404:
return None # 用户不存在,不重试
if response.status_code == 429:
# 被限流,等更久再重试
raise RateLimitError()
response.raise_for_status()
return response.json()
except requests.Timeout:
logger.warning(f"获取用户信息超时: user={user_id}")
return get_cached_profile(user_id) # 回退到缓存
except requests.ConnectionError:
logger.error(f"服务不可达: user={user_id}")
raise # 让上层决定是否熔断五个生产必备的外部依赖策略:
# 1. 超时
timeout=10 # 永远不要默认无超时
# 2. 重试(带退避)
@retry(stop=stop_after_attempt(3), wait=wait_exponential())
# 3. 熔断
circuit_breaker.record_failure()
if circuit_breaker.is_open():
return fallback()
# 4. 降级
try:
result = primary_service.call()
except:
result = fallback_service.call()
# 5. 限流
rate_limiter.acquire() # 保护自己也保护对方在把 AI 写的代码推上线之前,过一遍这个清单:
[ ] 每个异常分支有明确的业务含义?(不是 bare except)
[ ] 共享状态有并发保护?(锁覆盖了读+改+写全流程)
[ ] 文件/连接/锁都有明确的生命周期?(with / __exit__)
[ ] 关键路径有结构化日志?(包含上下文、耗时、结果)
[ ] 外部调用有超时+重试+降级?(不是假设对方永远正常)五道坎,五道 YES,才算从 Demo 走到了生产。
本文所有示例均已脱敏处理。你推 AI 代码上线前检查什么?欢迎评论区补充你的清单。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。