API接入

币安WebSocket怎么订阅行情?单流多流生产级代码

币安 WebSocket 实时行情完整方案:单流/组合流 URL、订阅动态管理、断线重连、心跳保活、Stream 类型、深度增量合并算法,附 Python asyncio 与 Node.js 实战代码。

币安 WebSocket 实时行情的最优方案是:使用 wss://stream.binance.com:9443/stream?streams= 组合流端点,一个连接订阅最多 1024 个流,消息延迟小于 50ms,权重消耗几乎为零。本文给出单流、组合流、动态订阅、深度本地撮合、断线重连的完整 Python 与 Node.js 代码,全部可直接运行。在开始之前,如果你还没注册币安账号,可以先到 币安官网 查看官方入口,或通过 免费注册 完成开户。

一、WebSocket 端点总览

类别 URL 用途
Spot 单流 wss://stream.binance.com:9443/ws/{stream} 只订一个流
Spot 组合流 wss://stream.binance.com:9443/stream?streams={s1/s2/...} 订多个流,消息带流名
Spot 备用 wss://data-stream.binance.vision/ws/{stream} 官方备份线路
Futures U 本位 wss://fstream.binance.com/stream?streams= 永续合约
Futures 币本位 wss://dstream.binance.com/stream?streams= 币本位合约
Testnet wss://stream.testnet.binance.vision/ws 测试网

连接数上限:单 IP 最多 300 个 WebSocket 连接,单连接最多订阅 1024 个流,每 24 小时自动断开一次,客户端必须实现重连。

二、Stream 类型清单

币安提供 11 种主要行情流:

Stream 格式 推送频率 说明
aggTrade {symbol}@aggTrade 实时 归集成交
trade {symbol}@trade 实时 逐笔成交
kline_1m {symbol}@kline_1m 每秒 K 线(1m/5m/1h/...)
miniTicker {symbol}@miniTicker 1000ms 精简行情
ticker {symbol}@ticker 1000ms 24h 完整统计
bookTicker {symbol}@bookTicker 实时 最优买卖一
depth {symbol}@depth 1000ms 完整深度(20/50/100 档)
depth@100ms {symbol}@depth@100ms 100ms 深度差分(增量)
!miniTicker@arr !miniTicker@arr 1000ms 全市场精简行情
!ticker@arr !ticker@arr 1000ms 全市场完整统计
!bookTicker !bookTicker 实时 全市场最优买卖一

命名规则:小写 + @ 符号分隔,例如 btcusdt@ticker 而不是 BTCUSDT@ticker

三、Python asyncio 单流订阅

import asyncio, json
import websockets

async def subscribe_ticker(symbol: str):
    url = f"wss://stream.binance.com:9443/ws/{symbol.lower()}@ticker"
    async with websockets.connect(url, ping_interval=180) as ws:
        async for message in ws:
            data = json.loads(message)
            print(f"{data['s']}: 最新价 {data['c']}, "
                  f"24h 涨跌 {data['P']}%, "
                  f"成交量 {data['v']}")

asyncio.run(subscribe_ticker("BTCUSDT"))

输出示例:

BTCUSDT: 最新价 68420.12, 24h 涨跌 2.35%, 成交量 45231.2
BTCUSDT: 最新价 68421.00, 24h 涨跌 2.36%, 成交量 45231.8

四、组合流订阅多个交易对

一个连接订阅 10 个交易对的 ticker + 5 个交易对的 K 线:

import asyncio, json
import websockets

STREAMS = [
    "btcusdt@ticker", "ethusdt@ticker", "bnbusdt@ticker",
    "solusdt@ticker", "xrpusdt@ticker", "adausdt@ticker",
    "dogeusdt@ticker", "maticusdt@ticker", "dotusdt@ticker",
    "avaxusdt@ticker",
    "btcusdt@kline_1m", "ethusdt@kline_1m",
    "bnbusdt@kline_1m", "solusdt@kline_1m", "xrpusdt@kline_1m"
]

