设计模式 - Reactor 模式

news/2024/9/9 13:49:16

https://blog.csdn.net/saienenen/article/details/111400911

1. Reactor模式简介

Netty是典型的Reactor模型结构。Reactor模式也叫反应器模式,大多数IO相关组件如Netty、Redis在使用的IO模式。

2. 多线程 IO 的致命缺陷

最最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理,类似:

while(true){socket = accept();handle(socket)
}
1234

这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。

之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:

class BasicModel implements Runnable {public void run() {try {ServerSocket ss =new ServerSocket(SystemConfig.SOCKET_SERVER_PORT);while (!Thread.interrupted())new Thread(new Handler(ss.accept())).start();//创建新线程来handle// or, single-threaded, or a thread pool} catch (IOException ex) { /* ... */ }}
​static class Handler implements Runnable {final Socket socket;Handler(Socket s) { socket = s; }public void run() {try {byte[] input = new byte[SystemConfig.INPUT_SIZE];socket.getInputStream().read(input);byte[] output = process(input);socket.getOutputStream().write(output);} catch (IOException ex) { /* ... */ }}private byte[] process(byte[] input) {byte[] output=null;/* ... */return output;}}
}

对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。 tomcat服务器的早期版本确实是这样实现的。

2.1 多线程并发模式,一个连接一个线程的优点是:

一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。另外有个问题,如果一个线程中对应多个socket连接不行吗?语法上确实可以,但是实际上没有用,每一个socket都是阻塞的,所以在一个线程里只能处理一个socket,就算accept了多个也没用,前一个socket被阻塞了,后面的是无法被执行到的。

2.2 多线程并发模式,一个连接一个线程的缺点是:

缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。

改进方法是: 采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。

3. 单线程Reactor模型

3.1 Reactor模型的朴素原型

Java的NIO模式的Selector网络通讯,其实就是一个简单的Reactor模型。可以说是Reactor模型的朴素原型。

 static class Server{
​public static void testServer() throws IOException{
​// 1、获取Selector选择器Selector selector = Selector.open();
​// 2、获取通道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 3.设置为非阻塞serverSocketChannel.configureBlocking(false);// 4、绑定连接serverSocketChannel.bind(new InetSocketAddress(SystemConfig.SOCKET_SERVER_PORT));
​// 5、将通道注册到选择器上,并注册的操作为:“接收”操作serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
​// 6、采用轮询的方式,查询获取“准备就绪”的注册过的操作while (selector.select() > 0){// 7、获取当前选择器中所有注册的选择键(“已经准备就绪的操作”)Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (selectedKeys.hasNext()){// 8、获取“准备就绪”的时间SelectionKey selectedKey = selectedKeys.next();
​// 9、判断key是具体的什么事件if (selectedKey.isAcceptable()){// 10、若接受的事件是“接收就绪” 操作,就获取客户端连接SocketChannel socketChannel = serverSocketChannel.accept();// 11、切换为非阻塞模式socketChannel.configureBlocking(false);// 12、将该通道注册到selector选择器上socketChannel.register(selector, SelectionKey.OP_READ);}else if (selectedKey.isReadable()){// 13、获取该选择器上的“读就绪”状态的通道SocketChannel socketChannel = (SocketChannel) selectedKey.channel();
​// 14、读取数据ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int length = 0;while ((length = socketChannel.read(byteBuffer)) != -1){byteBuffer.flip();System.out.println(new String(byteBuffer.array(), 0, length));byteBuffer.clear();}socketChannel.close();}
​// 15、移除选择键selectedKeys.remove();}}
​// 7、关闭连接serverSocketChannel.close();}
​public static void main(String[] args) throws IOException{testServer();}}

实际上的Reactor模式,是基于Java NIO的,在他的基础上,抽象出来两个组件——Reactor和Handler两个组件:

  1. Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。

