当前位置: 首页 > web >正文

SpringBoot整合t-io是websocket实时通信

SpringBoot整合t-io是websocket实时通信

1. 前言

随着实时网络应用的普及,即时聊天、在线游戏和实时数据推送等,websocket技术越来越受到开发者的青睐。他允许客户端和服务器之间进行全双工、低延时的通信,从而实现更加流畅的用户体验。

本问将详细介绍如何使用SpringBoot整合t-io快速搭建一个功能完善的websocket服务器,我们将从websocket的基本原理开始,逐步讲解配置,实现消息处理器、处理客户端连接和消息广播等关键不走,最后通过实际测试验证服务器的功能。

2. websocket的原理概述

什么是websocket

网上关于websocket的讲解有很多,也很详尽,再这里我不着重介绍了。我们只需要知道,websocket 是一种在单个TCP连接上进行全双工通信的协议,旨在解决传统HTTP协议在实时通信场景下的不足。它允许服务器主动向客户端推送数据,客户端也可以随时向服务器发送消息,实现真正的实时双向通信;

TCP协议是什么

1)计算机体系结构

在这里插入图片描述
如上图所示展示了计算机结构的OSI七层模型以及TCP/IP概念模型

应用层:向客户提供一组常用的应用程序,比如电子邮件、文件传输访问、虚拟终端等
应用层协议:两个主机的两个应用程序之间进行相互交流的数据格式
传输层:提供应用程序间的通信。其功能包括:格式化信息流以及提供可靠传输
网络层:标记了互联网上每一台主机地址,负责相邻计算机之间的通信。
链路层:底层物理通路(线路)

2)TCP/IP协议

TCP/IP协议实际上是一个协议族。TCP/IP协议主要由网络层的IP协议 和 传输层的TCP协议组成 。IP 或 ICMP、TCP 或 UDP、TELNET 或 FTP、以及 HTTP 等都属于 TCP/IP 协议,他们与 TCP 或 IP 的关系紧密。因此,也称 TCP/IP 为网际协议群。
TCP负责发现传输的问题,一有问题就发出信号,要求重新传输,直到所有数据安全正确地传输到目的地。而IP是给因特网的每一台联网设备规定一个地址。
打个比方:TCP协议就相当于中国邮政快递,用来做运输IP协议就相当于邮政编码,用来唯一标记目的地。TCP协议是传输控制协议,工作在传输层。提供面向链接的,可靠的传输服务(三次握手,四次挥手)
面向链接:数据传输之前,客户端与服务器之间要建立连接,才可以传输数据
可靠的:数据传输是有序的,要对数据进行校验,数据不会丢失

websocket的工作流程

1)建立连接(握手):

客户端发送一个带有特殊头部的 HTTP 请求,表示希望升级协议到 WebSocket。
服务器接收到请求后,如果支持 WebSocket,则返回一个包含升级协议的响应,双方确认切换到 WebSocket 协议。

2)数据传输:

连接建立后,客户端和服务器可以在不经过额外握手的情况下随时发送数据。
数据以帧(frame)的形式传输,支持文本和二进制数据。

3)关闭连接:

任意一方都可以发送关闭帧来终止连接。
连接关闭后,双方需重新握手才能建立新的连接

websocket的优势

1.实时性强:支持服务器主动推送数据,减少了客户端轮询的延迟和资源消耗。
2.低开销:一次握手后保持连接,减少了 HTTP 轮询带来的额外开销。
3.全双工通信:客户端和服务器可以同时发送数据,通信更加灵活高效。
4.适用广泛:适用于聊天应用、实时通知、在线游戏、股票行情等多种场景

t-io整合websocket实现实时消息通信

1) 引入jar包
        <dependency><groupId>org.t-io</groupId><artifactId>tio-websocket-server</artifactId><version>3.7.2.v20210316-RELEASE</version></dependency><dependency><groupId>org.t-io</groupId><artifactId>tio-websocket-client</artifactId><version>3.7.2.v20210316-RELEASE</version></dependency>
