首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >通达信TQ演示可转债套利策略技术实现,附代码

通达信TQ演示可转债套利策略技术实现,附代码

作者头像
子晓聊技术
发布2026-05-07 18:24:54
发布2026-05-07 18:24:54
230
举报
文章被收录于专栏:子晓AI量化子晓AI量化

最近有好几个同学问我可转债的问题,其实我个人不做可转债的。 既然问的人多了,要不写个演示例子。

这里注意,这里写的仅仅是演示例子。 演示可转债数据如何获取,怎么计算溢价, 不是让直接用它。

你可能也听说过可转债套利这回事:当转债价格比它的“转股价值”还低的时候,买入转债,转成股票,再卖掉股票,中间就能赚个差价。说白了就是折价套利——市场偶尔会犯迷糊,给你送个小红包。

但问题是,全市场几百只可转债,你不可能一只一只盯着看溢价率。而且折价机会往往转瞬即逝,等你手工算完,价格可能已经涨上去了。

之前写过通达信的TQ接口(就是那个tqcenter),可以写Python脚本直接调行情和交易。趁着五一放假,我用AI简单写了个可转债套利策略,全自动扫描、计算溢价率、发现机会还能推送到自选股。

套利的核心就一个数:转股溢价率

先简单复习一下(老司机可跳过):

  • 转股价值 = (100 / 转股价) × 正股现价意思是你把一张可转债转成股票,这些股票当前值多少钱。
  • 溢价率 = (转债现价 / 转股价值 - 1) × 100%

当溢价率 < 0 时,就是折价:转债比它对应的股票便宜,买入转债 → 申请转股 → 次日卖出正股,理论上白捡差价。

但实际操作有两个坑:① 转股需要T+1才能卖股票,所以要承担隔夜波动风险;② 不是所有转债都在转股期内,刚上市那几个月不能转。

所以策略里必须检查转股起始日期,别踩雷。

这脚本能干吗?

简单说,就是持续监控全市场可转债,实时计算溢价率,当折价超过你设定的阈值(比如-1%),就发出套利信号。

具体功能:

  • 全市场扫描:一次性拉出所有可转债(代码11xxxx.SH或12xxxx.SZ),调用tq.get_kzz_info()拿到转股价、正股现价、转债现价、转股价值、溢价率。
  • 自动过滤:只关注已在转股期内、成交额不是太小的标的(成交额太低可能流动性不足,买进去卖不出来)。
  • 生成信号:溢价率低于阈值(比如-1%)时,记录预期利润(扣除摩擦成本估算)。
  • 推送到通达信板块:把发现机会的转债和对应的正股分别加到两个自选板块里,方便你手动复核或盯盘。

这里需要注意:

  • 折价套利看着美,但T+1转股是硬伤。你买了折价1%的转债,第二天正股低开2%,反而亏钱。所以最好配合对正股走势的判断,别无脑套。
  • 自动交易我只是写了方法,但交易有点问题, 提示交易类型不支持。 不确定是不是通达信tq暂时不支持可转债交易。

文末附上代码:

代码语言:javascript
复制
# -*- coding: utf-8 -*-
"""
可转债套利策略 —— 基于转股溢价率监控与套利执行
策略逻辑:
1. 获取全市场可转债信息(转股价、正股代码等)
2. 计算每只可转债的 转股价值 和 转股溢价率
3. 当溢价率低于阈值时触发套利信号: 买入可转债 -> 转股 -> 卖出正股(或直接卖出可转债获利)
4. 也可监控负溢价机会:折价套利
使用方式:
1. 在通达信中运行此脚本
2. 脚本自动扫描可转债,将套利机会推送到自选股板块
3. 也可开启自动交易模式执行套利
注意:
- 可转债转股通常有 T+1 延迟,卖出正股也是 T+1
- 需要有可转债和正股的对应持仓/资金
- 实际执行前请充分测试
"""
import sys
import os
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
# 确保 tqcenter 可被导入
sys.path.insert(0, r"D:\lwj\new_tdx_mock\PYPlugins\user")
from tqcenter import tq, tqconst
# ============================================================
# 配置参数
# ============================================================
@dataclass
class ArbitrageConfig:
    """套利策略配置"""
    account: str = ''                # 交易账户 (留空使用默认)
    account_type: str = 'stock'      # 账户类型
    scan_interval_seconds: int = 30  # 扫描间隔(秒)
    max_scans: int = 0               # 最大扫描次数,0=无限
    negative_premium_threshold: float = -1.0   # 负溢价率阈值(%)
    low_premium_threshold: float = 3.0        # 低溢价率阈值(%)
    min_turnover: float = 500.0               # 最低成交额(万元)
    auto_trade: bool = False                  # 是否自动交易
    trade_amount: int = 10000                 # 单次交易金额(元)
    max_position_pct: float = 10.0            # 单只最大持仓占比(%)
    block_code: str = 'KZZ套利'               # 自选板块代码
    push_warning: bool = True                 # 是否发送预警
    print_log: bool = True                    # 是否打印日志
