Netty学习-java NIO编程

Netty学习-java NIO编程

二月 09, 2020

一. Java NIO 基本介绍

  1. Java Nio 是从JDK1.4以后提供的新API, 改进了输入输出, 被统称为NIO (New IO), 是同步非阻塞的.
  2. NIO 有三个核心部分: Channel(通道), Buffer(缓冲区), Selector(选择器)
  3. NIO 是面向缓冲区(面向块)编程的.接受的数据被读取到一个缓冲区,需要时可以在缓冲区移动, 增加处理过程中的灵活性, 使用他可以提供非阻塞式的高伸缩性网络.
  4. NIO 的非阻塞模式, 让一个线程从某通道发送请求或者读取数据, 但是他仅能获取当前可用的数据, 若当前没有数据可用, 就什么也不做, 不会保持线程阻塞, 直到数据变成可用之前, 该线程可以做其他事情.
  5. NIO 可以做到用一个线程来处理多个操作, 比如 有大量连接发向服务端, 使用NIO 只需要启动少量线程就可以处理, 不需要向阻塞io那样使用对等数量的线程.
  6. HTTP2.0 使用了多路复用技术, 使同一个连接可以并发处理多个请求, 并发量比HTTP1.1大了好几个数量级.


二. NIO主要工作流程

三. NIO 三大核心(Selector, Channel, Buffer)说明

1. Selector, Channel和Buffer关系图

  • 每个Channel都会对应一个Buffer
  • Selector对应一个线程, 一个线程对应多个Channel
  • Channel注册到Selector
  • Event决定Selector切换的Channel
  • Buffer就是一个内存块, 底层是一个数组
  • 数据的读取和写入通过Buffer,(BIO中要么是输入流,要么是输出流,不能双向). NIO中的Buffer是可以读也可以写, 需要使用flip方法切换, Channel是爽向的, 有相关操作系统底层属性

2. Buffer (缓冲区)

(1). 基本介绍

缓冲区的本质是一个可以读写数据的内存块, 可以理解成一个容器对象, 该对象提供一组方法可以轻松地使用内存块, 缓冲区对象内置了一些机制, 可以跟踪和记录缓冲区的状态变化情况. Channel 提供从文件, 网络读取数据的渠道, 但是读取或者写入的数据 必须经过Buffer

NIO <--- data ---> 缓冲区 <--- channel ---> 文件

(2). Buffer类及其子类

  • Buffer (java.nio)
    • IntBuffer (java.nio)
      • DirectIntBufferU (java.nio)
        • DirectIntBufferRU (java.nio)
      • ByteBufferAsIntBufferL (java.nio)
        • ByteBufferAsIntBufferRL (java.nio)
      • DirectIntBufferS (java.nio)
        • DirectIntBufferRS (java.nio)
      • ByteBufferAsIntBufferB (java.nio)
        • ByteBufferAsIntBufferRB (java.nio)
      • HeapIntBuffer (java.nio)
        • HeapIntBufferR (java.nio)
    • FloatBuffer (java.nio)
      • ByteBufferAsFloatBufferB (java.nio)
      • DirectFloatBufferU (java.nio)
      • ByteBufferAsFloatBufferL (java.nio)
      • HeapFloatBuffer (java.nio)
      • DirectFloatBufferS (java.nio)
    • CharBuffer (java.nio)
      • ByteBufferAsCharBufferL (java.nio)
      • DirectCharBufferS (java.nio)
      • StringCharBuffer (java.nio)
      • HeapCharBuffer (java.nio)
      • ByteBufferAsCharBufferB (java.nio)
      • DirectCharBufferU (java.nio)
    • DoubleBuffer (java.nio)
      • HeapDoubleBuffer (java.nio)
      • DirectDoubleBufferU (java.nio)
      • ByteBufferAsDoubleBufferB (java.nio)
      • DirectDoubleBufferS (java.nio)
      • ByteBufferAsDoubleBufferL (java.nio)
    • ShortBuffer (java.nio)
      • DirectShortBufferU (java.nio)
      • ByteBufferAsShortBufferL (java.nio)
      • ByteBufferAsShortBufferB (java.nio)
      • HeapShortBuffer (java.nio)
      • DirectShortBufferS (java.nio)
    • LongBuffer (java.nio)
      • HeapLongBuffer (java.nio)
      • ByteBufferAsLongBufferB (java.nio)
      • DirectLongBufferU (java.nio)
      • ByteBufferAsLongBufferL (java.nio)
      • DirectLongBufferS (java.nio)
    • ByteBuffer (java.nio)
      • HeapByteBuffer (java.nio)
      • MappedByteBuffer (java.nio)

