基于WebFlux的websocket的分组和群发实现

发布时间:2024年01月16日

一,分组发送

在WebFlux中实现分组发送数据和群发数据给所有客户端发送,可以借助Sinks.Many来管理消息流,并使用Flux进行订阅和发送消息。以下是一个示例代码,演示如何实现这两个功能:

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

@Component
public class ChatWebSocketHandler implements WebSocketHandler {

    private final Map<String, Many<ChatMessage>> groupChatMessages = new HashMap<>();
    private final Many<ChatMessage> allChatMessages = Sinks.many().multicast().onBackpressureBuffer();

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String username = session.getId(); // 使用WebSocket连接的唯一标识符作为用户名

        // 处理新用户加入聊天室
        Mono<Void> join = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .doOnNext(group -> joinGroup(username, group))
                .then();

        // 处理接收到的消息
        Mono<Void> chat = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(message -> {
                    ChatMessage chatMessage = new ChatMessage();
                    chatMessage.setSender(username);
                    chatMessage.setMessage(message);
                    chatMessage.setTimestamp(LocalDateTime.now());
                    return chatMessage;
                })
                .doOnNext(this::sendMessageToGroup)
                .then();

        // 关闭连接时,将用户从所有分组中移除
        Mono<Void> leave = session
                .close()
                .doFinally(signal -> leaveAllGroups(username));

        return Mono.zip(join, chat, leave).then();
    }

    private void joinGroup(String username, String group) {
        Many<ChatMessage> groupMessages = groupChatMessages.getOrDefault(group, Sinks.many().multicast().onBackpressureBuffer());
        groupChatMessages.put(group, groupMessages);
        groupMessages.asFlux().subscribe(); // 强制激活订阅以开始接收消息
    }

    private void sendMessageToGroup(ChatMessage chatMessage) {
        String group = chatMessage.getGroup();
        if (group != null && groupChatMessages.containsKey(group)) {
            Many<ChatMessage> groupMessages = groupChatMessages.get(group);
            groupMessages.tryEmitNext(chatMessage);
        }
        allChatMessages.tryEmitNext(chatMessage); // 群发给所有客户端
    }

    private void leaveAllGroups(String username) {
        groupChatMessages.values().forEach(groupMessages -> groupMessages.tryEmitNext(new ChatMessage(username, "left the chat room.", LocalDateTime.now())));
    }
}

在上述代码中,我们使用Sinks.Many来管理分组消息流和群发消息流。当有用户加入分组时,我们使用joinGroup()方法创建一个新的Many实例,并将其存储在groupChatMessages中。当收到消息时,我们通过sendMessageToGroup()方法选择性地将消息发送给特定分组的所有成员,并使用tryEmitNext()方法来发送消息到对应的消息流。同时,我们还维护了一个allChatMessages的消息流,用于群发给所有客户端。

通过使用groupChatMessagesallChatMessages管理消息流,你可以实现分组发送数据和群发数据给所有客户端发送的功能。

二, 广播发送

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.UnicastProcessor;

@Component
public class BroadcastWebSocketHandler implements WebSocketHandler {

    private final UnicastProcessor<String> messagePublisher;
    private final Flux<String> outputMessages;

    public BroadcastWebSocketHandler() {
        this.messagePublisher = UnicastProcessor.create();
        this.outputMessages = messagePublisher.replay(25).autoConnect();
    }

    public void broadcastMessage(String message) {
        messagePublisher.onNext(message);
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Flux<WebSocketMessage> input = session.receive();
        Flux<WebSocketMessage> output = outputMessages.map(session::textMessage);

        return session.send(output).and(input.then());
    }
}

上述代码中的BroadcastWebSocketHandler类实现了广播消息给所有连接的WebSocket客户端的功能。让我们逐行解释代码的实现:

  • messagePublisher是一个UnicastProcessor,用于发布新的消息。
  • outputMessages是一个Flux,用于保存最新的25条消息,并在每个新的WebSocket连接时自动发送这些消息。
  • broadcastMessage()方法用于向所有连接的WebSocket客户端广播消息。调用该方法时,新的消息将被发布到messagePublisher
  • handle()方法中,我们创建了一个input流来接收从客户端发送的消息。
  • 我们使用outputMessages流来创建一个output流,将最新的消息转换为WebSocketMessage对象。
  • 最后,我们将output流发送到客户端,并在input流完成时结束WebSocket会话。

通过这样的实现,当有新的消息到达时,所有连接的WebSocket客户端都会收到该消息。

三. 向特定用户发送消息:

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class UserWebSocketHandler implements WebSocketHandler {

    private final Map<String, Sinks.Many<String>> userSinks = new ConcurrentHashMap<>();

    public void sendMessageToUser(String userId, String message) {
        Sinks.Many<String> userSink = userSinks.get(userId);
        if (userSink != null) {
            userSink.tryEmitNext(message);
        }
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String userId = session.getHandshakeInfo().getUri().getQuery();
        Sinks.Many<String> userSink = Sinks.many().unicast().onBackpressureBuffer();
        userSinks.put(userId, userSink);

        Flux<String> input = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .subscribeOn(Schedulers.boundedElastic())
                .doFinally(signalType -> userSinks.remove(userId));

        Flux<String> output = userSink.asFlux().doOnNext(message ->
                session.send(Mono.just(session.textMessage(message))));

        return session.send(output).and(input.then());
    }
}

上述代码中的UserWebSocketHandler类实现了向特定用户发送消息的功能。让我们逐行解释代码的实现:

  • userSinks是一个ConcurrentHashMap,用于存储特定用户的Sinks.Many实例。
  • sendMessageToUser()方法用于向特定用户发送消息。我们通过用户ID从userSinks中获取相应的Sinks.Many实例,并使用tryEmitNext()方法发送消息。
  • handle()方法中,我们首先从WebSocket会话中获取用户ID,并创建一个新的Sinks.Many实例,并将其放入userSinks中。
  • 我们创建一个input流来接收从客户端发送的消息。使用subscribeOn(Schedulers.boundedElastic())将其放入弹性调度器中,以避免阻塞WebFlux线程。
  • output流中,我们将userSink转换为Flux,并在每个新的消息到达时发送给客户端。
  • 最后,我们将output流发送到客户端,并在input流完成时结束WebSocket会话。
    很抱歉,我无法直接为您编写一篇博客。然而,我可以为您提供一个关于基于WebFlux的WebSocket分组和群发实现的简要代码案例分析,您可以根据此信息撰写自己的博客。

总结:

本文将介绍如何使用Spring WebFlux框架实现WebSocket的分组和群发功能。WebSocket提供了一种双向通信的机制,能够在服务器和客户端之间实现实时的数据传输。在本文中,我们将使用WebFlux的响应式编程模型和相关的类库来实现WebSocket的分组和群发功能。

通过本文的介绍和代码案例分析,我们了解了如何使用Spring WebFlux框架实现WebSocket的分组和群发功能。这种实现方式能够满足实时通信和群聊等需求,并且借助WebFlux的响应式编程模型,能够处理大量并发连接和高吞吐量的场景。

文章来源:https://blog.csdn.net/m0_68856746/article/details/135613381
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。