# ============================================================
# 数据模型
# ============================================================
@dataclass
class ConvertibleBond:
    """可转债数据模型"""
    code: str = ''
    name: str = ''
    stock_code: str = ''
    stock_name: str = ''
    convert_price: float = 0.0
    convert_rate: float = 0.0
    convert_value: float = 0.0
    premium_rate: float = 0.0
    bond_price: float = 0.0
    stock_price: float = 0.0
    turnover: float = 0.0
    double_low: float = 0.0
    maturity_date: str = ''
    convert_start_date: str = ''
    is_convertible: bool = False
    rest_scope: float = 0.0
    rating: str = ''
    force_redeem: float = 0.0
    put_back: float = 0.0
    end_price: float = 0.0
@dataclass
class ArbitrageSignal:
    """套利信号"""
    bond: ConvertibleBond
    signal_type: str = ''
    premium_rate: float = 0.0
    expected_profit_pct: float = 0.0
    timestamp: str = ''
# ============================================================
# 工具函数
# ============================================================
def safe_float(val) -> float:
    """安全转为正浮点数"""
    if val is None:
        return 0.0
    try:
        v = float(val)
        return v if v > 0 else 0.0
    except (ValueError, TypeError):
        return 0.0
def build_stock_code(hs_code, set_code: str) -> str:
    """
    从 HSCode + SetCode 构建带后缀的正股代码
    SetCode: '0'=深圳, '1'=上海
    注意: DLL 返回的 HSCode 可能是数字类型,会丢失前导零,需要补零到 6 位
    """
    # 先转字符串,再补零到 6 位(A股代码固定 6 位)
    hs_code = str(hs_code).strip()
    if not hs_code or hs_code in ('0', '-'):
        return ''
    hs_code = hs_code.zfill(6)
    if set_code == '1':
        return f"{hs_code}.SH"
    elif set_code == '0':
        return f"{hs_code}.SZ"
    # 根据代码前缀推断
    if hs_code.startswith('6') or hs_code.startswith('5'):
        return f"{hs_code}.SH"
    elif hs_code.startswith('0') or hs_code.startswith('3') or hs_code.startswith('1'):
        return f"{hs_code}.SZ"
    elif hs_code.startswith('92') or hs_code.startswith('4'):
        return f"{hs_code}.BJ"
    return f"{hs_code}.SZ"
def parse_date_str(date_str: str) -> str:
    """将 YYYYMMDD 转为 YYYY-MM-DD"""
    s = str(date_str).strip()
    if len(s) == 8 and s.isdigit():
        return f"{s[:4]}-{s[4:6]}-{s[6:8]}"
    return s
def check_convertible(zg_date: str) -> bool:
    """检查是否在可转股期内 (ZGDate 格式: YYYYMMDD)"""
    if not zg_date or zg_date in ('0', '-'):
        return False
    s = str(zg_date).strip()
    try:
        if len(s) == 8 and s.isdigit():
            start = datetime(int(s[:4]), int(s[4:6]), int(s[6:8]))
        else:
            start = datetime.strptime(s, '%Y-%m-%d')
        return datetime.now() >= start
    except (ValueError, TypeError, IndexError):
        return False
