当前位置: 首页 > backend >正文

websocket用于控制在当前页只允许一个用户进行操作,其他用户等待

websocket用于控制用户进入当前页的可操作性

需求

页面只允许一个用户操作,后续进入的用户进行排队。

功能描述

  1. 用户进入当前页,排在第一位则放行操作。
  2. 用户进入当前页,不是排在第一位则等待。
  3. 第一位用户操作完毕后,关闭页面,第一位用户弹出队列,许可第二位用户进行操作。
  4. 页面占用有时间限制,超出阈值,用户需要退出重进,提示超时。
  5. 页面排队数量限制,若用户进入页面,超出排队长度,则提示队列超长。

使用场景

从数据台账进入某一条数据,该数据页面用户可操作。操作后后台存在异步任务,在该条数据存在后台任务时,不允许用户操作。一条数据只允许第一个进入该数据操作页面的用户对其进行操作,其他用户需要等待,直到自己排在第一位时,放行操作。

代码实现

  1. 消息实体类
@Data
public class SocketResponse {//当前排队位次 1 表示第一位 可以放行操作private Integer number;//异步任务信号量 true表示无异步任务可放心  false表示有异步任务不放行操作private Boolean taskSignal;//超时信号量 true表示超时 false 表示未超时private Boolean timeout;//超过最大连接数private Boolean overMaxConnectNum;//重试信号 true退出重试private Boolean retrySignal;//放行信号 true放行 false 不放行private Boolean signal;//当不能放行时给出原因private String message;
}
  1. 配置参数读取
    从yml中读取相关配置参数
@Data
@Component
@ConfigurationProperties(prefix = "xxx.web-socket")
public class WebSocketProperties {private int timeout;private int maxConnect;private int scheduleCleanDuration;
}
  1. 参数配置示例