2) 自定义websocket注解实现类
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({SocketAutoConfiguration.class, WebScoketProperties.class})
public @interface EnableTioSocketServer {
}
3) websocket 连接、握手,执行方法
@Slf4j
public abstract class AbstractScoketMsgHandler implements IWsMsgHandler {@Resourceprivate ApplicationContext applicationContext;/*** 握手时走这个方法,业务可以在这里获取cookie,request参数等** <li>对httpResponse参数进行补充并返回,如果返回null表示不想和对方建立连接,框架会断开连接,如果返回非null,框架会把这个对象发送给对方</li>* <li>注:请不要在这个方法中向对方发送任何消息,因为这个时候握手还没完成,发消息会导致协议交互失败。</li>* <li>对于大部分业务,该方法只需要一行代码:return httpResponse;</li>*/@Overridepublic HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {String token =null;try {token = httpRequest.getHeader("authorization");if (StrUtil.isEmpty(token)) {token = httpRequest.getParam("authorization");Map<String, String> map = new HashMap<>();map.put("authorization", "bearer " + token);httpRequest.setHeaders(map);}if (StrUtil.isEmpty(token)) {log.info("token没有获取到:{}拒绝连接", token);return null;}if (token.contains(" ")) {token = token.split(" ")[1];}String groupId = httpRequest.getParam("groupId");if (StringUtils.isNotEmpty(groupId)) {channelContext.set("groupId", groupId);String type = httpRequest.getParam("type");channelContext.set("type", type);log.info("新建数据绑定,组:{}", groupId);}return httpResponse;} catch (Exception ex) {log.info("解析token获取用户ID失败:{}拒绝连接", token);return null;}}/*** 握手成功后触发该方法* @author tanyaowu*/@Overridepublic void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {String type = httpRequest.getParam("type");if (StringUtils.isNotEmpty(type)) {switch (type) {case "live":Aio.sendToUser(channelContext.userid, Aio.getWsResponse("成功连接"));String groupId = httpRequest.getParam("groupId");LiveEvent liveEvent = new LiveEvent(this, groupId, channelContext.userid, 0);applicationContext.publishEvent(liveEvent);case "visit":String groupId1 = httpRequest.getParam("groupId");//绑定到群组,后面会有群发Aio.bindGroup(channelContext, groupId1);int count = Tio.getAll(channelContext.tioConfig).getObj().size();String msg = "{type:'online',name:'"+ channelContext.get("userName") + "',message:'" + channelContext.userid + " 进来了,共【" + count + "】人在线" + "'}";//将对象数据转为字符串String jsonString = JSON.toJSONString(msg);//群发Tio.sendToGroup(channelContext.tioConfig, groupId1, Aio.getWsResponse(jsonString));break;default: break;}}}/*** <li>当收到Opcode.BINARY消息时,执行该方法。也就是说如何你的ws是基于BINARY传输的,就会走到这个方法</li>* @return 可以是WsResponse、byte[]、ByteBuffer、String或null,如果是null,框架不会回消息* @author tanyaowu*/@Overridepublic Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {opcodeBytes(wsRequest, bytes, channelContext);return null;}/*** 当收到Opcode.CLOSE时,执行该方法,业务层在该方法中一般不需要写什么逻辑,空着就好* @return 可以是WsResponse、byte[]、ByteBuffer、String或null,如果是null,框架不会回消息* @author tanyaowu*/@Overridepublic Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {Tio.remove(channelContext, "WebSocket Close");String type = channelContext.get("type") + "";if (StringUtils.isNotEmpty(type)) {switch (type) {case "live":String groupId = channelContext.get("groupId") + "";LiveEvent liveEvent = new LiveEvent(this, groupId, channelContext.userid, 1);applicationContext.publishEvent(liveEvent);break;case "visit":String content = channelContext.get("content")+"";LiveEvent liveEvent1 = new LiveEvent(this, content, channelContext.userid, 1);applicationContext.publishEvent(liveEvent1);Tio.remove(channelContext, "receive close flag");break;default:break;}}return null;}/*** <li>当收到Opcode.TEXT消息时,执行该方法。也就是说如何你的ws是基于TEXT传输的,就会走到这个方法</li>** @return 可以是WsResponse、byte[]、ByteBuffer、String或null,如果是null,框架不会回消息* @author tanyaowu*/@Overridepublic Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {opcodeText(wsRequest, text, channelContext);//返回值是要发送给客户端的内容,一般都是返回nullreturn null;}public abstract Object opcodeText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception;public abstract Object opcodeBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception;
}
4) websocket的 AbstractScoketMsgHandler实现类 ScoketMsgHandler
@Slf4j
@Component
public class ScoketMsgHandler extends AbstractScoketMsgHandler{@Autowired(required = false)private IChatgptService iChatgptService;@Overridepublic Object opcodeText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {log.info("收到来自【{}】:用户的文本消息:{}", channelContext.userid, s);boolean json = JSONUtil.isJson(s);if(!json){iChatgptService.sendAndReceive(channelContext.userid, s);return null;}Map map = JSONUtil.toBean(s, Map.class);switch (String.valueOf(map.get("type"))){case "visit":log.info("回访信息");String contentJson = String.valueOf(map.get("content"));ChatHistoryDTO chatHistoryDTO = JSONUtil.toBean(contentJson, ChatHistoryDTO.class);Boolean b = Aio.sendToUser(chatHistoryDTO.getReceiveUserId(), Aio.getWsResponse(contentJson));/*ChatHistory chatHistory = new ChatHistory();chatHistory.setStatus("0");if(b){chatHistory.setStatus("1");}chatHistory.setFollowId(chatHistoryDTO.getFollowId());chatHistory.setCreateTime(new Date());chatHistory.setContent(chatHistoryDTO.getContent());chatHistory.setSendUserId(chatHistoryDTO.getReceiveUserId());chatHistory.setReceiveUserId(chatHistoryDTO.getSendUserId());chatHistory.setStatus("0");iChatHistoryService.save(chatHistory);*/break;default:break;}return null;}@Overridepublic Object opcodeBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {log.info("收到来自【{}】:用户的字节消息", channelContext.userid);return null;}
}public interface IChatgptService {void sendAndReceive(String userId, String prompt) throws Exception;
}public class ChatgptServiceImpl implements IChatgptService{@Autowiredprivate SparkClient sparkClient;private final static String GREETINGS="问候语";@Overridepublic void sendAndReceive(String userId, String prompt) throws Exception {if (prompt.equalsIgnoreCase(GREETINGS)) {//添加前置语WsResponse wsResponse = WsResponse.fromText("您好,我是您的健康小顾问,可以解答疾病、饮食、健康等多个方面的相关问题。比如您可以问我“糖尿病应该注意哪些问题?", "UTF-8");Aio.sendToUser(userId, wsResponse);return;}List<SparkMessage> messages=new ArrayList<>();//添加前置语messages.add(SparkMessage.userContent("请作为资深的健康咨询师回答关于职工健康和疾病预防的问题:".concat(prompt)));SparkRequest sparkRequest= SparkRequest.builder().messages(messages).maxTokens(500).temperature(0.5).apiVersion(SparkApiVersion.V3_5).build();sparkClient.chatStream(sparkRequest,new SparkConsoleListener(userId,prompt));}
}
5)websocket实现config

1.SocketAutoConfiguration 类

@Slf4j
@Import({WebSocketServerInitConfig.class})
public class SocketAutoConfiguration implements Serializable {@Autowired(required = false)private WebScoketProperties webScoketProperties;@Beanpublic WebSocketServerBootstrap webSocketServerBootstrap(){return new WebSocketServerBootstrap(webScoketProperties);}
}

2.WebScoketProperties 类

@Data
@Component
@ConfigurationProperties(prefix = "tio.config")
public class WebScoketProperties {/*** 服务名称*/private String name = "web-scoket";/*** 服务绑定的 IP 地址,默认不绑定*/private String ip = null;private int port = 6789;private int heartbeatTimeout = 5000;private SslProperties ssl;public boolean useSSL() {return ssl != null && ssl.keyStore != null && ssl.trustStore != null;}/*** ssl配置*/public static class SslProperties {private String keyStore;private String trustStore;private String password;public String getKeyStore() {return keyStore;}public void setKeyStore(String keyStore) {this.keyStore = keyStore;}public String getTrustStore() {return trustStore;}public void setTrustStore(String trustStore) {this.trustStore = trustStore;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}}
}

3.WebSocketServerBootstrap 类 socket自动配置类

@Slf4j
public class WebSocketServerBootstrap {
http://www.xdnf.cn/news/17111.html

相关文章:

  • LeetCode 分类刷题:16. 最接近的三数之和
  • 《汇编语言:基于X86处理器》第11章 复习题和练习
  • uiautomator2 编写测试流程-登陆后的酷狗01
  • 进程生命周期管理:从创建到终止的完整逻辑
  • 探索医学领域多模态人工智能的发展图景:技术挑战与临床应用的范围综述|文献速递-医学影像算法文献分享
  • iOS 内测上架流程详解:跨平台团队如何快速部署 TestFlight
  • 注解知识学习
  • 凹槽类零部件尺寸的检测方法有哪些 - 激光频率梳 3D 轮廓检测
  • [硬件电路-156]:什么是电信号? 电信号的本质:电信号是随时间变化的电压或电流。本质是电子运动表征信息,兼具能量传输与信息编码传递功能。
  • Mac电脑基本功能快捷键
  • EdgeView for macOS:解决图像管理痛点的利器
  • 设计模式 -> 策略模式(Strategy Pattern)
  • 经典设计模式
  • 验证码等待时间技术在酒店自助入住、美容自助与社区场景中的应用必要性研究—仙盟创梦IDE
  • Calcite自定义扩展SQL案例详细流程篇
  • 六、Linux核心服务与包管理
  • 前端 拼多多4399笔试题目
  • [自动化Adapt] 录制引擎 | iframe 穿透 | NTP | AIOSQLite | 数据分片
  • Connection refused: no further information: localhost/127.0.0.1:2375
  • 第四章:OSPF 协议
  • ssh服务器端口和本地端口映射
  • [spring-cloud: 服务发现]-源码解析
  • 旧笔记本电脑如何安装飞牛OS
  • 电商系统定制开发流程:ZKmall开源商城需求分析到上线全程可控
  • MySQL深度理解-MySQL锁机制
  • 【Mysql】日志--错误日志、二进制日志、查询日志、慢查询日志
  • Shell脚本-变量的定义规则
  • LLM调研
  • 【QT】概述
  • Azure DevOps — Kubernetes 上的自托管代理 — 第 4 部分