1:pom.xml配置
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.73.Final</version>
</dependency>
2:Netty作为HTTP服务器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
public class HttpServer {
private final int port;
public HttpServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1024 * 1024));
p.addLast(new LargeJsonHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new HttpServer(8080).start();
}
}
class LargeJsonHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx);
}
ByteBuf content = request.content();
String jsonStr = content.toString(CharsetUtil.UTF_8);
System.out.println(jsonStr);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
response.content().writeBytes("OK".getBytes(CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
ctx.writeAndFlush(response);
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
注意:如果发送的JSO数据如果大于1M,是会分包发送的,每次发送都会执行channelReadComplete方法,所以不可以关闭通道,发送完数据才执行channelRead0方法
3:Netty作为webSocket服务器
package com.example.slave.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class CustomWebSocket {
private final int port;
public CustomWebSocket(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1024 * 1024));
p.addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
p.addLast(new MyWebSocketHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channelGroup;
static {
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("与客户端建立连接,通道开启!");
channelGroup.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("与客户端断开连接,通道关闭!");
channelGroup.remove(ctx.channel());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
?
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg){
System.out.println("服务器收到的数据:" + msg.text());
sendAllMessage();
}
private void sendMessage(ChannelHandlerContext ctx) {
String message = "你好,"+ctx.channel().localAddress()+" 给固定的人发消息";
ctx.channel().writeAndFlush(new TextWebSocketFrame(message));
}
private void sendAllMessage(){
String message = "我是服务器,这里发送的是群消息";
channelGroup.writeAndFlush( new TextWebSocketFrame(message));
}
}