# ============================================================
# 核心:初始化 TQ 连接
# ============================================================
_tq_initialized = False
def ensure_tq_init():
    """确保 TQ 已初始化(幂等,多次调用安全)"""
    global _tq_initialized
    if _tq_initialized:
        return
    _script_path = os.path.abspath(__file__)
    try:
        tq.initialize(path=_script_path)
        _tq_initialized = True
    except RuntimeError as e:
        print(f"[ERROR] TQ初始化失败: {e}")
        raise
# ============================================================
# 核心策略类
# ============================================================
class KzzArbitrageStrategy:
    """可转债套利策略引擎"""
    def __init__(self, config: ArbitrageConfig = None):
        self.config = config or ArbitrageConfig()
        self.account_id: int = -1
        self.bonds: List[ConvertibleBond] = []
        self.signals: List[ArbitrageSignal] = []
        self.scan_count: int = 0
    def _log(self, msg: str):
        if self.config.print_log:
            ts = datetime.now().strftime('%H:%M:%S')
            print(f"[{ts}] {msg}")
    # --------------------------------------------------------
    # 初始化
    # --------------------------------------------------------
    def initialize(self) -> bool:
        """初始化策略"""
        ensure_tq_init()
        self._log("=== 可转债套利策略启动 ===")
        self.account_id = tq.stock_account(
            account=self.config.account,
            account_type=self.config.account_type
        )
        if self.account_id < 0:
            self._log("⚠ 未获取到交易账户,以只读模式运行")
        else:
            self._log(f"✓ 账户ID: {self.account_id}")
        if self.config.block_code:
            try:
                result = tq.create_sector(
                    block_code=self.config.block_code,
                    block_name='可转债套利机会'
                )
                if result:
                    self._log(f"✓ 自选板块已创建: {self.config.block_code}")
            except Exception:
                pass
        # 同时创建正股板块
        if self.config.block_code:
            try:
                tq.create_sector(
                    block_code=self.config.block_code + '_正股',
                    block_name='可转债套利-正股'
                )
            except Exception:
                pass
        return True
    # --------------------------------------------------------
    # 获取可转债列表
    # --------------------------------------------------------
    def get_all_kzz_list(self) -> List[dict]:
        """获取全市场可转债代码列表(含名称)"""
        self._log("正在获取可转债列表...")
        all_stocks = tq.get_stock_list('32', list_type=1)
        kzz_list = []
        for item in all_stocks:
            code = str(item.get('Code', ''))
            name = str(item.get('Name', ''))
            is_kzz = False
            if code.startswith('11') and code.endswith('.SH'):
                is_kzz = '转债' in name or '转' in name
            elif code.startswith('12') and code.endswith('.SZ'):
                is_kzz = '转债' in name or '转' in name
            if is_kzz:
                kzz_list.append({'code': code, 'name': name})
        self._log(f"✓ 共发现 {len(kzz_list)} 只可转债")
        return kzz_list
    # --------------------------------------------------------
    # 扫描全市场
    # --------------------------------------------------------
    def scan_market(self) -> List[ConvertibleBond]:
        """扫描全市场可转债并计算溢价率"""
        kzz_list = self.get_all_kzz_list()
        if not kzz_list:
            return []
        bonds = []
        ok_count = 0
        err_count = 0
        total = len(kzz_list)
        for i, item in enumerate(kzz_list):
            code = item['code']
            name = item['name']
            try:
                # 调用 get_kzz_info(DLL 一次返回所有核心数据)
                info = tq.get_kzz_info(stock_code=code)
                if not info:
                    continue
                # ====== 使用实际字段名提取数据 ======
                # 正股代码: HSCode (无后缀) + SetCode (市场)
                hs_code = str(info.get('HSCode', '')).strip()
                set_code = str(info.get('SetCode', '0')).strip()
                stock_code = build_stock_code(hs_code, set_code)
                if not stock_code:
                    continue
                # DLL 已计算好的核心数据
                zg_price = safe_float(info.get('ZGPrice'))       # 转股价
                bond_price = safe_float(info.get('KZZNow'))       # 转债现价
                stock_price = safe_float(info.get('AGNow'))       # 正股现价
                convert_value = safe_float(info.get('ZGValue'))   # 转股价值
                premium_rate = safe_float(info.get('KZZYj'))      # 溢价率(%)
                # 兜底:如果 DLL 没返回,手动算
                if convert_value <= 0 and zg_price > 0 and stock_price > 0:
                    convert_value = round((stock_price / zg_price) * 100.0, 2)
                if premium_rate == 0 and convert_value > 0 and bond_price > 0:
                    premium_rate = round((bond_price - convert_value) / convert_value * 100.0, 2)
                if bond_price <= 0 or stock_price <= 0:
                    continue
                # 其他信息
                rest_scope = safe_float(info.get('RestScope'))
                force_redeem = safe_float(info.get('ForceRedeem'))
                put_back = safe_float(info.get('PutBack'))
                end_price = safe_float(info.get('EndPrice'))
                rating = str(info.get('KZZScore', '')).strip()
                # 双低值
                dl = bond_price + abs(premium_rate)
                bond = ConvertibleBond(
                    code=code,
                    name=name,
                    stock_code=stock_code,
                    stock_name=stock_code,  # 先占位,后面批量补充
                    convert_price=zg_price,
                    convert_rate=100.0 / zg_price if zg_price > 0 else 0,
                    convert_value=round(convert_value, 2),
                    premium_rate=round(premium_rate, 2),
                    bond_price=round(bond_price, 2),
                    stock_price=round(stock_price, 2),
                    turnover=0.0,  # 成交额后面批量获取
                    double_low=round(dl, 2),
                    maturity_date=parse_date_str(info.get('EndDate', '')),
                    convert_start_date=parse_date_str(info.get('ZGDate', '')),
                    is_convertible=check_convertible(info.get('ZGDate', '')),
                    rest_scope=rest_scope,
                    rating=rating,
                    force_redeem=force_redeem,
                    put_back=put_back,
                    end_price=end_price,
                )
                bonds.append(bond)
                ok_count += 1
                if (i + 1) % 50 == 0:
                    self._log(f"  进度: {i+1}/{total},成功: {ok_count},失败: {err_count}")
            except Exception as e:
                err_count += 1
                if err_count <= 5:
                    self._log(f"  {code} 异常: {e}")
        bonds.sort(key=lambda x: x.premium_rate)
        self.bonds = bonds
        self._log(f"✓ 扫描完成: 成功 {ok_count},失败 {err_count}")
        return bonds
    # --------------------------------------------------------
    # 识别套利信号
    # --------------------------------------------------------
    def find_signals(self, bonds: List[ConvertibleBond] = None) -> List[ArbitrageSignal]:
        bonds = bonds or self.bonds
        signals = []
        cfg = self.config
        for bond in bonds:
            if bond.bond_price <= 0 or bond.stock_price <= 0:
                continue
            if not bond.is_convertible:
                continue
            if cfg.min_turnover > 0 and bond.turnover > 0 and bond.turnover < cfg.min_turnover:
                continue
            if bond.premium_rate < cfg.negative_premium_threshold:
                signals.append(ArbitrageSignal(
                    bond=bond,
                    signal_type='negative_premium',
                    premium_rate=bond.premium_rate,
                    expected_profit_pct=round(abs(bond.premium_rate) * 0.98, 2),
                    timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                ))
        signals.sort(key=lambda x: x.expected_profit_pct, reverse=True)
        self.signals = signals
        return signals
    # --------------------------------------------------------
    # 输出报告
    # --------------------------------------------------------
    def print_report(self, bonds: List[ConvertibleBond] = None, top_n: int = 30):
        bonds = bonds or self.bonds
        if not bonds:
            self._log("无可转债数据")
            return
        print("\n" + "=" * 120)
        print(f" 可转债套利扫描报告 | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | 共 {len(bonds)} 只")
        print("=" * 120)
        print(f" {'代码':<12} {'名称':<8} {'正股':<12} {'转债价':>7} {'正股价':>7} {'转股价':>7} "
              f"{'转股价值':>8} {'溢价率%':>8} {'双低值':>7} {'成交额(万)':>10}")
        print("-" * 120)
        for bond in bonds[:top_n]:
            flag = "[折价]" if bond.premium_rate < 0 else "     "
            print(f" {bond.code:<12} {bond.name:<8} {bond.stock_code:<12} "
                  f"{bond.bond_price:>7.2f} {bond.stock_price:>7.2f} {bond.convert_price:>7.2f} "
                  f"{bond.convert_value:>8.2f} {bond.premium_rate:>8.2f} {bond.double_low:>7.2f} "
                  f"{bond.turnover:>10.0f} {flag}")
        neg = sum(1 for b in bonds if b.premium_rate < 0)
        low = sum(1 for b in bonds if 0 <= b.premium_rate < self.config.low_premium_threshold)
        dl130 = sum(1 for b in bonds if b.double_low < 130)
        print("-" * 120)
        print(f" 统计: 负溢价 {neg} 只 | 低溢价(<{self.config.low_premium_threshold}%) {low} 只 | 双低(<130) {dl130} 只")
        print("=" * 120 + "\n")
    def print_signals(self, signals: List[ArbitrageSignal] = None):
        signals = signals or self.signals
        if not signals:
            self._log("未发现套利信号")
            return
        print("\n" + "=" * 60)
        print(f" !! 发现 {len(signals)} 个套利机会 !!")
        print("=" * 60)
        for i, sig in enumerate(signals, 1):
            b = sig.bond
            print(f"\n [{i}] {b.name} ({b.code})")
            print(f"   正股: {b.stock_name} ({b.stock_code})")
            print(f"   转债价: {b.bond_price:.2f} | 正股价: {b.stock_price:.2f} | 转股价: {b.convert_price:.2f}")
            print(f"   转股价值: {b.convert_value:.2f} | 溢价率: {sig.premium_rate:.2f}%")
            print(f"   预期利润: {sig.expected_profit_pct:.2f}% (扣除摩擦成本)")
    # --------------------------------------------------------
    # 推送到通达信
    # --------------------------------------------------------
    def push_to_tdx(self, signals: List[ArbitrageSignal] = None):
        signals = signals or self.signals
        if not signals:
            return
        kzz_codes = [sig.bond.code for sig in signals]
        if kzz_codes:
            tq.send_user_block(block_code=self.config.block_code, stocks=kzz_codes, show=True)
            self._log(f"✓ 推送 {len(kzz_codes)} 只转债到板块: {self.config.block_code}")
        stock_codes = [sig.bond.stock_code for sig in signals]
        if stock_codes:
            tq.send_user_block(
                block_code=self.config.block_code + '_正股',
                stocks=stock_codes,
                show=False
            )
            self._log(f"✓ 推送 {len(stock_codes)} 只正股到板块")
        if self.config.push_warning and self.account_id >= 0:
            for sig in signals[:5]:
                b = sig.bond
                tq.send_warn(
                    stock_list=[b.code],
                    time_list=[],
                    price_list=[str(b.bond_price)],
                    close_list=[str(b.stock_price)],
                    volum_list=[str(0)],
                    reason_list=[f"折价{sig.premium_rate:.2f}% 利润{sig.expected_profit_pct:.2f}%"],
                    bs_flag_list=['2'],
                    warn_type_list=[],
                    count=1
                )
    # --------------------------------------------------------
    # 自动交易
    # --------------------------------------------------------
    def execute_trade(self, signal: ArbitrageSignal) -> Optional[Dict]:
        if not self.config.auto_trade or self.account_id < 0:
            return None
        b = signal.bond
        volume = max(1, int(self.config.trade_amount / (b.bond_price * 10))) * 10
        self._log(f"📝 买入 {b.name}({b.code}) {volume} 张 @ {b.bond_price:.2f}")
        result = tq.order_stock(
            account_id=self.account_id,
            stock_code=b.code,
            order_type=tqconst.STOCK_BUY,
            order_volume=volume,
            price_type=tqconst.PRICE_MY,
            price=b.bond_price,
            notify=1
        )
        self._log(f"  结果: {result}")
        return result
    # --------------------------------------------------------
    # 查询持仓
    # --------------------------------------------------------
    def check_positions(self):
        if self.account_id < 0:
            self._log("无有效账户")
            return
        asset = tq.query_stock_asset(account_id=self.account_id)
        if asset:
            self._log(f"账户资产: {asset}")
        positions = tq.query_stock_positions(account_id=self.account_id)
        if positions:
            self._log(f"持仓: {len(positions)} 只")
    # --------------------------------------------------------
    # 持续监控
    # --------------------------------------------------------
    def run_continuous(self):
        self.initialize()
        self._log(
            f"持续监控 | 间隔 {self.config.scan_interval_seconds}s | "
            f"阈值 {self.config.negative_premium_threshold}%"
        )
        try:
            while True:
                self.scan_count += 1
                self._log(f"\n--- 第 {self.scan_count} 次扫描 ---")
                bonds = self.scan_market()
                self.print_report(bonds)
                signals = self.find_signals(bonds)
                if signals:
                    self.print_signals(signals)
                    self.push_to_tdx(signals)
                    if self.config.auto_trade:
                        for sig in signals[:3]:
                            self.execute_trade(sig)
                if 0 < self.config.max_scans <= self.scan_count:
                    break
                self._log(f"等待 {self.config.scan_interval_seconds}s...")
                time.sleep(self.config.scan_interval_seconds)
        except KeyboardInterrupt:
            self._log("\n手动停止")
        finally:
            tq.close()
