博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
1. Netty准备知识:Java NIO
阅读量:5091 次
发布时间:2019-06-13

本文共 12963 字,大约阅读时间需要 43 分钟。

前言:我们知道,Netty是基于NIO开发的一套框架,在学习Netty之前,我们先学习下Java NIO。

一、IO多路复用模型

  IO多路复用模型使用了Reactor设计模式,主要有三种实现:Reacotr单线程、Reactor多线程、Reactor主从模式。

1. Reactor单线程

  在Reactor单线程模式中,所有客户端的请求处理都交给一个线程,串行化处理,效率较低。

2. Reactor多线程

  在Reactor多线程模式中,acceptor线程负责接受客户端请求并将请求处理任务交给线程池,提升了请求处理速度。但是当client数量过多时,单线程就无法同时处理那么多的请求,造成瓶颈问题。

3. Reactor主从模式

  为了解决Reactor多线程请求转发瓶颈问题,Reactor主从模式将acceptor设计为线程池,用以处理客户端请求。

二、NIO使用示例

1. NIO服务端通讯示例

public class TimeServer {    public static void main(String[] args) throws IOException {        int port = 8080;        if (args != null && args.length > 0) {            try {                port = Integer.valueOf(args[0]);            } catch (NumberFormatException e) {                // 采用默认值            }        }        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);        new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();    }}public class MultiplexerTimeServer implements Runnable {    private Selector selector;    private ServerSocketChannel servChannel;    private volatile boolean stop;//保证线程可见(volitile关键字)    /**     * 初始化多路复用器、绑定监听端口     */    public MultiplexerTimeServer(int port) {        try {            selector = Selector.open();//创建多路复用器            servChannel = ServerSocketChannel.open();//打开ServerSocketChannel,用于监听客户端链接            servChannel.configureBlocking(false);//设置非阻塞            servChannel.socket().bind(new InetSocketAddress(port), 1024);//绑定端口            servChannel.register(selector, SelectionKey.OP_ACCEPT);//注册监听(监听ACCEPT事件)            System.out.println("The time server is start in port : " + port);        } catch (IOException e) {            e.printStackTrace();            System.exit(1);        }    }    public void stop() {        this.stop = true;    }    /**     * 无限轮询准备就绪的key,并对其进行处理     */    @Override    public void run() {        while (!stop) {            try {                selector.select(1000);                Set
selectedKeys = selector.selectedKeys(); Iterator
it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 处理新接入的请求消息 if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else { // 读到0字节,忽略 } } } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } }}

服务端通讯序列图如下:

 

2. 客户端通讯示例

public class TimeClient {    public static void main(String[] args) {        int port = 8080;        if (args != null && args.length > 0) {            try {                port = Integer.valueOf(args[0]);            } catch (NumberFormatException e) {                // 采用默认值            }        }        new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();    }}public class TimeClientHandle implements Runnable {    private String host;    private int port;    private Selector selector;    private SocketChannel socketChannel;    private volatile boolean stop;    public TimeClientHandle(String host, int port) {        this.host = host == null ? "127.0.0.1" : host;        this.port = port;        try {            selector = Selector.open();// 创建多路复用器            socketChannel = SocketChannel.open();// 打开SocketChannel            socketChannel.configureBlocking(false);// 设置非阻塞模式        } catch (IOException e) {            e.printStackTrace();            System.exit(1);        }    }    /**     * 创建连接,无线循环准备好的key并对其进行处理     */    @Override    public void run() {        try {            // 创建连接            doConnect();        } catch (IOException e) {            e.printStackTrace();            System.exit(1);        }        while (!stop) {            try {                selector.select(1000);                Set
selectedKeys = selector.selectedKeys(); Iterator
it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 判断是否连接成功 SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ);//注册READ事件 doWrite(sc); } else System.exit(1);// 连接失败,进程退出 } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); this.stop = true; } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else { // 读到0字节,忽略 } } } } private void doConnect() throws IOException { // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else socketChannel.register(selector, SelectionKey.OP_CONNECT); } private void doWrite(SocketChannel sc) throws IOException { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) System.out.println("Send order 2 server succeed."); }}

客户端通讯序列图如下:

 

 

 

 

三、NIO类库介绍

1. 缓冲区Buffer

  Buffer是一个对象,它包含一些要写入或读出的数据。在NIO中,所有的数据都是在缓冲区处理的,读数据时,直接读进缓冲区,写数据时,直接写在缓冲区。

  Buffer类库继承关系:

我们最常用的就是ByteBuffer,这里介绍下ByteBuffer的几个参数及使用方法:

capacity 数组容量,创建后不可变
limit 当写数据到buffer中时,limit一般和capacity相等,当读数据时,limit代表buffer中有效数据的长度
position 位置,下一次要被读或被写的位置
mark 标记,调用mark()来设置mark=position,调用reset()可以让position恢复到标记位置
clear() 令position=0;limit=capacity;mark=-1;  但是不清除byte数组内容
reset() 把position设置成mark的值,相当于之前做过一个标记,现在要退回到之前标记的地方
flip() 令limit=position;position=0
allocate(int capavity) 从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器
allocateDirect(int capacity) 在堆外内存中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器
wrap(byte[] array) 将byte数组包装成ByteBuffer

tips:ByteBuffer有两个比较重要的实现,HeapByteBuffer 和 DirectByteBuffer。

  HeapByteBuffer:在堆内申请的内存,利于维护

  DirectByteBuffer:在堆外申请的内存,实现零拷贝,提升数据操作速度(外部读取JVM堆中数据是先把JVM数据读到一个内存块中,然后在这个块里读取,使用堆外内存可省去这一步骤)。

2. 通道Channel

特性:

1. 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
2. 通道可以异步地读写。
3. 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。
 
重要实现:
FileChannel:从文件中读写数据
DatagramChannel:通过UDP读写网络中的数据,因为UDP是无连接的网络协议,所以不能像其它通道那样读取和写入,它发送和接收的是数据包
SocketChannel:通过TCP读写网络中的数据
ServerSocketChannel:监听新进来的TCP连接,对每一个连接创建一个SocketChannel

  示例(这里只举例了FileChannel 和 DatagramChannel,其他两种看上面第二部分):

//FileChannel示例:RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");FileChannel inChannel = aFile.getChannel();// 设置1MB的缓冲区ByteBuffer buf = ByteBuffer.allocate(1024);// 读取数据到buf中,并返回字节数int bytesRead = inChannel.read(buf);while (bytesRead != -1) {    System.out.println("Read " + bytesRead);    buf.flip(); // 重设缓冲区 postion = 0 ,limit = 原本position    while(buf.hasRemaining()){ //缓冲区中是否还有内容        // 或者使用buf.get(bytes),将数据读进字节数组中        System.out.print((char) buf.get());    }    buf.clear();//清空缓存区    // 缓冲区只有1MB大小,需要循环读取    bytesRead = inChannel.read(buf);}aFile.close();//DataGramChannel:// 打开连接DatagramChannel channel = DatagramChannel.open();channel.socket().bind(new InetSocketAddress(port));// buffer接收channel的数据channel.receive(buf);//如果buffer容不下收到的数据,多出的数据将被抛弃// 发送数据channel.send(buf, new InetSocketAddress("baidu.com", 80));// 连接到特定地址:由于UDP是无连接的,连接到特定的地址并不会像TCP通道那样创建一个真正的连接,而是锁住DatagramChannel,让其只能从特定地址收发数据channel.connect(new InetSocketAddress("baidu.com", 80));连接后,可以使用read()和write()方法,但数据传送无保证channel.read(buf);channel.write(buf);
Channel示例

3. 多路复用器Selector

  Selector不断轮询注册在其上的Channel,如果某个Channel上有新的TCP连接、读、写操作,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey就可以获取到就绪Channel集合,进行后续的IO操作。

  一些方法:

// 1. 创建SelectorSelector selector = Selector.open();// 2. 注册通道channel.configureBlocking(false);//channel必须处于非阻塞状态SelectionKey key = channel.register(selector, Selector.OP_READ);   int select():阻塞到至少有一个通道在你注册的事件上就绪了,返回值为多少通道已就绪int select(long timeout):同上,最长阻塞timeout毫秒int selectNow():不阻塞,不管什么通道都立即返回Set selectedKeys = selector.selectedKeys():返回已就绪的通道的SelectedKey

我们获取到的是SelectionKey,该对象中包含了一些有价值的属性:

insterest集合:OP_CONNECT、OP_ACCEPT、OP_READ、OP_WRITE       int interestSet = selectionKey.interestOps();    boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;    boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;    boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;    boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;ready集合:已准备就绪的操作的集合(你注册了监听connect,那么除了connect其它都是false),在一次选择(Selection)之后,会首先访问这个ready set     int readySet = selectionKey.readyOps();     boolean isAccept = selectionKey.isAcceptable();     boolean isConnect = selectionKey.isConnectable();     boolean isReadable = selectionKey.isReadable();     boolean isWritable = selectionKey.isWritable();channel:Channel  channel  = selectionKey.channel();selector:Selector selector = selectionKey.selector();附加对象(可选):SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

 

转载于:https://www.cnblogs.com/lovezmc/p/11547841.html

你可能感兴趣的文章
GridView如何实现双击行进行编辑,更新
查看>>
LINUX 命令行编辑快捷键
查看>>
redis持久化RDB与AOF
查看>>
信息化基础建设 开发框架
查看>>
讲给普通人听的分布式数据存储【转载】
查看>>
ASIHTTPRequest是什么?
查看>>
将博客搬至CSDN
查看>>
数据结构:散列函数的构造方法
查看>>
(C++)String的用法
查看>>
MVC 3 HTML 编码
查看>>
Knockout学习之前言
查看>>
php中使用swoole实现头协议
查看>>
Redis全方位讲解--哨兵模式(Sentinel模式)
查看>>
src 和 href 区别(转载)
查看>>
鱼C《零基础入门学习Python》10-17节课时知识点总结
查看>>
简单的python http接口自动化脚本
查看>>
C# 导出Excel的示例
查看>>
验证码帮助类
查看>>
深入浅出设计模式——桥接模式(Bridge Pattern)
查看>>
使用MongoDB C#官方驱动操作MongoDB
查看>>