[Netty实践] 简单WebSocket服务实现

发布时间:2023年12月24日

目录

一、介绍

二、依赖导入

三、基础类准备

四、Handler实现

五、WebSocketChannelInitializer实现

六、WebSocketServer实现

七、前端实现

八、测试

九、参考链接


一、介绍

关于WebSocket此处不进行过多介绍,本章主要着重通过Netty实现WebSocket通信服务端,并且实现一个简单的通过网页进行聊天的功能。

讲到WebSocket,这里简单介绍一下为什么要使用WebSocket。以往我们通过网页与服务器进行交互时,都是通过发起一个http/https请求,该请求是无状态的,发送请求后,等待获取服务器返回的结果之后,这次请求就结束了,客户端与服务端就断开了。如果此时服务端想向客户端推送消息的话,由于连接已经断开,服务端无法进行消息推送,此时可以通过使用WebSocket进行客户端与服务端建立长连接,当服务端想向客户端推送消息时,就可以向客户端进行消息推送了。

接下来就进行代码实现。

二、依赖导入

<dependencies>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.101.Final</version>
        </dependency>

        <!--添加tomcat依赖模块.-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <version>2.0.4.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.41</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>

    </dependencies>

三、基础类准备

1、ChannelManager, 用于管理Channel等相关信息

public class ChannelManager {

    public final static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public final static Map<Channel, WebSocketServerHandshaker> handShakerMap = new ConcurrentHashMap<>();

}

2、WebSocketRequestMessage,用于将客户端传递过来的json字符串转换该类的对象

@Data
@ToString
public class WebSocketRequestMessage {

    public String user;

    public String type;

    public String message;


}

3、WebSocketResponseMessage,用于返回给客户端的类型数据

public class WebSocketResponseMessage {

    public String user;

    public String type;

    public String message;

    public String date;

    public WebSocketResponseMessage(WebSocketRequestMessage webSocketRequestMessage) {
        this.user = webSocketRequestMessage.getUser();
        this.type = webSocketRequestMessage.getType();
        this.message = webSocketRequestMessage.getMessage();
        this.date = new Date().toString();
    }

}

四、Handler实现

建立WebSocket通信时,第一次发送的就是http请求,进行协议升级为WebSocket,需要实现针对该连接请求进行处理的handler,协议升级成功之后,后续发送的消息是数据帧,消息将由WebSocketFrame相关的Handler进行处理,为了方便理解,此处简单画个图:

1、DefaultHandler, 抽象类,重写一些基础的方法(处理触发的事件、建立连接、断开连接、遗异常处理的),供其他Handler继承,其他Handler则无需实现上述功能

public abstract class DefaultHandler<T> extends SimpleChannelInboundHandler<T> {

   
    // 配合心跳使用,由客户端进行发送心跳数据包
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;

            // 触发读空闲, 关闭客户端
            if(idleStateEvent.state().equals(IdleState.READER_IDLE)) {
                System.out.println("触发读空闲,关闭channel");
                // 将消息传递CloseWebSocketFrame进行处理
                CloseWebSocketFrame closeWebSocketFrame = new CloseWebSocketFrame();
                ctx.fireChannelRead(closeWebSocketFrame);
            }

        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("与客户端建立连接: " + ctx.channel());
        ChannelManager.channelGroup.add(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("与客户端断开连接: " + ctx.channel());
        ChannelManager.channelGroup.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        System.out.println("发生异常: " + ctx.channel());
    }
    
}

2、FullHttpRequestHander

public class FullHttpRequestHandler extends DefaultHandler<FullHttpRequest> {

    private WebSocketServerHandshaker handShaker;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpRequest) throws Exception {
        System.out.println("客户端消息:" + httpRequest);

        if(httpRequest.decoderResult().isFailure() || !"websocket".equals(httpRequest.headers().get(HttpHeaderValues.UPGRADE))) {
            DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);

            ctx.writeAndFlush(httpResponse);

            return;
        }

        WebSocketServerHandshakerFactory webSocketServerHandshakerFactory = new WebSocketServerHandshakerFactory("ws:/" + ctx.channel() + "/websocket", null, false);
        handShaker = webSocketServerHandshakerFactory.newHandshaker(httpRequest);
        if (null == handShaker) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            // 在此处为channel添加WebSocketFrameEncoder和WebSocketFrameDecoder
            handShaker.handshake(ctx.channel(), httpRequest);
        }

        ChannelManager.handShakerMap.put(ctx.channel(), handShaker);

    }

}

3、TextWebSocketFrameHandler,当WebSocket通信建立成功之后,在此阶段发送的文本消息在该handler中进行处理,如果发送的是二进制流,那么请自己自行实现一个处理BinaryWebSocketFrame类型数据的Handler

public class TextWebSocketFrameHandler extends DefaultHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

        WebSocketRequestMessage webSocketRequestMessage = JSON.parseObject(msg.text(), WebSocketRequestMessage.class);

        if("ping".equals(webSocketRequestMessage.getType())) {

            System.out.println("ping message");

            return;
        }

        System.out.println(ctx.channel() + ", 客户端消息:" + msg);

        WebSocketResponseMessage webSocketResponseMessage = new WebSocketResponseMessage(webSocketRequestMessage);

        TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(JSON.toJSONString(webSocketResponseMessage));

        ChannelManager.channelGroup.writeAndFlush(textWebSocketFrame);
    }
}

