目录
WebSocket 是互联网项目中画龙点睛的应用,可以用于消息推送、站内信、在线聊天等业务。
WebSocket 是一种基于 TCP 的新网络协议,它是一种持久化的协议,实现了全双工通信,可以让服务器主动发送消息给客户端。
在 WebSocket 出现之前,要保持消息更新和推送一般采用轮询的方式,例如,开启一个服务进程每隔一段时间去发送请求给另外一个服务,以此获取最新的资源信息。这里都很阻塞请求,性能非常差,也会占用资源。所以考虑使用 WebSocket 来实现,使用连接实现传输,性能很高,整个客户端和服务之间交互的请求过程如图。
由图可知,最开始客户端也需要发起一次 http 请求,然后 WebSocket 协议需要通过已建立的 TCP 连接来传输数据,可见 WebSocket 和 HTTP 请求也有一些交集。但是 WebSocket 只用发起一次 HTTP 请求之后就可以通过回调机制不断地获取数据并进行交互。?
整合 WebSocket 到 SpringBoot?
在 pom.xml 中添加如下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
通信协议转换过程:?
服务端 API?
Tomcat的7.0.5 版本开始支持WebSocket,并且实现了Java WebSocket规范。
Java WebSocket应用由一系列的Endpoint组成。Endpoint 是一个java对象,代表WebSocket链接的一端,对于服务端,我们可以视为处理具体WebSocket消息的接口。
我们可以通过两种方式定义Endpoint:
Endpoint实例在WebSocket握手时创建,并在客户端与服务端链接过程中有效,最后在链接关闭时结束。在Endpoint接口中明确定义了与其生命周期相关的方法, 规范实现者确保生命周期的各个阶段调用实例的相关方法。?
服务端接受客户端发送的数据:
编程式 | 注解式 |
通过添加 MessageHandler 消息处理器来接收消息 | 在定义Endpoint时,通过@OnMessage注解指定接收消息的方法 |
服务端推送消息给客户端:
发送消息则由 RemoteEndpoint 完成, 其实例由 Session 维护。
发送消息有2种方式发送消息
WebSocket 使用 ws 或 wss 作为通信协议,和 HTTPS 协议类似,其中 wss 表示在 TLS 之上的 WebSocket。一个完整的 URL 如下:ws://example.com/api。实现聊天室的思路可分成以下两种:
编写一个配置文件,配置类代码:
package org.example.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
/**
* 配置 WebSocketEndpointServer
*
* 注入 ServerEndpointExporter 到web启动流程中
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
创建 WebSocket 服务端,需要通过注解来实现如下方法,见如下:
事件类型 | WebSocket 注解 | 事件描述 |
open | @OnOpen | 当打开连接后触发 |
message | @OnMessage | 当接收客户端信息时触发 |
error | @OnError | 当通信异常时触发 |
close | @OnClose | 当连接关闭时触发 |
下面正式编写 WebSocket 服务端代码,代码如下:
package org.example.componment;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.example.chatobject.Message;
import org.example.config.WebSocketConfig;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpSession;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@ServerEndpoint(value = "/chat"
,configurator = GetHttpSessionConfig.class)//标记此类为服务端
public class WebSocketChatSerever {
/*
使用线程安全的 Map存储会话
*/
private static Map<String, Session> onlineSession = new ConcurrentHashMap<>();
private HttpSession httpSession;
/*
当打开连接的时候,添加会话和更新在线人数
*/
@OnOpen
public void onOpen(Session session) {//WebSocket Session,不是httpSession
//1.将 session进行保存
onlineSession.put(session.getId(), session);
//2.广播消息推送给所有用户
sendMessageToAll(Message.toJsonResult(Message.ENTER, "", "", onlineSession.size()));
}
/* @OnOpen
public void onOpen(Session session,EndpointConfig endpointConfig) {//和 http ServerEndpointConfig 是同一个对象
//1.将 session进行保存
this.httpSession = (HttpSession) endpointConfig.getUserProperties().get(HttpSession.class.getName());
String user = (String) this.httpSession.getAttribute("user");
onlineSession.put(user, session);
//2.广播消息推送给所有用户
sendMessageToAll(Message.toJsonResult(Message.ENTER, "", "", onlineSession.size()));
}*/
/**
* 发送消息给所有人
* @param msg
*/
private static void sendMessageToAll(String msg) {
onlineSession.forEach((id, session) -> {
try {
//发送同步消息的方法 getBasicRemote,getAsyncRemote发送异步的消息。
session.getBasicRemote().sendText(msg);
} catch (IOException e) {
e.printStackTrace();
}
});
}
/* @OnMessage
public static void sendMessageToAll(String msg, Session session, @PathParam("nickname")String nickname) {
log.info("来自客户端:{} 发来的消息:{}",nickname,msg);
SocketConfig socketConfig;
ObjectMapper objectMapper = new ObjectMapper();
try {
socketConfig = objectMapper.readValue(msg, SocketConfig.class);
if (socketConfig.getType() == 1){//私聊
socketConfig.setFormUser(session.getId());
Session fromSession = map.get(socketConfig.getFormUser());
Session toSession = map.get(socketConfig.getToUser());
if (toSession != null){// 接收者存在,发送以下消息给接受者和发送者
fromSession.getAsyncRemote().sendText(nickname+": "+socketConfig.getMsg());
toSession.getAsyncRemote().sendText(nickname+": "+socketConfig.getMsg());
}else {
fromSession.getAsyncRemote().sendText("频道号不存在或者对方不在线");
}
}else {//群聊
broadcast(nickname+": "+socketConfig.getMsg());
}
} catch (Exception e) {
log.error("发送消息出错");
e.printStackTrace();
}
}*/
/**
* 浏览器发送消息给服务端,该方法会被调用
* 张三 ---> 李四
*/
@OnMessage
public void OnMessage(String msg){
//1.消息推送给指定的用户
Message message = JSON.parseObject(msg, Message.class);
//2.获取接收方的用户名
//3.获取接收方用户对象的session对象
//4.发送消息 session.getAsyncRemote().sendText(msg);
//未实现私聊,先这样
onlineSession.forEach((id, session) -> {
try {
//发送同步消息的方法 getBasicRemote,getAsyncRemote发送异步的消息。
session.getBasicRemote().sendText(msg);
} catch (IOException e) {
e.printStackTrace();
}
});
}
/*
当关闭连接,移除会话并减少在线人数。
*/
@OnClose
public void onClose(Session session) {
//1.从 onlineSession 中剔除当前的 session 对象
onlineSession.remove(session.getId());
//2.通知所有用户,当前用户下线了。
sendMessageToAll(Message.toJsonResult(Message.QUIT, "", "下线啦!", onlineSession.size()));
}
/*
当通信发生异常时,打印错误日志
*/
@OnError
public void OnError(Session session,Throwable error){
error.printStackTrace();
}
}
上面的代码就是对 onError、onMessage、onOpen 注解进行编写接口,使用线程安全的 Map 来存储对话信息,根据不同的操作对在线人数和当前会话用户进行删改。sessionid 对应每一个连接的用户,由于没有注册,所有保存用户信息用处不大。其中 Message 对象是对聊天消息进行封装的数据类,具体实现代码如下:
package org.example.chatobject;
import com.alibaba.fastjson.JSON;
/**
* 聊天对象
*/
public class Message {
//进入聊天
public static final String ENTER = "ENTER";
//聊天
public static final String TALK = "TALK";
//退出聊天
public static final String QUIT = "QUIT";
//消息类型
private String type;
//发送人
private String username;
//发送消息
private String message;
//在线人数
private int onlineCount;
//返回处理后的 json 结果
public Message(String type, String username, String message, int onlineCount) {
this.type = type;
this.username = username;
this.message = message;
this.onlineCount = onlineCount;
}
public static String toJsonResult(String type, String username, String message, int onlineCount) {
return JSON.toJSONString(new Message(type, username, message, onlineCount));
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public int getOnlineCount() {
return onlineCount;
}
public void setOnlineCount(int onlineCount) {
this.onlineCount = onlineCount;
}
}
为了更好地编写前端页面和处理参数(JSON 格式为主)传递,添加如下配置到 pom.xml:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.25</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
然后编写一个 Controller 文件来实现简单地路由和逻辑,具体实现代码如下:
package org.example.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;
import javax.servlet.http.HttpServletRequest;
@RestController
public class ChatController {
@GetMapping("/login")
public ModelAndView login() {
return new ModelAndView("/login");
}
@GetMapping("/chat")
public ModelAndView index(String username, String password, HttpServletRequest request) {
return new ModelAndView("/chat");
}
}
编写前端页面,运行项目,访问登录页面?http://localhost:8080/login,如图所示:
?
错误页面:
??
聊天页面,如图:
?在前端中也需要编写 WebSocket 的客户端操作,核心代码如下:
<script>
/**
* WebSocket客户端
*
*/
function createWebSocket() {
/**
* WebSocket客户端
*/
var serviceUri = 'ws://localhost:8080/chat';
var webSocket = new WebSocket(serviceUri);
/**
* 打开连接的时候
*/
webSocket.onopen = function (event) {
console.log('websocket打开连接....');
};
/**
* 接受服务端消息
*/
webSocket.onmessage = function (event) {
console.log('websocket:%c' + event.data);
// 获取服务端消息
var message = JSON.parse(event.data) || {};
var $messageContainer = $('.message_container');
if (message.type === 'TALK') {
var insertOneHtml = '<div class="mdui-card" style="margin: 10px 0;">' +
'<div class="some-class">' +
'<div class="message_content">' + message.username + ":" + message.message + '</div>' +
'</div></div>';
$messageContainer.append(insertOneHtml);
}
// 更新在线人数
$('#chat_num').text(message.onlineCount);
//防止刷屏
var $cards = $messageContainer.children('.mdui-card:visible').toArray();
if ($cards.length > 5) {
$cards.forEach(function (item, index) {
index < $cards.length - 5 && $(item).slideUp('fast');
});
}
};
/**
* 关闭连接
*/
webSocket.onclose = function (event) {
console.log('WebSocket关闭连接');
};
/**
* 通信失败
*/
webSocket.onerror = function (event) {
console.log('WebSocket发生异常');
};
return webSocket;
}
var webSocket = createWebSocket();
/**
* 通过WebSocket对象发送消息给服务端
*/
function sendMsgToServer() {
var $message = $('#msg');
if ($message.val()) {
webSocket.send(JSON.stringify({username: $('#username').text(), msg: $message.val()}));
$message.val(null);
}
}
/**
* 清屏
*/
function clearMsg() {
$(".message_container").empty();
}
/**
* 使用ENTER发送消息
*/
document.onkeydown = function (event) {
var e = event || window.event || arguments.callee.caller.arguments[0];
e.keyCode === 13 && sendMsgToServer();
};
</script>
其中,createWebSocket 方法是专门用来创建 WebSocket 对象的,并且封装了处理服务器的消息、错误处理、连接服务器等操作。
解析:
websocket对象创建:
let ?ws ?= ?new WebSocket(URL);
websocket对象相关事件?:
事件 | 事件处理程序 | 描述 |
open | ws.onopen | 连接建立时触发 |
message | ws.onmessage | 客户端接收到服务器发送的数据时触发 |
close | ws.onclose | 连接关闭时触发 |
websocket对象提供的方法?:
方法名称 | 描述 |
send() | 通过websocket对象调用该方法发送数据给服务端 |
所谓点到点消息传输,其实就是我们常说的私聊功能。一个用户相当于一个连接,两个用户之间在一个通道中进行消息传输,私密地聊天。
在之前项目地基础上,进一步引入一个新的概念:频道号。就像听广播一样,必须要在相同的频道上才能听到消息,同样地,两个用户必须在相同的频道上才能接受到对方发来的消息。
于是改写 WebSocket 类中的 OnMessage 方法,具体代码如下:
@OnMessage
public static void sendMessageToAll(String msg, Session session, @PathParam("nickname")String nickname) {
log.info("来自客户端:{} 发来的消息:{}",nickname,msg);
SocketConfig socketConfig;
ObjectMapper objectMapper = new ObjectMapper();
try {
socketConfig = objectMapper.readValue(msg, SocketConfig.class);
if (socketConfig.getType() == 1){//私聊
socketConfig.setFormUser(session.getId());
Session fromSession = map.get(socketConfig.getFormUser());
Session toSession = map.get(socketConfig.getToUser());
if (toSession != null){// 接收者存在,发送以下消息给接受者和发送者
fromSession.getAsyncRemote().sendText(nickname+": "+socketConfig.getMsg());
toSession.getAsyncRemote().sendText(nickname+": "+socketConfig.getMsg());
}else {
fromSession.getAsyncRemote().sendText("频道号不存在或者对方不在线");
}
}else {//群聊
broadcast(nickname+": "+socketConfig.getMsg());
}
} catch (Exception e) {
log.error("发送消息出错");
e.printStackTrace();
}
}
了解了 WebSocket 在Spring Boot 中的整合和实践,以群聊和私聊两大功能为例,具体讲解了相应的实现方法。对 WebSocket 的常用方法的封装,进行了详细的介绍。