websocket长连接,概念就不描述了。先说我代码的应用场景,用户扫码支付后,后台通知到前端,前端二维码消失。废话不多说直接上代码
@Configuration
@ConditionalOnWebApplication
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
public MySpringConfigurator mySpringConfigurator() {
return new MySpringConfigurator();
}
}
先是配置类,这就不过多的解读
@ServerEndpoint(value = "/api/noauth/websocket/{userId}")
@Service
@Scope("prototype")
@Slf4j
public class WebSocketServer {
/**
* concurrent包的线程安全map,用来存放每个客户端对应的MyWebSocket对象。
*/
private static ConcurrentMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userId;
/**
* 连接建立成功调用的方法
**/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
// 加入map中
webSocketMap.put(userId, this);
// 在线人数
int i = webSocketMap.size();
log.info("窗口开始监听:" + userId + ",当前在线人数为" + i);
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("websocket IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
// 从map中删除
webSocketMap.remove(userId);
// 在线人数
int i = webSocketMap.size();
log.info("窗口:" + userId +"连接关闭!当前在线人数为" + i);
}
/**
* 收到客户端消息后调用的方法
*
* @param message
* 客户端发送过来的消息
**/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口" + userId + "的信息:" + message);
// 群发消息
webSocketMap.forEach((k, v) -> {
try {
// 只响应对应的客户端
if (userId.equals(k)) {
if ("ping".equals(message)) {
v.sendMessage("pong");
} else {
v.sendMessage(message);
}
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误",error);
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
synchronized (this.session) {
this.session.getBasicRemote().sendText(message);
}
}
/**
* 群发自定义消息
* */
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
log.info("推送消息到窗口" + userId + ",推送内容:" + message);
webSocketMap.forEach((k, v) -> {
try {
// 这里可以设定只推送给这个sid的,为null则全部推送
if (userId == null) {
v.sendMessage(message);
} else if (v.userId.equals(userId)) {
v.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
有很多兄弟也看了网上代码,我这是改版,首先将CopyOnWriteArraySet转换为ConcurrentMap,以用户id为key。。。之所以这样做是因为我们系统可以在浏览器多个账户同时登录,如果用CopyOnWriteArraySet就会造成上一个登录用户会被当前用户挤占,websocket消息紊乱,接受出错。
用synchronized 将每个会话锁住,不会出现上个要发送的消息被下个会话所发送。。
@Scope("prototype")则是将@Service服务的作用范围扩大,大家都知道@Service默认都是单例的,即每个服务都是一个全新的实例,这样就会造成每次请求都重新生成一个WebSocketServer,正常情况是大家都在一个server里面。通过各自的id来区分各自的socket
前端代码:
import store from '../store';
// websocket连接
var websocket_connected_count = 0;
var websocket_heartBreak_count = 0;
var onclose_connected_count = 0;
function newWebSocket(url, onmessage) {
var websocket = null;
// 判断当前环境是否支持websocket
if (window.WebSocket) {
if (!websocket) {
var ws_url = url.replace('https://', 'wss://').replace('http://', 'ws://')
websocket = new WebSocket(ws_url);
}
} else {
console.log("not support websocket");
}
// 连接成功建立的回调方法
websocket.onopen = function (e) {
heartCheck.reset().start(); // 成功建立连接后,重置心跳检测
reconnectSocket.reset().start()
console.log("websocket建立连接成功")
}
// 连接发生错误,连接错误时会继续尝试发起连接(尝试5次)
websocket.onerror = function () {
console.log("websocket连接发生错误")
websocket_connected_count++;
if (websocket_connected_count <= 5) {
newWebSocket()
}
}
// 接受到消息的回调方法
websocket.onmessage = function (e) {
console.log("接受到消息:")
console.log(e)
var res = checkIsJsonString(e.data)? JSON.parse(e.data):e.data;
//执行接收到消息的操作,一般是刷新UI
if (res) {
if(res === 'pong'){
console.log("接受到心跳-pong")
heartCheck.reset().start(); // 如果获取到消息,说明连接是正常的,重置心跳检测
}
if (res.bizToken || res.hasNewMsg) {
store.commit('user/setbizTokenInfo',res)
}
if (res.type === 'welfare') {
store.commit('welfarePlan/SET_WELFARE_PAY_MESSAGE', res.data)
}
}
}
// 接受到服务端关闭连接时的回调方法
websocket.onclose = function () {
console.log("onclose断开连接");
}
// 监听窗口事件,当窗口关闭时,主动断开websocket连接,防止连接没断开就关闭窗口,server端报错
window.onbeforeunload = function () {
websocket.close();
}
// 心跳检测, 每隔一段时间检测连接状态,如果处于连接中,就向server端主动发送消息,来重置server端与客户端的最大连接时间,如果已经断开了,发起重连。
var heartCheck = {
timeout: 9* 60*1000, // 9分钟发一次心跳,比server端设置的连接时间稍微小一点,在接近断开的情况下以通信的方式去重置连接时间。
serverTimeoutObj: null,
reset: function () {
clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
start: function () {
var self = this;
this.serverTimeoutObj = setInterval(function () {
if (websocket.readyState == 1) {
console.log("发送心跳-ping");
websocket.send("ping");
heartCheck.reset().start(); // 如果获取到消息,说明连接是正常的,重置心跳检测
} else {
console.log("断开状态,尝试重连");
websocket_heartBreak_count++;
if (websocket_heartBreak_count <= 5) {
newWebSocket()
}else{
websocket.close();
heartCheck.reset();
}
}
}, this.timeout)
}
}
var reconnectSocket={
timeout: 9* 60*1000, // 9分钟在接近断开的情况下以通信的方式去重置连接时间。
serverTimeoutObj: null,
reset: function () {
clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
start: function () {
var self = this;
this.serverTimeoutObj = setInterval(function () {
if (websocket.readyState == 1) {
console.log("重连。。。。。。");
newWebSocket(url)
} else {
console.log("断开状态,尝试重连");
websocket_heartBreak_count++;
if (websocket_heartBreak_count <= 5) {
newWebSocket(url)
}else{
websocket.close();
}
}
}, this.timeout)
}
}
}
export default newWebSocket
有时候大家会遇到连接一段时间后就会断开,很多网上会说做心跳检测。。也就是上图的 heartCheck方法,但是如果你用了nginx了,你的心跳检测是不会起作用的,
哪怕你把这些参数设置很长了(比如我们nginx设置了10分钟的连接时间了,这已经算是长的了)
keepalive_timeout
proxy_connect_timeout
proxy_send_timeout
proxy_read_timeout
到时效了也会断开连接,所以我这边又加了一个重连机制reconnectSocket,即在9分钟时进行一次重连操作,这样就可无缝持续的连接了。