通过WebSocket实现异步导出

发布时间:2024年01月11日

前言

本篇文章记录大批量数据导出时间过长,导致接口请求超时问题。

解决思路与流程

  1. 前端点击导出按钮时开启websocket连接
  2. 逻辑处理异步执行
  3. 文件处理好后,得到文件的绝对路径
  4. 后台socket通知前端绝对路径的地址
  5. 前端下载文件到浏览器

文章目录

本地环境

?一、WebSocket配置

1.pom文件配置

2.信号量相关处理

3.websocket 配置

4.websocket 消息处理

5.websocket 用户集

二、建立WebSocket连接

1.导出按钮

2.导出方法

3.导出接口地址

4.流程说明

???????三、异步导出接口

1.导出接口示例

四、服务器部署nginx配置

总结


本地环境

Vue版本:2.6.12

Java版本:1.8

SpringBoot版本:2.2.13

项目使用若依前后分离版

?一、WebSocket配置

1.pom文件配置

<!-- SpringBoot Websocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.2.13.RELEASE</version>
</dependency>

2.信号量相关处理

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Semaphore;

/**
 * 信号量相关处理
 *
 * @author
 */
public class SemaphoreUtils {

    /**
     * SemaphoreUtils 日志控制器
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);

    /**
     * 获取信号量
     *
     * @param semaphore
     * @return
     */
    public static boolean tryAcquire(Semaphore semaphore) {
        boolean flag = false;

        try {
            flag = semaphore.tryAcquire();
        } catch (Exception e) {
            LOGGER.error("获取信号量异常", e);
        }

        return flag;
    }

    /**
     * 释放信号量
     *
     * @param semaphore
     */
    public static void release(Semaphore semaphore) {

        try {
            semaphore.release();
        } catch (Exception e) {
            LOGGER.error("释放信号量异常", e);
        }
    }
}

3.websocket 配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * websocket 配置
 *
 * @author
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

4.websocket 消息处理

import com.hnxr.scada.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.Semaphore;

/**
 * websocket 消息处理
 *
 * @author
 */
@Component
@ServerEndpoint("/websocket/message")
public class WebSocketServer {
    /**
     * WebSocketServer 日志控制器
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);

    /**
     * 默认最多允许同时在线人数100
     */
    public static int socketMaxOnlineCount = 100;

    public static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) throws Exception {
        boolean semaphoreFlag = false;
        // 尝试获取信号量
        semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
        if (!semaphoreFlag) {
            // 未获取到信号量
            LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
            WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);
            session.close();
        } else {
            //截取前端传入的id
            String sessionId = StringUtils.substringAfterLast(session.getQueryString(), "socketId=");
            System.out.println("---------------------解析socketId" + sessionId);
            // 添加用户
            WebSocketUsers.put(sessionId, session);
            System.out.println("-----------------当前用户" + WebSocketUsers.getUsers());
            LOGGER.info("\n 建立连接 - {}", session);
            LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());
            WebSocketUsers.sendMessageToUserByText(session, "连接成功");
        }
    }

    /**
     * 连接关闭时处理
     */
    @OnClose
    public void onClose(Session session) {
        LOGGER.info("\n 关闭连接 - {}", session);
        String sessionId = StringUtils.substringAfterLast(session.getQueryString(), "socketId=");
        // 移除用户
        WebSocketUsers.remove(sessionId);
        // 获取到信号量则需释放
        SemaphoreUtils.release(socketSemaphore);
    }

    /**
     * 抛出异常时处理
     */
    @OnError
    public void onError(Session session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            // 关闭连接
            session.close();
        }
        String sessionId = StringUtils.substringAfterLast(session.getQueryString(), "socketId=");
        LOGGER.info("\n 连接异常 - {}", sessionId);
        LOGGER.info("\n 异常信息 - {}", exception);
        // 移出用户
        WebSocketUsers.remove(sessionId);
        // 获取到信号量则需释放
        SemaphoreUtils.release(socketSemaphore);
    }

    /**
     * 服务器接收到客户端消息时调用的方法
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        WebSocketUsers.sendMessageToUserByText(session, message);
    }
}

白名单放开socket配置接口

.antMatchers("/websocket/**").permitAll()

5.websocket 用户集

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.websocket.Session;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * websocket 客户端用户集
 *
 * @author
 */
