开源模型应用落地-业务整合篇(二)

发布时间:2024年01月19日

一、前言

? ? 现在我们已经掌握了如何调用AI下游服务,接下来我们将开始构建APP与AI服务之间的桥梁,从而逐步实现AI的对话功能。在进行这一步之前,需要先学习如何利用Netty库快速构建WebSocket服务,以便实现高效的实时通信。


二、术语

2.1.?Netty

? ? 是一个开源的、高性能的网络应用程序框架,用于快速开发可扩展的服务器和网络应用程序。它基于Java NIO(New I/O)技术,提供了一种异步、事件驱动的编程模型,使得开发者能够轻松构建高性能、可伸缩的网络应用程序。

2.2.?WebSocket

? ? 是一种基于TCP协议的全双工通信协议,用于在Web应用程序中实现实时的双向通信。与传统的HTTP请求-响应模型不同,WebSocket允许服务器主动向客户端推送数据,而不需要客户端明确地发起请求。

WebSocket协议的主要特点包括:

  1. 双向通信:WebSocket支持客户端和服务器之间的双向通信,可以实现实时的数据传输。服务器可以主动向客户端推送数据,而不需要等待客户端的请求。

  2. 长连接:与HTTP请求-响应模型不同,WebSocket在建立连接后可以保持持久连接,减少了建立和关闭连接的开销,同时也减少了网络传输的延迟。

  3. 较低的开销:WebSocket使用较少的网络流量和较低的开销,因为它使用二进制帧和数据压缩技术来减少数据传输的负载。

  4. 跨域支持:WebSocket支持跨域通信,可以在不同域名或不同端口之间进行通信。

WebSocket协议已经被现代的Web浏览器广泛支持,并且成为构建实时应用程序、聊天应用、多人游戏、股票行情等需要实时通信的Web应用程序的常用技术。在服务器端,可以使用各种编程语言和框架来实现WebSocket服务器,而在客户端,可以使用JavaScript等语言来与WebSocket服务器进行交互。

2.2.TCP(Transmission Control Protocol)

? ? 是一种网络传输协议,位于网络协议栈的传输层,用于提供可靠的、面向连接的数据传输。

TCP协议的主要特点包括:

  1. 可靠性:TCP使用确认、重传和超时机制来确保数据的可靠传输。接收端会向发送端发送确认消息,以确保数据的正确接收,如果发送端没有收到确认消息,会进行数据的重传。此机制可以保证数据在传输过程中不会丢失或损坏。

  2. 面向连接:在进行数据传输之前,TCP需要在发送端和接收端之间建立连接。连接包括三个阶段,即建立连接、数据传输和连接释放。这种面向连接的机制可以保证数据按照顺序到达,并且不会交错或重复。

  3. 流式传输:TCP将数据视为一连续的字节流进行传输,而不是将其分割为单个数据包。TCP会将数据分割为适当大小的数据块,并在接收端进行重新组装。

  4. 拥塞控制:TCP使用拥塞控制机制来避免网络拥塞和数据丢失。它会根据网络的拥塞程度动态调整发送数据的速率,以确保网络的稳定性和公平性。


三、架构图

? ? 3.1. 基于Http协议实现业务上的前置流程,例如:身份认证、权限认证、流量控制等

? ? 3.2. 通过前置流程处理后,App与im集群建立WS连接

? ? 3.3. 用户输入内容通过im集群实时发送至下游AI服务集群,并进行模型推理

? ? 3.4. 推理结果以流式输出返回至im集群,并通过其实时返回至App


四、前置条件

? ? 4.1.? 初始化一个Maven项目(记得项目编码为:UTF-8)

? ? 4.2.? 引入核心依赖库

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>


<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.5</version>
</dependency>

? ? 4.3.? 引入辅助依赖库

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.5.4</version>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>2.0.38</version>
</dependency>

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.12.0</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
</dependency>

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
    <version>1.2.5</version>
</dependency>

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.5</version>
</dependency>

? ? ? ?

PS:若您平时没有接触过上面的依赖库,强烈建议您去使用一下哦!


五、技术实现

5.1.?Server(WebSocket服务器启动类)

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Server {

	public static void main(String[] args) throws Exception {
		
		EventLoopGroup mainGroup = new NioEventLoopGroup();
		EventLoopGroup subGroup = new NioEventLoopGroup();
		
		try {
			ServerBootstrap server = new ServerBootstrap();
			server.group(mainGroup, subGroup)
				.channel(NioServerSocketChannel.class)
				.childHandler(new ServerInitialzer());
			
			ChannelFuture future = server.bind(7778).sync();
			log.info("webSocket server listen on port : [{}]", 7778);
			future.channel().closeFuture().sync();
		} finally {
			mainGroup.shutdownGracefully();
			subGroup.shutdownGracefully();
		}
	}
	
}

