首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >印度股票行情 API 接入指南

印度股票行情 API 接入指南

原创
作者头像
Always_Somewhere
发布2026-06-16 16:21:37
发布2026-06-16 16:21:37
350
举报

提到印度,很多人的第一反应可能是街头美食、人口红利,或者互联网外包。

但如果你关注全球资本市场,会发现另一件事正在发生:

过去几年,印度正在成为亚洲增长最快的金融市场之一。

不只是股市上涨,而是整个市场基础设施都在高速扩张——交易量持续增长、投资者数量不断刷新纪录、IPO融资热度居高不下,越来越多国际资金开始把印度放进配置名单。

市场一热,需求自然就来了。

近几年,面向印度市场的证券交易平台、量化系统、行情终端、金融 SaaS 也越来越多。而这些产品背后绕不开的一件事就是:

如何拿到稳定、实时、可商用的印度股票行情数据?

如果你正在开发印度股票相关产品,这篇文章会把市场结构、数据成本,以及最现实的接入方案一次讲清楚。

一、印度股票市场到底是什么格局?

先别急着接 API。

想做印度股票产品,第一件事是先搞清楚:数据到底从哪里来。

目前由印度证券交易委员会(SEBI)监管认可的交易所共有 9 家,覆盖股票、衍生品、商品以及国际证券市场。

序号

交易所简称

主要类型

总部/位置

活跃度/特色

总体市场份额

1

NSE

股权+衍生品

孟买

交易量最大,衍生品绝对主导

90%

2

BSE

股权+衍生品

孟买

历史最悠久、上市公司最多

<10%

3

India INX

国际/外币证券

GIFT City

面向全球投资者

<1%

4

NSE IFSC

国际衍生品

GIFT City

NSE国际版

<1%

5

MSEI

股权+货币

孟买

第三大但规模有限

<1%

6

CSE

股权(有限)

加尔各答

基本不活跃

接近0

7

MCX

商品衍生品

孟买

金属与能源主场

97–98%商品份额

8

NCDEX

农产品商品

孟买

农业期货核心市场

2–3%

9

ICEX

商品衍生品

纳维孟买

细分商品市场

<1%

看到这里你会发现:

虽然交易所有九家,但真正决定印度股票市场的,其实只有两家。

NSE:印度真正的流量入口

国家证券交易所(NSE)几乎统治了印度资本市场。

无论股票成交、衍生品交易、量化策略还是实时行情需求,大部分最终都会落到 NSE。

尤其在衍生品市场,长期占据超过 90% 的市场份额。

很多机构做印度市场,本质上就是在做 NSE。

BSE:亚洲老大哥,但市场份额已经换人

孟买证券交易所(BSE)成立于 1875 年。

它比很多国家的现代金融体系出现得都早,是亚洲历史最悠久的交易所之一。

虽然如今交易规模不如 NSE,但上市公司数量仍然很多,一些特定品种依然离不开它。

所以现实里,大部分金融产品都会同时覆盖:

  • NSE
  • BSE

双市场组合,才能做到完整覆盖。

截至 2025 年底:

BSE 与 NSE 合计上市公司数量已经超过 5300 家

两者加起来,占据了印度接近 98% 的股票市场份额

更夸张的是:

NSE 的衍生品成交量长期排在全球前三。

它甚至一度估值超过 500 亿美元,成为全球最贵的未上市公司之一。

时间规则也需要提前注意。

两家交易所统一采用:

IST(印度标准时间,UTC+5:30)

正常交易时间:

  • 周一至周五
  • 上午 9:15 ~ 下午 3:30

预开盘阶段:

  • 9:00 ~ 9:15

如果你准备做实时行情系统、K线推送或者交易应用,这些时间参数通常都需要提前处理好。

二、为什么很少有人直接接交易所?

理论上最完美的方案当然是直接找 NSE 或 BSE 买授权。

比较交易所也是追逐盈利的,给钱都能拿到原始行情,听起来很合理。

现实往往是大多数团队看到报价单以后直接退出。

原因只有一个:太贵。

下面是直连交易所的大致成本(已折算人民币)。

费用项目

NSE

BSE

股票 L1 实时(年费)

约176,640元

约73,600元

期货期权 L1(年费)

约176,640元

约36,800元

L2 深度行情(年费)

约276,000元

约73,600元

终端用户分发费(月/用户)

约603–809元

约478–846元

备份链路

约14,720元/年

GST 增值税

额外18%

额外18%

注意,这还只是门票。

真正上线以后还有:

  • 用户分发授权
  • 专线网络
  • 数据中心接入
  • 备份链路
  • 合规审计
  • 企业资质审核