public class WebSocketUsers {
    /**
     * WebSocketUsers 日志控制器
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);

    /**
     * 用户集
     */
    private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();

    /**
     * 存储用户
     *
     * @param key     唯一键
     * @param session 用户信息
     */
    public static void put(String key, Session session) {
        USERS.put(key, session);
    }

    /**
     * 移除用户
     *
     * @param session 用户信息
     * @return 移除结果
     */
    public static boolean remove(Session session) {
        String key = null;
        boolean flag = USERS.containsValue(session);
        if (flag) {
            Set<Map.Entry<String, Session>> entries = USERS.entrySet();
            for (Map.Entry<String, Session> entry : entries) {
                Session value = entry.getValue();
                if (value.equals(session)) {
                    key = entry.getKey();
                    break;
                }
            }
        } else {
            return true;
        }
        return remove(key);
    }

    /**
     * 移出用户
     *
     * @param key 键
     */
    public static boolean remove(String key) {
        LOGGER.info("\n 正在移出用户 - {}", key);
        Session remove = USERS.remove(key);
        if (remove != null) {
            boolean containsValue = USERS.containsValue(remove);
            LOGGER.info("\n 移出结果 - {}", containsValue ? "失败" : "成功");
            return containsValue;
        } else {
            return true;
        }
    }

    /**
     * 获取在线用户列表
     *
     * @return 返回用户集合
     */
    public static Map<String, Session> getUsers() {
        return USERS;
    }

    /**
     * 群发消息文本消息
     *
     * @param message 消息内容
     */
    public static void sendMessageToUsersByText(String message) {
        Collection<Session> values = USERS.values();
        for (Session value : values) {
            sendMessageToUserByText(value, message);
        }
    }

    /**
     * 发送文本消息
     *
     * @param session 缓存
     * @param message 消息内容
     */
    public static void sendMessageToUserByText(Session session, String message) {
        if (session != null) {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                LOGGER.error("\n[发送消息异常]", e);
            }
        } else {
            LOGGER.info("\n[你已离线]");
        }
    }
}

二、建立WebSocket连接

1.导出按钮

<el-button type="warning" plain icon="el-icon-download" size="mini" :loading="exportLoading"
           @click="handleExport">导出
</el-button>

2.导出方法

      /** 导出按钮操作 */
      handleExport() {
        this.$confirm('是否确认导出所有?', '警告', {
          confirmButtonText: '确定',
          cancelButtonText: '取消',
          type: 'warning'
        })
          .then(() => {
            const ip = window.location.host;
            //TODO 服务器端口号 nginx代理为4430
            const hostIp = ip.split(":")[0];
            console.log('ip:', hostIp)
            const url = "wss://" + hostIp + ":4430/websocket/message";

            //TODO 本地端口号为9996 项目启动端口
            // let url = "wss://localhost:9996/websocket/message";

            console.log('url', url)
            const socketId = this.getRandomId();
            //将socketId当做参数传入后端
            this.queryParams.sessionId = socketId
            const socket = new WebSocket(url + '?socketId=' + socketId);
            socket.onopen = function (event) {
              console.log("已开启websocket连接!" + "sessionId:" + socketId)
            };
            socket.onmessage = function (event) {
              const val = event.data
              console.log("收到消息", val)
              if (val.endsWith("asyExport")) {
                const path = val.slice(0, val.indexOf("asyExport"));
                console.log("文件名:" + path)
                const baseURL = process.env.VUE_APP_BASE_API
                window.location.href = baseURL + "/common/download?fileName=" + encodeURI(path) + "&delete=" + true;
                //延时关闭连接 确保WebSocket处于打开状态
                setTimeout(function () {
                  if (socket.readyState === WebSocket.OPEN) {
                    socket.close();
                  }
                }, 5000);
              }
            };
            // 处理连接关闭
            socket.onclose = function (event) {
              console.log("已经关闭连接!");
            };
            // 处理连接错误
            socket.onerror = function (error) {
              console.error("WebSocket 出现错误:", error);
              if (socket.readyState === WebSocket.OPEN) {
                socket.close();
              }
            };
            exportChineseParsing(this.queryParams)
          })
      },
      // 生成唯一标识符
      getRandomId() {
        return Math.random().toString(36).substring(2) + Date.now().toString(36);
      }

