目录
HttpSessionHandshakelnterceptor (抽象类):? ?握手拦截器,在握手前后添加操作
AbstractWebSocketHandler (抽象类) :? ?WebSocket处理程序,监听连接前,连接中,连接后WebSocketConfigurer (接口):? ? 配置程序,比如配置监听哪个端口,上面的握手拦截器,处理程序的使用
?握手拦截器,在握手前后添加操作
//握手拦截器
@Component
@Slf4j
public class MyWsInterceptor extends HttpSessionHandshakeInterceptor {
//用户开始连接
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
//从reuqest获取远程的地址
log.info("{}==》开始握手", request.getRemoteAddress().toString());
return super.beforeHandshake(request, response, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
//从reuqest获取远程的地址
log.info("{}==》握手成功后的返回", request.getRemoteAddress().toString());
super.afterHandshake(request, response, wsHandler, ex);
}
}
//WebSocket处理程序
@Slf4j
@Component //注入spring进来
public class MyWsHandler extends AbstractWebSocketHandler {
//同样的,为了快速访问,使用map去取值
private static Map<String, SessionBean> map;
//县线程安全的Integer
private static AtomicInteger clientIdMaker;
static {
map = new ConcurrentHashMap<>();
clientIdMaker = new AtomicInteger(0);
}
@Override
//连接建立后,保存session连接,保存在类对象map中
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
//返回并自增//半自定义操作
SessionBean sessionBean = new SessionBean(session, clientIdMaker.getAndIncrement());
map.put(session.getId(), sessionBean);
//自定义操作
//sb.append(map.get(session.getId()).getClientId() + "进入了群聊<br/>");
//sengMessage(map);
}
@Override
//收到消息
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
super.handleTextMessage(session, message);
//sb.append(map.get(session.getId()).getClientId() + ":传输的内容:" + message.getPayload() + "<br/>");
//log.info(String.valueOf(sb));
//sengMessage(map);
}
@Override
//传输出现异常
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.info("传输出现异常");
super.handleTransportError(session, exception);
if (session.isOpen()) {
log.info("传输出现异常,关闭session");
session.close();
map.remove(session.getId());
}
}
@Override
//连接关闭
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
map.remove(session.getId());
//Integer clientId = map.get(session.getId()).getClientId();
//log.info("()==>连接关闭", clientId);
//sb.append(clientId + "退出了群聊" + "<br/>");
//sengMessage(map);
}
//实现定时任务
@Scheduled(fixedRate = 2000) //每隔多少秒执行定时任务
public void sengMsg() throws IOException {
//向每个客户端发送请求
for (String s : map.keySet()) { //遍历对象
map.get(s) //或与session对象
.getWebSocketSession() //获取session
.sendMessage(new TextMessage("测试心跳")); //发送任务
}
}
static void sengMessage(Map<String, SessionBean> map) {
for (String key : map.keySet()) {
try {
map.get(key).getWebSocketSession().sendMessage(new TextMessage(sb.toString()));
} catch (IOException e) {
e.printStackTrace();
log.info(e.getMessage());
}
}
}
}
@Configuration
@EnableWebSocket
public class MyWsConfig implements WebSocketConfigurer {
@Resource
private MyWsHandler handler;
@Resource
private MyWsInterceptor interceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(handler, "/myWs1") //传入处理器,以及handler监听的地址
.addInterceptors(interceptor) //添加拦截器 注入自定义的握手拦截器
.setAllowedOrigins("*"); //允许的源
}
}