WebSocket是一种在单个TCP连接上进行全双工通信的持久化协议。
全双工协议就是客户端可以给我们服务器发数据 服务器也可以主动给客户端发数据。
http协议是一种无状态,非持久化的单全双工应用层协议。
主要用于一问一答的方式交付信息,即客户端发送请求,服务器返回响应。这种模式适合于获取数据或者提交数据的场景。
所以http协议中,服务器无法主动给客户端发送数据,导致出现服务器数据状态发生改变,客户端无法感知。
针对上面的问题,http 勉强可以通过?定时轮询 和 长轮询 解决问题。
定时轮询:客户端不断地定时请求服务器, 询问数据状态变更的情况。
定时轮询的弊端:存在延时,浪费服务器资源和带宽,存在大量无效请求。
长轮询:拉长请求时间,客户端发送请求后,服务器在没有新数据时不会立即响应,而是等到有新数据时才返回响应。这种方法可以减少无效的请求,
长轮询的弊端:仍然需要频繁地建立和断开连接,且服务器需要维护未完成的请求,这可能会占用大量的服务器资源。
承上启下 所有最后我们websocket应运而生,它就是为了解决这个问题而设计的。
WebSocket协议可以实现全双工通信,即客户端和服务器可以在任何时候?相互?主动发送数据。此外,一旦WebSocket连接建立,客户端和服务器之间的连接将保持活动状态,直到被任何一方关闭。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
package com.ruoyi.framework.websocket;
import com.ruoyi.common.core.domain.model.LoginUser;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.framework.config.WebSocketConfig;
import com.ruoyi.framework.web.service.TokenService;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.LoggerFactory;
/**
* @author qujingye
* @Classname WebSocketServer
* @Description 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean
* @Date 2023/12/19 16:11
*/
@Component
@ServerEndpoint(value = "/websocket/message", configurator = WebSocketConfig.class)
public class WebSocketServer {
private static TokenService tokenService;
@Autowired
private void setOriginMessageSender(TokenService tokenService) {
WebSocketServer.tokenService = tokenService;
}
/**
* WebSocketServer 日志控制器
*/
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> sessionPool = new ConcurrentHashMap<>();
private final static AtomicLong atomicLong = new AtomicLong(0L);
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) throws Exception {
Long userId = parseUserId(session);
System.out.println(userId);
LOGGER.info("[WebSocket] 有新的连接, 当前用户id: {}", userId);
if (userId == null) {
return;
}
CopyOnWriteArrayList<Session> sessions = sessionPool.get(userId);
//不存在其他人登陆
if (null == sessions) {
sessions = new CopyOnWriteArrayList<>();
}
sessions.add(session);
sessionPool.put(userId, sessions);
atomicLong.getAndIncrement();
LOGGER.info("[WebSocket] 有新的连接, 当前连接数: {}", atomicLong.get());
}
/**
* 连接关闭时处理
*/
@OnClose
public void onClose(Session session) {
Long userId = parseUserId(session);
if (userId == null) {
return;
}
CopyOnWriteArrayList<Session> sessions = sessionPool.remove(userId);
CopyOnWriteArrayList<Session> newSessions = new CopyOnWriteArrayList<>();
for (Session s : sessions) {
if (!s.getId().equals(session.getId())) {
newSessions.add(s);
}
}
sessionPool.put(userId, newSessions);
atomicLong.getAndDecrement();
LOGGER.info("[WebSocket] 连接断开, 当前连接数: {}", atomicLong.get());
}
/**
* 抛出异常时处理
*/
@OnError
public void onError(Session session, Throwable exception) throws Exception {
LOGGER.error("用户错误:,原因:" + exception.getMessage());
}
/**
* 服务器接收到客户端消息时调用的方法
*/
@OnMessage
public void onMessage(String message, Session session) {
//把收到的消息发回去
session.getAsyncRemote().sendText(message);
LOGGER.info("message: {}", message);
}
/**
* 给该用户id的全部发送消息
*/
public void sendMessage(Long userId, String message) {
CopyOnWriteArrayList<Session> sessions = sessionPool.get(userId);
if (null == sessions || sessions.size() == 0) {
return;
}
sessions.forEach(s -> s.getAsyncRemote().sendText(message));
}
/**
* 获取用户id
*/
private Long parseUserId(Session session) {
String token = (String) session.getUserProperties().get(WebSocketConfig.WEBSOCKET_PROTOCOL);
if (StringUtils.isNotEmpty(token)) {
LoginUser loginUser = tokenService.getLoginUserByToken(token);
if (loginUser != null) {
return loginUser.getUserId();
}
}
return null;
}
}
注入ServerEndpointExporter来自动注册端点
package com.ruoyi.framework.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import java.util.List;
import java.util.Map;
/**
* @author qujingye
* @Classname WebSocketConfig
* @Description 继承服务器断点配置类
* @Date 2023/12/19 16:08
*/
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator {
/**
* WebSocket的协议头
*/
public final static String WEBSOCKET_PROTOCOL = "Sec-Websocket-Protocol";
/**
* 注入ServerEndpointExporter,这个Bean会自动注册使用了@ServerEndpoint注解声明的WebSocket Endpoint。
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
/**
* 建立握手时,连接前的操作
*/
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
// 这个用户属性userProperties 可以通过 session.getUserProperties()获取
final Map<String, Object> userProperties = sec.getUserProperties();
Map<String, List<String>> headers = request.getHeaders();
List<String> protocol = headers.get(WEBSOCKET_PROTOCOL);
// 存放自己想要的header信息
if (protocol != null) {
userProperties.put(WEBSOCKET_PROTOCOL, protocol.get(0));
}
}
/**
* 创建端点实例,也就是被@ServerEndpoint所标注的对象
*/
@Override
public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
return super.getEndpointInstance(clazz);
}
}
package com.ruoyi.framework.security.filter;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.framework.config.WebSocketConfig;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.servlet.*;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
* FileName: com.admin.security.filter WebsocketFilter
* Date: 2023/8/1 16:42
*
* @author Messylee
*/
@Order(1)
@Component
@WebFilter(filterName = "WebsocketFilter", urlPatterns = "/ws/**")
public class WebsocketFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletResponse response = (HttpServletResponse) servletResponse;
HttpServletRequest headers = (HttpServletRequest) servletRequest;
String token = headers.getHeader(WebSocketConfig.WEBSOCKET_PROTOCOL);
if (StringUtils.isNotEmpty(token)){
response.setHeader(WebSocketConfig.WEBSOCKET_PROTOCOL, token);
}
filterChain.doFilter(servletRequest, servletResponse);
}
}
<template>
<div class="app-container home">
<el-row :gutter="20">
<el-col :sm="24" :lg="24">
<h1>集成websocket测试</h1>
</el-col>
</el-row>
<el-row :gutter="20">
<el-col :sm="24" :lg="24">
<div>
<el-input v-model="url" type="text" style="width: 20%" />
<el-button @click="join" type="primary">连接</el-button>
<el-button @click="exit" type="danger">断开</el-button>
<el-button @click="resetForm" type="success">重置</el-button>
<br />
<br />
<el-input type="textarea" v-model="message" :rows="9" />
<br />
<br />
<el-button type="success" @click="send">发送消息</el-button>
<br />
<br />
返回内容
<el-input type="textarea" v-model="text_content" :rows="9" />
<br />
<br />
</div>
</el-col>
</el-row>
</div>
</template>
<script>
import { getToken } from "@/utils/auth";
export default {
name: "Index",
data() {
return {
url: "ws://127.0.0.1:8080/websocket/message",
message: "",
text_content: "",
ws: null,
headers: {
Authorization: "Bearer " + getToken(),
},
};
},
methods: {
join() {
const wsuri = this.url;
// this.ws = new WebSocket(wsuri);
this.ws = new WebSocket(wsuri, [getToken()]);
const self = this;
// 连接成功后调用
this.ws.onopen = function (event) {
self.text_content = self.text_content + "WebSocket连接成功!" + "\n";
};
this.ws.onerror = function (event) {
self.text_content = self.text_content + "WebSocket连接发生错误!" + "\n";
};
// 接收后端消息
this.ws.onmessage = function (event) {
self.text_content = self.text_content + event.data + "\n";
};
// 关闭连接时调用
this.ws.onclose = function (event) {
self.text_content = self.text_content + "已经关闭连接!" + "\n";
};
},
exit() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
},
send() {
if (this.ws) {
this.ws.send(this.message);
} else {
alert("未连接到服务器");
}
},
//重置
resetForm() {
this.message = "";
this.text_content = "";
},
},
};
</script>