async def combined_stream():
    streams = "/".join(STREAMS)
    url = f"wss://stream.binance.com:9443/stream?streams={streams}"
    async with websockets.connect(url, ping_interval=180) as ws:
        async for message in ws:
            payload = json.loads(message)
            stream = payload["stream"]
            data = payload["data"]
            if "@ticker" in stream:
                print(f"[{stream}] {data['s']}: {data['c']}")
            elif "@kline" in stream:
                k = data["k"]
                if k["x"]:  # 只在 K 线闭合时打印
                    print(f"[{stream}] {k['s']} {k['i']} "
                          f"O:{k['o']} H:{k['h']} L:{k['l']} C:{k['c']}")

asyncio.run(combined_stream())

五、动态订阅/取消(SUBSCRIBE 方法)

连接建立后,可以通过 JSON 消息动态添加或移除订阅:

import asyncio, json
import websockets

async def dynamic_sub():
    url = "wss://stream.binance.com:9443/ws"
    async with websockets.connect(url) as ws:
        # 订阅 BTC 和 ETH 的 bookTicker
        await ws.send(json.dumps({
            "method": "SUBSCRIBE",
            "params": ["btcusdt@bookTicker", "ethusdt@bookTicker"],
            "id": 1
        }))

        count = 0
        async for msg in ws:
            data = json.loads(msg)
            if "result" in data:
                print(f"订阅确认: {data}")
                continue
            count += 1
            print(f"{data['s']} 买一 {data['b']} 卖一 {data['a']}")

            # 收到 50 条后取消 ETH 订阅,只保留 BTC
            if count == 50:
                await ws.send(json.dumps({
                    "method": "UNSUBSCRIBE",
                    "params": ["ethusdt@bookTicker"],
                    "id": 2
                }))
                print("已取消 ETH 订阅")

asyncio.run(dynamic_sub())

六、深度增量合并(本地维护订单簿)

高频策略常需要本地撮合级别的订单簿,流程是:

  1. 订阅 @depth@100ms 差分流,放入缓冲
  2. REST 调用 GET /api/v3/depth?symbol=X&limit=1000 获取快照
  3. 丢弃缓冲中 u < lastUpdateId 的消息
  4. 逐个应用 U <= lastUpdateId+1 <= u 的消息
  5. 后续消息直接应用,pu 必须等于上条的 u
import asyncio, json, aiohttp
import websockets
from sortedcontainers import SortedDict

class OrderBook:
    def __init__(self, symbol):
        self.symbol = symbol.lower()
        self.bids = SortedDict()  # 价格 -> 数量
        self.asks = SortedDict()
        self.last_update_id = 0
        self.buffer = []

    def apply_update(self, bids, asks):
        for price, qty in bids:
            price, qty = float(price), float(qty)
            if qty == 0:
                self.bids.pop(price, None)
            else:
                self.bids[price] = qty
        for price, qty in asks:
            price, qty = float(price), float(qty)
            if qty == 0:
                self.asks.pop(price, None)
            else:
                self.asks[price] = qty

    async def load_snapshot(self):
        async with aiohttp.ClientSession() as s:
            url = f"https://api.binance.com/api/v3/depth?symbol={self.symbol.upper()}&limit=1000"
            async with s.get(url) as r:
                snap = await r.json()
                self.last_update_id = snap["lastUpdateId"]
                self.apply_update(snap["bids"], snap["asks"])

    async def run(self):
        url = f"wss://stream.binance.com:9443/ws/{self.symbol}@depth@100ms"
        async with websockets.connect(url) as ws:
            # Step 1: 先接收一段时间差分
            asyncio.create_task(self._collect(ws))
            await asyncio.sleep(1)
            # Step 2: 拉快照
            await self.load_snapshot()
            # Step 3: 应用缓冲中有效差分
            for diff in self.buffer:
                if diff["u"] < self.last_update_id:
                    continue
                self.apply_update(diff["b"], diff["a"])
                self.last_update_id = diff["u"]
            self.buffer = None
            # Step 4: 后续直接应用
            async for msg in ws:
                diff = json.loads(msg)
                self.apply_update(diff["b"], diff["a"])
                self.last_update_id = diff["u"]
                best_bid = self.bids.keys()[-1] if self.bids else 0
                best_ask = self.asks.keys()[0] if self.asks else 0
                print(f"买一 {best_bid} 卖一 {best_ask} 差 {best_ask-best_bid:.2f}")

    async def _collect(self, ws):
        if self.buffer is None:
            return
        async for msg in ws:
            if self.buffer is None:
                break
            self.buffer.append(json.loads(msg))