5.2.?ServerInitialzer(服务配置类)

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class ServerInitialzer extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		// websocket 基于http协议,所以要有http编解码器
		pipeline.addLast(new HttpServerCodec());
		// 对写大数据流的支持 
		pipeline.addLast(new ChunkedWriteHandler());
		// 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
		pipeline.addLast(new HttpObjectAggregator(1024*64));
		
		// ====================== 以上是用于支持http协议    ======================
		
		// ====================== 以下是支持httpWebsocket ======================
		
		//websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
		pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
		
		// 自定义的handler
		pipeline.addLast(new BusinessHandler());
	}

}

5.3.?BusinessHandler(具体业务处理类)

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandler;
import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;


/**
 * @Description: 处理消息的handler
 * TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体
 */
@Slf4j
public class BusinessHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame)
            throws Exception {
        // 获取客户端传输过来的消息
        String content = textWebSocketFrame.text();
        log.info("接收到客户端发送的信息: {}",content);

        Long userIdForReq;
        String msgType = "";
        String contents = "";


        try {
            ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);
            msgType = apiReqMessage.getMsgType();
            contents = apiReqMessage.getContents();
            userIdForReq = apiReqMessage.getUserId();

            log.info("用户标识: {}, 消息类型: {}, 消息内容: {}",userIdForReq,msgType,contents);

            ApiRespMessage apiRespMessage = ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode()))
                    .respTime(String.valueOf(System.currentTimeMillis()))
                    .contents("测试通过,很高兴收到你的信息")
                    .msgType(String.valueOf(MsgType.CHAT.getCode()))
                    .build();
            String response = JSON.toJSONString(apiRespMessage);
            channelHandlerContext.writeAndFlush(new TextWebSocketFrame(response));

        } catch (Exception e) {
            log.warn("【BusinessHandler】接收到请求内容:{},异常信息:{}", content, e.getMessage(), e);
            // 异常返回
            return;
        }

    }

}

5.4.?ApiReqMessage(请求封装类)

import lombok.*;

@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Builder
public final class ApiReqMessage {
	/**
	 * 流程类型
	 */
	private Integer flowType;
	/**
	 * 消息类型
	 */
	private String msgType;
	/**
	 * 请求时间
	 */
	private String reqTime;
	/**
	 * 操作ID(对于服务端无意义,原封不动响应回前端即可)
	 */
	private String operateId;
	/**
	 * 内容
	 */
	private String contents;
	/**
	 * 用户ID
	 */
	private Long userId;
}

5.5.?ApiRespMessage(响应封装类)

import lombok.*;

@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Builder
public final class ApiRespMessage<T> {
	private String code;// 响应码
	private String respTime;// 响应时间
	private String msgType;// 消息类型
	private String operateId;// 操作ID(对于服务端无意义,原封不动响应回前端即可)
	private T contents;// 内容
}

六、测试

6.1. 页面测试

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<title></title>
	</head>
	<body>
		
		<div>发送消息:</div>
		<input type="text" id="msgContent"/>
		<input type="button" value="点我发送" onclick="CHAT.chat()"/>
		
		<div>接受消息:</div>
		<div id="receiveMsg" style="background-color: gainsboro;"></div>
		
		<script type="application/javascript">
			
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {
						CHAT.socket = new WebSocket("ws://192.168.102.88:7778/ws");
						CHAT.socket.onopen = function() {
							console.log("连接建立成功...");
						},
						CHAT.socket.onclose = function() {
							console.log("连接关闭...");
						},
						CHAT.socket.onerror = function() {
							console.log("发生错误...");
						},
						CHAT.socket.onmessage = function(e) {
							console.log("接受到消息:" + e.data);
							var receiveMsg = document.getElementById("receiveMsg");
							var html = receiveMsg.innerHTML;
							receiveMsg.innerHTML = html + "<br/>" + e.data;
						}
					} else {
						alert("浏览器不支持websocket协议...");
					}
				},
				chat: function() {
					var msg = document.getElementById("msgContent");
					CHAT.socket.send(msg.value);
				}
			};
			
			CHAT.init();
			
		</script>
	</body>
</html>

PS:192.168.102.88:7778需要根据实际情况调整IP和PORT

执行结果:

# 服务端输出:

# 页面输出:

PS: 页面输入内容:

{"userId":12345,"msgType":1,"contents":"hello world"}

6.2. 在线测试

网址:WebSocket在线测试工具

1) 输入连接信息

2) 输入对话内容

# 服务端输出:

# 页面输出:

若您能够复现出跟我一样的结果,恭喜您,您已经跑通了最简单WebSocket的流程,离我们的目标又近一步了!

文章来源:https://blog.csdn.net/qq839019311/article/details/135648720
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。