
在量化交易和金融数据分析领域,股票行情 API、港美股报价 API、港美股数据 API和处理股票行情数据是核心需求之一。与传统轮询方式相比,WebSocket 技术提供了真正的实时数据推送能力,大大降低了延迟。 目前市面上有多种股票行情数据接口,包括免费和付费的,如聚宽、Tushare、iTick 等。本文将以 iTick API 为例,介绍如何使用 Java 通过 WebSocket 协议订阅股票实时行情数据

首先,我们需要在项目中添加 WebSocket 客户端依赖。推荐使用 Java-WebSocket 库:
<dependencies>
<!-- WebSocket 客户端 -->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.3</version>
</dependency>
<!-- JSON 处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- 日志框架 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.8</version>
</dependency>
</dependencies>import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
* iTick 配置类
*/
@Data
public class ITickConfig {
@JsonProperty("websocket_url")
private String websocketUrl = "wss://api.itick.org/stock";
// 通过官网:https://itick.org 获取免费试用 API KEY
@JsonProperty("api_key")
private String apiKey = "YOUR_API_KEY";
@JsonProperty("reconnect_interval")
private int reconnectInterval = 5000;
@JsonProperty("heartbeat_interval")
private int heartbeatInterval = 30000;
}import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 股票实时行情数据模型
*/
@Data
public class StockQuote {
@JsonProperty("symbol")
private String symbol; // 股票代码
@JsonProperty("volume")
private Long volume; // 成交量
@JsonProperty("open")
private BigDecimal open; // 开盘价
@JsonProperty("high")
private BigDecimal high; // 最高价
@JsonProperty("low")
private BigDecimal low; // 最低价
@JsonProperty("close")
private BigDecimal preClose; // 前收盘价
@JsonProperty("timestamp")
private LocalDateTime timestamp; // 时间戳
}import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* iTick WebSocket 客户端
*/
@Slf4j
public class ITickWebSocketClient extends WebSocketClient {
private final ITickConfig config;
private final ObjectMapper objectMapper;
private final Map<String, QuoteListener> listeners;
private final ScheduledExecutorService scheduler;
private volatile boolean authenticated = false;
public ITickWebSocketClient(URI serverUri, Draft draft, ITickConfig config) {
super(serverUri, draft);
this.config = config;
this.objectMapper = new ObjectMapper();
this.listeners = new ConcurrentHashMap<>();
this.scheduler = Executors.newSingleThreadScheduledExecutor();
}
public ITickWebSocketClient(URI serverUri, ITickConfig config) {
super(serverUri);
this.config = config;
this.objectMapper = new ObjectMapper();
this.listeners = new ConcurrentHashMap<>();
this.scheduler = Executors.newSingleThreadScheduledExecutor();
}
@Override
public void onOpen(ServerHandshake handshake) {
log.info("WebSocket 连接已建立,状态: {}", handshake.getHttpStatus());
// 发送认证消息
authenticate();
// 启动心跳
startHeartbeat();
}
@Override
public void onMessage(String message) {
try {
log.debug("收到消息: {}", message);
// 解析消息类型
Map<String, Object> msgMap = objectMapper.readValue(message, Map.class);
String type = (String) msgMap.get("resAc");
switch (type) {
case "auth":
handleAuthResponse(msgMap);
break;
case "quote":
handleQuoteMessage(msgMap);
break;
case "subscribe":
handleSubscriptionResponse(msgMap);
break;
default:
log.warn("未知消息类型: {}", type);
}
} catch (Exception e) {
log.error("消息处理异常: {}", e.getMessage(), e);
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.warn("WebSocket 连接关闭,代码: {}, 原因: {}, 远程: {}", code, reason, remote);
this.authenticated = false;
// 尝试重连
if (remote) {
scheduleReconnect();
}
}
@Override
public void onError(Exception ex) {
log.error("WebSocket 错误: {}", ex.getMessage(), ex);
}
/**
* 发送认证消息
*/
private void authenticate() {
try {
Map<String, Object> authMsg = new HashMap<>();
authMsg.put("ac", "auth");
authMsg.put("params", config.getApiKey());
String authJson = objectMapper.writeValueAsString(authMsg);
send(authJson);
log.info("认证消息已发送");
} catch (JsonProcessingException e) {
log.error("认证消息序列化失败: {}", e.getMessage(), e);
}
}
/**
* 处理认证响应
*/
private void handleAuthResponse(Map<String, Object> response) {
Number code = (Number) response.get("code");
if (code.intValue() == 1) {
this.authenticated = true;
log.info("认证成功")
} else {
String error = (String) response.get("error");
log.error("认证失败: {}", error);
}
}
/**
* 启动心跳
*/
private void startHeartbeat() {
scheduler.scheduleAtFixedRate(() -> {
if (isOpen()) {
try {
Map<String, Object> pingMsg = new HashMap<>();
pingMsg.put("ac", "ping");
pingMsg.put("params", System.currentTimeMillis());
String pingJson = objectMapper.writeValueAsString(pingMsg);
send(pingJson);
log.debug("心跳消息已发送");
} catch (Exception e) {
log.error("心跳发送失败: {}", e.getMessage());
}
}
}, 0, config.getHeartbeatInterval(), TimeUnit.MILLISECONDS);
}
/**
* 处理行情消息
*/
private void handleQuoteMessage(Map<String, Object> message) {
try {
String dataJson = objectMapper.writeValueAsString(message.get("data"));
StockQuote quote = objectMapper.readValue(dataJson, StockQuote.class);
// 通知所有监听器
notifyListeners(quote);
} catch (JsonProcessingException e) {
log.error("行情消息解析失败: {}", e.getMessage(), e);
}
}
/**
* 处理订阅响应
*/
private void handleSubscriptionResponse(Map<String, Object> response) {
Number code = (Number) response.get("code");
if (code.intValue() == 1) {
log.info("订阅成功: {}");
} else {
String error = (String) response.get("error");
log.error("订阅失败 {}: {}", error);
}
}
/**
* 安排重连
*/
private void scheduleReconnect() {
log.info("{}ms 后尝试重连...", config.getReconnectInterval());
scheduler.schedule(() -> {
if (!isOpen()) {
log.info("开始重连...");
reconnect();
}
}, config.getReconnectInterval(), TimeUnit.MILLISECONDS);
}
/**
* 订阅股票行情
*/
public void subscribe(String symbol, QuoteListener listener) {
if (!authenticated) {
log.warn("未认证,无法订阅");
return;
}
try {
listeners.put(symbol, listener);
Map<String, Object> subMsg = new HashMap<>();
subMsg.put("types", "quote");
subMsg.put("params", symbol);
String subJson = objectMapper.writeValueAsString(subMsg);
send(subJson);
log.info("订阅请求已发送: {}", symbol);
} catch (JsonProcessingException e) {
log.error("订阅消息序列化失败: {}", e.getMessage(), e);
}
}
/**
* 通知监听器
*/
private void notifyListeners(StockQuote quote) {
QuoteListener listener = listeners.get(quote.getSymbol());
if (listener != null) {
try {
listener.onQuote(quote);
} catch (Exception e) {
log.error("监听器处理异常: {}", e.getMessage(), e);
}
}
}
/**
* 关闭客户端
*/
public void shutdown() {
scheduler.shutdown();
close();
}
/**
* 行情监听器接口
*/
public interface QuoteListener {
void onQuote(StockQuote quote);
}
}import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 实时数据流处理器
*/
@Slf4j
public class RealTimeDataProcessor implements ITickWebSocketClient.QuoteListener {
private final BlockingQueue<StockQuote> dataQueue;
private final AtomicLong processedCount;
private final DataHandler dataHandler;
private volatile boolean running = false;
private Thread processorThread;
public RealTimeDataProcessor(DataHandler dataHandler, int queueCapacity) {
this.dataQueue = new LinkedBlockingQueue<>(queueCapacity);
this.processedCount = new AtomicLong(0);
this.dataHandler = dataHandler;
}
public void start() {
if (running) {
return;
}
running = true;
processorThread = new Thread(this::processData, "DataProcessor");
processorThread.start();
log.info("数据处理器已启动");
}
public void stop() {
running = false;
if (processorThread != null) {
processorThread.interrupt();
try {
processorThread.join(5000);
} catch (InterruptedException e) {
log.warn("处理器线程停止等待被中断");
}
}
log.info("数据处理器已停止,总共处理 {} 条数据", processedCount.get());
}
@Override
public void onQuote(StockQuote quote) {
if (!running) {
log.warn("处理器未运行,丢弃数据: {}", quote.getSymbol());
return;
}
boolean offered = dataQueue.offer(quote);
if (!offered) {
log.warn("数据队列已满,丢弃行情数据: {}", quote.getSymbol());
// 可以在这里实现背压策略或数据采样
}
}
private void processData() {
while (running && !Thread.currentThread().isInterrupted()) {
try {
StockQuote quote = dataQueue.poll(100, TimeUnit.MILLISECONDS);
if (quote != null) {
processSingleQuote(quote);
processedCount.incrementAndGet();
}
// 批量处理的机会
processBatchIfNeeded();
} catch (InterruptedException e) {
log.info("数据处理器被中断");
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("数据处理异常: {}", e.getMessage(), e);
}
}
}
private void processSingleQuote(StockQuote quote) {
try {
dataHandler.handleQuote(quote);
} catch (Exception e) {
log.error("处理单个行情数据异常: {}", e.getMessage(), e);
}
}
private void processBatchIfNeeded() {
// 如果队列中有大量数据,可以进行批量处理
if (dataQueue.size() > 1000) {
log.info("检测到数据积压,当前队列大小: {}", dataQueue.size());
// 实现批量处理逻辑
processBatch();
}
}
private void processBatch() {
// 批量处理实现
int batchSize = Math.min(dataQueue.size(), 100);
for (int i = 0; i < batchSize; i++) {
StockQuote quote = dataQueue.poll();
if (quote != null) {
try {
dataHandler.handleQuote(quote);
} catch (Exception e) {
log.error("批量处理异常: {}", e.getMessage(), e);
}
}
}
}
public long getProcessedCount() {
return processedCount.get();
}
public int getQueueSize() {
return dataQueue.size();
}
/**
* 数据处理器接口
*/
public interface DataHandler {
void handleQuote(StockQuote quote);
}
}import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 数据存储处理器 - 支持批量插入
*/
@Slf4j
public class DatabaseStorageHandler implements RealTimeDataProcessor.DataHandler {
private final String jdbcUrl;
private final String username;
private final String password;
private final BlockingQueue<StockQuote> batchQueue;
private final int batchSize;
private volatile boolean running = false;
private Thread batchProcessorThread;
public DatabaseStorageHandler(String jdbcUrl, String username, String password, int batchSize) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
this.batchSize = batchSize;
this.batchQueue = new LinkedBlockingQueue<>(batchSize * 2);
}
public void start() {
if (running) {
return;
}
running = true;
batchProcessorThread = new Thread(this::processBatch, "BatchStorageProcessor");
batchProcessorThread.start();
log.info("数据库存储处理器已启动");
}
public void stop() {
running = false;
if (batchProcessorThread != null) {
batchProcessorThread.interrupt();
try {
batchProcessorThread.join(10000);
} catch (InterruptedException e) {
log.warn("批量处理器线程停止等待被中断");
}
}
// 处理剩余数据
processRemainingData();
log.info("数据库存储处理器已停止");
}
@Override
public void handleQuote(StockQuote quote) {
if (!running) {
return;
}
try {
boolean offered = batchQueue.offer(quote, 100, TimeUnit.MILLISECONDS);
if (!offered) {
log.warn("批量队列已满,直接插入单条数据");
insertSingleQuote(quote);
}
} catch (InterruptedException e) {
log.warn("数据入队被中断");
Thread.currentThread().interrupt();
}
}
private void processBatch() {
while (running && !Thread.currentThread().isInterrupted()) {
try {
if (batchQueue.size() >= batchSize) {
insertBatch();
} else {
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
log.info("批量处理器被中断");
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("批量处理异常: {}", e.getMessage(), e);
}
}
}
private void insertBatch() {
if (batchQueue.isEmpty()) {
return;
}
String sql = "INSERT INTO stock_quotes (symbol, volume, open, high, low, close, timestamp) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)";
try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
PreparedStatement statement = connection.prepareStatement(sql)) {
connection.setAutoCommit(false);
int count = 0;
while (count < batchSize && !batchQueue.isEmpty()) {
StockQuote quote = batchQueue.poll();
if (quote != null) {
setStatementParameters(statement, quote);
statement.addBatch();
count++;
}
}
int[] results = statement.executeBatch();
connection.commit();
log.debug("批量插入完成,插入 {} 条数据", count);
} catch (SQLException e) {
log.error("批量插入数据库异常: {}", e.getMessage(), e);
// 可以考虑将失败的数据重新放回队列
}
}
private void insertSingleQuote(StockQuote quote) {
String sql = "INSERT INTO stock_quotes (symbol, volume, open, high, low, close, timestamp) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)";
try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
PreparedStatement statement = connection.prepareStatement(sql)) {
setStatementParameters(statement, quote);
statement.executeUpdate();
} catch (SQLException e) {
log.error("单条插入数据库异常: {}", e.getMessage(), e);
}
}
private void setStatementParameters(PreparedStatement statement, StockQuote quote) throws SQLException {
statement.setString(1, quote.getSymbol());
statement.setLong(5, quote.getVolume());
statement.setBigDecimal(7, quote.getOpen());
statement.setBigDecimal(8, quote.getHigh());
statement.setBigDecimal(9, quote.getLow());
statement.setBigDecimal(10, quote.getClose());
statement.setObject(15, quote.getTimestamp());
}
private void processRemainingData() {
log.info("处理剩余数据,队列大小: {}", batchQueue.size());
while (!batchQueue.isEmpty()) {
insertBatch();
}
}
}import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
/**
* 股票实时行情应用示例
*/
@Slf4j
public class StockRealtimeApplication {
public static void main(String[] args) {
// 配置
ITickConfig config = new ITickConfig();
// 创建 WebSocket 客户端
ITickWebSocketClient client = new ITickWebSocketClient(
URI.create(config.getWebsocketUrl()), config);
// 创建数据处理器
RealTimeDataProcessor dataProcessor = new RealTimeDataProcessor(
new ConsoleDataHandler(), 10000);
// 创建数据库存储处理器(可选)
DatabaseStorageHandler dbHandler = new DatabaseStorageHandler(
"jdbc:mysql://localhost:3306/stock_data", "username", "password", 100);
RealTimeDataProcessor dbProcessor = new RealTimeDataProcessor(dbHandler, 5000);
try {
// 启动处理器
dataProcessor.start();
dbProcessor.start();
dbHandler.start();
// 添加监听器
client.addListener(dataProcessor);
client.addListener(dbProcessor);
// 连接 WebSocket
client.connect();
// 等待连接建立
Thread.sleep(2000);
// 订阅股票
String[] symbols = {"AAPL$US", "GOOGL$US", "MSFT$US", "TSLA$US", "AMZN$US"};
for (String symbol : symbols) {
client.subscribe(symbol, dataProcessor);
Thread.sleep(100); // 避免请求过于频繁
}
log.info("应用启动完成,开始接收实时行情数据");
// 保持程序运行
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("收到关闭信号,开始清理资源...");
client.shutdown();
dataProcessor.stop();
dbProcessor.stop();
dbHandler.stop();
latch.countDown();
}));
latch.await();
} catch (Exception e) {
log.error("应用运行异常: {}", e.getMessage(), e);
} finally {
client.shutdown();
dataProcessor.stop();
dbProcessor.stop();
dbHandler.stop();
}
}
/**
* 控制台数据处理器示例
*/
static class ConsoleDataHandler implements RealTimeDataProcessor.DataHandler {
private long count = 0;
@Override
public void handleQuote(StockQuote quote) {
count++;
if (count % 100 == 0) {
log.info("已处理 {} 条行情数据, 最新: {} - {}", count, quote.getSymbol(), quote.getPrice());
}
// 在这里可以添加业务逻辑,如:
// - 价格预警
// - 技术指标计算
// - 交易信号生成
// - 数据分析
}
}
}import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 性能监控器
*/
public class PerformanceMonitor {
private final RealTimeDataProcessor processor;
private final ScheduledExecutorService monitorScheduler;
private final AtomicLong lastProcessedCount = new AtomicLong(0);
public PerformanceMonitor(RealTimeDataProcessor processor) {
this.processor = processor;
this.monitorScheduler = Executors.newSingleThreadScheduledExecutor();
}
public void start() {
monitorScheduler.scheduleAtFixedRate(this::reportPerformance, 5, 5, TimeUnit.SECONDS);
}
public void stop() {
monitorScheduler.shutdown();
}
private void reportPerformance() {
long currentCount = processor.getProcessedCount();
long delta = currentCount - lastProcessedCount.get();
lastProcessedCount.set(currentCount);
int queueSize = processor.getQueueSize();
double throughput = delta / 5.0; // 每秒处理量
log.info("性能统计 - 队列大小: {}, 吞吐量: {}/秒, 总处理量: {}",
queueSize, String.format("%.2f", throughput), currentCount);
// 预警机制
if (queueSize > 5000) {
log.warn("数据积压警告: 队列大小 {}", queueSize);
}
}
}/**
* 对象池 - 减少GC压力
*/
public class StockQuotePool {
private final ConcurrentLinkedQueue<StockQuote> pool = new ConcurrentLinkedQueue<>();
private final int maxSize;
public StockQuotePool(int maxSize) {
this.maxSize = maxSize;
}
public StockQuote borrowObject() {
StockQuote obj = pool.poll();
return obj != null ? obj : new StockQuote();
}
public void returnObject(StockQuote obj) {
if (pool.size() < maxSize) {
// 重置对象状态
obj.setSymbol(null);
obj.setPrice(null);
// ... 重置其他字段
pool.offer(obj);
}
}
}/**
* 连接管理器 - 支持多连接负载均衡
*/
public class ConnectionManager {
private final List<ITickWebSocketClient> clients = new ArrayList<>();
private final AtomicInteger currentIndex = new AtomicInteger(0);
public void addClient(ITickWebSocketClient client) {
clients.add(client);
}
public void subscribe(String symbol, ITickWebSocketClient.QuoteListener listener) {
// 轮询选择客户端,实现负载均衡
int index = currentIndex.getAndIncrement() % clients.size();
clients.get(index).subscribe(symbol, listener);
}
}本文详细介绍了如何使用 Java 接入 iTick WebSocket API 获取股票实时行情,并提供了高效处理实时数据流的完整解决方案。关键要点包括:
这套方案能够处理高频率的实时行情数据,为量化交易、风险监控、数据分析等应用场景提供稳定可靠的数据基础。在实际部署时,建议根据具体业务需求调整队列大小、批处理参数等配置,以达到最佳性能。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。