# ============================================================
# 快捷函数
# ============================================================
def run_once(config: ArbitrageConfig = None):
    """单次全市场扫描"""
    strategy = KzzArbitrageStrategy(config or ArbitrageConfig())
    strategy.initialize()
    bonds = strategy.scan_market()
    strategy.print_report(bonds, top_n=50)
    signals = strategy.find_signals(bonds)
    if signals:
        strategy.print_signals(signals)
        strategy.push_to_tdx(signals)
        # # ====== 自动交易(需要时取消下方注释)======
        # if strategy.config.auto_trade:
        #     for sig in signals[:3]:
        #         strategy.execute_trade(sig)
    # 双低策略 TOP 20
    bonds_dl = sorted(strategy.bonds, key=lambda x: x.double_low)
    print("\n=== 双低策略 TOP 20 ===")
    for i, b in enumerate(bonds_dl[:20], 1):
        print(f"  {i:>2}. {b.code} {b.name:<8} 价格:{b.bond_price:>6.2f} "
              f"溢价:{b.premium_rate:>6.2f}% 双低:{b.double_low:>6.2f}")
    tq.close()
    return strategy
def intraday_negative_premium_scan(config: ArbitrageConfig = None):
    """日内折价套利扫描"""
    cfg = config or ArbitrageConfig()
    cfg.negative_premium_threshold = -0.5
    cfg.min_turnover = 1000.0
    strategy = KzzArbitrageStrategy(cfg)
    strategy.initialize()
    bonds = strategy.scan_market()
    targets = [
        b for b in bonds
        if b.premium_rate < cfg.negative_premium_threshold
        and b.is_convertible
    ]
    if not targets:
        print("\n当前无显著折价机会")
        strategy.print_report(bonds, top_n=20)
        low = [b for b in bonds if 0 <= b.premium_rate < 5]
        print("\n低溢价关注:")
        for b in sorted(low, key=lambda x: x.premium_rate)[:20]:
            print(f"  {b.code} {b.name:<8} 溢价:{b.premium_rate:>6.2f}% "
                  f"价格:{b.bond_price:>6.2f}")
    else:
        print(f"\n>> 发现 {len(targets)} 只折价转债!")
        for b in targets:
            profit = abs(b.premium_rate) * 0.98
            print(f"  {b.code} {b.name:<8} | 转债:{b.bond_price:.2f} "
                  f"-> 转股价值:{b.convert_value:.2f} | "
                  f"折价:{b.premium_rate:.2f}% | 利润:{profit:.2f}%")
    tq.close()