(3). Buffer 重要属性

属性 描述
Capactity 容量, 可以容纳的最大数据量, 在缓冲区创建时设定并且不能改变
Limit 表示缓冲区的当前终点, 不能对缓冲区超过极限的位置进行读写操作, 极限可以修改
Position 位置, 下一个要被读或写的元素的索引, 每次读写缓冲区数据时会改值,为下次读写做准备
Mark 标记

3. Channel(通道)

(1) 基本介绍

* NIO的通道类似流, 但是有区别

  • 通道可以进行读写,而流只能读或者只能写
  • 通道可以实现异步读写数据
  • 通道可以从缓冲读取数据, 也可以写数据到缓冲

* 常见的Channel类有: FileChannel (用于文件的数据读写), DatagramChannel (用于UDP的数据读写), ServerSocketChannel 和 SocketChannel(用于TCP的数据读写)

(2) 使用FileChannel和ByteBuffer做代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

/**
* 写入数据到本地文件
* @param message 数据
* @param address 磁盘地址
*/
public static void localWrite(String message, String address) throws Exception {

/* 创建一个输出流来获取channel */
FileOutputStream fileOutputStream = new FileOutputStream(address);

/* 通过fileOutputStream获取对应的FileChannel */
FileChannel fileChannel = fileOutputStream.getChannel();

/* 创建一个缓冲区 ByteBuffer */
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

/* 将message放入 byteBuffer (对于缓冲区来说,当前是写入的状态) */
byteBuffer.put(message.getBytes());

/* 将byteBuffer进行状态切换, 由写入状态转为读取状态 */
byteBuffer.flip();

/* 将从byteBuffer中读取到的数据写入到channel */
fileChannel.write(byteBuffer);

fileOutputStream.close();

}

/**
* 从本地磁盘文件中读取数据并打印
* @param address 本地磁盘文件路径
*/
public static void localRead(String address) throws Exception {

File file = new File(address);

/* 从本地磁盘路径获取该文件的输出流 */
FileInputStream fileInputStream = new FileInputStream(file);

/* 通过fileInputStream获取对应的FileChannel */
FileChannel fileChannel = fileInputStream.getChannel();

/* 创建一个缓冲区 */
ByteBuffer byteBuffer = ByteBuffer.allocate((int)file.length());

/* 将通道的数据读入到Buffer */
fileChannel.read(byteBuffer);

System.out.println(new String(byteBuffer.array()));

fileInputStream.close();
}

4.Selector

(1) 基本介绍

  1. Java的nio使用的是非阻塞的IO方式, 使用一个线程来处理多个客户端的连接时会使用到Selector
  2. 多个Channel可以注册到Selector上, Selector可以检测注册到其上的Channel, 如果有Event(事件)发生就会获取事件, 然后针对每个事件进行相应的处理.
  3. Selector只有在通道真正有读写事件发生时才会进行读写, 这样大大地减少了系统开销, 并且不必为每个线程都创建一个线程, 减少系统资源消耗.
  4. 避免多线程之间的上下文切换导致的开销.
  5. Netty的io线程NioEventLoop聚合了Selector(选择器, 或者说多路复用器),可以同时并发处理成百上千的客户端连接.

(2) Selector 相关方法说明

  • selector.select() // 阻塞
    selector.select( 1*1000 ) // 阻塞1秒, 一秒后返回
  • selector.wakeup() // 唤醒 selector
  • selector.selectNow() // 不阻塞, 立即返回

(3) Nio非阻塞网络编程原理分析

