最近开发一个系统,其中一个模块需要展示实时的执行过程,过程日志可能比较多。以前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单容易实现,但是比较消耗资源,后端没有数据的时候,会造成大量的无用轮询。所以这次我们采用长连接的方案,优化这块的逻辑,提升用户体验。
参考:https://liaoxuefeng.com/books/java/spring/web/websocket/
WebSocket 是一种基于 HTTP 的长连接技术。传统的 HTTP 协议采用请求-响应模型,浏览器不发送请求时,服务器无法主动推送数据给浏览器。因此,当需要定期或不定期向浏览器推送数据(例如股票行情或在线聊天)时,传统的 HTTP 协议只能通过浏览器的 JavaScript 定时轮询来实现。这种方法效率低下,且实时性不高。
由于 HTTP 本身基于 TCP 连接,WebSocket 在 HTTP 协议的基础上进行了简单的升级。建立 TCP 连接后,浏览器在发送请求时附带以下头部信息:
GET /chat HTTP/1.1 Host: www.example.com Upgrade: websocket Connection: Upgrade
这表示客户端希望升级为长连接的 WebSocket。服务器返回升级成功的响应:
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade
收到成功响应后,WebSocket 握手即告完成。这意味着代表 WebSocket 的 TCP 连接将不会被服务器关闭,而是保持开放状态,服务器和浏览器可以随时互相推送消息。这些消息既可以是文本,也可以是二进制数据。通常,大多数应用程序会发送基于 JSON 的文本消息。
现代浏览器均已支持 WebSocket 协议,服务器端则需要底层框架的支持。Java 的 Servlet 规范从 3.1 开始支持 WebSocket,因此,必须选择支持 Servlet 3.1 或更高版本的容器,才能使用 WebSocket。最新版本的 Tomcat、Jetty 等开源服务器均已支持 WebSocket。
我们以实际代码来演示如何在Springboot项目中实现对Websocket的支持。
org.springframework.boot spring-boot-starter-websocket
这个配置的主要作用是自动启动使用了注解==@ServerEndpoint==的类
@Configuration @EnableWebSocket public class WebSocketConfiguration { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
@ServerEndpoint(value = ChaosConst.CHAOS_WS_API + "/execute/log/{bizType}/{bizId}") @Component @Slf4j public class LogWsEndpoint implements Consumer { // 对话的标识 private String bizKey; // 存储每个会话 private static final ConcurrentHashMap> endpointMap = new ConcurrentHashMap<>(); // 将会话放入到线程池中,异步将数据返回给前端 private static ThreadPoolExecutor wsThreadPoolExecutor; // 核心逻辑处理器 private ChaosLogEventHandler handler; // 业务写和读log private static ChaosLogger chaosLogger; @Autowired @Qualifier("wsThreadPoolExecutor") public void setWsThreadPoolExecutor(ThreadPoolExecutor wsThreadPoolExecutor) { if (null != wsThreadPoolExecutor) { LogWsEndpoint.wsThreadPoolExecutor = wsThreadPoolExecutor; } } @Autowired public void setChaosLogger(ChaosLogger chaosLogger) { if (null != chaosLogger) { LogWsEndpoint.chaosLogger = chaosLogger; } } @OnOpen public void onOpen(Session session, @PathParam("bizType") String bizType, @PathParam("bizId") String bizId) { this.bizKey = String.format("%s-%s", bizType, bizId); log.info("[ws-chaos-log]连接建立中 ==> bizKey : {}", bizKey); this.handler = new ChaosLogEventHandler(chaosLogger, session); wsThreadPoolExecutor.submit(() -> flushMessage(bizType, bizId, true)); endpointMap.compute(bizKey, (key, value) -> { List ends = null == value ? new ArrayList<>() : value; ends.add(this); return ends; }); log.info("[ws-chaos-log]连接建立成功: sessionId:{}, bizKey : {}",session.getId(), bizKey); } public void flushMessage(String bizType, String bizId, boolean force) { this.handler.flushMessage(bizType, bizId, force); } @OnClose public void onClose() { log.info("websocket log server close"); if (StringUtils.isBlank(bizKey)) { return; } endpointMap.compute(bizKey, (key, endpoints) -> { if (null != endpoints) { endpoints.remove(this); } return endpoints; }); log.info("[ws-chaos-log]连接关闭成功,关闭该连接信息:sessionId : {}, bizKey : {}。", handler.getSession().getId(), bizKey); } @OnMessage public void onMessage(String message, Session session) throws IOException { log.info("[ws-chaos-log]服务端收到客户端消息 ==> sessionId : {}, bizKey : {}, message : {}", handler.getSession().getId(), bizKey, message); } @OnError public void onError(Session session, Throwable error) { log.error("[ws-chaos-log]WebSocket发生错误,sessionId : {}, bizKey : {}", handler.getSession().getId(), bizKey); } @Override public void accept(ChaosLogEvent chaosLogEvent) { String contextId = String.format("%s-%s", chaosLogEvent.getBizType(), chaosLogEvent.getBizId()); log.info("accept chaosLogEvent : {}", JSON.toJSONString(chaosLogEvent)); List logWsEndpoints = endpointMap.get(contextId); if (CollectionUtils.isEmpty(logWsEndpoints)) { return; } logWsEndpoints.forEach(endpoint -> wsThreadPoolExecutor.submit(() -> endpoint.flushMessage(chaosLogEvent.getBizType(), chaosLogEvent.getBizId(), true))); } }
==注意:上面有个accept()==方法,这个方法后面也会讲到,主要就是用于触发已经建立连接Websocket发送消息。
核心逻辑实现, 这里读取的日志文件是存储在百度云的ois,ois读取逻辑忽略。
@Slf4j public class ChaosLogEventHandler { private static final long READ_LOG_MOST_LEN = 1024 * 1024 * 5L; // 5M private final ChaosLogger chaosLogger; @Getter private final Session session; private final AtomicLong offset = new AtomicLong(-1L); private final AtomicBoolean hasTruncated = new AtomicBoolean(false); private final AtomicLong waitEventCnt = new AtomicLong(0L); private final Lock lock = new ReentrantLock(); public ChaosLogEventHandler(ChaosLogger chaosLogger, Session session) { this.chaosLogger = chaosLogger; this.session = session; } public void flushMessage(String bizType, String bizId, boolean force) { String bizKey = bizType + "-" + bizId; if (!lock.tryLock()) { waitEventCnt.incrementAndGet(); log.info("[WS]获取锁失败,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey); return; } try { if (!force && waitEventCnt.getAndSet(0L) < 1) { log.info("[ws-chaos-log]没有待处理事件,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey); // 没有待处理的事件 return; } log.info("[ws-chaos-log]向客户端刷新数据 ==> sessionId : {}, bizKey : {}, offset : {}", session.getId(), bizKey, offset.get()); if (offset.get() < 0) { long contentLength = chaosLogger.getLogContentLength(bizType, bizId); log.info("[ws-chaos-log]contentLength = {} for bizLogKey {}", contentLength, bizKey); if (contentLength == 0) { return; } if (contentLength > READ_LOG_MOST_LEN) { offset.set(contentLength - READ_LOG_MOST_LEN); hasTruncated.set(true); log.info("[ws-chaos-log]文件过大,截取最后10M ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get()); } else { offset.set(0L); } } else if (!force) { long contentLength = chaosLogger.getLogContentLength(bizType, bizId); if (contentLength <= offset.get()) { log.info("[ws-chaos-log]文件长度小于offset,无需刷新 ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get()); return; } } // 读取日志内容 BosObject bosObject = chaosLogger.readLogObject(bizType, bizId, offset.get(), Long.MAX_VALUE); try (BufferedReader reader = new BufferedReader(new InputStreamReader(bosObject.getObjectContent()))) { String line = null; while (null != (line = reader.readLine())) { if (hasTruncated.get()) { hasTruncated.set(false); log.info("[ws-chaos-log]hasTruncated changed to false"); } else { log.info("[ws-chaos-log]send ws msg:{}", line); try { session.getBasicRemote().sendText(line + "\n"); } catch (IllegalStateException e) { log.info("[ws-chaos-log]发送消息过程中连接状态异常,跳过", e); } } // +1是因为每一行结尾会有一个回车 offset.addAndGet(line.getBytes(StandardCharsets.UTF_8).length + 1); } } catch (IOException e) { log.error("", e); } } catch (NotFoundException e) { log.info("[ws-chaos-log]未找到数据,无需向客户端同步,bizKey:{}", bizKey, e); } catch (RuntimeException e) { log.error("", e); } finally { lock.unlock(); } log.info("[ws-chaos-log]向客户端刷新数据,完成 ==> sessionId : {}, bizKey : {}", session.getId(), bizKey); // 在处理过程中,可能又有新的事件,所以再次尝试刷新数据 flushMessage(bizType, bizKey, false); } }
前后端建立连接的时候,绑定了后端一台机器,但是后台一般都是多台服务器,如果事件传递到其他服务器,那么已经建立的连接如何监听到并返回内呢?
这里使用了rocketmq的机制,每台机器都会监听到事件的变化,从而触发当前机器将变更内容发回到前端。
@Component @RocketMQMessageListener(topic = "EXECUTE_FLOW_LOG", selectorExpression = "log", consumerGroup = "flow-log", messageModel = MessageModel.BROADCASTING) @Slf4j public class ChaosLogEventConsumer implements RocketMQListener { @Autowired(required = false) private List> chaosLogEventConsumers = Collections.emptyList(); @Override public void onMessage(String message) { log.info("[MQ]receive ChaosLogEvent message:{}", message); ChaosLogEvent event = JsonUtils.fromJson(message, ChaosLogEvent.class); for (Consumer consumer : chaosLogEventConsumers) { try { consumer.accept(event); } catch (RuntimeException e) { log.error("[MQ] failed consume ChaosLogEvent message,consumer:" + consumer.getClass(), e); } } } }
以react为例,仅供参考:
export const fetchExecuteLogs = (bizType: string, bizId: any, logsRef: any, setLogs: any) => { if (!bizType || !bizId) { console.log('fetchLogs: logContextToken or node is null') return } setLogs([]) if (logsRef.current[0]) { console.log('close ws') logsRef.current[0].close() } let host = wsHost ? wsHost : window.location.host let protocol = window.location.protocol === 'https:' ? 'wss' : 'ws' let client = new WebSocket(`${protocol}://${host}/ws/ark/chaos/execute/log/${bizType}/${bizId}`) logsRef.current = [client, []] // 报错的回调函数 client.onerror = (e: any) => { console.log('Connection Error') console.log(e) } //链接打开的回调函数 client.onopen = () => { console.log('WebSocket Client Connected') } //链接关闭的回调函数 client.onclose = () => { console.log('echo-protocol Client Closed') } //收到消息的处理函数 client.onmessage = (e: any) => { if (logsRef.current[0] === client) { if (typeof e.data === 'string') { let newLogs = [...logsRef.current[1], e.data] if (newLogs.length > 250) { newLogs = newLogs.slice(200) } setLogs(newLogs) logsRef.current = [client, newLogs] } } else { client.close() } } const sendPing = () => { if (logsRef.current[0] === client) { const data = { message: 'heartbeat' } client.send(JSON.stringify(data)) setTimeout(sendPing, 10000) } } setTimeout(sendPing, 10000) }