基于WebFlux的Websocket的实现,高级实现自定义功能拓展

发布时间:2024年01月13日

基于WebFlux的Websocket

一、导入XML依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<!-- 或者引入jackson -->
<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.26</version>
</dependency>

二、定义配置类,设置WebSocket拦截器

@Configuration
@EnableWebFlux
public class WebSocketConfig implements WebFluxConfigurer {
    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/chat", new MyWebSocketChatHandler());
        map.put("/ws/echo", new MyWebSocketEchoHandler());

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1); // 需要设置较高的优先级,以避免与其他处理程序冲突

        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

三、设置处理器,WebSocket的处理器

// 1. Echo的处理器
public class MyWebSocketEchoHandler implements WebSocketHandler {
    @NotNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(
                session.receive()
                        .map(msg -> "Echo: " + msg.getPayloadAsText())
                        .map(session::textMessage)
        );
    }
}

设置自定义的处理器(高级处理)

public class MyWebSocketChatHandler implements WebSocketHandler {
    private static final Map<String, WebSocketSession> userMap = new ConcurrentHashMap<>();
    private static final ObjectMapper objectMapper = new ObjectMapper();

    @NotNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String query = session.getHandshakeInfo().getUri().getQuery();
        Map<String, String> queryMap = getQueryMap(query);
        String userId = queryMap.getOrDefault("id", "");
        userMap.put(userId, session);
        System.out.println("当前用户:" + userId);
        System.out.println("当前在线人数:" + userMap.size());
        return session.receive().flatMap(webSocketMessage -> {
            String payload = webSocketMessage.getPayloadAsText();
            Message message;
            try {
                message = objectMapper.readValue(payload, Message.class);
                if (Integer.parseInt(message.getCode()) == CodeEnum.SUCCESS.getCode()) {
                    // 执行成功模式
                    Mono<Void> targetSession = SuccessMode(message);
                    if (targetSession != null) return targetSession;
                } else if (Integer.parseInt(message.getCode()) == CodeEnum.ERROR.getCode()) {
                    // 执行出错模式
                    return session.send(Mono.just(session.textMessage("消发送出错了")));
                } else {
                    // 其他code的功能实现
                    return session.send(Mono.just(session.textMessage("消息格式错误")));
                }

            } catch (JsonProcessingException e) {
                e.printStackTrace();
                // 这里一定要return,否则会导致线程卡死直接断开连接
                return session.send(Mono.just(session.textMessage(e.getMessage())));
            }
            return session.send(Mono.just(session.textMessage("目标用户不在线")));
        }).then().doFinally(signal -> userMap.remove(userId)); // 用户关闭连接后删除对应连接
    }

    @Nullable
    private static Mono<Void> SuccessMode(Message message) {
        String targetId = message.getTargetId();
        if (userMap.containsKey(targetId)) {
            WebSocketSession targetSession = userMap.get(targetId);
            if (null != targetSession) {
                WebSocketMessage textMessage = targetSession.textMessage(message.getMessageText());
                return targetSession.send(Mono.just(textMessage));
            }
        }
        return null;
    }

    // 其他的实现
    private Map<String, String> getQueryMap(String queryStr) {
        Map<String, String> queryMap = new HashMap<>();
        if (!StringUtils.isEmpty(queryStr)) {
            String[] queryParam = queryStr.split("&");
            Arrays.stream(queryParam).forEach(s -> {
                String[] kv = s.split("=", 2);
                String value = kv.length == 2 ? kv[1] : "";
                queryMap.put(kv[0], value);
            });
        }
        return queryMap;
    }
}

注意要先配置一个实体类映射—(注意客户端的信息一定也是要json格式不然会报错哟)

@Data
public class Message {
    @JsonProperty("code")
    private String code;
    @JsonProperty("targetId")
    private String targetId;
    @JsonProperty("messageText")
    private String messageText;
    @JsonProperty("userId")
    private String userId;
}

枚举类设置code对应的信息

public enum CodeEnum {
    SUCCESS(1),
    ERROR(2);
    // 其他枚举值...

    private final Integer code;

    CodeEnum(int code) {
        this.code = code;
    }

    public Integer getCode() {
        return code;
    }
}

最后运行即可。注意访问ws://localhost:8081/ws/chat?userId=123实现私聊的功能,访问ws://localhost:8081/ws/echo即可实现简单的服务器和客户端的回应。群聊功能可以根据自己的需求进行实现,只需要添加对应的code以及获取所有session并发送message即可。

文章来源:https://blog.csdn.net/m0_68856746/article/details/135567331
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。