Netty学习-netty深入了解

Netty学习-netty深入了解

二月 20, 2020

Netty与NIO

原生NIO存在的问题

  1. NIO的类库和API复杂, 使用麻烦: 要熟练掌握 Selector, ServerSocketChannel, SocketChannel, ByteBuffer
  2. 需要开发者对Java多线程(NIO涉及到Reactor模式) 和 网络编程非常熟悉, 才能编写出高质量的NIO程序
  3. 开发工作量和难度非常大: 例如客户端面临重新连接, 网络闪断, 半包读写, 失败缓存, 网络拥塞和异常流的处理 等.
  4. JDK NIO 的 bug: 臭名昭著的Epoll Bug, 它会导致Selector空轮询, 最终CUP飙满, JDK1.8也没有完全修复

Netty的优点

Netty 对 jdk 自带的NIO的API进行了封装, 解决了上诉问题.

  1. 设计优雅: 适用于各种传输类型的统一API阻塞和非阻塞Socket; 基于灵活且可扩展的事件模型, 可以清晰地分离关注点; 高度可定制地线程模型 - 单线程, 一个或多个线程池.
  2. 使用方便: 详细记录的Javadoc, 用户指南和示例: 没有其他依赖项, 使用JDK5(Netty 3.x) 或者 6 (Netty 4.x) 基本ok
  3. 高性能, 吞吐量更高: 延迟更低, 减少资源消耗; 最小化不必要的内存复制.
  4. 安全: 完整的SSL/TLS 和 StartLTS支持
  5. 社区活跃: 版本迭代周期短, 更新频繁, bug可以被及时修复.

Netty高性能架构设计

Netty的线程模型

目前存在的线程模型有: 传统阻塞的I/O服务模型, 基于Reactor模式的模型.
根据Reactor的数量和处理资源线程池的数量不同, 有3种典型的实现: 1. 单Reactor 单线程;2. 单Reactor 多线程; 3. 主从Reactor 多线程.

Netty的线程模型是基于主从Reactor多线程模型做改进.

说明

  1. Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写
  2. BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
  3. NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop
  4. NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个 NioEventLoop 都有一个 selector , 用于监听绑定在其上的 socket 的网络通讯
  5. NioEventLoopGroup 可以有多个线程, 即可以含有多个 NioEventLoop
  6. 每个 Boss NioEventLoop 循环执行的步骤有 3 步
  • 轮询 accept 事件
  • 处理 accept 事件 , 与 client 建立连接 , 生成 NioScocketChannel , 并将其注册到某个 worker NIOEventLoop 上的 selector
  • 处理任务队列的任务 , 即 runAllTasks
  1. 每个 Worker NIOEventLoop 循环执行的步骤
  • 轮询 read, write 事件
  • 处理 i/o 事件, 即 read , write 事件,在对应 NioScocketChannel 处理
  • 处理任务队列的任务 , 即 runAllTasks
  1. 每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline 中包含了 channel , 即通过pipeline 可以获取到对应通道, 管道中维护了很多的 处理器

Netty 快速入门实例-TCP 服务

NettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class NettyServer {

public static void main(String[] args) throws Exception {

/*
创建 BossGroup 和 WorkerGroup
说明
1. 创建两个线程组 bossGroup 和 workerGroup
2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,
会交给 workerGroup 完成
3. 两个都是无限循环
4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
默认实际 cpu 核数 * 2
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();


try{

// 创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();

//使用链式编程来进行设置
//设置两个线程组
bootstrap.group(bossGroup, workerGroup)
//使用 NioSocketChannel 作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//创建一个通道测试对象(匿名对象)
.childHandler(new ChannelInitializer<SocketChannel>() {

//给 pipeline 设置处理器
// 给我们的 workerGroup 的 EventLoop 对应的管道设置处理器
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyServerHandler());
}
});

System.out.println(".....服务器 is ready...");

//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();

//对关闭通道进行监听
cf.channel().closeFuture().sync();

}finally {

//优雅关闭

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();
}
}
}

NettyServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//说明
//1.我们自定义一个 Handler 需要继续 netty 规定好的某个 HandlerAdapter(规范)
//2.这时我们自定义一个 Handler , 才能称为一个 handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {


//读取数据实际(这里我们可以读取客户端发送的消息)

/**
* 1.ChannelHandlerContext ctx:上下文对象, 含有 管道 pipeline , 通道 channel, 地址
* 2.Object msg: 就是客户端发送的数据 默认 Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
System.out.println("server ctx =" + ctx);
System.out.println("看看 channel 和 pipeline 的关系");
Channel channel = ctx.channel();

//本质是一个双向链接, 出站入站
//ChannelPipeline pipeline = ctx.pipeline()

//将 msg 转成一个 ByteBuf
//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + channel.remoteAddress());
}

//数据读取完毕

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端-->", CharsetUtil.UTF_8));
}


//处理异常, 一般是需要关闭通道

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}

NettyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();

try {

//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();


//设置相关参数
bootstrap.group(group)
//设置线程组
.channel(NioSocketChannel.class)
// 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
//加入自己的处理器
ch.pipeline().addLast(new NettyClientHandler());

}
});


System.out.println("客户端 ok..");

//启动客户端去连接服务器端
//关于 ChannelFuture 要分析,涉及到 netty 的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听

channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}

NettyClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

//当通道就绪就会触发该方法

@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: -->", CharsetUtil.UTF_8));
}

//当通道有读取事件时,会触发

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){


ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

Netty 异步模型

基本介绍

  1. 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
  2. Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
  3. 调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得
    IO 操作结果
  4. Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 去监控方法 fun 的处理过程(即 : Future-Listener 机制)

Future 说明

  1. 表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,比如检索计算等等.
  2. ChannelFuture 是一个接口 : public interface ChannelFuture extends Future我们可以添加监听器,当监听的事件发生时,就会通知到监听器

图解

数据读取 -> 数据处理(发送方) -> 数据传输 -> 数据处理(接收方) -> 显示数据

代码

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Server {
public static void main(String[] args) throws InterruptedException {

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try{

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, workerGroup).
channel(NioServerSocketChannel.class).
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 取得管道
ChannelPipeline pipeline = ch.pipeline();
// HttpServerCodec 是netty提供的处理http的 编,解码器
pipeline.addLast("TestHttpServerCodec",
new HttpServerCodec())
//添加自定义handler
.addLast("TestHttpServerHandler",
new TestHttpServerHandler());
}
});

// 异步
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();

channelFuture.channel().close().sync();

}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}

}

Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
throws Exception {

// 判断是否是httpRequest请求
if (msg instanceof HttpRequest){

Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();

System.out.println("pipeline ==> " + pipeline.hashCode());
System.out.println("handler ==> " + this.hashCode());
System.out.println("msg type ==> " + msg.getClass());
System.out.println("client address ==> " + channel.remoteAddress());

HttpRequest httpRequest = (HttpRequest) msg;

URI uri = new URI(httpRequest.uri());

if ("/favicon.ico".equals(uri.getPath())){
System.out.println("浏览器请求了网站ico");
return;
}

// 消息响应回复
ByteBuf content = Unpooled.copiedBuffer("server here", CharsetUtil.UTF_8);

// 构建http响应
FullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK, content);

response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

// 将设置好的response返回
ctx.writeAndFlush(response);

}

}
}