def analyze_single_bond(kzz_code: str):
    """分析单只可转债"""
    ensure_tq_init()
    print(f"\n{'='*60}")
    print(f" 可转债详细分析: {kzz_code}")
    print(f"{'='*60}")
    info = tq.get_kzz_info(stock_code=kzz_code)
    if not info:
        print("获取信息失败!")
        tq.close()
        return
    print(f"\n基本信息 (get_kzz_info 返回):")
    for k, v in info.items():
        print(f"  {k}: {v}")
    # 构建正股代码
    hs_code = str(info.get('HSCode', '')).strip()
    set_code = str(info.get('SetCode', '0')).strip()
    stock_code = build_stock_code(hs_code, set_code)
    if not stock_code:
        print("\n未能获取正股代码")
        tq.close()
        return
    # DLL 已返回的实时数据
    zg_price = safe_float(info.get('ZGPrice'))
    bond_price = safe_float(info.get('KZZNow'))
    stock_price = safe_float(info.get('AGNow'))
    convert_value = safe_float(info.get('ZGValue'))
    premium_rate = safe_float(info.get('KZZYj'))
    print(f"\n核心数据:")
    print(f"  转债价格: {bond_price:.2f}")
    print(f"  正股价格: {stock_price:.2f} ({stock_code})")
    print(f"  转股价:   {zg_price:.2f}")
    print(f"  转股价值: {convert_value:.2f}")
    print(f"  溢价率:   {premium_rate:.2f}%")
    print(f"  剩余规模: {info.get('RestScope', '-')}")
    print(f"  评级:     {info.get('KZZScore', '-')}")
    if premium_rate < -0.5:
        print(f"  [!!] 折价! 预期利润: {abs(premium_rate)*0.98:.2f}%")
    elif premium_rate < 3:
        print(f"  [--] 低溢价,关注")
    else:
        print(f"  [OK] 暂无套利空间")
    # 近期K线
    print(f"\n近10日走势:")
    try:
        kzz_data = tq.get_market_data(
            stock_list=[kzz_code], period='1d', count=10, field_list=['Close']
        )
        stock_data = tq.get_market_data(
            stock_list=[stock_code], period='1d', count=10, field_list=['Close']
        )
        print(f"  {'日期':<12} {'转债':>8} {'正股':>8}")
        if 'Close' in kzz_data:
            for dt in kzz_data['Close'].index[-10:]:
                kc = (kzz_data['Close'].loc[dt, kzz_code]
                      if kzz_code in kzz_data['Close'].columns else '-')
                sc = '-'
                if ('Close' in stock_data and stock_code in stock_data['Close'].columns
                        and dt in stock_data['Close'].index):
                    sc = stock_data['Close'].loc[dt, stock_code]
                print(f"  {str(dt)[:10]:<12} {str(kc):>8} {str(sc):>8}")
    except Exception as e:
        print(f"  K线获取失败: {e}")
    tq.close()
