
最近有好几个同学问我可转债的问题,其实我个人不做可转债的。 既然问的人多了,要不写个演示例子。
这里注意,这里写的仅仅是演示例子。 演示可转债数据如何获取,怎么计算溢价, 不是让直接用它。
你可能也听说过可转债套利这回事:当转债价格比它的“转股价值”还低的时候,买入转债,转成股票,再卖掉股票,中间就能赚个差价。说白了就是折价套利——市场偶尔会犯迷糊,给你送个小红包。
但问题是,全市场几百只可转债,你不可能一只一只盯着看溢价率。而且折价机会往往转瞬即逝,等你手工算完,价格可能已经涨上去了。
之前写过通达信的TQ接口(就是那个tqcenter),可以写Python脚本直接调行情和交易。趁着五一放假,我用AI简单写了个可转债套利策略,全自动扫描、计算溢价率、发现机会还能推送到自选股。
先简单复习一下(老司机可跳过):
当溢价率 < 0 时,就是折价:转债比它对应的股票便宜,买入转债 → 申请转股 → 次日卖出正股,理论上白捡差价。
但实际操作有两个坑:① 转股需要T+1才能卖股票,所以要承担隔夜波动风险;② 不是所有转债都在转股期内,刚上市那几个月不能转。
所以策略里必须检查转股起始日期,别踩雷。
简单说,就是持续监控全市场可转债,实时计算溢价率,当折价超过你设定的阈值(比如-1%),就发出套利信号。
具体功能:
tq.get_kzz_info()拿到转股价、正股现价、转债现价、转股价值、溢价率。这里需要注意:

文末附上代码:
# -*- coding: utf-8 -*-
"""
可转债套利策略 —— 基于转股溢价率监控与套利执行
策略逻辑:
1. 获取全市场可转债信息(转股价、正股代码等)
2. 计算每只可转债的 转股价值 和 转股溢价率
3. 当溢价率低于阈值时触发套利信号: 买入可转债 -> 转股 -> 卖出正股(或直接卖出可转债获利)
4. 也可监控负溢价机会:折价套利
使用方式:
1. 在通达信中运行此脚本
2. 脚本自动扫描可转债,将套利机会推送到自选股板块
3. 也可开启自动交易模式执行套利
注意:
- 可转债转股通常有 T+1 延迟,卖出正股也是 T+1
- 需要有可转债和正股的对应持仓/资金
- 实际执行前请充分测试
"""
import sys
import os
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
# 确保 tqcenter 可被导入
sys.path.insert(0, r"D:\lwj\new_tdx_mock\PYPlugins\user")
from tqcenter import tq, tqconst
# ============================================================
# 配置参数
# ============================================================
@dataclass
class ArbitrageConfig:
"""套利策略配置"""
account: str = '' # 交易账户 (留空使用默认)
account_type: str = 'stock' # 账户类型
scan_interval_seconds: int = 30 # 扫描间隔(秒)
max_scans: int = 0 # 最大扫描次数,0=无限
negative_premium_threshold: float = -1.0 # 负溢价率阈值(%)
low_premium_threshold: float = 3.0 # 低溢价率阈值(%)
min_turnover: float = 500.0 # 最低成交额(万元)
auto_trade: bool = False # 是否自动交易
trade_amount: int = 10000 # 单次交易金额(元)
max_position_pct: float = 10.0 # 单只最大持仓占比(%)
block_code: str = 'KZZ套利' # 自选板块代码
push_warning: bool = True # 是否发送预警
print_log: bool = True # 是否打印日志
# ============================================================
# 数据模型
# ============================================================
@dataclass
class ConvertibleBond:
"""可转债数据模型"""
code: str = ''
name: str = ''
stock_code: str = ''
stock_name: str = ''
convert_price: float = 0.0
convert_rate: float = 0.0
convert_value: float = 0.0
premium_rate: float = 0.0
bond_price: float = 0.0
stock_price: float = 0.0
turnover: float = 0.0
double_low: float = 0.0
maturity_date: str = ''
convert_start_date: str = ''
is_convertible: bool = False
rest_scope: float = 0.0
rating: str = ''
force_redeem: float = 0.0
put_back: float = 0.0
end_price: float = 0.0
@dataclass
class ArbitrageSignal:
"""套利信号"""
bond: ConvertibleBond
signal_type: str = ''
premium_rate: float = 0.0
expected_profit_pct: float = 0.0
timestamp: str = ''
# ============================================================
# 工具函数
# ============================================================
def safe_float(val) -> float:
"""安全转为正浮点数"""
if val is None:
return 0.0
try:
v = float(val)
return v if v > 0 else 0.0
except (ValueError, TypeError):
return 0.0
def build_stock_code(hs_code, set_code: str) -> str:
"""
从 HSCode + SetCode 构建带后缀的正股代码
SetCode: '0'=深圳, '1'=上海
注意: DLL 返回的 HSCode 可能是数字类型,会丢失前导零,需要补零到 6 位
"""
# 先转字符串,再补零到 6 位(A股代码固定 6 位)
hs_code = str(hs_code).strip()
if not hs_code or hs_code in ('0', '-'):
return ''
hs_code = hs_code.zfill(6)
if set_code == '1':
return f"{hs_code}.SH"
elif set_code == '0':
return f"{hs_code}.SZ"
# 根据代码前缀推断
if hs_code.startswith('6') or hs_code.startswith('5'):
return f"{hs_code}.SH"
elif hs_code.startswith('0') or hs_code.startswith('3') or hs_code.startswith('1'):
return f"{hs_code}.SZ"
elif hs_code.startswith('92') or hs_code.startswith('4'):
return f"{hs_code}.BJ"
return f"{hs_code}.SZ"
def parse_date_str(date_str: str) -> str:
"""将 YYYYMMDD 转为 YYYY-MM-DD"""
s = str(date_str).strip()
if len(s) == 8 and s.isdigit():
return f"{s[:4]}-{s[4:6]}-{s[6:8]}"
return s
def check_convertible(zg_date: str) -> bool:
"""检查是否在可转股期内 (ZGDate 格式: YYYYMMDD)"""
if not zg_date or zg_date in ('0', '-'):
return False
s = str(zg_date).strip()
try:
if len(s) == 8 and s.isdigit():
start = datetime(int(s[:4]), int(s[4:6]), int(s[6:8]))
else:
start = datetime.strptime(s, '%Y-%m-%d')
return datetime.now() >= start
except (ValueError, TypeError, IndexError):
return False
# ============================================================
# 核心:初始化 TQ 连接
# ============================================================
_tq_initialized = False
def ensure_tq_init():
"""确保 TQ 已初始化(幂等,多次调用安全)"""
global _tq_initialized
if _tq_initialized:
return
_script_path = os.path.abspath(__file__)
try:
tq.initialize(path=_script_path)
_tq_initialized = True
except RuntimeError as e:
print(f"[ERROR] TQ初始化失败: {e}")
raise
# ============================================================
# 核心策略类
# ============================================================
class KzzArbitrageStrategy:
"""可转债套利策略引擎"""
def __init__(self, config: ArbitrageConfig = None):
self.config = config or ArbitrageConfig()
self.account_id: int = -1
self.bonds: List[ConvertibleBond] = []
self.signals: List[ArbitrageSignal] = []
self.scan_count: int = 0
def _log(self, msg: str):
if self.config.print_log:
ts = datetime.now().strftime('%H:%M:%S')
print(f"[{ts}] {msg}")
# --------------------------------------------------------
# 初始化
# --------------------------------------------------------
def initialize(self) -> bool:
"""初始化策略"""
ensure_tq_init()
self._log("=== 可转债套利策略启动 ===")
self.account_id = tq.stock_account(
account=self.config.account,
account_type=self.config.account_type
)
if self.account_id < 0:
self._log("⚠ 未获取到交易账户,以只读模式运行")
else:
self._log(f"✓ 账户ID: {self.account_id}")
if self.config.block_code:
try:
result = tq.create_sector(
block_code=self.config.block_code,
block_name='可转债套利机会'
)
if result:
self._log(f"✓ 自选板块已创建: {self.config.block_code}")
except Exception:
pass
# 同时创建正股板块
if self.config.block_code:
try:
tq.create_sector(
block_code=self.config.block_code + '_正股',
block_name='可转债套利-正股'
)
except Exception:
pass
return True
# --------------------------------------------------------
# 获取可转债列表
# --------------------------------------------------------
def get_all_kzz_list(self) -> List[dict]:
"""获取全市场可转债代码列表(含名称)"""
self._log("正在获取可转债列表...")
all_stocks = tq.get_stock_list('32', list_type=1)
kzz_list = []
for item in all_stocks:
code = str(item.get('Code', ''))
name = str(item.get('Name', ''))
is_kzz = False
if code.startswith('11') and code.endswith('.SH'):
is_kzz = '转债' in name or '转' in name
elif code.startswith('12') and code.endswith('.SZ'):
is_kzz = '转债' in name or '转' in name
if is_kzz:
kzz_list.append({'code': code, 'name': name})
self._log(f"✓ 共发现 {len(kzz_list)} 只可转债")
return kzz_list
# --------------------------------------------------------
# 扫描全市场
# --------------------------------------------------------
def scan_market(self) -> List[ConvertibleBond]:
"""扫描全市场可转债并计算溢价率"""
kzz_list = self.get_all_kzz_list()
if not kzz_list:
return []
bonds = []
ok_count = 0
err_count = 0
total = len(kzz_list)
for i, item in enumerate(kzz_list):
code = item['code']
name = item['name']
try:
# 调用 get_kzz_info(DLL 一次返回所有核心数据)
info = tq.get_kzz_info(stock_code=code)
if not info:
continue
# ====== 使用实际字段名提取数据 ======
# 正股代码: HSCode (无后缀) + SetCode (市场)
hs_code = str(info.get('HSCode', '')).strip()
set_code = str(info.get('SetCode', '0')).strip()
stock_code = build_stock_code(hs_code, set_code)
if not stock_code:
continue
# DLL 已计算好的核心数据
zg_price = safe_float(info.get('ZGPrice')) # 转股价
bond_price = safe_float(info.get('KZZNow')) # 转债现价
stock_price = safe_float(info.get('AGNow')) # 正股现价
convert_value = safe_float(info.get('ZGValue')) # 转股价值
premium_rate = safe_float(info.get('KZZYj')) # 溢价率(%)
# 兜底:如果 DLL 没返回,手动算
if convert_value <= 0 and zg_price > 0 and stock_price > 0:
convert_value = round((stock_price / zg_price) * 100.0, 2)
if premium_rate == 0 and convert_value > 0 and bond_price > 0:
premium_rate = round((bond_price - convert_value) / convert_value * 100.0, 2)
if bond_price <= 0 or stock_price <= 0:
continue
# 其他信息
rest_scope = safe_float(info.get('RestScope'))
force_redeem = safe_float(info.get('ForceRedeem'))
put_back = safe_float(info.get('PutBack'))
end_price = safe_float(info.get('EndPrice'))
rating = str(info.get('KZZScore', '')).strip()
# 双低值
dl = bond_price + abs(premium_rate)
bond = ConvertibleBond(
code=code,
name=name,
stock_code=stock_code,
stock_name=stock_code, # 先占位,后面批量补充
convert_price=zg_price,
convert_rate=100.0 / zg_price if zg_price > 0 else 0,
convert_value=round(convert_value, 2),
premium_rate=round(premium_rate, 2),
bond_price=round(bond_price, 2),
stock_price=round(stock_price, 2),
turnover=0.0, # 成交额后面批量获取
double_low=round(dl, 2),
maturity_date=parse_date_str(info.get('EndDate', '')),
convert_start_date=parse_date_str(info.get('ZGDate', '')),
is_convertible=check_convertible(info.get('ZGDate', '')),
rest_scope=rest_scope,
rating=rating,
force_redeem=force_redeem,
put_back=put_back,
end_price=end_price,
)
bonds.append(bond)
ok_count += 1
if (i + 1) % 50 == 0:
self._log(f" 进度: {i+1}/{total},成功: {ok_count},失败: {err_count}")
except Exception as e:
err_count += 1
if err_count <= 5:
self._log(f" {code} 异常: {e}")
bonds.sort(key=lambda x: x.premium_rate)
self.bonds = bonds
self._log(f"✓ 扫描完成: 成功 {ok_count},失败 {err_count}")
return bonds
# --------------------------------------------------------
# 识别套利信号
# --------------------------------------------------------
def find_signals(self, bonds: List[ConvertibleBond] = None) -> List[ArbitrageSignal]:
bonds = bonds or self.bonds
signals = []
cfg = self.config
for bond in bonds:
if bond.bond_price <= 0 or bond.stock_price <= 0:
continue
if not bond.is_convertible:
continue
if cfg.min_turnover > 0 and bond.turnover > 0 and bond.turnover < cfg.min_turnover:
continue
if bond.premium_rate < cfg.negative_premium_threshold:
signals.append(ArbitrageSignal(
bond=bond,
signal_type='negative_premium',
premium_rate=bond.premium_rate,
expected_profit_pct=round(abs(bond.premium_rate) * 0.98, 2),
timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
))
signals.sort(key=lambda x: x.expected_profit_pct, reverse=True)
self.signals = signals
return signals
# --------------------------------------------------------
# 输出报告
# --------------------------------------------------------
def print_report(self, bonds: List[ConvertibleBond] = None, top_n: int = 30):
bonds = bonds or self.bonds
if not bonds:
self._log("无可转债数据")
return
print("\n" + "=" * 120)
print(f" 可转债套利扫描报告 | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | 共 {len(bonds)} 只")
print("=" * 120)
print(f" {'代码':<12} {'名称':<8} {'正股':<12} {'转债价':>7} {'正股价':>7} {'转股价':>7} "
f"{'转股价值':>8} {'溢价率%':>8} {'双低值':>7} {'成交额(万)':>10}")
print("-" * 120)
for bond in bonds[:top_n]:
flag = "[折价]" if bond.premium_rate < 0 else " "
print(f" {bond.code:<12} {bond.name:<8} {bond.stock_code:<12} "
f"{bond.bond_price:>7.2f} {bond.stock_price:>7.2f} {bond.convert_price:>7.2f} "
f"{bond.convert_value:>8.2f} {bond.premium_rate:>8.2f} {bond.double_low:>7.2f} "
f"{bond.turnover:>10.0f} {flag}")
neg = sum(1 for b in bonds if b.premium_rate < 0)
low = sum(1 for b in bonds if 0 <= b.premium_rate < self.config.low_premium_threshold)
dl130 = sum(1 for b in bonds if b.double_low < 130)
print("-" * 120)
print(f" 统计: 负溢价 {neg} 只 | 低溢价(<{self.config.low_premium_threshold}%) {low} 只 | 双低(<130) {dl130} 只")
print("=" * 120 + "\n")
def print_signals(self, signals: List[ArbitrageSignal] = None):
signals = signals or self.signals
if not signals:
self._log("未发现套利信号")
return
print("\n" + "=" * 60)
print(f" !! 发现 {len(signals)} 个套利机会 !!")
print("=" * 60)
for i, sig in enumerate(signals, 1):
b = sig.bond
print(f"\n [{i}] {b.name} ({b.code})")
print(f" 正股: {b.stock_name} ({b.stock_code})")
print(f" 转债价: {b.bond_price:.2f} | 正股价: {b.stock_price:.2f} | 转股价: {b.convert_price:.2f}")
print(f" 转股价值: {b.convert_value:.2f} | 溢价率: {sig.premium_rate:.2f}%")
print(f" 预期利润: {sig.expected_profit_pct:.2f}% (扣除摩擦成本)")
# --------------------------------------------------------
# 推送到通达信
# --------------------------------------------------------
def push_to_tdx(self, signals: List[ArbitrageSignal] = None):
signals = signals or self.signals
if not signals:
return
kzz_codes = [sig.bond.code for sig in signals]
if kzz_codes:
tq.send_user_block(block_code=self.config.block_code, stocks=kzz_codes, show=True)
self._log(f"✓ 推送 {len(kzz_codes)} 只转债到板块: {self.config.block_code}")
stock_codes = [sig.bond.stock_code for sig in signals]
if stock_codes:
tq.send_user_block(
block_code=self.config.block_code + '_正股',
stocks=stock_codes,
show=False
)
self._log(f"✓ 推送 {len(stock_codes)} 只正股到板块")
if self.config.push_warning and self.account_id >= 0:
for sig in signals[:5]:
b = sig.bond
tq.send_warn(
stock_list=[b.code],
time_list=[],
price_list=[str(b.bond_price)],
close_list=[str(b.stock_price)],
volum_list=[str(0)],
reason_list=[f"折价{sig.premium_rate:.2f}% 利润{sig.expected_profit_pct:.2f}%"],
bs_flag_list=['2'],
warn_type_list=[],
count=1
)
# --------------------------------------------------------
# 自动交易
# --------------------------------------------------------
def execute_trade(self, signal: ArbitrageSignal) -> Optional[Dict]:
if not self.config.auto_trade or self.account_id < 0:
return None
b = signal.bond
volume = max(1, int(self.config.trade_amount / (b.bond_price * 10))) * 10
self._log(f"📝 买入 {b.name}({b.code}) {volume} 张 @ {b.bond_price:.2f}")
result = tq.order_stock(
account_id=self.account_id,
stock_code=b.code,
order_type=tqconst.STOCK_BUY,
order_volume=volume,
price_type=tqconst.PRICE_MY,
price=b.bond_price,
notify=1
)
self._log(f" 结果: {result}")
return result
# --------------------------------------------------------
# 查询持仓
# --------------------------------------------------------
def check_positions(self):
if self.account_id < 0:
self._log("无有效账户")
return
asset = tq.query_stock_asset(account_id=self.account_id)
if asset:
self._log(f"账户资产: {asset}")
positions = tq.query_stock_positions(account_id=self.account_id)
if positions:
self._log(f"持仓: {len(positions)} 只")
# --------------------------------------------------------
# 持续监控
# --------------------------------------------------------
def run_continuous(self):
self.initialize()
self._log(
f"持续监控 | 间隔 {self.config.scan_interval_seconds}s | "
f"阈值 {self.config.negative_premium_threshold}%"
)
try:
while True:
self.scan_count += 1
self._log(f"\n--- 第 {self.scan_count} 次扫描 ---")
bonds = self.scan_market()
self.print_report(bonds)
signals = self.find_signals(bonds)
if signals:
self.print_signals(signals)
self.push_to_tdx(signals)
if self.config.auto_trade:
for sig in signals[:3]:
self.execute_trade(sig)
if 0 < self.config.max_scans <= self.scan_count:
break
self._log(f"等待 {self.config.scan_interval_seconds}s...")
time.sleep(self.config.scan_interval_seconds)
except KeyboardInterrupt:
self._log("\n手动停止")
finally:
tq.close()
# ============================================================
# 快捷函数
# ============================================================
def run_once(config: ArbitrageConfig = None):
"""单次全市场扫描"""
strategy = KzzArbitrageStrategy(config or ArbitrageConfig())
strategy.initialize()
bonds = strategy.scan_market()
strategy.print_report(bonds, top_n=50)
signals = strategy.find_signals(bonds)
if signals:
strategy.print_signals(signals)
strategy.push_to_tdx(signals)
# # ====== 自动交易(需要时取消下方注释)======
# if strategy.config.auto_trade:
# for sig in signals[:3]:
# strategy.execute_trade(sig)
# 双低策略 TOP 20
bonds_dl = sorted(strategy.bonds, key=lambda x: x.double_low)
print("\n=== 双低策略 TOP 20 ===")
for i, b in enumerate(bonds_dl[:20], 1):
print(f" {i:>2}. {b.code} {b.name:<8} 价格:{b.bond_price:>6.2f} "
f"溢价:{b.premium_rate:>6.2f}% 双低:{b.double_low:>6.2f}")
tq.close()
return strategy
def intraday_negative_premium_scan(config: ArbitrageConfig = None):
"""日内折价套利扫描"""
cfg = config or ArbitrageConfig()
cfg.negative_premium_threshold = -0.5
cfg.min_turnover = 1000.0
strategy = KzzArbitrageStrategy(cfg)
strategy.initialize()
bonds = strategy.scan_market()
targets = [
b for b in bonds
if b.premium_rate < cfg.negative_premium_threshold
and b.is_convertible
]
if not targets:
print("\n当前无显著折价机会")
strategy.print_report(bonds, top_n=20)
low = [b for b in bonds if 0 <= b.premium_rate < 5]
print("\n低溢价关注:")
for b in sorted(low, key=lambda x: x.premium_rate)[:20]:
print(f" {b.code} {b.name:<8} 溢价:{b.premium_rate:>6.2f}% "
f"价格:{b.bond_price:>6.2f}")
else:
print(f"\n>> 发现 {len(targets)} 只折价转债!")
for b in targets:
profit = abs(b.premium_rate) * 0.98
print(f" {b.code} {b.name:<8} | 转债:{b.bond_price:.2f} "
f"-> 转股价值:{b.convert_value:.2f} | "
f"折价:{b.premium_rate:.2f}% | 利润:{profit:.2f}%")
tq.close()
def analyze_single_bond(kzz_code: str):
"""分析单只可转债"""
ensure_tq_init()
print(f"\n{'='*60}")
print(f" 可转债详细分析: {kzz_code}")
print(f"{'='*60}")
info = tq.get_kzz_info(stock_code=kzz_code)
if not info:
print("获取信息失败!")
tq.close()
return
print(f"\n基本信息 (get_kzz_info 返回):")
for k, v in info.items():
print(f" {k}: {v}")
# 构建正股代码
hs_code = str(info.get('HSCode', '')).strip()
set_code = str(info.get('SetCode', '0')).strip()
stock_code = build_stock_code(hs_code, set_code)
if not stock_code:
print("\n未能获取正股代码")
tq.close()
return
# DLL 已返回的实时数据
zg_price = safe_float(info.get('ZGPrice'))
bond_price = safe_float(info.get('KZZNow'))
stock_price = safe_float(info.get('AGNow'))
convert_value = safe_float(info.get('ZGValue'))
premium_rate = safe_float(info.get('KZZYj'))
print(f"\n核心数据:")
print(f" 转债价格: {bond_price:.2f}")
print(f" 正股价格: {stock_price:.2f} ({stock_code})")
print(f" 转股价: {zg_price:.2f}")
print(f" 转股价值: {convert_value:.2f}")
print(f" 溢价率: {premium_rate:.2f}%")
print(f" 剩余规模: {info.get('RestScope', '-')}")
print(f" 评级: {info.get('KZZScore', '-')}")
if premium_rate < -0.5:
print(f" [!!] 折价! 预期利润: {abs(premium_rate)*0.98:.2f}%")
elif premium_rate < 3:
print(f" [--] 低溢价,关注")
else:
print(f" [OK] 暂无套利空间")
# 近期K线
print(f"\n近10日走势:")
try:
kzz_data = tq.get_market_data(
stock_list=[kzz_code], period='1d', count=10, field_list=['Close']
)
stock_data = tq.get_market_data(
stock_list=[stock_code], period='1d', count=10, field_list=['Close']
)
print(f" {'日期':<12} {'转债':>8} {'正股':>8}")
if 'Close' in kzz_data:
for dt in kzz_data['Close'].index[-10:]:
kc = (kzz_data['Close'].loc[dt, kzz_code]
if kzz_code in kzz_data['Close'].columns else '-')
sc = '-'
if ('Close' in stock_data and stock_code in stock_data['Close'].columns
and dt in stock_data['Close'].index):
sc = stock_data['Close'].loc[dt, stock_code]
print(f" {str(dt)[:10]:<12} {str(kc):>8} {str(sc):>8}")
except Exception as e:
print(f" K线获取失败: {e}")
tq.close()
# ============================================================
# 主入口
# ============================================================
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='可转债套利策略')
parser.add_argument(
'--mode', type=str, default='once',
choices=['once', 'continuous', 'intraday', 'analyze'],
help='运行模式: once=单次扫描, continuous=持续监控, intraday=日内折价, analyze=单只分析'
)
parser.add_argument('--code', type=str, default='', help='可转债代码 (analyze模式使用)')
parser.add_argument('--interval', type=int, default=60, help='扫描间隔秒数 (continuous模式)')
parser.add_argument('--threshold', type=float, default=-1.0, help='负溢价率阈值(%%)')
parser.add_argument('--min-turnover', type=float, default=500.0, help='最低成交额(万元)')
parser.add_argument('--auto-trade', action='store_true', help='开启自动交易')
args = parser.parse_args()
config = ArbitrageConfig(
scan_interval_seconds=args.interval,
negative_premium_threshold=args.threshold,
min_turnover=args.min_turnover,
auto_trade=args.auto_trade,
)
if args.mode == 'once':
run_once(config)
elif args.mode == 'continuous':
KzzArbitrageStrategy(config).run_continuous()
elif args.mode == 'intraday':
intraday_negative_premium_scan(config)
elif args.mode == 'analyze':
if not args.code:
print("用法: python kzz_arbitrage.py --mode analyze --code 123045.SZ")
else:
analyze_single_bond(args.code)