本地项目是http请求,前缀设置为ws

后端application.yml 的ssl配置将server下的enabled改为false

前端代理地址保持http

https请求,前缀设置为wss

ssl配置文件将server下的enabled改为true

前端代理地址改为https

3.导出接口地址

//导出中文解析
export function exportChineseParsing(query) {
  return request({
    url: '/basic/chineseParsing/export',
    method: 'get',
    params: query,
    responseType: 'blob'
  })
}

浏览器控制台查看连接时是否正常

前后端socket配置完成后在浏览器控制台输入

new WebSocket("ws://localhost:9996/websocket/message")

测试是否正常 如下图 抛出异常需要检查上方配置是否正确

4.流程说明

首先获取当前ip,本地使用localhost即可,服务器部署需要配置nginx代理

ip后跟端口号,端口后是后端websocket接口地址

socket.onopen回调方法验证是否成功建立连接

socket.onmessage是接收后端接口异步导出后的消息

后端文件处理好后,约定发送的消息以asyExport字符为后缀,方便前端识别和截取文件绝对路径

调用下载接口,关闭socket连接

如不是若依前后分离版,替换为自己的下载文件的接口就行

???????三、异步导出接口

1.导出接口示例

    @GetMapping("/export")
    public void export(BasicChineseParsing basicChineseParsing) throws Exception {
        //存入文件名称
        AtomicReference<String> path = new AtomicReference<>("");
        //逻辑异步处理
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            try {
                //模拟逻辑处理时长
                System.out.println("-----------------------睡眠5秒");
                Thread.sleep(5000);
				.......
				
                //创建临时文件 绝对路径
                File tempFile = PoiUtils.createTempNameFile(rows, title, "_中文解析表");
                String result = tempFile.getCanonicalPath().substring(tempFile.getCanonicalPath().indexOf("vehicle"));
                System.out.println("-----------------------处理后文件名:" + result);
                path.set(result);
            } catch (InterruptedException | IOException e) {
                e.printStackTrace();
            }
        });
        //异步处理完成后
        runAsync.thenRun(() -> {
            // 发送消息 拿到前端传入的socketId
            String sessionId = basicChineseParsing.getSessionId();
            System.out.println("-----------------------sessionId:" + sessionId);
            Session session = WebSocketUsers.getUsers().get(sessionId);
            webSocketServer.onMessage(path.toString() + "asyExport", session);
            System.out.println("-----------------------导出成功");
            //前端关闭socket连接
        });
    }

?2.debug调试流程截图

下载方法大家的和我的会不一样

但是File tempFile只要获取到下载好后的绝对路径就可以


???????我这里path存入的是下载的文件名称

因为前端会再次调用下载接口,有文件名称,就可以将文件下载至浏览器了

?在webSocketServer.onMessage方法中发送的消息就是文件名称+asyExport一个特殊约定的后缀

前端在收到消息后会截取asyExport前的文件名称

?完成流程日志打印如下

因使用若依前后分离版,大家调整后的下载方法可能和我的不一样

比如socket需要发送的信息是文件的绝对路径,前端调用下载方法传入绝对路径即可

四、服务器部署nginx配置

这个配置看大家是否需要部署至服务器

server {
        listen       4430 ssl;
        server_name  localhost;
        .....
        
        location /websocket/message {
            proxy_pass https://服务器ip:4430;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
        }
}

配置需要放在当前代理的端口配置下

如果是ssl认证的proxy_pass后地址需要用https,没有认证就用http

ip后的端口是你的程序服务使用端口,不是代理映射的端口

这里配置的地址是和前端创建连接时const url = "wss://" + hostIp + ":4430/websocket/message";

ip和端口是一致的


???????总结

第一步WebSocket配置大家是可以直接复制使用

第二步我的处理是只有导出时才启用socket连接,下载文件后关闭,也可以设置为全局性的,方便处理其他通知

socket通知前端的下载方式也有很多种,需要考虑的是会有其他用户或其他导出接口,其他socket通知,所以通知下载的socket消息是需要具有唯一性的,上方建立socket连接时就加了保障唯一性的socketId,并当为参数传入后端。

本方法就介绍到这里,需要大家多调试,欢迎留言讨论!

文章来源:https://blog.csdn.net/weixin_56567361/article/details/135451807
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。