最近项目里需要使用到websocket,主要用于前后端实时通信,项目中用到的场景是用户扫码乘车之后司机的设备需要语音提醒,思路是司机在打开乘车二维码时前端根据司机的用户ID发送websocket请求,后端在扫码乘车成功后发送消息,在本地测试一切正常,因为在本地与前端联调时没走网关,直接通过websocket所在服务端口来连接,但是上服务器由于服务端口不能对外暴漏,只能走网关或者通过nginx转发来实现,所以就出现了nginx转发成http请求这种问题。
这是websocket实现类代码:
WebSocketServerConvenientlife
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; //乘客扫码后司机角色登录的设备语音提示 @ServerEndpoint (value = "/convenientlife/websocket/{sessionId}") @Component public class WebSocketServerConvenientlife { private static Logger LOGGER = LoggerFactory.getLogger(WebSocketServerConvenientlife.class); //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static int onlineCount = 0; //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet<>(); //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; //接收sessionId(默认就是司机账户ID) private String sessionId; //接收message private JSONObject message; /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam ("sessionId") String sessionId) { LOGGER.info("---+++++ convenientlife模块成功连接,sessionId :{}", sessionId); this.session = session; this.sessionId = sessionId; webSocketSet.add(this); addOnlineCount(); LOGGER.info("当前在线连接数为:"+getOnlineCount()); try { sendMessage("{}"); } catch (IOException e) { LOGGER.error("websocket IO异常"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); subOnlineCount(); LOGGER.info("convenientlife模块有一连接关闭!当前在线连接数为:"+getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息*/ @OnMessage public void onMessage(String message, Session session) { JSONObject jsonObject = JSON.parseObject(message); if(jsonObject != null && StringUtils.isNotBlank(jsonObject.getString("sid"))) { String sid = jsonObject.getString("sid"); for(WebSocketServerConvenientlife item : webSocketSet) { if(sid.equals(item.sessionId)) { item.message = jsonObject; } } } } /** * * @param session 参数 * @param error 参数 */ @OnError public void onError(Session session, Throwable error) { LOGGER.error("convenientlife模块websocke 发生错误:"+error.getMessage()); } /** * 实现服务器主动推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群发自定义消息 * */ public static synchronized void sendInfo(String message, @PathParam ("sessionId") String sessionId) { for(WebSocketServerConvenientlife item : webSocketSet) { try { //只推送给指定的这个sessionId if(sessionId != null && item.sessionId.equals(sessionId)) { LOGGER.info("推送sessionId: "+sessionId+";推送内容:"+message); item.sendMessage(message); } } catch (IOException e) { LOGGER.error("sendInfo error: ", e); } } } /** * session是否存在 * @param sessionId 参数 * @return boolean */ public static synchronized boolean existsSesson(@PathParam ("sessionId") String sessionId) { if(StringUtils.isBlank(sessionId)) { return false; } for(WebSocketServerConvenientlife item : webSocketSet) { if(item.sessionId.equals(sessionId)) { return true; } } return false; } public static synchronized JSONObject getMessage(@PathParam ("sessionId") String sessionId) { if(StringUtils.isBlank(sessionId)) { return null; } for(WebSocketServerConvenientlife item : webSocketSet) { if(item.sessionId.equals(sessionId)) { return item.message; } } return null; } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServerConvenientlife.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServerConvenientlife.onlineCount--; } }
使用 @ServerEndpoint 注解定义 WebSocket 服务器端的终端点,WebSocket 终端点可以处理来自客户端的连接和消息,并可以向客户端发送消息。
WebSocketConfig类:
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { /** 部署时注释 start**/ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } /** 部署时注释 end**/ }
这个类是必须要的,配置和初始化 WebSocket 相关的设置和组件,以便在应用程序中启用 WebSocket 功能。
服务器nginx配置如下:
location /ws { proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_pass http://ip:6100; }
前缀为/ws开头的请求转发到6100端口去,6100是WebSocket所在服务的端口,于是我用postman测试了一下
就出现了WebSocket连接识别成http请求的错误了,看了一下nginx日志,也没发现报错信息,但确实将WebSocket连接识别成http请求转发到服务中去了
那这就很奇怪了,我明明nginx配置了WebSocket连接转发请求,然后框框一顿查,以为是服务不支持接收WebSocket请求,检查pom文件、配置文件,最后发现需要在nginx配置端口后面加一个接口前缀,改成这样
location /ws { proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_pass http://ip:6100/convenientlife; }
然后再用postman请求:
连接成功!!!查了一下大概原因:
服务器端应用程序的路径:在更改Nginx配置后,将WebSocket连接的请求代理到http://ip:6100/convenientlife
这个路径上。如果你的服务器端应用程序正好在/convenientlife
路径下,那么添加了接口前缀后,Nginx将正确地将WebSocket请求转发到正确的路径,从而实现了连接成功。