# ============================================================
# 主入口
# ============================================================
if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser(description='可转债套利策略')
    parser.add_argument(
        '--mode', type=str, default='once',
        choices=['once', 'continuous', 'intraday', 'analyze'],
        help='运行模式: once=单次扫描, continuous=持续监控, intraday=日内折价, analyze=单只分析'
    )
    parser.add_argument('--code', type=str, default='', help='可转债代码 (analyze模式使用)')
    parser.add_argument('--interval', type=int, default=60, help='扫描间隔秒数 (continuous模式)')
    parser.add_argument('--threshold', type=float, default=-1.0, help='负溢价率阈值(%%)')
    parser.add_argument('--min-turnover', type=float, default=500.0, help='最低成交额(万元)')
    parser.add_argument('--auto-trade', action='store_true', help='开启自动交易')
    args = parser.parse_args()
    config = ArbitrageConfig(
        scan_interval_seconds=args.interval,
        negative_premium_threshold=args.threshold,
        min_turnover=args.min_turnover,
        auto_trade=args.auto_trade,
    )
    if args.mode == 'once':
        run_once(config)
    elif args.mode == 'continuous':
        KzzArbitrageStrategy(config).run_continuous()
    elif args.mode == 'intraday':
        intraday_negative_premium_scan(config)
    elif args.mode == 'analyze':
        if not args.code:
            print("用法: python kzz_arbitrage.py --mode analyze --code 123045.SZ")
        else:
            analyze_single_bond(args.code)
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-05-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 子晓聊技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 套利的核心就一个数:转股溢价率
  • 这脚本能干吗?
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档