ob = OrderBook("BTCUSDT")
asyncio.run(ob.run())

七、断线重连与心跳

币安 WebSocket 要求客户端在 3 分钟内响应 pong,且连接 24 小时强制断开:

import asyncio, json, websockets, logging

async def resilient_ws(url, on_message, max_retries=1000):
    retry = 0
    while retry < max_retries:
        try:
            async with websockets.connect(
                url,
                ping_interval=180,  # 3 分钟发一次 ping
                ping_timeout=10,
                close_timeout=5
            ) as ws:
                retry = 0  # 连接成功重置
                async for message in ws:
                    await on_message(json.loads(message))
        except (websockets.ConnectionClosed, asyncio.TimeoutError) as e:
            retry += 1
            wait = min(2 ** retry, 60)  # 指数退避,最多 60s
            logging.warning(f"WS 断开 {e},{wait}s 后重连(第 {retry} 次)")
            await asyncio.sleep(wait)

八、Node.js WebSocket 示例

const WebSocket = require('ws');

const streams = ['btcusdt@bookTicker', 'ethusdt@bookTicker'].join('/');
const url = `wss://stream.binance.com:9443/stream?streams=${streams}`;

function connect() {
  const ws = new WebSocket(url);
  ws.on('open', () => console.log('连接成功'));
  ws.on('message', (raw) => {
    const { stream, data } = JSON.parse(raw);
    console.log(`${data.s} 买一 ${data.b} 卖一 ${data.a}`);
  });
  ws.on('close', () => {
    console.log('断线,3s 后重连');
    setTimeout(connect, 3000);
  });
  ws.on('error', (err) => console.error('错误', err.message));
}

connect();

九、常见问题 FAQ

Q1: WebSocket 也有权重消耗吗?

A: 建立连接本身有 2 权重;后续推送消息 完全不消耗权重。但连接数超过 300 / IP 会被拒绝。

Q2: 为什么订阅后没收到任何消息?

A: 三种常见原因:1) stream 名必须 全小写(btcusdt 不是 BTCUSDT);2) 交易对写错(例如用 BTC-USDT 而不是 BTCUSDT);3) 订阅消息必须在 wss://.../ws 端点而不是 /stream 端点。

Q3: @depth 和 @depth@100ms 什么区别?

A: @depth 推 1000ms 一次完整快照(20 档);@depth@100ms 推 100ms 一次差分(增量)。本地维护订单簿必须用 @100ms,看盘展示用 @depth 即可。

Q4: WebSocket 可以下单吗?

A: 可以。币安提供 WebSocket API(wss://ws-api.binance.com:443/ws-api/v3)用于签名下单,比 REST 延迟低 30-50ms。但生态不成熟,主流策略仍用 REST 下单 + WS 监听。

Q5: 24 小时自动断开如何无缝切换?

A: 双连接滚动方案:到 23 小时时建立新连接 B,等 B 稳定收到消息后再关闭旧连接 A,避免瞬间丢数据。生产级框架如 python-binance 已内置此逻辑。

看完 WebSocket 方案,回到 分类导航 查看「API接入」分类其它 SDK 教程。

继续浏览

对币安使用还有疑问?回到分类页查找同主题的其它教程。

分类导航

相关教程

币安API怎么申请?密钥签名一般要怎么生成 2026-04-14 币安Spot API怎么用?从零到第一单的可运行代码 2026-04-14 币安Futures和Spot API有什么区别?端点参数权重对比 2026-04-14 币安API会被封IP吗?限频策略与权重计算详解 2026-04-14