目录
运行流程
心跳机制
重连机制
重复登录校验
md5缺省摘要校验
1. maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.42</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<!-- logback 依赖 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
2. Model代码
@Data
public final class MyMessage {
private MsgHeader msgHeader;
private Object body;
}
@Data
public final class MsgHeader {
/*消息体的MD5摘要*/
private String md5;
/*消息的ID,因为是同步处理模式,不考虑应答消息需要填入请求消息ID*/
private long msgID;
/*消息类型*/
private byte type;
/*消息优先级*/
private byte priority;
private Map<String, Object> attachment = new HashMap<String, Object>();
}
public enum MessageType {
SERVICE_REQ((byte) 0),/*业务请求消息*/
SERVICE_RESP((byte) 1), /*TWO_WAY消息,需要业务应答*/
ONE_WAY((byte) 2), /*无需应答的业务请求消息*/
LOGIN_REQ((byte) 3), /*登录请求消息*/
LOGIN_RESP((byte) 4), /*登录响应消息*/
HEARTBEAT_REQ((byte) 5), /*心跳请求消息*/
HEARTBEAT_RESP((byte) 6);/*心跳应答消息*/
private byte value;
private MessageType(byte value) {
this.value = value;
}
public byte value() {
return this.value;
}
}
3. NettyServer
@Slf4j
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("nt_boss"));
EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("nt_worker"));
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerInit());
sb.bind(SERVER_PORT).sync();
log.info("netty server start ip = {} port = {}", SERVER_IP, SERVER_PORT);
}
}
public class ServerInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 解决粘包 半包问题
sc.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
sc.pipeline().addLast(new LengthFieldPrepender(2));
// 序列化成消息对象
sc.pipeline().addLast(new KryoDecoder());
sc.pipeline().addLast(new KryoEncoder());
/*处理心跳超时*/
sc.pipeline().addLast(new ReadTimeoutHandler(15));
// 处理登录鉴权
sc.pipeline().addLast(new LoginAuthRespHandler());
// 心跳
sc.pipeline().addLast(new HeartBeatRespHandler());
// 处理真正业务
sc.pipeline().addLast(new ServerBusiHandler());
}
}
4. 业务Handler
@Slf4j
public class ServerBusiHandler extends SimpleChannelInboundHandler<MyMessage> {
private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(3000);
private static ExecutorService executorService = new ThreadPoolExecutor(NettyRuntime.availableProcessors(),
NettyRuntime.availableProcessors() * 2,60, TimeUnit.SECONDS,taskQueue);
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) {
// 校验MD5
String headMd5 = msg.getMsgHeader().getMd5();
String calcMd5 = EncryptUtils.encryptObj(msg.getBody());
if(!headMd5.equals(calcMd5)){
log.error("报文md5检查不通过:"+headMd5+" vs "+calcMd5+",关闭连接");
ctx.writeAndFlush(buildBusiResp("报文md5检查不通过,关闭连接"));
ctx.close();
}
log.info(msg.toString());
if(msg.getMsgHeader().getType() == MessageType.ONE_WAY.value()){
log.debug("ONE_WAY类型消息,异步处理");
executorService.execute(() -> {
// 处理消息 msg
});
}else{
log.debug("TWO_WAY类型消息,应答");
ctx.writeAndFlush(buildBusiResp("OK"));
}
}
private MyMessage buildBusiResp(String result) {
MyMessage message = new MyMessage();
MsgHeader msgHeader = new MsgHeader();
msgHeader.setType(MessageType.SERVICE_RESP.value());
message.setMsgHeader(msgHeader);
message.setBody(result);
return message;
}
}
? ? ? ? 1.1 修改操作系统允许当前用户进程打开的句柄数量
# 查看允许当前用户进程打开的句柄数量
ulimit -n
# 修改数量
ulimit -n 1000000
? ? ? ? 1.2 修改软限制和硬限制
# 修改配置文件
vim /etc/security/limits.conf
# 文末添加
* soft nofile 1000000
* hard nofile 1000000
# 修改配置文件
vim /etc/pam.d/login
# 文末添加
session required /lib/security/pam_limits.so
? ? ? ? ?1.3 修改Linux系统级能打开最大文件数限制
# 查看Linux能打开最大文件数
cat /proc/sys/fs/file-max
# 修改限制
vim /etc/sysctl.conf
# 文末添加
fs.file_max = 1000000
# 配置立即生效
sysctl -p
????????对于 Nety 服务端,通常只需要启动一个监听端口用于端侧设备接入即可,但是如果服务 端集群实例比较少,甚至是单机(或者双机冷备)部署,在端侧设备在短时间内大量接入时,需要 对服务端的监听方式和线程模型做优化,以满足短时间内(例如 30s)百万级的端侧设备接入的 需要。 服务端可以监听多个端口,利用主从 Reactor 线程模型做接入优化,前端通过 SLB 做 4 层 门 7 层负载均衡。
?????????IO线程优化: 先使用默认构造函数的线程(CPU*2)压测, jstack监控堆栈, 如果都停留在Selectorlmpl. lockAndDoSelect, 表明IO线程比较空闲, 无须调整.如果停留在读/写操作, 可适当调大线程.
心跳优化策略:
心跳失败判断:
Nety 提供了三种链路空闲检测机制:
????????Nety 内存池从实现上可以分为两类:堆外直接内存和堆内存。由于 Byte Buf 主要用于网 络 IO 读写,因此采用堆外直接内存会减少一次从用户堆内存到内核态的字节数组拷贝,所以 性能更高。由于 DirectByteBuf 的创建成本比较高,因此如果使用 DirectByteBuf,则需要配合内存池使用.
select(),poll()模型都是水平触发模式,信号驱动 IO 是边缘触发模式,epoll()模型即支 持水平触发,也支持边缘触发,默认是水平触发。JDK 中的 select 实现是水平触发,而 Netty 提供的 Epoll 的实现中是边缘触发。
如将域名www.google.com解析成IP地址