  2. Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。

3.2 什么是单线程Reactor呢?

如下图所示:

在这里插入图片描述

这是最简单的单Reactor单线程模型。Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分派请求到Handler处理器中。

下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多,Reactor和Hander 处于一条线程执行。 在这里插入图片描述

顺便说一下,可以将上图的accepter,看做是一种特殊的handler。

3.3 单线程Reactor的参考代码

“Scalable IO in Java”,实现了一个单线程Reactor的参考代码,Reactor的代码如下:

class Reactor implements Runnable
{final Selector selector;final ServerSocketChannel serverSocket;
​Reactor(int port) throws IOException{ //Reactor初始化selector = Selector.open();serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(port));//非阻塞serverSocket.configureBlocking(false);
​//分步处理,第一步,接收accept事件SelectionKey sk =serverSocket.register(selector, SelectionKey.OP_ACCEPT);//attach callback object, Acceptorsk.attach(new Acceptor());}
​public void run(){try{while (!Thread.interrupted()){selector.select();Set selected = selector.selectedKeys();Iterator it = selected.iterator();while (it.hasNext()){//Reactor负责dispatch收到的事件dispatch((SelectionKey) (it.next()));}selected.clear();}} catch (IOException ex){ /* ... */ }}
​void dispatch(SelectionKey k){Runnable r = (Runnable) (k.attachment());//调用之前注册的callback对象if (r != null){r.run();}}
​// inner classclass Acceptor implements Runnable{public void run(){try{SocketChannel channel = serverSocket.accept();if (channel != null)new Handler(selector, channel);} catch (IOException ex){ /* ... */ }}}
}

Handler

class Handler implements Runnable
{final SocketChannel channel;final SelectionKey sk;ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);static final int READING = 0, SENDING = 1;int state = READING;
​Handler(Selector selector, SocketChannel c) throws IOException{channel = c;c.configureBlocking(false);// Optionally try first read nowsk = channel.register(selector, 0);
​//将Handler作为callback对象sk.attach(this);
​//第二步,注册Read就绪事件sk.interestOps(SelectionKey.OP_READ);selector.wakeup();}
​boolean inputIsComplete(){/* ... */return false;}
​boolean outputIsComplete(){
​/* ... */return false;}
​void process(){/* ... */return;}
​public void run(){try{if (state == READING){read();}else if (state == SENDING){send();}} catch (IOException ex){ /* ... */ }}
​void read() throws IOException{channel.read(input);if (inputIsComplete()){
​process();
​state = SENDING;// Normally also do first write now
​//第三步,接收write就绪事件sk.interestOps(SelectionKey.OP_WRITE);}}
​void send() throws IOException{channel.write(output);
​//write完就结束了, 关闭select keyif (outputIsComplete()){sk.cancel();}}
}
  1. Select 是前面 I/O 复用模型姐扫的标准网络编程 API,可以通过一个阻塞对象监听多路连接请求

  2. Reactor 对象通过 Select 监控客户端请求事件,收到事件后 通过 Dispatch 进行分发

  3. 如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后续的业务处理

  4. 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应

  5. Handler 会完成 Read -> 业务处理 -> send 的完整业务流程

3.4 单线程Reactor模式的缺点:

  1. 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。

  2. 因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。

4.多线程的Reactor

4.1 基于线程池的改进

在线程Reactor模式基础上,做如下改进:

  1. 将Handler处理器的执行放入线程池,多线程进行业务处理。

  2. 而对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。

一个简单的图如下:

在这里插入图片描述

4.2 改进后的完整示意图

下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多,只是更加详细。Reactor是一条独立的线程,Hander 处于线程池中执行。

在这里插入图片描述

4.3 多线程Reactor的参考代码

“Scalable IO in Java”,的多线程Reactor的参考代码,是基于单线程做一个线程池的改进,改进的Handler的代码如下:

class MthreadHandler implements Runnable
{final SocketChannel channel;final SelectionKey selectionKey;ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);static final int READING = 0, SENDING = 1;int state = READING;
​
​ExecutorService pool = Executors.newFixedThreadPool(2);static final int PROCESSING = 3;
​MthreadHandler(Selector selector, SocketChannel c) throws IOException{channel = c;c.configureBlocking(false);// Optionally try first read nowselectionKey = channel.register(selector, 0);
​//将Handler作为callback对象selectionKey.attach(this);
​//第二步,注册Read就绪事件selectionKey.interestOps(SelectionKey.OP_READ);selector.wakeup();}
​boolean inputIsComplete(){/* ... */return false;}
​boolean outputIsComplete(){
​/* ... */return false;}
​void process(){/* ... */return;}
​public void run(){try{if (state == READING){read();}else if (state == SENDING){send();}} catch (IOException ex){ /* ... */ }}
​
​synchronized void read() throws IOException{// ...channel.read(input);if (inputIsComplete()){state = PROCESSING;//使用线程pool异步执行pool.execute(new Processer());}}
​void send() throws IOException{channel.write(output);
​//write完就结束了, 关闭select keyif (outputIsComplete()){selectionKey.cancel();}}
​synchronized void processAndHandOff(){process();state = SENDING;// or rebind attachment//process完,开始等待write就绪selectionKey.interestOps(SelectionKey.OP_WRITE);}
​class Processer implements Runnable{public void run(){processAndHandOff();}}
​
}
  1. Reactor 对象通过 Select 监控客户端请求事件,收到事件后 通过 Dispatch 进行分发

  2. 如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后续的业务处理

  3. 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来处理

  4. Handler 只负责 响应事件,不做具体的业务处理,通过read 读取到数据之后,会分发给后面的 pool 线程池的某个线程处理业务

  5. pool 线程池会分配独立线程完成真正的业务,并将结果返回给 Handler

  6. handler 收到响应后,通过 send 将结果返回给 client

5. Reactor 持续改进 - 主从Reactor

在这里插入图片描述

对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。代码如下:

class MthreadReactor implements Runnable
{
​//subReactors集合, 一个selector代表一个subReactorSelector[] selectors=new Selector[2];int next = 0;final ServerSocketChannel serverSocket;
​MthreadReactor(int port) throws IOException{   //Reactor初始化selectors[0]=Selector.open();selectors[1]= Selector.open();serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(port));//非阻塞serverSocket.configureBlocking(false);
​//分步处理,第一步,接收accept事件SelectionKey sk =serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT);//attach callback object, Acceptorsk.attach(new Acceptor());}
​public void run(){try{while (!Thread.interrupted()){for (int i = 0; i <2 ; i++){selectors[i].select();Set selected =  selectors[i].selectedKeys();Iterator it = selected.iterator();while (it.hasNext()){//Reactor负责dispatch收到的事件dispatch((SelectionKey) (it.next()));}selected.clear();}
​}} catch (IOException ex){ /* ... */ }}
​void dispatch(SelectionKey k){Runnable r = (Runnable) (k.attachment());//调用之前注册的callback对象if (r != null){r.run();}}
​
​class Acceptor { // ...public synchronized void run() throws IOException{SocketChannel connection =serverSocket.accept(); //主selector负责acceptif (connection != null){new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection}if (++next == selectors.length) next = 0;}}
}
  1. Reactor 主线程 MainReactor 对象通过 select 监听连接事件,收到事件后,通过 Acceptor 处理连接事件

  2. 当 Acceptor 处理 连接事件后,MainReactor 将连接分配给 SubReactor

  3. SubReactor 将连接加入到连接队列进行监听,并创建 Handler 进行各种事件处理

  4. 当有 新事件发生时,SubReactor 就会调用对应的 Handler 处理

  5. Handler 通过 read 读取数据,分发给后面的 Processer 线程处理

  6. pool 线程池分配独立的 Processer 线程进行业务处理,并返回结果

  7. Handler 收到响应的结果后,再通过 send 将结果返回给 client

  8. Reactor 主线程可以对应多个 Reactor 子线程,即 MainReactor 可以关联多个 SubReactor

6. Reactor 编程的优点

6.1 优点

1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;

2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;

3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;

4)可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;


https://www.xjx100.cn/news/663765.html

相关文章

Java IO篇:什么是 Reactor 网络模型?

一、什么是 Reactor 模型&#xff1a; The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and di…

佳能2545i打印机驱动安装问题

记录一个打印机驱动安装的问题&#xff0c;起初在佳能售后官网下载了第一个驱动Generic PCL6 -2.86这个版本&#xff0c;但是装上之后没有效果&#xff0c;就很奇怪&#xff0c;然后询问了同事说是下载UFRII LT这个版本的就可以&#xff0c;最终成功打印&#xff0c;由衷的的感…

C++ QT读写Microsoft Word文档基操

以下是一个简单的示例&#xff0c;演示如何使用C QT读写Microsoft Word文档&#xff1a; #include <QAxObject> #include <QDebug> void readWordDocument(QString filePath) { QAxObject* word new QAxObject("Word.Application", 0);…

如何高质量产出与进步

个人困惑 明明有很多时间&#xff0c;却感觉没有事做。明明有很多事做&#xff0c;却不知道从哪里做起。明明知道某个事很重要&#xff0c;却一直无法开始。眼前有好几件事&#xff0c;却无法推进。本来想好要做的事&#xff0c;中途坚持不下去了。 背后根因思考 直接原因是…

VTK学习之光照和相机

目录 一、VTK光照 1、关于vtkLight常用的方法 2、最终效果 二、相机设置 1、相机设置 2、效果 一、VTK光照 通过设置光照&#xff0c;可以达到不同颜色的目的&#xff0c;参考博客&#xff1a; VTK修炼之道7_三维场景基本要素:光照_vtk 光照_沈子恒的博客-CSDN博客 1…

珂朵莉树ODT(基于std::set的暴力玄学数据结构)

使一整段区间内的东西变得一样&#xff0c;数据随机。 在具有区间赋值操作&#xff0c;区间统计操作&#xff0c;以及最好保证数据随机的情况下在时空复杂度上把线段树吊起来打。 珂朵莉树的各种操作的总体复杂度始终为O(NlogN)&#xff0c;这会吊打某些常数大、附加工作会影…

DDR中的ODT

ODT电阻端接 ODT (on-die termination) 裸片终端&#xff08;ODT&#xff09;功能允许DRAM通过ODT控制引脚为x4 / x8配置的每个DQ&#xff0c;DQS / DQS&#xff0c;RDQS / RDQS和DM信号打开/关闭终端电阻。对于x16配置&#xff0c;ODT通过ODT控制引脚应用于每个DQ&#xff0…

java解析.odt文件

1、了解.odt文件 .odt文件是openoffice软件产生的文档格式&#xff0c;可以直接用office打开&#xff0c;这其实就是一个压缩包&#xff0c;可以使用解压软件打开&#xff0c;里面有一个content.xml文件&#xff0c;这个文件内有<text:p>标签&#xff0c;标签内就是展示出…