用java注解来监听连接@ServerEndpoint、连接成功@OnOpen、连接失败@OnClose、收到消息等状态@OnMessage
把spring中的ServerEndpointExporter对象注入进来
4.0.0 com.heima ws-demo 1.0-SNAPSHOT 8 8 UTF-8 org.springframework.boot spring-boot-starter 2.7.3 org.projectlombok lombok 1.16.22 org.springframework.boot spring-boot-starter-websocket 2.7.14
package com.heima; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /*** * 监听websocket地址 /myWs */ @ServerEndpoint("/myWs") @Component @Slf4j @EnableScheduling public class WsServerEndpoint { static Map map = new ConcurrentHashMap(); /*** * 连接建立时执行的操作 * @param session */ @OnOpen public void onOpen(Session session) { map.put(session.getId(),session); log.info("websocket is open"); } /*** * 收到客户端消息执行的操作 * @param text */ @OnMessage public String OnMessage(String text) { log.info("收到了一条信息"+text); return "已收到你的信息" ; } /*** * 连接关闭时执行的操作 * @param session */ @OnClose public void OnClose(Session session) { map.remove(session.getId()); log.info("连接关闭时执行的操作"); } /*** * 向客户端发送信息 */ @Scheduled(fixedRate = 2000) public void sendMsg() throws IOException { for (String key : map.keySet()) { map.get(key).getBasicRemote().sendText("你好,你好"); } } }
2.1.3 WebSocketConfig
package com.heima; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
wsClient
package com.spring; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; import java.util.Map; /*** * 握手拦截器 */ @Component @Slf4j public class MyWsInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { log.info(request.getRemoteAddress().toString()+"开始握手"); return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { log.info(request.getRemoteAddress().toString()+"完成握手"); super.afterHandshake(request, response, wsHandler, ex); } }
import lombok.AllArgsConstructor; import lombok.Data; import org.springframework.web.socket.WebSocketSession; @Data @AllArgsConstructor public class SessionBean { private WebSocketSession webSocketSession; private Integer clientId; }
package com.spring; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.web.servlet.server.Session; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /*** * webSocket 主处理程序 */ @Component @Slf4j @EnableScheduling public class MyWsHandler extends AbstractWebSocketHandler { //map有并发线程问题 所以用ConcurrentHashMap private static Map map ; //id有并发问题 所以用Integer的安全类型 private static AtomicInteger clientIdMaker; static { map = new ConcurrentHashMap<>(); clientIdMaker=new AtomicInteger(0); } //连接建立 @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { super.afterConnectionEstablished(session); //将session 进一步封装 id采用的是自增 SessionBean sessionBean = new SessionBean(session, clientIdMaker.getAndIncrement()); map.put(session.getId(),sessionBean); log.info(map.get(session.getId()).getClientId()+"建立了连接"); } //收到消息 @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { super.handleTextMessage(session, message); log.info(map.get(session.getId()).getClientId()+":"+message.getPayload()); } //传输异常 @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { super.handleTransportError(session, exception); if (session.isOpen()) { session.close(); } map.remove(session.getId()); } //连接关闭 @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { super.afterConnectionClosed(session, status); log.info(map.get(session.getId()).getClientId()+"关闭连接"); } /*** * 向客户端发送信息 */ @Scheduled(fixedRate = 2000) public void sendMsg() throws IOException { for (String key : map.keySet()) { map.get(key).getWebSocketSession().sendMessage(new TextMessage("hello," + "spring socket")); } } }
package com.spring; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import javax.annotation.Resource; @Configuration @EnableWebSocket public class MyWsConfig implements WebSocketConfigurer { @Resource private MyWsHandler wsHandler; @Resource private MyWsInterceptor wsInterceptor; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(wsHandler,"/myWs1").addInterceptors(wsInterceptor).setAllowedOriginPatterns("*"); } }
wsClient