话不多说,直接上代码看效果!
一、服务端:
1、引用依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、添加配置文件 WebSocketConfig
@Configuration
public class WebSocketConfig{
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3、编写WebSocket服务端接收、发送功能
? 声明接口代码:
public interface IWebSocketService {
void sendMessage(String message, List<String> toSids);
String getMessage(String msg);
}
? 实现类代码:
@Slf4j
@Service
@ServerEndpoint("/api/websocket/{sid}")
public class IWebSocketServiceImpl implements IWebSocketService {
@Autowired
RabbitTemplate rabbitTemplate;
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static AtomicInteger onlineCount = new AtomicInteger(0);
//concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
private static CopyOnWriteArraySet<IWebSocketServiceImpl> webSocketSet = new CopyOnWriteArraySet<>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//接收sid
private String sid = "";
@Override
public void sendMessage(String message, List<String> toSids) {
log.info("推送消息到客户端 " + toSids + ",推送内容:" + message);
for (IWebSocketServiceImpl item : webSocketSet) {
try {
if (CollectionUtils.isEmpty(toSids)) {
item.sendMessage(message);
} else if (toSids.contains(item.sid)) {
item.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
@Override
public String getMessage(String msg) {
return null;
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
this.session = session;
webSocketSet.add(this); // 加入set中
this.sid = sid;
addOnlineCount(); // 在线数加1
try {
// sendMessage("conn_success");
log.info("有新客户端开始监听,sid=" + sid + ",当前在线人数为:" + getOnlineCount());
} catch (Exception e) {
log.error("websocket IO Exception");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); // 从set中删除
subOnlineCount(); // 在线数减1
// 断开连接情况下,更新主板占用情况为释放
log.info("释放的sid=" + sid + "的客户端");
releaseResource();
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自客户端 sid=" + sid + " 的信息:" + message);
//此处为MQ接收功能,如果不使用则可以屏蔽,如果有需要可使用
// if (rabbitTemplate == null) {
// ApplicationContext context = springUtils.getApplicationContext();
// rabbitTemplate = context.getBean(RabbitTemplate.class);
// }
// rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, //RabbitmqConfig.ROUTINGKEY_RETURN, message);
// rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, //RabbitmqConfig.ROUTINGKEY_RETURN, message);
// 群发消息
// List<String> sids = new ArrayList<>();
// for (WebSocketServer item : webSocketSet) {
// sids.add(item.sid);
// }
// try {
// sendMessage("客户端 " + this.sid + "发布消息:" + message, sids);
// } catch (IOException e) {
// e.printStackTrace();
// }
}
private void releaseResource() {
// 这里写释放资源和要处理的业务
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 发生错误回调
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(session.getAsyncRemote() + "客户端发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送消息到 指定客户端
*/
public void sendMessage(String message) throws IOException {
this.session.getAsyncRemote().sendText(message);
}
/**
* 获取当前在线人数
*
* @return
*/
public int getOnlineCount() {
return onlineCount.get();
}
/**
* 当前在线人数 +1
*
* @return
*/
public void addOnlineCount() {
onlineCount.getAndIncrement();
}
/**
* 当前在线人数 -1
*
* @return
*/
public void subOnlineCount() {
onlineCount.getAndDecrement();
}
/**
* 获取当前在线客户端对应的WebSocket对象
*
* @return
*/
public CopyOnWriteArraySet<IWebSocketServiceImpl> getWebSocketSet() {
return webSocketSet;
}
}
4、如果不需要实现客户端功能,此处可选择前端调用,奉上代码
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Java后端WebSocket的Tomcat实现</title>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.min.js"></script>
</head>
<body>
<div id="message"></div>
<hr />
<div id="main"></div>
<div id="client"></div>
<input id="text" type="text" />
<button onclick="send()">发送消息</button>
<hr />
<button onclick="closeWebSocket()">关闭WebSocket连接</button>
</body>
<script type="text/javascript">
var cid = Math.floor(Math.random() * 100); // 随机生成客户端id
document.getElementById('client').innerHTML += "客户端 id = " + cid + '<br/>';
var websocket = null;
//判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
// 改成你的地址
websocket = new WebSocket("ws://127.0.0.1:9204/api/websocket/" + cid);
} else {
alert('当前浏览器 Not support websocket')
}
//连接发生错误的回调方法
websocket.onerror = function () {
setMessageInnerHTML("websocket.onerror: WebSocket连接发生错误");
};
//连接成功建立的回调方法
websocket.onopen = function () {
setMessageInnerHTML("websocket.onopen: WebSocket连接成功");
}
//接收到消息的回调方法
websocket.onmessage = function (event) {
setMessageInnerHTML("websocket.onmessage: " + event.data);
}
//连接关闭的回调方法
websocket.onclose = function () {
setMessageInnerHTML("websocket.onclose: WebSocket连接关闭");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
closeWebSocket();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭WebSocket连接
function closeWebSocket() {
websocket.close();
alert('websocket.close: 关闭websocket连接')
}
//发送消息
function send() {
var message = document.getElementById('text').value;
try {
websocket.send('{"msg":"' + message + '"}');
setMessageInnerHTML("websocket.send: " + message);
} catch (err) {
console.error("websocket.send: " + message + " 失败");
}
}
</script>
</html>
二、客户端:
1、引用依赖
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.3.8</version>
</dependency>
2、自定义WebSocket客户端,继承WebSocketClient类,实现发送、接收等功能
/**
* 自定义WebSocket客户端
*/
public class CustomizedWebSocketClient extends WebSocketClient {
private static final Logger logger = LoggerFactory.getLogger(CustomizedWebSocketClient.class);
//用来接收数据
private String excptMessage;
/**
* 线程安全的Boolean -是否受到消息
*/
public AtomicBoolean hasMessage = new AtomicBoolean(false);
/**
* 线程安全的Boolean -是否已经连接
*/
private AtomicBoolean hasConnection = new AtomicBoolean(false);
/**
* 构造方法
*
* @param serverUri
*/
public CustomizedWebSocketClient(URI serverUri) {
super(serverUri);
logger.info("CustomizeWebSocketClient init:" + serverUri.toString());
}
/**
* 打开连接是方法
*
* @param serverHandshake
*/
@Override
public void onOpen(ServerHandshake serverHandshake) {
logger.info("CustomizeWebSocketClient onOpen");
}
/**
* 收到消息时
*
* @param s
*/
@Override
public void onMessage(String s) {
hasMessage.set(true);
logger.info("CustomizeWebSocketClient onMessage:" + s);
}
public void sendMessage(String message){
this.send(message);
logger.info("已发送消息:" + message);
}
/**
* 当连接关闭时
*
* @param i
* @param s
* @param b
*/
@Override
public void onClose(int i, String s, boolean b) {
this.hasConnection.set(false);
this.hasMessage.set(false);
logger.info("CustomizeWebSocketClient onClose:" + s);
}
/**
* 发生error时
*
* @param e
*/
@Override
public void onError(Exception e) {
logger.error("CustomizeWebSocketClient onError:" + e);
}
@Override
public void connect() {
if(!this.hasConnection.get()){
super.connect();
hasConnection.set(true);
}
}
//获取接收到的信息
public String getExcptMessage() {
if(excptMessage != null){
String message = new String(excptMessage);
excptMessage = null;
return message;
}
return null;
}
}
3、创建连接封装类,uri对应socket服务的ip和端口号
public class WebSocketClientSingleton {
private static CustomizedWebSocketClient client;
private WebSocketClientSingleton(){}
public static CustomizedWebSocketClient getInstance() {
if (client == null || client.isClosed()) {
String webSocketUri = "ws://localhost:9204/api/websocket/12345678";
try {
//实例WebSocketClient对象,并连接到WebSocket服务端
client = new CustomizedWebSocketClient(new URI(webSocketUri));
client.connect();
//等待服务端响应
while (!client.getReadyState().equals(WebSocket.READYSTATE.OPEN)) {
System.out.println("连接中···请稍后");
Thread.sleep(1000);
}
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// return client;
}
return client;
}
public static void closeClient(CustomizedWebSocketClient client){
client.close();
}
}
三、测试
@Autowired
private IWebSocketService webSocketService;
调用代码:
//懒汉模式创建客户端连接
CustomizedWebSocketClient customizedWebSocketClient = WebSocketClientSingleton.getInstance();
//客户端发送消息
customizedWebSocketClient.sendMessage("----客户端发送消息啦-----1123");
//服务端发送消息
webSocketService.sendMessage("服务端发送消息啦----223333",null);
//关闭连接
WebSocketClientSingleton.closeClient(customizedWebSocketClient);
控制台打印效果:
总结完毕