相信很多同学都用过websocket来实现服务端主动向客户端推送消息吧,基本上所有的管理类系统都会有这个功能。因为有websocket的存在,使得前后的主动交互变得容易和低成本。其实在JAVA领域用SpringBoot框架集成Websoket还是很简单的,今天我们重点不是集成而是通过Redis的发布订阅实现Websocket集群通信,当然有条件的也可以用MQ代替。
WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
Redis 的发布订阅(Pub/Sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。Redis 客户端可以订阅任意数量的频道。当有新消息通过 PUBLISH 命令发送给频道时,这个消息会被发送给订阅它的所有客户端。

Redis的发布订阅(Pub/Sub)和消息队列是两种不同的消息传递模式,它们的主要区别在于消息的处理方式和使用场景。
消息的处理方式:
在 Redis 的发布订阅模式中,消息是即时的,也就是说,当消息发布后,只有当前在线且订阅了该频道的客户端才能收到这个消息,消息不会被存储,一旦发布,当前没有在线的客户端将无法接收到这个消息。
在消息队列中,消息是持久化的,消息被发送到队列后,会一直在队列中等待被消费,即使没有在线的消费者,消息也不会丢失,消费者下次上线后可以继续从队列中获取到消息。
本次演示采用demo形式,仅仅提供演示Websocket集群的实现方式,以及解决消息负载均衡的问题。演示案例重点偏后端实现Websoket集群通讯,不涉及前后端心跳检测,如果应用在生产环境前端需要增加心跳检测与重复创建。
本实战采用原生spring websocket,后续有时间再提供netty版本,以及产线版本。有条件的同学可以自己实现,原理都差不多。
1、项目结构
2、springcloud 版本
org.springframework.boot spring-boot-starter-parent 2.3.12.RELEASE 8 Hoxton.SR12 org.springframework.cloud spring-cloud-dependencies ${spring-cloud.version} pom import 3、maven依赖
com.alibaba fastjson 1.2.68 org.springframework.boot spring-boot-starter-websocket org.springframework.boot spring-boot-starter-thymeleaf 4、配置文件
server: port: 8888 spring: profiles: active: dev mvc: pathmatch: # Springfox使用的路径匹配是基于AntPathMatcher的,而Spring Boot 2.6.X使用的是PathPatternMatcher matching-strategy: ant_path_matcher thymeleaf: mode: HTML encoding: UTF-8 content-type: text/html cache: false prefix: classpath:/templates/ 5、thymeleaf 页面
websocket.html
websocket通讯 【userId】:
【toUserId】:
【内容】:
【操作】:
【操作】:
6、消息实体
Message.java
import lombok.Data; /** * Message * @author senfel * @version 1.0 * @date 2024/5/17 14:39 */ @Data public class Message { /** * 消息编码 */ private String code; /** * 来自(保证唯一) */ private String fromUserId; /** * 去自(保证唯一) */ private String toUserId; /** * 内容 */ private String contentText; } 7、Websocket配置类
WebSocketConfig.java
import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * WebSocketConfig * @author senfel * @version 1.0 * @date 2024/5/16 16:51 */ @Component public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { System.out.println("启动websocket支持"); return new ServerEndpointExporter(); } } Websocket 服务类 WebSocketServer.java import com.example.ccedemo.config.SpringUtil; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; /** * WebSocketServer * @author senfel * @version 1.0 * @date 2024/5/16 16:59 */ @ConditionalOnClass(value = WebSocketConfig.class) @Component @ServerEndpoint("/ws/{deviceId}") public class WebSocketServer { protected Logger logger = LoggerFactory.getLogger(this.getClass()); /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/ private Session session; /** * 设备ID */ private String deviceId; /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 注:底下WebSocket是当前类名*/ private static CopyOnWriteArraySet webSockets =new CopyOnWriteArraySet<>(); /**用来存在线连接用户信息*/ private static ConcurrentHashMap sessionPool = new ConcurrentHashMap(); /** * 链接成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam(value="deviceId")String deviceId) { try { if(StringUtils.isEmpty(deviceId)||deviceId.equals("undefined")){ return; } this.session = session; this.deviceId = deviceId; webSockets.add(this); sessionPool.put(deviceId, session); logger.info("【websocket消息】有新的连接,总数为:"+webSockets.size()); StringBuffer stringBuffer = new StringBuffer(); sessionPool.forEach((key, value) -> { stringBuffer.append(key).append(";"); }); logger.info("当前服务器连接有客户端有:"+stringBuffer.toString()); } catch (Exception e) { } } /** * 链接关闭调用的方法 */ @OnClose public void onClose() { try { webSockets.remove(this); sessionPool.remove(this.deviceId); logger.info("【websocket消息】连接断开,总数为:"+webSockets.size()); StringBuffer stringBuffer = new StringBuffer(); sessionPool.forEach((key, value) -> { stringBuffer.append(key).append(";"); }); logger.info("当前服务器连接有客户端有:"+stringBuffer.toString()); } catch (Exception e) { } } /** * 收到客户端消息后调用的方法 * @param message */ @OnMessage public void onMessage(String message) { logger.info("【websocket消息】收到客户端消息:"+message); SpringUtil.getBean(StringRedisTemplate.class).convertAndSend("webSocketMsgPush",message); } /** 发送错误时的处理 * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { logger.error("用户错误,原因:"+error.getMessage()); error.printStackTrace(); } /** * 广播消息 * @author senfel * @date 2024/5/17 17:10 * @return void */ public void sendAllMessage(String message) { logger.info("【websocket消息】广播消息:"+message); for(WebSocketServer webSocket : webSockets) { try { if(webSocket.session.isOpen()) { webSocket.session.getAsyncRemote().sendText(message); } } catch (Exception e) { e.printStackTrace(); } } } /** * 单点消息 单人 * @param deviceId * @param message * @author senfel * @date 2024/5/17 17:10 * @return void */ public void sendOneMessage(String deviceId, String message) { Session session = sessionPool.get(deviceId); if (session != null&&session.isOpen()) { try { logger.info("【websocket消息】 单点消息:"+message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } /** * 单点消息 * @param deviceId * @param object * @author senfel * @date 2024/5/17 17:10 * @return void */ public void sendOneObject(String deviceId, Object object) { Session session = sessionPool.get(deviceId); if (session != null&&session.isOpen()) { try { logger.info("【websocket消息】 单点消息(对象):"+object); session.getAsyncRemote().sendObject(object); } catch (Exception e) { e.printStackTrace(); } } } /** * 单点消息(多人) * @param deviceIds * @param message * @author senfel * @date 2024/5/17 17:11 * @return void */ public void sendMoreMessage(String[] deviceIds, String message) { for(String deviceId:deviceIds) { Session session = sessionPool.get(deviceId); if (session != null&&session.isOpen()) { try { logger.info("【websocket消息】 单点消息:"+message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } } } 8、controller提供接口
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.ModelAndView; /** * TestController * @author senfel * @version 1.0 * @date 2024/5/17 17:49 */ @RestController @RequestMapping("/api/websocket") public class BaseController { @GetMapping("page") public ModelAndView page(Long userId){ ModelAndView websocket = new ModelAndView("websocket"); websocket.addObject("userId",userId); return websocket; } } 大家都知道Websoket是一个长链接,在不断开的情况下服务端与客户端是可以自由通讯的,这是因为服务端缓存了会话。
如果我们后端采用集群部署,那么可能多个用户的缓存会话会分散在各个服务器上。在我们给指定用户推送消息时就有可能调用服务器上并没有这个用户的会话。
所以,我们引入Redis发布订阅,将消息进行转发到所有的服务端,只有有会话缓存的服务端才会成功推送消息。讲到这里就比较明显了吧,完美解决Websoket负载均衡的问题。
1、maven引入Redis
org.springframework.boot spring-boot-starter-data-redis 2、配置文件
spring: redis: host: 127.0.0.1 port: 6379 3、Redis配置类
RedisConfig.java
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * RedisConfig * @author senfel * @version 1.0 * @date 2024/5/17 14:31 */ @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { @Bean @Primary public RedisTemplate redisTemplate(RedisConnectionFactory factory) { RedisTemplate template = new RedisTemplate<>(); template.setConnectionFactory(factory); Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jacksonSeial.setObjectMapper(om); template.setValueSerializer(jacksonSeial); template.setKeySerializer(stringRedisSerializer); template.setHashKeySerializer(stringRedisSerializer); template.setHashValueSerializer(jacksonSeial); template.afterPropertiesSet(); return template; } @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter topicAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了主题 webSocketMsgPush container.addMessageListener(topicAdapter, new PatternTopic("webSocketMsgPush")); return container; } /** * 消息监听器适配器,绑定消息处理器 * * @return */ @Bean MessageListenerAdapter topicAdapter() { return new MessageListenerAdapter(new RedisListener()); } } 4、Redis订阅监听
RedisListener.java
import com.alibaba.fastjson.JSONObject; import com.example.ccedemo.config.SpringUtil; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; /** * RedisListener * @author senfel * @version 1.0 * @date 2024/5/17 14:37 */ public class RedisListener implements MessageListener { @Override public void onMessage(Message msg, byte[] bytes) { System.out.println(".监听到需要进行负载转发的消息:" + msg.toString()); com.example.ccedemo.redissocket.Message message = JSONObject.parseObject(msg.toString(), com.example.ccedemo.redissocket.Message.class); SpringUtil.getBean(WebSocketServer.class).sendOneMessage(message.getToUserId(), message.getContentText()); } } 我们本地启动两个服务,分别开启端口8888、9999,然后用nginx暴露7777端口做一个负载均衡。


#服务器url变量定义 upstream api_service1 { server 192.168.1.4:8888; server 192.168.1.4:9999; } #nginx配置websocket map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { listen 7777; large_client_header_buffers 4 16k; client_max_body_size 300m; client_body_buffer_size 128k; proxy_connect_timeout 600; proxy_read_timeout 600; proxy_send_timeout 600; proxy_buffer_size 64k; proxy_buffers 4 32k; proxy_busy_buffers_size 64k; proxy_temp_file_write_size 64k; proxy_http_version 1.1; root /demo/page/dist; index index.html; #api location /api/ { proxy_pass http://api_service1/api/; proxy_set_header Host $http_host; } #nginx配置websocket location /ws/ { proxy_http_version 1.1; proxy_pass http://api_service1/ws/; proxy_redirect off; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_read_timeout 3600s; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; } #解决页面刷新404 location / { try_files $uri $uri/ @req; index index.html; } location @req { rewrite ^.*$ /index.html last; } } 1、浏览器开启多个无痕界面
http://192.168.1.4:7777/api/websocket/page
模拟对话:
多个界面的用户ID互补
2、分别开启soket,由于nginx轮询策略会分别注册在两个服务端上

3、客户端相互发送消息验证
由以上图片可知,我们两个客户端相互对话能够接收到对方推送的消息。那么,由此也可以证明我们后端Websocke集群使用Redis发布订阅的方式搭建成功。