4、CloseWebSocketFrameHandler,客户端进行关闭时,将通过该Handler进行处理

public class CloseWebSocketFrameHandler extends DefaultHandler<CloseWebSocketFrame> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, CloseWebSocketFrame msg) throws Exception {
        System.out.println("客户端要断开");
        WebSocketServerHandshaker handShaker = ChannelManager.handShakerMap.get(ctx.channel());
        handShaker.close(ctx.channel(), msg.retain());
    }

}

五、WebSocketChannelInitializer实现

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {


    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // 将建立连接时发送的第一次http请求数据装换为转换为 HttpRequest
        pipeline.addLast(new HttpServerCodec());
        // 将拆分的http消息(请求内容、请求体)聚合成一个消息
        pipeline.addLast(new HttpObjectAggregator(65536));
        // 块写出
        pipeline.addLast(new ChunkedWriteHandler());

        // 配置读空闲Handler, 3秒该Channel没有产生读将会触发读空闲事件
        pipeline.addLast(new IdleStateHandler(3, 0, 0));

        pipeline.addLast(new FullHttpRequestHandler());
        pipeline.addLast(new TextWebSocketFrameHandler());
        pipeline.addLast(new CloseWebSocketFrameHandler());
    }

}

六、WebSocketServer实现

public class WebSocketServer {

    public void bind(Integer port) {

        EventLoopGroup parent = new NioEventLoopGroup();
        EventLoopGroup child = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(parent, child)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new WebSocketChannelInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

            System.out.println("web socket server 启动成功...");

            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            parent.shutdownGracefully();
            child.shutdownGracefully();
        }


    }

}

七、前端实现

接下来写一个简单的前端web页,无须引入框架,只需要使用原生HTML5和JavaScript即可。

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8">
		<title></title>
	</head>
	
	<body>
		
		<button id = "login-btn" onclick="login()">登录</button>
		<button id = "logout-btn" onclick="logout()">退出</button>
		<br/>
		<p id="connection-status">连接状态:关闭</p>
		<p id="username-tag">user:</p>
		
		<input id="send-msg-box" type="text"/>
		<button onclick="send()">发送</button>
		
		<p>==================================群发消息=====================================</p>
		
		<textarea  id="msg-box"></textarea >
		
	</body>
	
	<script>
		var socket;
		var user;
		
		function login() {
			if (window.WebSocket){
				
				socket = new WebSocket("ws://localhost:8888/websocket");
			
				socket.onerror = function(event) {
					document.getElementById("connection-status").innerHTML = "连接状态:异常";
					document.getElementById("channel-tag").innerHTML = "channel:";
				}
				
				socket.onopen = function(event) {
					document.getElementById("connection-status").innerHTML = "连接状态:正常";
					var max = 999
					var min = 100
					user = 'admin' + (Math.floor(Math.random() * (max - min + 1)) + min)
					document.getElementById("username-tag").innerHTML = "user: " + user

;
					
					// 每隔 2 秒钟发送一次 Ping 帧
					setInterval(function() {
					  if (socket.readyState === socket.OPEN) {
								
							var pingMessage = {
								type: 'ping',
								message: ''
							}
							
							socket.send(JSON.stringify(pingMessage))
					  }
					}, 2000);
				}
			
				socket.onclose = function(event){
				    document.getElementById("connection-status").innerHTML = "连接状态:关闭";
					document.getElementById("username-tag").innerHTML = "user:"
				}
				
				socket.onmessage = function(event){
					var message = document.getElementById("msg-box").value;
				
					var response = JSON.parse(event.data)
				
					message = message + '\n' + response.date + " " + response.user + " " + response.message;
					
					document.getElementById("msg-box").value = message;
					
				}
				
			} else {
				alter("不支持websocket")
			}
		}
		
		function send() {
			
			if(socket == null) {
				alert('连接未建立')
				return;
			}
			
			if (socket.readyState == WebSocket.OPEN) {
					var value = document.getElementById("send-msg-box").value;
			        
					var textMessage = {
						type: 'text',
						user: user,
						message: value
					}
					
					//alert(message);
					//console.log(message)
			        socket.send(JSON.stringify(textMessage));
			} else {
				alert('连接未建立')
			}
		}
		
		
		function logout() {
			
			if(socket == null) {
				alert('连接未建立')
				return;
			}
			
			if (socket.readyState == WebSocket.OPEN) {
				socket.close();
			} else {
				alert('连接未建立')
			}
		}
	
	</script>
	
</html>

八、测试

1、启动服务端

new WebSocketServer().bind(8888);

控制台打印如下信息

web socket server 启动成功...

2、打开两个web页分别进行登录点击

点击登录之后,可以看到连接状态正常,以及随机分配了一个用户名

3、发送消息

在任一客户端进行消息发送,都能在所有客户端聊天框中看到如图信息:

4、关闭客户端,可手动点击关闭按钮,或则通过关闭网页以及浏览器,即可断开连接

九、参考链接

JS实时通信三把斧系列之一: websocket - 知乎

文章来源:https://blog.csdn.net/Staba/article/details/135134100
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。