Java WebSocket对接币安区块链K线行情API
创始人
2024-12-29 02:35:04
0

今天我们来说说我在做Java后端对接币安区块链时,遇到的问题及解决方式。

既然要对接币安区块链K线接口,我们首先必须先了解这个行情api在哪里?

1. 行情K线接口

https://binance-docs.github.io/apidocs/futures/cn/#k-8

图片

注意:这里一定要是连续合约K线,而不是K线接口。这是在看官网K线socket接口对比得到的。

如果使用K线Api,数据是不正确的。

而且有一点我需要表述一下,不能说网络上其他博主的步骤不对,应该是版本升级的问题,其他博主的教程已经无法使用。

2. Java实现websockt客户端

Java实现websocket其实有很多种方式,比如:javax.websocket Java标准库、再如OkHttp、Apache HttpClient还有一个开源的库Java-WebSocket等等

Java-WebSocket在我碰到这个库之后,我基本都没用过其实的库,假如它适用我的业务,必须用它,因为它简单、高效。

来写代码了:

pom.xml先引入Java-WebSocket

   org.java-websocket   Java-WebSocket   1.5.4 
然后我们写一个websocket的工具类。
/**  * 合约站行情推送处理器  */ @Slf4j public class WssMarketHandle implements Cloneable {     private WebSocketClient webSocketClient;     // 合约站行情请求以及订阅地址     private String pushUrl = "";          private WssMarketHandle() {     }      public WssMarketHandle(String pushUrl) {         this.pushUrl = pushUrl;     }     // .... }

由于K线数据时序要实时去接收的,所以我这里需要它连接之后,就让他去订阅接口,所以我的代码只写一个订阅的外部方式:

public void sub(List channels, SubscriptionListener callback) throws URISyntaxException {     doConnect(channels, callback); }

​​​​ List channels 多币种订阅的模式参数

SubscriptionListener callback 回调方法类

来看下doConnect方法,他是实现是主体

private void doConnect(List channels, SubscriptionListener callback) throws URISyntaxException {     webSocketClient = new WebSocketClient(new URI(pushUrl)) {         @Override         public void onOpen(ServerHandshake serverHandshake) {             logger.debug("onOpen Success");             doSub(channels);             dealReconnect();             }         @Override         public void onMessage(String socketJson) {             fixedThreadPool.execute(() -> {                 try {                     JSONObject entries = JSONUtil.parseObj(socketJson);                     String ping = entries.getStr("ping");                     String stream = entries.getStr("stream");                     if (StrUtil.isNotEmpty(ping)) {                         log.info("WssMarketHandle onMessage stream: " + socketJson);                         dealPing();                     } else if (StrUtil.isNotEmpty(stream)) {                          callback.onReceive(socketJson);                     } else {                         log.info("WssMarketHandle onMessage other: " + socketJson);                     }                 } catch (Throwable e) {                     logger.error("onMessage异常", e);                 }             });             }         @Override         public void onMessage(ByteBuffer bytes) {             fixedThreadPool.execute(() -> {                 try {                     byte[] temp = bytes.array();                     // gzip 解压                     byte[] decompress = GZipUtils.decompress(temp);                     String socketJson = new String(decompress, StandardCharsets.UTF_8);                     JSONObject JSONMessage = JSONUtil.parseObj(socketJson);                     String ch = JSONMessage.getStr("ch");                     String ping = JSONMessage.getStr("ping");                     if (StrUtil.isNotEmpty(ch)) {                         callback.onReceive(socketJson);                     }                     if (StrUtil.isNotEmpty(ping)) {                         dealPing();                     }                 } catch (Throwable e) {                     logger.error("onMessage异常", e);                 }             });             }         @Override         public void onClose(int i, String s, boolean b) {             logger.error("onClose i:{},s:{},b:{}", i, s, b);             }         @Override         public void onError(Exception e) {             logger.error("onError:", e);         }         };         webSocketClient.connect(); }

其他的方法,包含取消订阅、断开重连以及Ping处理:

public void close() {   webSocketClient.connect(); }  /** * 开始订阅 * @param channels */ private void doSub(List channels) {   try {       if (IterUtil.isNotEmpty(channels)) {           channels.stream().forEach(e -> {               webSocketClient.send(e);           });       }   } catch (Exception ex) {   } }  /** * 取消订阅 * @param subStr */ public void unSub(String subStr) {   try {       webSocketClient.send(subStr);   } catch (Exception ex) {   } }  /** * Ping处理 */ private void dealPing() {   try {       JSONObject jsonMessage = new JSONObject();       jsonMessage.put("pong", pong.incrementAndGet());       logger.debug("发送pong:{}", jsonMessage.toString());       webSocketClient.send(jsonMessage.toString());   } catch (Throwable t) {       logger.error("dealPing出现了异常");   } }  /** * 重连 */ private void dealReconnect() {   try {       scheduledExecutorService.scheduleAtFixedRate(() -> {           try {               if ((webSocketClient.isClosed() && !webSocketClient.isClosing())) {                   logger.error("isClosed:{},isClosing:{},准备重连", webSocketClient.isClosed(), webSocketClient.isClosing());                   Boolean reconnectResult = webSocketClient.reconnectBlocking();                   logger.error("重连的结果为:{}", reconnectResult);                   if (!reconnectResult) {                       webSocketClient.closeBlocking();                       logger.error("closeBlocking");                   }                }           } catch (Throwable e) {               logger.error("dealReconnect异常", e);           }        }, 60, 10, TimeUnit.SECONDS);   } catch (Exception e) {       logger.error("dealReconnect scheduledExecutorService异常", e);   } }

​​​​​​​ 至此整个开发过程已然明了。

调用websockt接口实现,写个main方法测试下。

private static final String BIAN_WS_URL = "wss://fstream.binance.com/stream";  public static void main(String[] args) throws Exception {     WssMarketHandle marketHandle = new WssMarketHandle(BIAN_WS_URL);     List channels = new ArrayList<>();     JSONObject object = new JSONObject();     object.set("method", "SUBSCRIBE");     object.set("id", RandomUtil.randomInt(99999));      JSONArray params = new JSONArray();     String pair = ExchangeEnum.BTC.getExchangeName().toLowerCase() + "usdt";     String perpetual = "perpetual";     String urlParam = StrUtil.format("{}_{}@continuousKline_1m", pair, perpetual);     params.add(urlParam);     object.set("params", params);      channels.add(object.toString());     marketHandle.sub(channels, log::info); }

​​​​​​​历史K线我找了好久,都没有适用的,所以使用的是接口方式,因为它本来就是一次性获取的,所以不需要socket长连接获取也是可行的。接口如下:

https://fapi.binance.com/fapi/v1/continuousKlines

有需要帮助的小伙伴,可以关注,联系我,谢谢!

相关内容

热门资讯

七分钟了解!闲娱棋牌有挂吗,w... 七分钟了解!闲娱棋牌有挂吗,wepower德州都是真的有挂,必备教程(有挂脚本);进入游戏-大厅左侧...
2分钟科普!雀神微信小程序怎么... 2分钟科普!雀神微信小程序怎么开挂,aaPOKER确实是真的有挂,玩家教你(有挂教学)1、点击下载安...
九分钟辅助!决战卡五星游戏辅助... 九分钟辅助!决战卡五星游戏辅助器,德州俱乐部果然有挂,详细教程(有挂揭秘)暗藏猫腻,小编详细说明决战...
8分钟辅助挂!财神十三张特殊牌... 8分钟辅助挂!财神十三张特殊牌外 挂,we-poker原来有挂,新2025版(有挂教学)1、每一步都...
7分钟辅助挂!台州宝宝游戏外 ... 7分钟辅助挂!台州宝宝游戏外 挂,鱼扑克app俱乐部切实是有挂,安装教程(有挂辅助挂)1、任何台州宝...
一分钟实锤!闲逸碰胡黑科技,哈... 一分钟实锤!闲逸碰胡黑科技,哈糖大菠萝十三张一直真的是有挂,微扑克教程(有挂软件)运闲逸碰胡黑科技辅...
六分钟辅助!微信小程序广丰51... 六分钟辅助!微信小程序广丰510k有挂吗,菠萝德州app切实是有挂,AA德州教程(有挂详情)1、起透...
五分钟普及!中至万年麻将有挂吗... 五分钟普及!中至万年麻将有挂吗,德州俱乐部一直是有挂,可靠技巧(有挂普及);该软件可以轻松地帮助玩家...
8分钟发现!葫芦娃捉鸡是真的吗... 8分钟发现!葫芦娃捉鸡是真的吗,红龙poker切实是有挂,科技教程(有挂神器)该软件可以轻松地帮助玩...
4分钟了解!大唐麻将有什么规律... 4分钟了解!大唐麻将有什么规律吗,德扑其实是真的有挂,教你攻略(有挂方法);1、实时大唐麻将有什么规...