如果按完整商业部署算:

仅仅接入 NSE 的股票 + 衍生品实时行情,

一年预算做到 几十万到上百万元人民币 并不夸张。

而且即使你有预算,也不一定能买得到。

交易所通常要求:

  • 企业主体
  • 数据协议签署
  • 合规审核
  • 专线部署

对于中小团队、创业公司甚至个人开发者来说,几乎没有现实可操作性。

那有没有免费方案?

也有。

比如很多开发者第一反应都会想到:

Yahoo Finance

问题在于:

免费往往意味着代价。

常见问题包括:

  • 延迟 10~15 分钟
  • 高频请求限制
  • IP 封锁
  • 不允许商业实时场景

做研究可以。

做实时交易基本不够。

所以全球市场后来慢慢形成了一种成熟模式:

专业行情服务商统一采购授权 → 再向开发者分发。

美国、欧洲、港股市场已经非常成熟。

印度市场也正在快速进入这个阶段。

下一部分,我们正式进入实操:

如何不用承担百万级授权成本,也能快速接入印度实时行情。

三、Infoway API印度股票实时行情接口

对大多数开发者来说,Infoway API 是性价比最高的选择,能在不承担高额成本的前提下快速搭建实时行情应用。Infoway API提供印度NSE交易所全部挂牌股票的实时行情数据(约5300只个股)。

除了印度股票以外,Infoway API还提供以下市场的实时数据:

  • A股、港股、美股、日本股票
  • 外汇货币对
  • 加密货币
  • 能源/贵金属期货

Infoway API的接口使用非常简单,先在官网注册账号,注册完自动获得API Key → 点我注册,在查询数据的时候需要带上API Key。下面来看看如何查询印度股票数据。

3.1 成交明细

成交明细(Last trade)查询地址:

代码语言:python
复制
https://data.infoway.io/india/batch_trade/{codes}

需要在codes中传入对应的股票代码,建议在我们的官网下载股票代码列表,里面有所有的印度股票代码。

下面是成交明细的请求示例:

代码语言:python
复制
import requests

api_url = 'https://data.infoway.io/india/batch_trade/INFY.IN'

# 设置请求头
headers = {
    'User-Agent': 'Mozilla/5.0',
    'Accept': 'application/json',
    'apiKey': 'YOUR_API_KEY_HERE'
}

# 发送GET请求
response = requests.get(api_url, headers=headers)

# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")

成交明细端点返回如下内容:

代码语言:python
复制
{
  "s": "INFY.IN",
  "respList": [
    {
      "t": "1773037920",
      "h": "1303.40",
      "o": "1301.50",
      "l": "1301.50",
      "c": "1303.00",
      "v": "15615.0",
      "vw": "20342210.20",
      "pc": "0.15%",
      "pca": "1.90"
    }
  ]
}

字段说明:

字段

说明

s

股票代码

t

秒时间戳(UTC+8)

p

交易价格

v

成交量

vw

成交额

td

交易方向 1:BUY 2:SELL 0:默认值

3.2 实时K线

K线请求地址:

代码语言:python
复制
https://data.infoway.io/india/v2/batch_kline/{klineType}/{klineNum}/{codes}

参数说明:

klineType :指的是K线的周期,这里传入1,返回1分钟K,传入2,返回5分钟K,具体请查看K线查询教程。

klineNum :指的是需要返回的K线数量,单只股票查询最多可一次要求返回500根最近的K线

code :代表股票代码

示例:查询InfoSys(股票代码INFY.IN)1分钟K线,返回最近的10根:

代码语言:python
复制
import requests

api_url = 'https://data.infoway.io/india/v2/batch_kline/1/10/INFY.IN'

# 设置请求头
headers = {
    'User-Agent': 'Mozilla/5.0',
    'Accept': 'application/json',
    'apiKey': 'YOUR_API_KEY_HERE'
}

# 发送GET请求
response = requests.get(api_url, headers=headers)

# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")

K线端点返回示例:

代码语言:python
复制
{
  "s": "INFY.IN",
  "respList": [
    {
      "t": "1773037920",
      "h": "1303.40",
      "o": "1301.50",
      "l": "1301.50",
      "c": "1303.00",
      "v": "15615.0",
      "vw": "20342210.20",
      "pc": "0.15%",
      "pca": "1.90"
    }
  ]
}

字段说明:

字段

说明

t

秒时间戳(UTC+8)

h

最高价

o

开盘价

l

最低价

C

收盘价

v

成交量

vw

成交额

pc

涨跌幅

pca

涨跌额

3.3 盘口深度

Infoway API提供全印度一档盘口深度,请求地址:

代码语言:python
复制
https://data.infoway.io /india/batch_depth/{codes}

同样在cdoes中传入股票代码即可:

代码语言:python
复制
import requests

api_url = 'https://data.infoway.io /india/batch_depth/INFY.IN'

# 设置请求头
headers = {
    'User-Agent': 'Mozilla/5.0',
    'Accept': 'application/json',
    'apiKey': 'YOUR_API_KEY_HERE'
}

# 发送GET请求
response = requests.get(api_url, headers=headers)

# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")

盘口端点返回示例:

代码语言:python
复制
{
  "s": "INFY.IN",
  "t": 1773037524808,
  "a": [
    [
      "1302.0",
    ],
    [
      "1.0",
    ]
  ],
  "b": [
    [
      "1301.9",
    ],
    [
      "172.0",
    ]
  ]
}

a为买盘,包含买一价和买一量,b为卖盘,包含卖一价和卖一量。

3.4 WebSocket订阅全印度股票实时行情

Infoway API支持WebSocket订阅,可订阅全印度市场的实时K线、实时成交、以及实时盘口,非常适合交易所用户。

下面WebSocket代码示例,包含断线重连机制:

代码语言:python
复制
import asyncio
import json
import uuid
import logging
from typing import Optional
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("websocket-client")

class CryptoWebsocketClient:
    """加密货币行情WebSocket客户端(对等Java版本逻辑)"""
    
    def __init__(self, api_key: str, business: str = "crypto"):
        # WebSocket连接配置
        self.ws_url = f"wss://data.infoway.io/ws?business=india&apikey=YourAPIKey"
        # 核心状态
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.is_connected = False
        self.reconnect_interval = 10  # 重连间隔(秒)
        self.heartbeat_interval = 30  # 心跳间隔(秒)
        # 任务对象(用于管理异步任务)
        self.reconnect_task: Optional[asyncio.Task] = None
        self.heartbeat_task: Optional[asyncio.Task] = None

    async def connect(self) -> None:
        """建立WebSocket连接"""
        try:
            # 关闭旧连接(防止连接泄露)
            if self.ws and not self.ws.closed:
                await self.ws.close()
            
            # 建立新连接
            self.ws = await websockets.connect(self.ws_url)
            self.is_connected = True
            logger.info(f"WebSocket连接成功,地址: {self.ws_url}")
            
            # 连接成功后启动订阅和心跳
            await self._send_all_subscribe_requests()
            self._start_heartbeat_task()
        
        except WebSocketException as e:
            self.is_connected = False
            logger.error(f"WebSocket连接失败: {str(e)}")
            raise

    async def _send_all_subscribe_requests(self) -> None:
        """发送所有订阅请求(成交明细、盘口、K线)"""
        if not self.ws or self.ws.closed:
            logger.error("连接未建立,无法发送订阅请求")
            return
        
        try:
            # 1. 订阅实时成交明细(协议号10000)
            await self._send_trade_subscribe()
            await asyncio.sleep(5)  # 间隔5秒
            
            # 2. 订阅实时盘口数据(协议号10003)
            await self._send_depth_subscribe()
            await asyncio.sleep(5)  # 间隔5秒
            
            # 3. 订阅1分钟K线数据(协议号10006)
            await self._send_kline_subscribe()
            
        except Exception as e:
            logger.error(f"发送订阅请求失败: {str(e)}")
            raise

    def _generate_trace_id(self) -> str:
        """生成唯一trace ID(对等Java的UUID)"""
        return str(uuid.uuid4())

    async def _send_trade_subscribe(self) -> None:
        """发送实时成交明细订阅请求"""
        subscribe_msg = {
            "code": 10000,
            "trace": self._generate_trace_id(),
            "data": {"codes": "INFY.IN"}
        }
        await self.ws.send(json.dumps(subscribe_msg))
        logger.info(f"发送成交明细订阅请求: {subscribe_msg}")

    async def _send_depth_subscribe(self) -> None:
        """发送实时盘口数据订阅请求"""
        subscribe_msg = {
            "code": 10003,
            "trace": self._generate_trace_id(),
            "data": {"codes": "INFY.IN"}
        }
        await self.ws.send(json.dumps(subscribe_msg))
        logger.info(f"发送盘口数据订阅请求: {subscribe_msg}")

    async def _send_kline_subscribe(self) -> None:
        """发送1分钟K线数据订阅请求"""
        subscribe_msg = {
            "code": 10006,
            "trace": self._generate_trace_id(),
            "data": {
                "arr": [
                    {"type": 1, "codes": "INFY.IN"}  # type=1 表示1分钟K线
                ]
            }
        }
        await self.ws.send(json.dumps(subscribe_msg))
        logger.info(f"发送K线数据订阅请求: {subscribe_msg}")

    async def _send_heartbeat(self) -> None:
        """发送心跳包(协议号10010)"""
        if not self.ws or self.ws.closed:
            return
        
        heartbeat_msg = {
            "code": 10010,
            "trace": self._generate_trace_id()
        }
        try:
            await self.ws.send(json.dumps(heartbeat_msg))
            logger.debug(f"发送心跳包: {heartbeat_msg}")
        except Exception as e:
            logger.error(f"发送心跳包失败: {str(e)}")
            raise

    def _start_heartbeat_task(self) -> None:
        """启动心跳任务(后台定时发送)"""
        if self.heartbeat_task and not self.heartbeat_task.done():
            self.heartbeat_task.cancel()
        
        async def heartbeat_loop():
            while self.is_connected and self.ws and not self.ws.closed:
                try:
                    await self._send_heartbeat()
                    await asyncio.sleep(self.heartbeat_interval)
                except Exception as e:
                    logger.error(f"心跳任务异常: {str(e)}")
                    break
        
        self.heartbeat_task = asyncio.create_task(heartbeat_loop())

    async def _message_listener(self) -> None:
        """监听服务端推送的消息"""
        while self.is_connected and self.ws and not self.ws.closed:
            try:
                # 阻塞等待接收消息
                message = await self.ws.recv()
                logger.info(f"收到服务端消息: {message}")
                
                # 解析消息并处理(对等Java的OnMessage逻辑)
                self._handle_received_message(message)
                
            except ConnectionClosed:
                logger.warning("WebSocket连接已关闭,停止消息监听")
                self.is_connected = False
                break
            except Exception as e:
                logger.error(f"接收/处理消息异常: {str(e)}")

    def _handle_received_message(self, message: str) -> None:
        """处理接收到的消息(按协议号分类)"""
        try:
            msg_data = json.loads(message)
            code = msg_data.get("code")
            trace = msg_data.get("trace")
            data = msg_data.get("data", {})
            
            if code == 10000:
                logger.info(f"处理成交明细数据 [trace={trace}]: {data}")
            elif code == 10003:
                logger.info(f"处理盘口数据 [trace={trace}]: {data}")
            elif code == 10006:
                logger.info(f"处理K线数据 [trace={trace}]: {data}")
            elif code == 10010:
                logger.debug(f"收到心跳响应 [trace={trace}]")
            else:
                logger.warning(f"未知协议号消息 [code={code}]: {message}")
                
        except json.JSONDecodeError:
            logger.error(f"消息格式错误,无法解析JSON: {message}")
        except Exception as e:
            logger.error(f"处理消息异常: {str(e)}")

    async def _reconnect_loop(self) -> None:
        """自动重连循环(对等Java的startReconnection)"""
        while True:
            if not self.is_connected:
                logger.info(f"尝试重连WebSocket(间隔{self.reconnect_interval}秒)...")
                try:
                    await self.connect()
                except Exception:
                    # 重连失败,等待后重试
                    await asyncio.sleep(self.reconnect_interval)
            else:
                # 连接正常,短暂等待后再次检查
                await asyncio.sleep(1)

    async def start(self) -> None:
        """启动客户端(主入口)"""
        # 启动自动重连任务
        self.reconnect_task = asyncio.create_task(self._reconnect_loop())
        
        try:
            # 首次连接
            await self.connect()
            
            # 启动消息监听
            await self._message_listener()
            
        finally:
            # 清理资源
            self.is_connected = False
            if self.heartbeat_task:
                self.heartbeat_task.cancel()
            if self.reconnect_task:
                self.reconnect_task.cancel()
            if self.ws and not self.ws.closed:
                await self.ws.close()
            logger.info("WebSocket客户端已停止")

async def main():
    """主函数"""
    # 替换为你的实际API Key
    API_KEY = "yourApikey"
    client = CryptoWebsocketClient(api_key=API_KEY)
    await client.start()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("用户中断,退出客户端")
    except Exception as e:
        logger.error(f"客户端运行异常: {str(e)}")

更多详细教程请查阅官方文档,或者Github

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、印度股票市场到底是什么格局?
  • NSE:印度真正的流量入口
  • BSE:亚洲老大哥,但市场份额已经换人
  • 二、为什么很少有人直接接交易所?
  • 三、Infoway API印度股票实时行情接口
    • 3.1 成交明细
    • 3.2 实时K线
    • 3.3 盘口深度
    • 3.4 WebSocket订阅全印度股票实时行情
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档