NIO(Selector, SelectionKey, ServerSocketChannel, SocketChannel)关系图

  1. 当客户端连接时, 会通过ServerSocketChannel得到SocketChannel
  2. Selector进行监听select方法, 返回有事件发生的通道数(ServerSocketChannel也会注册到Selector)
  3. 将socketChannel注册到Selector上, register(Selector sel, int ops), 一个Selector可以注册多个socketChannel
  4. socketChannel注册后会返回一个SelectionKey, 并且和该Selector关联(集合)
  5. 可以通过socketChannel反向获取socketChannel
  6. 调用socketChannel的channel()方法可以进行相应的业务处理

(4) Nio非阻塞网络编程案例

  • nio入门, 实现简单服务端和客户端之间的数据通信(非阻塞)


NioServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class NioServer {

public static void main(String[] args) throws Exception {

// 创建ServerSocketChannel 来获取serverSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

// 得到一个Selector对象
Selector selector = Selector.open();

// 绑定端口8888, 在服务端监听
serverSocketChannel.socket().bind(new InetSocketAddress(8888));

// 设置为非阻塞
serverSocketChannel.configureBlocking(false);

// 把serverSocketChannel注册到selector监听事件上, 事件为OP_ACCEPT(连接建立)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true){


if (selector.select(1000) == 0){
/* System.out.println("服务器等待1s, 没有连接"); */
continue;
}

/*
selector.selectedKeys() 可以获取到相关的selectionKey集合
通过selectionKeys反向获取通道
*/
Set<SelectionKey> selectionKeys = selector.selectedKeys();

for (SelectionKey selectionKey : selectionKeys) {

// 根据key对应的通道发生的事件做相应处理
if (selectionKey.isAcceptable()) {
// 如果是OP_ACCEPT, 则有新的客户端连接

SocketChannel socketChannel = null;
try {
socketChannel = serverSocketChannel.accept();
} catch (IOException e) {
e.printStackTrace();
}

assert socketChannel != null;
System.out.println("客户端连接成功: " + socketChannel.hashCode());

// 将SocketChannel设置为非阻塞
try {
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}

/*
将socketChannel注册到selector, 关注事件为OP_READ, 同时给socketChannel关联一个Buffer
*/
try {
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
} catch (ClosedChannelException e) {
e.printStackTrace();
}

if (selectionKey.isReadable()) {
//发生OP_READ事件
SocketChannel channel = (SocketChannel) selectionKey.channel();

// 获取该channel关联的buffer
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();

try {
channel.read(buffer);
} catch (IOException e) {
e.printStackTrace();
}

System.out.println("客户端: " + new String(buffer.array()));

}

selectionKeys.remove(selectionKey);
}
}

}
}

NioClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    public class NioClient {
public static void main(String[] args) throws Exception {


// 得到一个网路连接
SocketChannel socketChannel = SocketChannel.open();

// 设置为非阻塞
socketChannel.configureBlocking(false);

// 提供服务器端的ip和端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8888);

if (!socketChannel.connect(inetSocketAddress)){

while (!socketChannel.finishConnect()){
System.out.println("连接需要时间, 连接未能完成, 但客户端不会阻塞");
}
}

String str = "hello world";

// 把byte写入缓冲区
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());

socketChannel.write(buffer);

System.in.read();
}
}

5. SelectionKey

SelectionKey表示Selector和网络通道的注册关系, 分为四种:

  • int OP_ACCEPT: 有新的网络连接可以 accept, 值为16 (1<<4) < li>
  • int OP_CONNECT: 表示连接已经建立, 值为8 (1<<3) < li>
  • int OP_WRITE: 代表写操作, 值为4(1<<2) < li>
  • int OP_READ: 代表读操作, 值为1 (1<<0) < li>


SelectionKey相关方法:

  • public abstract Selector selector(); // 得到与之关联的Selector对象
  • public abstract SelectableChannel channel(); //得到与之关联的通道
  • public abstract Object attachment(); //得到与之关联的共享数据
  • public abstract SelectionKey insterestOps(int ops); //设置或改变监听事件
  • public final boolean isAcceptable(); // 是否可accept
  • public final boolean isReadable(); // 是否可读
  • public final boolean isWritable(); // 是否可写