websocket用于控制在当前页只允许一个用户进行操作,其他用户等待
websocket用于控制用户进入当前页的可操作性
需求
页面只允许一个用户操作,后续进入的用户进行排队。
功能描述
- 用户进入当前页,排在第一位则放行操作。
- 用户进入当前页,不是排在第一位则等待。
- 第一位用户操作完毕后,关闭页面,第一位用户弹出队列,许可第二位用户进行操作。
- 页面占用有时间限制,超出阈值,用户需要退出重进,提示超时。
- 页面排队数量限制,若用户进入页面,超出排队长度,则提示队列超长。
使用场景
从数据台账进入某一条数据,该数据页面用户可操作。操作后后台存在异步任务,在该条数据存在后台任务时,不允许用户操作。一条数据只允许第一个进入该数据操作页面的用户对其进行操作,其他用户需要等待,直到自己排在第一位时,放行操作。
代码实现
- 消息实体类
@Data
public class SocketResponse {//当前排队位次 1 表示第一位 可以放行操作private Integer number;//异步任务信号量 true表示无异步任务可放心 false表示有异步任务不放行操作private Boolean taskSignal;//超时信号量 true表示超时 false 表示未超时private Boolean timeout;//超过最大连接数private Boolean overMaxConnectNum;//重试信号 true退出重试private Boolean retrySignal;//放行信号 true放行 false 不放行private Boolean signal;//当不能放行时给出原因private String message;
}
- 配置参数读取
从yml中读取相关配置参数
@Data
@Component
@ConfigurationProperties(prefix = "xxx.web-socket")
public class WebSocketProperties {private int timeout;private int maxConnect;private int scheduleCleanDuration;
}
- 参数配置示例
xxx:web-socket:schedule-clean-duration: 15 #单位是分钟timeout: 24 #超时时间 单位 小时max-connect: 10 #每个表单实例允许的最大连接数量
- 长连接控制器
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson2.JSON;
import com.xxx.WebSocketProperties;
import com.xxx.SocketResponse;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** WebSocket的操作类*/
@Component
@Slf4j
/*** html页面与之关联的接口* var reqUrl = "http://localhost:8081/websocket/" + cid;* socket = new WebSocket(reqUrl.replace("http", "ws"));*/
@ServerEndpoint("/xxxWebsocket/{businessId}")
public class XXXWebSocketServer {public static final Long ONEHOURMILLISECONDS = 60 * 60 * 1000L;/*** 存放所有在线的客户端*/private static final Map<String, Queue<Session>> onlineSessionClientMap = new ConcurrentHashMap<>();//读取配置参数 按需自行设计存取方式
//我这里是存在yml文件中,参数有 session超时清理定时任务执行时间间隔,占用超时时间,单页面最大排队数private static final WebSocketProperties properties = SpringUtil.getBean(WebSocketProperties.class); //异步任务执行标记 key为表单实例 值为放行信号量 无任务在执行放行 true 有任务在不放行 falsepublic static final Map<String, Boolean> taskSignalMap = new ConcurrentHashMap<>();private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);static{scheduler.scheduleAtFixedRate(() -> {log.info("长连接定时任务清理启动");WebSocketServer server = SpringUtil.getBean(WebSocketServer.class);AtomicInteger number = new AtomicInteger();try {onlineSessionClientMap.forEach((businessId, sessions) -> {sessions.removeIf(session -> {long currentTime = System.currentTimeMillis();long createTime = (long) session.getUserProperties().getOrDefault("creationTime", 0L);if(currentTime - createTime > properties.getTimeout() * ONEHOURMILLISECONDS){number.getAndIncrement();server.sendToOneTimeOut(session, businessId);return true;}return false;});if(sessions.isEmpty()){onlineSessionClientMap.remove(businessId);}});} catch (Exception e) {log.error("长连接定时任务执行异常", e);}log.info("长连接定时任务清理完成, 清理连接数量:{}", number);}, 0, properties.getScheduleCleanDuration(), TimeUnit.MINUTES);}/*** 连接sid和连接会话*/private String businessId;private Session session;/*** 连接建立成功调用的方法。由前端<code>new WebSocket</code>触发** @param businessId 每次页面建立连接时传入到服务端的id,比如用户id等。可以自定义。* @param session 与某个客户端的连接会话,需要通过它来给客户端发送消息*/@OnOpen@ApiOperation(value = "连接建立成功调用的方法。由前端new WebSocket触发")public void onOpen(@PathParam("businessId") String businessId, Session session) {Queue<Session> q = onlineSessionClientMap.get(businessId);this.businessId = businessId;this.session = session;if(q == null || q.size() < properties.getMaxConnect()){onlineSessionClientMap.putIfAbsent(businessId, new ConcurrentLinkedQueue<>());onlineSessionClientMap.get(businessId).offer(session);//进队log.info("businessId {}, sessionId {} 长连接建立", businessId, session.getId());session.getUserProperties().put("creationTime", System.currentTimeMillis());sendToOne(false);}else{sendToOne(true);}}/*** 连接关闭调用的方法。由前端<code>socket.close()</code>触发** @param businessId* @param session*/@OnClose@ApiOperation(value = "连接关闭调用的方法。由前端<code>socket.close()</code>触发")public void onClose(@PathParam("businessId") String businessId, Session session) {// 从 Map中移除Queue<Session> sessions = onlineSessionClientMap.get(businessId);if(CollUtil.isNotEmpty(sessions)) {sessions.remove(session);}log.info("businessId {}, sessionId {} 长连接关闭", businessId, session.getId());//群发消息sendToAll(businessId);}/*** 收到客户端消息后调用的方法。由前端<code>socket.send</code>触发* * 当服务端执行toSession.getAsyncRemote().sendText(xxx)后,前端的socket.onmessage得到监听。** @param message* @param session*/@OnMessagepublic void onMessage(String message, Session session) {
// /**
// * html界面传递来得数据格式,可以自定义.
// * {"sid":"user-1","message":"hello websocket"}
// */
// JSONObject jsonObject = JSON.parseObject(message);
// String toSid = jsonObject.getString("sid");
// String msg = jsonObject.getString("message");
// log.info("服务端收到客户端消息 ==> fromSid = {}, toSid = {}, message = {}", sid, toSid, message);
//
// /**
// * 模拟约定:如果未指定sid信息,则群发,否则就单独发送
// */
// if (toSid == null || toSid == "" || "".equalsIgnoreCase(toSid)) {
// sendToAll(msg);
// } else {
// sendToOne(toSid, msg);
// }}/*** 发生错误调用的方法** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {// 记录详细错误log.error("WebSocket发生错误,businessId={}", businessId, error);// 可选:尝试安全地通知客户端sendAsyncRemote(session, null, "异常信息:"+error.getMessage());}/*** 群发消息**/public void sendToAll(String businessId) {Queue<Session> sessions = onlineSessionClientMap.get(businessId);if(CollUtil.isNotEmpty(sessions)){Boolean taskSignal = taskSignalMap.getOrDefault(businessId, true);int[] indexHolder = {0}; // 用于模拟索引递增sessions.forEach(session -> {SocketResponse socketResponse = new SocketResponse();socketResponse.setNumber(++indexHolder[0]);socketResponse.setTaskSignal(BooleanUtil.isTrue(taskSignal));dealWithSocketResponse(socketResponse);boolean res = sendBasicRemote(session, businessId, JSON.toJSONString(socketResponse));if(!res){//补位indexHolder[0]--;}});}}/*** 指定发送消息 清理超时session**/private void sendToOne(boolean connectNumberOver) {Queue<Session> q = onlineSessionClientMap.get(businessId);int size = 1;if(q != null){size = q.size();}SocketResponse socketResponse = new SocketResponse();socketResponse.setNumber(size);socketResponse.setOverMaxConnectNum(connectNumberOver);Boolean taskSignal = taskSignalMap.getOrDefault(businessId, true);socketResponse.setTaskSignal(BooleanUtil.isTrue(taskSignal));dealWithSocketResponse(socketResponse);//发送消息sendBasicRemote(session, businessId, JSON.toJSONString(socketResponse));}/*** 消息发送异常的清理掉连接* 仅仅移除元素* @param businessId* @param session*/private void onCloseOnly(String businessId, Session session) {// 从 Map中移除Queue<Session> sessions = onlineSessionClientMap.get(businessId);if(CollUtil.isNotEmpty(sessions)) {sessions.remove(session);}}/*** 异步消息发送* 成功发送返回true,发送失败返回false*/private boolean sendAsyncRemote(Session session, String businessId, String message) {boolean[] result = new boolean[1];result[0] = true;if (session != null && session.isOpen()) {session.getAsyncRemote().sendText(message, res -> {if(!res.isOK()){result[0] = false;onCloseOnly(businessId, session);log.error("businessId {} 发送消息失败:{},消息内容 {}", businessId, res.getException().getMessage(), message);}});}return result[0];}/*** 同步消息发送* 成功发送返回true,发送失败返回false*/private boolean sendBasicRemote(Session session, String businessId, String message) {boolean[] result = new boolean[1];result[0] = true;if (session != null && session.isOpen()) {try {session.getBasicRemote().sendText(message);} catch (IOException e) {result[0] = false;onCloseOnly(businessId, session);log.error("businessId {} 发送消息失败:{},消息内容 {}", businessId, e.getMessage(), message);}}return result[0];}/*** 指定发送消息**/private void sendToOneTimeOut(Session session, String businessId) {SocketResponse socketResponse = new SocketResponse();socketResponse.setTimeout(true);dealWithSocketResponse(socketResponse);//发送消息sendAsyncRemote(session, businessId, JSON.toJSONString(socketResponse));}/*** 完善响应体*/private void dealWithSocketResponse(SocketResponse res) {if(BooleanUtil.isTrue(res.getOverMaxConnectNum())) {//当前表单连接数量过多res.setSignal(false);res.setRetrySignal(true);res.setMessage("当前表单连接过多,连接建立失败");} else if(BooleanUtil.isTrue(res.getTimeout())){//队列前面有人 不放行res.setSignal(false);res.setRetrySignal(true);res.setMessage("连接超时请退出重进");} else if(BooleanUtil.isFalse(res.getTaskSignal())) {//任务信号量不放行res.setSignal(false);res.setMessage("当前表单实例存在异步任务请稍后");} else if(!Objects.equals(1, res.getNumber())){//队列前面有人 不放行res.setSignal(false);res.setMessage("当前排在第"+res.getNumber()+"位 请稍后");} else{//放行res.setSignal(true);}}}
- 后台调用示例
//关闭通行
XXXWebSocketServer.taskSignalMap.put(data.getBusinessId(), false);
//消息广播
XXXWebSocketServer.sendToAll(data.getBusinessId());
//异步任务
commonTaskExecutor.submit(() -> {try {//实际任务}catch (Exception e) {log.error("异步线程{}执行失败", Thread.currentThread().getName(), e);}finally{//放开通行XXXWebSocketServer.taskSignalMap.put(data.getBusinessId(), true);//消息广播XXXWebSocketServer.sendToAll(data.getBusinessId());}
});