【JAVA实现websocket】
JAVA实现websocket
- 背景
- 依赖
- 问题
- 代码实现
- 测试
背景
近期项目中需要用到websocket,实现即时通信。
依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
问题
使用websocket网页测试时,发现websocket连接不上,因为spring.security的问题,需要账号密码,可以在启动类中移除SecurityAutoConfiguration.class。
代码实现
onMessage方法用来接收消息,可以根据自己业务需要定义正常的心跳处理以及业务处理逻辑。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.travelsky.config.WebSocketConfigurator;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
@Component
@ServerEndpoint(value = "/websocket", configurator = WebSocketConfigurator.class)
public class WebSocketServer {//与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;private static final AtomicInteger OnlineCount = new AtomicInteger(0);// concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。public static ConcurrentHashMap<String, WebSocketServer> serverMap = new ConcurrentHashMap<>();private static final String LOCALHOST = "127.0.0.1";private final String ipAddress = LOCALHOST;/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session) {this.session = session;log.info("开始建立websocket方法连接,获取Session为:{}", session);log.info("连接建立 - Session ID: {}, Query: {}",session.getId(), session.getRequestURI().getQuery());// 如果已有连接,先关闭旧连接if (serverMap.containsKey(this.ipAddress)) {WebSocketServer oldServer = serverMap.get(this.ipAddress);try {oldServer.session.close();} catch (IOException e) {log.error("关闭旧连接失败", e);}}serverMap.put(this.ipAddress, this);int cnt = OnlineCount.incrementAndGet();log.info("有连接加入,当前连接数为:{},当前连接id为:{}", cnt, this.ipAddress);sendMessage("连接成功");log.info("{},已上线!", this.ipAddress);}@OnMessagepublic void onMessage(String message, Session session) {try {log.info("服务器收到了用户:{},发来的消息:{},当前ip集合为:{}", ipAddress, message, JSON.toJSONString(serverMap));//方便前端测试JSONObject jsonObject = JSON.parseObject(message);Object modeCode = jsonObject.get("ModeCode");log.info("获取到前端发送消息为:{}", modeCode);String string = modeCode.toString();sendMessage("pong");if (!string.contains("ping")) {//处理其它业务逻辑log.info("ModeCode为空则表示正常数据");//ModeCode为空则表示正常数据 处理其他消息Map map = JSON.parseObject(message, Map.class);log.info("打印JSON字符串:{}", JSON.toJSONString(map));}} catch (Exception e) {log.error("处理数据异常", e);onError(session, e);}}/*** 给ip地址为ip的客户端发送消息** @param ip ip地址* @param message 消息*/public static Boolean sendMessage(String ip, String message) {log.info("开始发送消息,serverMap数据为:{},ip为:{},消息为:{}", serverMap, ip, message);if (serverMap.containsKey(ip)) {WebSocketServer webSocketServer = serverMap.get(ip);webSocketServer.sendMessage(message);log.info("发送成功:{},当前连接数为:{}", ip, OnlineCount);return false;} else {log.error("发送失败,客户端未连接: {}", ip);return true;}}/*** 服务器主动发送消息** @param message 消息*/public void sendMessage(String message) {try {this.session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("发送消息异常:{}", e.getMessage());}}@OnClosepublic void onClose(Session session, CloseReason closeReason) {log.info("连接关闭 - Session ID: {}, Reason: {} (Code: {})",session.getId(),closeReason.getReasonPhrase(),closeReason.getCloseCode());if (serverMap.containsKey(ipAddress)) {serverMap.remove(ipAddress);int cnt = OnlineCount.decrementAndGet();log.info("有连接退出,当前连接数为:{},当前;退出id为:{}", cnt, this.ipAddress);log.info("{},已下线!", ipAddress);}}@OnErrorpublic void onError(Session session, Throwable throwable) {log.info("连接异常 - Session ID: {}", session.getId());if (serverMap.containsKey(ipAddress)) {serverMap.remove(ipAddress);int cnt = OnlineCount.decrementAndGet();log.error("用户{}发生了错误,具体如下:{},已下线!,当前连接数为:{}", ipAddress, throwable.getMessage(), cnt);}}private static synchronized void subOnlineCount() {OnlineCount.decrementAndGet();}public static synchronized void addOnlineCount() {OnlineCount.incrementAndGet();}public static WebSocketServer get(String ipAddress) {return serverMap.get(ipAddress);}public static ConcurrentHashMap<String, WebSocketServer> getMap() {return serverMap;}public static boolean isOnline(String ipAddress) {return serverMap.containsKey(ipAddress);}/*** 通用处理空格** @param s String*/private static String processDataSpace(String s, int length) {if (s == null || s.isEmpty()) {// 如果为空,直接返回空格return String.format("%-" + length + "s", s);}//如果长度超过则截取 不足则用空格补齐if (s.length() > length) {// 超过位则截断return s.substring(0, length);} else {// 不足位右补空格return String.format("%-" + length + "s", s);}}
}
测试
websocket在线测试
该链接可以在线测试websocket是否正常,数据格式需要满足自定义的要求。