一、前言
? ? 现在我们已经掌握了如何调用AI下游服务,接下来我们将开始构建APP与AI服务之间的桥梁,从而逐步实现AI的对话功能。在进行这一步之前,需要先学习如何利用Netty库快速构建WebSocket服务,以便实现高效的实时通信。
二、术语
2.1.?Netty
? ? 是一个开源的、高性能的网络应用程序框架,用于快速开发可扩展的服务器和网络应用程序。它基于Java NIO(New I/O)技术,提供了一种异步、事件驱动的编程模型,使得开发者能够轻松构建高性能、可伸缩的网络应用程序。
2.2.?WebSocket
? ? 是一种基于TCP协议的全双工通信协议,用于在Web应用程序中实现实时的双向通信。与传统的HTTP请求-响应模型不同,WebSocket允许服务器主动向客户端推送数据,而不需要客户端明确地发起请求。
WebSocket协议的主要特点包括:
双向通信:WebSocket支持客户端和服务器之间的双向通信,可以实现实时的数据传输。服务器可以主动向客户端推送数据,而不需要等待客户端的请求。
长连接:与HTTP请求-响应模型不同,WebSocket在建立连接后可以保持持久连接,减少了建立和关闭连接的开销,同时也减少了网络传输的延迟。
较低的开销:WebSocket使用较少的网络流量和较低的开销,因为它使用二进制帧和数据压缩技术来减少数据传输的负载。
跨域支持:WebSocket支持跨域通信,可以在不同域名或不同端口之间进行通信。
WebSocket协议已经被现代的Web浏览器广泛支持,并且成为构建实时应用程序、聊天应用、多人游戏、股票行情等需要实时通信的Web应用程序的常用技术。在服务器端,可以使用各种编程语言和框架来实现WebSocket服务器,而在客户端,可以使用JavaScript等语言来与WebSocket服务器进行交互。
2.2.TCP(Transmission Control Protocol)
? ? 是一种网络传输协议,位于网络协议栈的传输层,用于提供可靠的、面向连接的数据传输。
TCP协议的主要特点包括:
可靠性:TCP使用确认、重传和超时机制来确保数据的可靠传输。接收端会向发送端发送确认消息,以确保数据的正确接收,如果发送端没有收到确认消息,会进行数据的重传。此机制可以保证数据在传输过程中不会丢失或损坏。
面向连接:在进行数据传输之前,TCP需要在发送端和接收端之间建立连接。连接包括三个阶段,即建立连接、数据传输和连接释放。这种面向连接的机制可以保证数据按照顺序到达,并且不会交错或重复。
流式传输:TCP将数据视为一连续的字节流进行传输,而不是将其分割为单个数据包。TCP会将数据分割为适当大小的数据块,并在接收端进行重新组装。
拥塞控制:TCP使用拥塞控制机制来避免网络拥塞和数据丢失。它会根据网络的拥塞程度动态调整发送数据的速率,以确保网络的稳定性和公平性。
三、架构图
? ? 3.1. 基于Http协议实现业务上的前置流程,例如:身份认证、权限认证、流量控制等
? ? 3.2. 通过前置流程处理后,App与im集群建立WS连接
? ? 3.3. 用户输入内容通过im集群实时发送至下游AI服务集群,并进行模型推理
? ? 3.4. 推理结果以流式输出返回至im集群,并通过其实时返回至App
四、前置条件
? ? 4.1.? 初始化一个Maven项目(记得项目编码为:UTF-8)
? ? 4.2.? 引入核心依赖库
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
? ? 4.3.? 引入辅助依赖库
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.38</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.5</version>
</dependency>
? ? ? ?
PS:若您平时没有接触过上面的依赖库,强烈建议您去使用一下哦!
五、技术实现
5.1.?Server(WebSocket服务器启动类)
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup mainGroup = new NioEventLoopGroup();
EventLoopGroup subGroup = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(mainGroup, subGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitialzer());
ChannelFuture future = server.bind(7778).sync();
log.info("webSocket server listen on port : [{}]", 7778);
future.channel().closeFuture().sync();
} finally {
mainGroup.shutdownGracefully();
subGroup.shutdownGracefully();
}
}
}
5.2.?ServerInitialzer(服务配置类)
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class ServerInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// websocket 基于http协议,所以要有http编解码器
pipeline.addLast(new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
pipeline.addLast(new HttpObjectAggregator(1024*64));
// ====================== 以上是用于支持http协议 ======================
// ====================== 以下是支持httpWebsocket ======================
//websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定义的handler
pipeline.addLast(new BusinessHandler());
}
}
5.3.?BusinessHandler(具体业务处理类)
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandler;
import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* @Description: 处理消息的handler
* TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体
*/
@Slf4j
public class BusinessHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame)
throws Exception {
// 获取客户端传输过来的消息
String content = textWebSocketFrame.text();
log.info("接收到客户端发送的信息: {}",content);
Long userIdForReq;
String msgType = "";
String contents = "";
try {
ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);
msgType = apiReqMessage.getMsgType();
contents = apiReqMessage.getContents();
userIdForReq = apiReqMessage.getUserId();
log.info("用户标识: {}, 消息类型: {}, 消息内容: {}",userIdForReq,msgType,contents);
ApiRespMessage apiRespMessage = ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode()))
.respTime(String.valueOf(System.currentTimeMillis()))
.contents("测试通过,很高兴收到你的信息")
.msgType(String.valueOf(MsgType.CHAT.getCode()))
.build();
String response = JSON.toJSONString(apiRespMessage);
channelHandlerContext.writeAndFlush(new TextWebSocketFrame(response));
} catch (Exception e) {
log.warn("【BusinessHandler】接收到请求内容:{},异常信息:{}", content, e.getMessage(), e);
// 异常返回
return;
}
}
}
5.4.?ApiReqMessage(请求封装类)
import lombok.*;
@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Builder
public final class ApiReqMessage {
/**
* 流程类型
*/
private Integer flowType;
/**
* 消息类型
*/
private String msgType;
/**
* 请求时间
*/
private String reqTime;
/**
* 操作ID(对于服务端无意义,原封不动响应回前端即可)
*/
private String operateId;
/**
* 内容
*/
private String contents;
/**
* 用户ID
*/
private Long userId;
}
5.5.?ApiRespMessage(响应封装类)
import lombok.*;
@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Builder
public final class ApiRespMessage<T> {
private String code;// 响应码
private String respTime;// 响应时间
private String msgType;// 消息类型
private String operateId;// 操作ID(对于服务端无意义,原封不动响应回前端即可)
private T contents;// 内容
}
六、测试
6.1. 页面测试
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title></title>
</head>
<body>
<div>发送消息:</div>
<input type="text" id="msgContent"/>
<input type="button" value="点我发送" onclick="CHAT.chat()"/>
<div>接受消息:</div>
<div id="receiveMsg" style="background-color: gainsboro;"></div>
<script type="application/javascript">
window.CHAT = {
socket: null,
init: function() {
if (window.WebSocket) {
CHAT.socket = new WebSocket("ws://192.168.102.88:7778/ws");
CHAT.socket.onopen = function() {
console.log("连接建立成功...");
},
CHAT.socket.onclose = function() {
console.log("连接关闭...");
},
CHAT.socket.onerror = function() {
console.log("发生错误...");
},
CHAT.socket.onmessage = function(e) {
console.log("接受到消息:" + e.data);
var receiveMsg = document.getElementById("receiveMsg");
var html = receiveMsg.innerHTML;
receiveMsg.innerHTML = html + "<br/>" + e.data;
}
} else {
alert("浏览器不支持websocket协议...");
}
},
chat: function() {
var msg = document.getElementById("msgContent");
CHAT.socket.send(msg.value);
}
};
CHAT.init();
</script>
</body>
</html>
PS:192.168.102.88:7778需要根据实际情况调整IP和PORT
执行结果:
# 服务端输出:
# 页面输出:
PS: 页面输入内容:
{"userId":12345,"msgType":1,"contents":"hello world"}
6.2. 在线测试
1) 输入连接信息
2) 输入对话内容
# 服务端输出:
# 页面输出:
若您能够复现出跟我一样的结果,恭喜您,您已经跑通了最简单WebSocket的流程,离我们的目标又近一步了!