xxx:web-socket:schedule-clean-duration: 15 #单位是分钟timeout: 24 #超时时间 单位 小时max-connect: 10 #每个表单实例允许的最大连接数量
  1. 长连接控制器
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson2.JSON;
import com.xxx.WebSocketProperties;
import com.xxx.SocketResponse;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** WebSocket的操作类*/
@Component
@Slf4j
/*** html页面与之关联的接口* var reqUrl = "http://localhost:8081/websocket/" + cid;* socket = new WebSocket(reqUrl.replace("http", "ws"));*/
@ServerEndpoint("/xxxWebsocket/{businessId}")
public class XXXWebSocketServer {public static final Long ONEHOURMILLISECONDS = 60 * 60 * 1000L;/*** 存放所有在线的客户端*/private static final Map<String, Queue<Session>> onlineSessionClientMap = new ConcurrentHashMap<>();//读取配置参数 按需自行设计存取方式 
//我这里是存在yml文件中,参数有  session超时清理定时任务执行时间间隔,占用超时时间,单页面最大排队数private static final WebSocketProperties properties = SpringUtil.getBean(WebSocketProperties.class); //异步任务执行标记 key为表单实例 值为放行信号量  无任务在执行放行 true  有任务在不放行 falsepublic static final Map<String, Boolean> taskSignalMap = new ConcurrentHashMap<>();private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);static{scheduler.scheduleAtFixedRate(() -> {log.info("长连接定时任务清理启动");WebSocketServer server = SpringUtil.getBean(WebSocketServer.class);AtomicInteger number = new AtomicInteger();try {onlineSessionClientMap.forEach((businessId, sessions) -> {sessions.removeIf(session -> {long currentTime = System.currentTimeMillis();long createTime = (long) session.getUserProperties().getOrDefault("creationTime", 0L);if(currentTime - createTime > properties.getTimeout() * ONEHOURMILLISECONDS){number.getAndIncrement();server.sendToOneTimeOut(session, businessId);return true;}return false;});if(sessions.isEmpty()){onlineSessionClientMap.remove(businessId);}});} catch (Exception e) {log.error("长连接定时任务执行异常", e);}log.info("长连接定时任务清理完成, 清理连接数量:{}", number);}, 0, properties.getScheduleCleanDuration(), TimeUnit.MINUTES);}/*** 连接sid和连接会话*/private String businessId;private Session session;/*** 连接建立成功调用的方法。由前端<code>new WebSocket</code>触发** @param businessId     每次页面建立连接时传入到服务端的id,比如用户id等。可以自定义。* @param session 与某个客户端的连接会话,需要通过它来给客户端发送消息*/@OnOpen@ApiOperation(value = "连接建立成功调用的方法。由前端new WebSocket触发")public void onOpen(@PathParam("businessId") String businessId, Session session) {Queue<Session> q = onlineSessionClientMap.get(businessId);this.businessId = businessId;this.session = session;if(q == null || q.size() < properties.getMaxConnect()){onlineSessionClientMap.putIfAbsent(businessId, new ConcurrentLinkedQueue<>());onlineSessionClientMap.get(businessId).offer(session);//进队log.info("businessId {}, sessionId {} 长连接建立", businessId, session.getId());session.getUserProperties().put("creationTime", System.currentTimeMillis());sendToOne(false);}else{sendToOne(true);}}/*** 连接关闭调用的方法。由前端<code>socket.close()</code>触发** @param businessId* @param session*/@OnClose@ApiOperation(value = "连接关闭调用的方法。由前端<code>socket.close()</code>触发")public void onClose(@PathParam("businessId") String businessId, Session session) {// 从 Map中移除Queue<Session> sessions = onlineSessionClientMap.get(businessId);if(CollUtil.isNotEmpty(sessions)) {sessions.remove(session);}log.info("businessId {}, sessionId {} 长连接关闭", businessId, session.getId());//群发消息sendToAll(businessId);}/*** 收到客户端消息后调用的方法。由前端<code>socket.send</code>触发* * 当服务端执行toSession.getAsyncRemote().sendText(xxx)后,前端的socket.onmessage得到监听。** @param message* @param session*/@OnMessagepublic void onMessage(String message, Session session) {
//        /**
//         * html界面传递来得数据格式,可以自定义.
//         * {"sid":"user-1","message":"hello websocket"}
//         */
//        JSONObject jsonObject = JSON.parseObject(message);
//        String toSid = jsonObject.getString("sid");
//        String msg = jsonObject.getString("message");
//        log.info("服务端收到客户端消息 ==> fromSid = {}, toSid = {}, message = {}", sid, toSid, message);
//
//        /**
//         * 模拟约定:如果未指定sid信息,则群发,否则就单独发送
//         */
//        if (toSid == null || toSid == "" || "".equalsIgnoreCase(toSid)) {
//            sendToAll(msg);
//        } else {
//            sendToOne(toSid, msg);
//        }}/*** 发生错误调用的方法** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {// 记录详细错误log.error("WebSocket发生错误,businessId={}", businessId, error);// 可选:尝试安全地通知客户端sendAsyncRemote(session, null, "异常信息:"+error.getMessage());}/*** 群发消息**/public void sendToAll(String businessId) {Queue<Session> sessions = onlineSessionClientMap.get(businessId);if(CollUtil.isNotEmpty(sessions)){Boolean taskSignal = taskSignalMap.getOrDefault(businessId, true);int[] indexHolder = {0}; // 用于模拟索引递增sessions.forEach(session -> {SocketResponse socketResponse = new SocketResponse();socketResponse.setNumber(++indexHolder[0]);socketResponse.setTaskSignal(BooleanUtil.isTrue(taskSignal));dealWithSocketResponse(socketResponse);boolean res = sendBasicRemote(session, businessId, JSON.toJSONString(socketResponse));if(!res){//补位indexHolder[0]--;}});}}/*** 指定发送消息 清理超时session**/private void sendToOne(boolean connectNumberOver) {Queue<Session> q = onlineSessionClientMap.get(businessId);int size = 1;if(q != null){size = q.size();}SocketResponse socketResponse = new SocketResponse();socketResponse.setNumber(size);socketResponse.setOverMaxConnectNum(connectNumberOver);Boolean taskSignal = taskSignalMap.getOrDefault(businessId, true);socketResponse.setTaskSignal(BooleanUtil.isTrue(taskSignal));dealWithSocketResponse(socketResponse);//发送消息sendBasicRemote(session, businessId, JSON.toJSONString(socketResponse));}/*** 消息发送异常的清理掉连接* 仅仅移除元素* @param businessId* @param session*/private void onCloseOnly(String businessId, Session session) {// 从 Map中移除Queue<Session> sessions = onlineSessionClientMap.get(businessId);if(CollUtil.isNotEmpty(sessions)) {sessions.remove(session);}}/*** 异步消息发送* 成功发送返回true,发送失败返回false*/private boolean sendAsyncRemote(Session session, String businessId, String message) {boolean[] result = new boolean[1];result[0] = true;if (session != null && session.isOpen()) {session.getAsyncRemote().sendText(message, res -> {if(!res.isOK()){result[0] = false;onCloseOnly(businessId, session);log.error("businessId {} 发送消息失败:{},消息内容 {}", businessId, res.getException().getMessage(), message);}});}return result[0];}/*** 同步消息发送* 成功发送返回true,发送失败返回false*/private boolean sendBasicRemote(Session session, String businessId, String message) {boolean[] result = new boolean[1];result[0] = true;if (session != null && session.isOpen()) {try {session.getBasicRemote().sendText(message);} catch (IOException e) {result[0] = false;onCloseOnly(businessId, session);log.error("businessId {} 发送消息失败:{},消息内容 {}", businessId, e.getMessage(), message);}}return result[0];}/*** 指定发送消息**/private void sendToOneTimeOut(Session session, String businessId) {SocketResponse socketResponse = new SocketResponse();socketResponse.setTimeout(true);dealWithSocketResponse(socketResponse);//发送消息sendAsyncRemote(session, businessId, JSON.toJSONString(socketResponse));}/*** 完善响应体*/private void dealWithSocketResponse(SocketResponse res) {if(BooleanUtil.isTrue(res.getOverMaxConnectNum())) {//当前表单连接数量过多res.setSignal(false);res.setRetrySignal(true);res.setMessage("当前表单连接过多,连接建立失败");} else if(BooleanUtil.isTrue(res.getTimeout())){//队列前面有人 不放行res.setSignal(false);res.setRetrySignal(true);res.setMessage("连接超时请退出重进");} else if(BooleanUtil.isFalse(res.getTaskSignal())) {//任务信号量不放行res.setSignal(false);res.setMessage("当前表单实例存在异步任务请稍后");}  else if(!Objects.equals(1, res.getNumber())){//队列前面有人 不放行res.setSignal(false);res.setMessage("当前排在第"+res.getNumber()+"位 请稍后");} else{//放行res.setSignal(true);}}}
  1. 后台调用示例
//关闭通行
XXXWebSocketServer.taskSignalMap.put(data.getBusinessId(), false);
//消息广播
XXXWebSocketServer.sendToAll(data.getBusinessId());
//异步任务
commonTaskExecutor.submit(() -> {try {//实际任务}catch (Exception e) {log.error("异步线程{}执行失败", Thread.currentThread().getName(), e);}finally{//放开通行XXXWebSocketServer.taskSignalMap.put(data.getBusinessId(), true);//消息广播XXXWebSocketServer.sendToAll(data.getBusinessId());}
});
http://www.xdnf.cn/news/19780.html

相关文章:

  • 硬件(一)51单片机
  • 阿里开源首个图像生成基础模型——Qwen-Image本地部署教程,中文渲染能力刷新SOTA
  • HTTP 协议核心组件与安全扩展深度解析
  • 机器学习与深度学习的 Python 基础之 NumPy(2)
  • uniapp+vue3 微信小程序全屏广告组件功能
  • AI IDE+AI 辅助编程,真能让程序员 “告别 996” 吗?
  • 【LeetCode_283】移动零
  • 技术小白如何快速的了解opentenbase?--把握四大特色
  • XE 旧版本 JSON 处理
  • 使用 Uni-app 打包 外链地址APK 及 iOS 注意事项
  • K8S-基础架构
  • 离开职场2个月,后知后觉的反思。
  • 素材合集!直播间带货音乐BGM合集,抖音直播间常用热门音乐合集,根据中文分类,方便查找
  • 力扣hot100:矩阵置零(73)(原地算法)
  • 【Python语法基础学习笔记】类的定义和使用
  • WSL + VSCode + Git + Node.js 开发环境配置文档
  • python数据分析 与spark、hive数据分析对比
  • 使用pyspark对上百亿行的hive表生成稀疏向量
  • 2025年COR IOTJ SCI2区,灾后通信无人机基站位置优化和移动充电无人机路径规划,深度解析+性能实测
  • Android aoap开发常见问题之package_allowed_list.txt导致的编译报错
  • 深度学习------模型的保存和使用
  • 深度学习篇---Adam优化器
  • Docker Pull 代理配置方法
  • 【正则表达式】 正则表达式有哪些语法?
  • Low-Light Image Enhancement via Structure Modeling and Guidance 论文阅读
  • AP5414:高效灵活的LED驱动解决方案,点亮创意生活
  • go大厂真实的面试经历与总结
  • 心路历程-初识Linux用户
  • EasyExcel 基础用法
  • 如何在FastAPI中巧妙隔离依赖项,让单元测试不再头疼?