高级IO—poll,epoll,reactor

news/2024/4/17 17:40:47

高级IO—poll,epoll,reactor

文章目录

  • 高级IO—poll,epoll,reactor
    • poll函数
      • poll函数接口
      • poll服务器
    • epoll
      • epoll的系统调用
        • epoll_create
        • epoll_ctl
        • epoll_wait
      • epoll的工作原理
      • epoll的工作方式
        • 水平触发
        • 边缘触发
      • epoll服务器
    • reactor

poll函数

poll函数是一个用于多路复用的系统调用,类似于select函数,用于监视一组文件描述符的状态。

poll函数接口

函数原型

 #include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • fds:指向一个struct pollfd结构数组的指针,每个结构体描述一个要监视的文件描述符及其关注的事件。

  • nfdsfds数组中的结构体数量。

  • timeout:超时时间(以毫秒为单位)。设置为-1,表示阻塞式,设置为0,表示非阻塞,设置为大于0的数,例如设置为5000,表示阻塞5秒,5秒后非阻塞返回一次。

struct pollfd结构体定义如下:

struct pollfd {int fd;       // 文件描述符short events; // 关注的事件short revents; // 实际发生的事件-short-整数
};
  • event字段用于设置关注的事件,可以使用如下宏:通过event&宏事件可以将事件添加到event中

image-20231124171956283

  • revent字段用于返回实际发生的事件,也可以使用上述宏进行判断。通过revent|宏事件可以得知revent中是否包含了该就绪事件

  • poll函数的返回值是就绪文件描述符的数量,返回值大于0表示已经有文件描述符就绪,返回值等于0表示没有文件描述符就绪,返回值小于0表示出错,可以使用perror输出错误信息。

总结一下:

  1. 在poll函数的pollfd结构体中,有event参数保存用户需要OS关心的事件,有revent参数保存OS告诉用户就绪的事件,即做到了输入输出分离,不需要像select函数借助第三方数组对sock进行管理。
  2. poll函数等待文件描述符理论上没有上限。由于参数fds是一个动态数组,并不是一个确定的结构。不同于位图,动态数组可以动态扩容。且动态数组的大小取决于用户传入的nfds。

poll服务器

现对select服务器套用poll函数,改造成poll服务器。只需要对实现业务hpp改造即可

pollserver.hpp

#pragma once#include <iostream>
#include <sys/select.h>
#include <string>
#include <functional>
#include<poll.h>
#include "Sock.hpp"using namespace std;namespace Poll_sv
{static const int defaultport = 8080;         // 默认端口号static const int defaultfd = -1;             // 默认套接字标志static  const int fdnum=2048;//设置文件描述符的数量using func_t = function<string(const string &)>;class PollServer{public:PollServer(func_t f, int port = defaultport) : _func(f), _port(port), _listensock(-1),_rfds(nullptr){}void initServer(){// 获取套接字_listensock = Sock::Socket();cout << "Sock success" << endl;// 绑定网络信息Sock::Bind(_listensock, _port);cout << "Bind success" << endl;// 把套接字设置为监听状态Sock::Listen(_listensock);cout << "Listen success" << endl;_rfds=new struct pollfd[fdnum];//指针指向一个成员是poll结构体的数组for(int i=0;i<fdnum;i++){_rfds[i].fd=defaultfd;_rfds[i].events=0;_rfds[i].revents=0;}_rfds[0].fd=_listensock;_rfds[0].events=POLLIN;//       cout << "initServer" << endl;}void Print(){cout << "now using socket: ";for (int i = 0; i < fdnum; i++){if(_rfds[i].fd!=defaultfd)cout<<_rfds[i].fd<<" ";//打印正在使用的fd}cout << endl;}void Accpter(int lsock){//  logMessage(DEBUG, "Accepter begin");string clientip;uint16_t clientport = 0;int sock = Sock::Accpet(lsock, &clientip, &clientport); // 若成功返回,返回一个用于通信的套接字if (sock < 0)return;logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);int i=0;for(;i<fdnum;i++){if(_rfds[i].fd!=defaultfd)//这里是找到默认的位置,给后续需要使用的文件描述符用,因此是跳过已经被使用的位置continue;else break;}if(i==fdnum){close(sock);logMessage(WARNING,"fd full,please wait");}else{_rfds[i].fd=sock;_rfds[i].events=POLLIN;_rfds[i].revents=0;logMessage(NORMAL,"sock has set in rfds");}Print();//     logMessage(DEBUG, "Accepter end");}void Recver(int pos){//   logMessage(DEBUG, "Recver begin");char buffer[1024];ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0;cout << "client# " << buffer << endl;}else if (s == 0){close(_rfds[pos].fd);               // 关闭该套接字,关闭通信通道_rfds[pos].fd = defaultfd; // 将数组中的该套接字清除logMessage(NORMAL, "client quit");return;}else{close(_rfds[pos].fd);_rfds[pos].fd = defaultfd; // 将数组中的该套接字清除logMessage(ERROR, "recv error");return;}// 将客户端发来的数据原样写回去string resp = _func(buffer);write(_rfds[pos].fd, resp.c_str(), resp.size()); // 写回去//     logMessage(DEBUG, "Recever end");}void Handlerop(){for (int i = 0; i < fdnum; i++){if (_rfds[i].fd == defaultfd)//fd没有被设置则跳过continue;if(!(_rfds[i].events&POLLIN)) continue;//结构体不是被指定标志位POLLIN设置过则跳过if (_rfds[i].fd==_listensock&&_rfds[i].revents&POLLIN)// 此时i对应的数组位置是拿到连接的文件描述符,意味着在底层连接已经拿到,等待上层提取{Accpter(_listensock);}else if (_rfds[i].revents&POLLIN) // 此时存在数组内的对应套接字都是底层读资源就绪{Recver(i);}else{}}}void Start(){int timeout=-1;for(;;){//     cout<<"poll ready"<<endl;int n=poll(_rfds,fdnum,timeout);//     cout<<"poll finished"<<endl;switch(n){case 0:logMessage(NORMAL,"timeout...");break;case -1:logMessage(WARNING,"poll error");break;default:logMessage(NORMAL,"poll success");Handlerop();break;}}}~PollServer(){if (_listensock < 0) // 为什么是小于0?close(_listensock);if(_rfds!=nullptr)delete[]_rfds;}private:int _port;int _listensock;struct pollfd* _rfds;//指向poll结构体的指针func_t _func;};
}

image-20231124213444016

总结一下:

  1. poll函数通过输入输出分离,避免了借用第三方数组来记录sock和事件。
  2. poll解决了select具有管理文件描述符数量上限的问题。
  3. poll依旧存在遍历问题。由于检查就绪事件依旧是需要遍历整个数组,即时间复杂度为O(N)。

epoll模型可以解决遍历问题,使得检查就绪事件的时间复杂度优化到O(1)。

epoll

epoll的系统调用

epoll_create

epoll_create用于创建一个epoll模型

函数原型

int epoll_create(int size);
  • size 指定了 epoll 实例能够同时监视的文件描述符的数量上限。
  • 返回值为一个非负整数的文件描述符epollfd,表示创建的epoll模型对象。创建失败,返回值为-1。可以传递epollfd给epoll_ctl函数像epoll模型中添加、修改、删除需要监视的文件描述符以及事件。
  • 注意一下:使用epoll模型后,需要关闭epollfd。
epoll_ctl

epoll_ctl 函数用于向 epoll 实例中添加、修改或删除要监视的文件描述符,并设置关注的事件。

函数原型

#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • epfd:epoll 模型的文件描述符,由 epoll_create 函数返回。
  • op:操作类型,可以是这些值:
    EPOLL_CTL_ADD:将文件描述符 fd 添加到 epoll 模型中。
    EPOLL_CTL_MOD:修改已经添加到 epoll模型中的文件描述符 fd 的关注事件。
    EPOLL_CTL_DEL:从 epoll模型中删除文件描述符 fd
  • fd:要添加、修改或删除的文件描述符。
  • event:指向 struct epoll_event 结构体的指针,用于设置关注的事件。

struct epoll_event结构体定义如下:

struct epoll_event {_uint32_t events;  // 关注的事件epoll_data_t data;  // 用户数据
};
  • events字段用于设置关注的事件。可以使用以下宏设置:
  • EPOLLIN:可读事件。
  • EPOLLOUT:可写事件。
  • EPOLLPRI:紧急事件。
  • EPOLLERR:错误事件。
  • EPOLLHUP:挂起事件。
  • EPOLLET:边缘触发模式。
  • EPOLLONESHOT:一次性事件。

epoll_data_t结构定义如下:

typedef union epoll_data
{void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;
  • fd是用户传递需要监管的文件描述符。

  • 调用成功返回0,失败返回-1。

epoll_wait

epoll_wait函数用于收集epoll模型中已经就绪的事件,并将就绪事件的个数返回

函数原型

#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • epfd:epoll 实例的文件描述符,由 epoll_create 函数返回。
  • events:指向 struct epoll_event 结构体数组的指针,用于存储就绪的事件。
  • maxeventsevents 数组的大小,表示最多可以存储多少个事件。
  • timeout:等待事件的超时时间,单位为毫秒。设置为-1,表示阻塞式,设置为0,表示非阻塞,设置为大于0的数,例如设置为5000,表示阻塞5秒,5秒后非阻塞返回一次。
  • 函数的返回值表示就绪的事件数量,返回值大于0表示已经有事件就绪,返回值等于0表示超时返回,无事发生。返回值小于0表示调用出错。

epoll的工作原理

数据本身只能按照输入设备—内存—CPU—内存—输出设备的方向流动。

image-20231125164652720

  1. 网络数据到达主机时,是先到达网卡,即网课外设。CPU有很多针脚,外设虽然不会与CPU有直接的数据流通,但外设可以将信号直接发送到CPU的阵脚上(硬件中断)。外设通过中断设备,将电子信号发送到CPU上。CPU会将该信号转发到中断向量表,根据信号的值找到对应的表中位置(下标),该表是一张函数指针数组,根据指针能够调用驱动方法,驱动方法将数据拷贝到内存上,即数据从硬件传输到了OS中。

  2. 在OS中,会以红黑树的方式管理sock和events。每个节点上都有sock和event,当然还有left指针和right指针。红黑树的优点是查找效率高。当用户告诉内核那些事件需要被关心时,OS会将需要关心的事件、sock放到该红黑树当中。当事件就绪,OS会将红黑树上的节点添加到就绪队列中,该就绪队列中的成员表示内核需要告诉用户,那些事件已经就绪,等待上层将该事件资源取走。实际上,一个节点即能存在于红黑树中,也能存在于就绪队列中。

image-20231125171013425

总结一下:

image-20231125172634228

网络数据到来后,外设通过信号中断将数据拷贝到内存上。细致的说,是底层收到数据后,贯穿网络协议栈向上交付数据,可以把红黑树的节点看作成文件描述符sock,根据sock找到对应的文件结构体struct file,该结构体内有指针指向接收缓存区,然后将数据填充到该文件接收缓冲区中。随后调用struct file中的回调方法(void* private data指针指向一个回调函数)-回调机制,该回调方法会将红黑树中的节点添加到就绪队列当中,表示该事件已经就绪,通知用户来取走数据。

image-20231126152314783

红黑树、就绪队列、epoll管理的文件struct file以及部分网络协议栈一整套可以认作成一个epoll模型。进程可以通过文件描述符表找到epoll_create的返回值epollfd,epollfd会指向自己的struct file,在struct file中有关联指针能够找到对应的epoll模型。

重新看待epoll的相关接口。操作系统需要提供接口该上层使用。

 #include <sys/epoll.h>
int epoll_create(int size);

epoll_create创建epoll模型,并返回一个epollfd供上层使用。

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctl根据提供的epollfd找到对应的epoll模型,根据提供的fd和event增删改红黑树中的节点。

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

epoll_wait根据epollfd找到对应的epoll模型,根据提供的events、maxevents、timeout管理就绪队列中的节点。而epoll_wait检测就绪事件,只需要检测已经就绪的事件,不需要遍历检测整个需要管理的事件表,只需要将就绪的事件(资源)从内核层拷贝到用户层。因此遍历的事件复杂度为O(1)。

其次,epoll_wait会按照传入的顺序,依次放回到就绪事件数组中。根据队列先进先出的特性,若就绪队列中有很多数据节点,一次性拿不完,可以下一次调用epoll_wait再拿。

而伴随epoll模型的创建的辅助数组,就是该管理需要关心事件的红黑树。红黑树节点是以kv形式存在,由于sock是不会重复的,因此sock作为key。

epoll相比于selectpoll,将大部分管理任务分配给操作系统负责,也因此epoll的接口相比较简单。

epoll的工作方式

前面都有提到select、poll、epoll在事件就绪的时候通知上层将事件处理。事件就绪可以认为底层的IO条件满足,可以进行某种IO行为,epoll就会通知上层进行这种IO行为将底层的数据提取走。

epoll有2种通知策略,即水平触发(LT)和边缘触发(ET)

水平触发
  1. 水平触发为Level Triggered,简称LT。epoll默认状态下是LT工作策略。

  2. 水平触发关心的是缓冲区的状态,当缓冲区可读的时候,就会一直发出通知,也就是当缓冲区中只要有数据就会发出通知,知道上层将缓冲区的所有数据读完。可以认为你在做老师布置的作业,写了但没写完的情况下,老师就会一直通知你写作业,直到作业完全写完。

  3. LT的优势在于:可以在读取数据的时候只读取一部分,在第二次调用epoll_wait时立刻返回并通知上层将底层数据读走。

  4. 支持阻塞读写和阻塞读写。‘

边缘触发
  1. 边缘触发为Edge Triggered。简称ET。
  2. 边缘触发关心的是缓冲区状态的变化,当缓冲区状态发生变化的时候才会发出通知,比如缓冲区中来了新的数据。底层有数据,ET模式下epoll只会通知上层一次,后续缓冲区来了新的数据,epoll才会再次通知。
  3. ET优势在于:ET是一次性通知的方式,倒逼上层尽量做到一次性将数据读完。其次是尽可能一次性读取多的数据,从而使得接收缓冲区可容纳下一次数据的空间尽可能的大,那么接收方的接收能力自然就强,能够告诉发送数据方:接收方的滑动窗口较大,让对方更新出更大的滑动窗口,提高数据发送的效率。
  4. ET模式下,文件描述符要求是非阻塞的。由于底层的事件到达增多,OS才会通知上层将数据取走,因此用户提取数据时,尽量调用一次read,recv就把数据取完,而为了避免一次性调用函数没有读完,就需要循环调用读取函数,当调用读取函数直到读取不到数据才算作读完数据。若fd是阻塞的,那么读取函数进行最后一次读取时读取不到数据,就会阻塞,因此fd需要是非阻塞的,读取不到数据直接返回。

LT模式下,fd可以是阻塞的也可以是非阻塞的。LT可以模拟ET工作方式。

epoll服务器

epollserver.cc

#include<iostream>
#include<functional>
#include<vector>
#include<memory>
#include"err.hpp"
#include"epollserver.hpp"
using namespace std;
using namespace EPoll_sv;// static void Usage(string proc)
// {
//     cerr<<"Usage:\n\t"<<proc<<" port "<<"\n\n";
// }string resp(const string& s)
{return s;
}int main(int argc,char* argv[])
{// if(argc!=2)// {//     Usage(argv[0]);//     exit(USAGE_ERR);// }unique_ptr<EpollServer> epolsv(new EpollServer(resp));epolsv->initServer();epolsv->start();return 0;
}

log.hpp

#pragma once#include <iostream>
#include <string>
#include<ctime>
#include <sys/types.h>#include <unistd.h>#include <stdio.h>
#include <stdarg.h>
using namespace std;
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4#define NUM 1024
#define LOG_STR "./logstr.txt"
#define LOG_ERR "./log.err"enum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR,EPOLL_CREATE_ERR
};
const char* to_str(int level)
{switch(level){case DEBUG: return "DEBUG";case NORMAL: return "NORMAL";case WARNING: return "WARNING";case ERROR: return "ERROR";case FATAL: return "FATAL";default: return nullptr;}
}void logMessage(int level, const char* format,...)
{char logprestr[NUM];
snprintf(logprestr,sizeof(logprestr),"[%s][%ld][%d]",to_str(level),(long int)time(nullptr),getpid());char logeldstr[NUM];
va_list arg;
va_start(arg,format); 
vsnprintf(logeldstr,sizeof(logeldstr),format,arg);//arg是logmessage函数列表中的...cout<<logprestr<<logeldstr<<endl;}

Sock.hpp

#pragma once#include<iostream>
#include<string>
#include<cstring>
#include<sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"class Sock
{const static int backlog=32;public:static int Socket(){int sock=socket(AF_INET,SOCK_STREAM,0);//创建套接字if(sock<0)//创建失败{logMessage(FATAL,"create sock error");exit(SOCKET_ERR);}//创建成功logMessage(NORMAL,"create sock success");int opt=1;setsockopt(sock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));//允许套接字关闭后立刻重启return sock;} static void Bind(int sock,int port){//绑定自己的网络信息struct sockaddr_in local;memset(&local,0,sizeof(local));//将结构体清空local.sin_family=AF_INET;//添加协议local.sin_port=htons(port);//添加端口号local.sin_addr.s_addr=htons(INADDR_ANY);//不绑定指定IP,可以接收任意IP主机发送来的数据//将本地设置的信息绑定到网络协议栈if (bind(sock,(struct sockaddr*)&local,sizeof(local))<0){logMessage(FATAL,"bind socket error");exit(BIND_ERR);}logMessage(NORMAL,"bind socket success");}static void Listen(int sock)//将套接字设置为监听{if(listen(sock,0)<0){logMessage(FATAL,"listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL,"listen socket success");}static int Accpet(int listensock,string * clientip,uint16_t* clientport){struct sockaddr_in cli;socklen_t len= sizeof(cli);int sock=accept(listensock,(struct sockaddr*)&cli,&len);if(sock<0){logMessage(FATAL,"accept error");//这里accept失败为什么不退出}else{logMessage(NORMAL,"accept a new link,get new sock : %d",sock);*clientip=inet_ntoa(cli.sin_addr);*clientport=ntohs(cli.sin_port);}return sock;}
};
  • 默认的LT模式下的epollserver

epollserver.hpp

#pragma once#include <iostream>
#include <sys/select.h>
#include <string>
#include <functional>
#include<sys/epoll.h>
#include"err.hpp"
#include "Sock.hpp"using namespace std;namespace EPoll_sv
{static const int defaultport = 8080;static const int size = 128;static const int defaultvalue = -1;static const int defaultnum = 64;using func_t =function<string(const string&)>;class EpollServer{public:EpollServer(func_t fun,const int port = defaultport) :_num(defaultnum), _port(port), _listensock(defaultvalue), _epfd(defaultvalue),_func(fun){}void handlerEvent(int evs){for(int i=0;i<evs;i++)//直接遍历已经就绪的事件{uint32_t event=_reves[i].events;int sock=_reves[i].data.fd;if(sock==_listensock&&(event&EPOLLIN))//当前是将连接拿上应用层的文件描述符{string clientip;uint16_t clientport;int fd=Sock::Accpet(_listensock,&clientip,&clientport);if(fd<0){logMessage(NORMAL,"accpet sock error");continue;}struct epoll_event ev;ev.data.fd=fd;ev.events=EPOLLIN;epoll_ctl(_epfd,EPOLL_CTL_ADD,fd,&ev);}else if(event&EPOLLIN)//当前是通信的事件{char buffer[1024];int n=recv(sock,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;cout<<"client# "<<buffer<<endl;string resp=_func(buffer);send(sock,resp.c_str(),resp.size(),0);//将数据发送回去给客户端}else if(n==0){epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(NORMAL,"client quit");}else{epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(ERROR,"communicate error");}}else{}}logMessage(DEBUG,"handlerEvent out");}void initServer(){// 获取套接字_listensock = Sock::Socket();cout << "Sock success" << endl;// 绑定网络信息Sock::Bind(_listensock, _port);cout << "Bind success" << endl;// 把套接字设置为监听状态Sock::Listen(_listensock);cout << "Listen success" << endl;_epfd=epoll_create(size);//调用成功返回一个epoll文件描述符,size表示是需要监听的文件描述符的数量if(_epfd<0){logMessage(FATAL,"epoll_create error");exit(EPOLL_CREATE_ERR);}//将listensock添加到epoll模型中struct epoll_event epev;epev.events=EPOLLIN;epev.data.fd=_listensock;epoll_ctl(_epfd,EPOLL_CTL_ADD,_listensock,&epev);//申请就绪时间的空间_reves=new struct epoll_event[_num];//申请一块空间,内含_num个事件数logMessage(NORMAL, "init server success");}void start(){//等待就绪事件int timeout=-1;for(;;){logMessage(DEBUG,"epoll_wait ready");int re=epoll_wait(_epfd,_reves,_num,timeout);logMessage(DEBUG,"epoll_wait end");switch (re){case 0://0个事件就绪,即超时重传logMessage(NORMAL,"timeout...");break;case -1://epoll_wait函数调用失败logMessage(ERROR,"epoll_wait error,code: %d,errstring: %s",errno,strerror(errno));    default://到这里时返回值都大于0,即re为已经就绪的事件数logMessage(NORMAL,"wait incident success");// handlerEvent(re);break;}}}~EpollServer(){if(_listensock!=defaultvalue){close(_listensock);}if(_epfd!=defaultvalue){close(_epfd);}if(_reves!=nullptr){delete[]_reves;}}private:int _port;int _listensock;int _epfd;struct epoll_event* _reves;int _num;//事件数func_t _func;//外部传递进来的函数};
}

image-20231126190327483

客户端连接上但不处理就会一直通知。

  • ET模式下的epollserver服务器

epollserver.hpp

#pragma once#include <iostream>
#include <sys/select.h>
#include <string>
#include <functional>
#include<sys/epoll.h>
#include<unistd.h>
#include<fcntl.h>
#include"err.hpp"
#include "Sock.hpp"using namespace std;namespace EPoll_sv
{static const int defaultport = 8080;static const int size = 128;static const int defaultvalue = -1;static const int defaultnum = 64;using func_t =function<string(const string&)>;class EpollServer{public:EpollServer(func_t fun,const int port = defaultport) :_num(defaultnum), _port(port), _listensock(defaultvalue), _epfd(defaultvalue),_func(fun){}void handlerEvent(int evs){for(int i=0;i<evs;i++)//直接遍历已经就绪的事件{uint32_t event=_reves[i].events;int sock=_reves[i].data.fd;if(event&EPOLLET){Sock::setNonBlock(sock);}if(sock==_listensock&&(event&EPOLLIN))//当前是将连接拿上应用层的文件描述符{string clientip;uint16_t clientport;int fd=Sock::Accpet(_listensock,&clientip,&clientport);if(fd<0){logMessage(NORMAL,"accpet sock error");continue;}struct epoll_event ev;ev.data.fd=fd;ev.events=EPOLLIN|EPOLLET;epoll_ctl(_epfd,EPOLL_CTL_ADD,fd,&ev);}else if(event&EPOLLIN)//当前是通信的事件{char buffer[1024];int n=recv(sock,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;cout<<"client# "<<buffer<<endl;string resp=_func(buffer);send(sock,resp.c_str(),resp.size(),0);//将数据发送回去给客户端}else if(n==0){epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(NORMAL,"client quit");}else{epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(ERROR,"communicate error");}}else{}}//      logMessage(DEBUG,"handlerEvent out");}void initServer(){// 获取套接字_listensock = Sock::Socket();cout << "Sock success" << endl;// 绑定网络信息Sock::Bind(_listensock, _port);cout << "Bind success" << endl;// 把套接字设置为监听状态Sock::Listen(_listensock);cout << "Listen success" << endl;_epfd=epoll_create(size);//调用成功返回一个epoll文件描述符,size表示是需要监听的文件描述符的数量if(_epfd<0){logMessage(FATAL,"epoll_create error");exit(EPOLL_CREATE_ERR);}//将listensock添加到epoll模型中struct epoll_event epev;epev.events=EPOLLIN|EPOLLET;epev.data.fd=_listensock;epoll_ctl(_epfd,EPOLL_CTL_ADD,_listensock,&epev);//申请就绪时间的空间_reves=new struct epoll_event[_num];//申请一块空间,内含_num个事件数logMessage(NORMAL, "init server success");}void start(){//等待就绪事件int timeout=-1;for(;;){sleep(1);logMessage(DEBUG,"epoll_wait ready");int re=epoll_wait(_epfd,_reves,_num,timeout);logMessage(DEBUG,"epoll_wait end");switch (re){case 0://0个事件就绪,即超时重传logMessage(NORMAL,"timeout...");break;case -1://epoll_wait函数调用失败logMessage(ERROR,"epoll_wait error,code: %d,errstring: %s",errno,strerror(errno));    default://到这里时返回值都大于0,即re为已经就绪的事件数logMessage(NORMAL,"wait incident success");// handlerEvent(re);break;}}}~EpollServer(){if(_listensock!=defaultvalue){close(_listensock);}if(_epfd!=defaultvalue){close(_epfd);}if(_reves!=nullptr){delete[]_reves;}}private:int _port;int _listensock;int _epfd;struct epoll_event* _reves;int _num;//事件数func_t _func;//外部传递进来的函数};
}static void setNonBlock(int fd)//把文件描述符设置为非阻塞
{int n=fcntl(fd,F_GETFL);//获取文件描述符的状态,正常返回非-1的标志位,出错返回-1if(n<0){cerr<<"fcntl :"<<strerror(errno)<<endl;return ;}fcntl(fd,F_SETFL,n|O_NONBLOCK);//对文件描述符的状态进行设置,设置为非阻塞状态
}
  • 需要将文件描述符都设置成EPOLLET模式,在处理事件函数内,若sock是EPOLLET模式,就调用setNonBlock将该sock设置成非阻塞。

Sock.hpp

#pragma once#include<iostream>
#include<string>
#include<cstring>
#include<sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"class Sock
{const static int backlog=32;public:static void setNonBlock(int fd)//把文件描述符设置为非阻塞
{int n=fcntl(fd,F_GETFL);//获取文件描述符的状态,正常返回非-1的标志位,出错返回-1if(n<0){cerr<<"fcntl :"<<strerror(errno)<<endl;return ;}fcntl(fd,F_SETFL,n|O_NONBLOCK);//对文件描述符的状态进行设置,设置为非阻塞状态
}static int Socket(){int sock=socket(AF_INET,SOCK_STREAM,0);//创建套接字if(sock<0)//创建失败{logMessage(FATAL,"create sock error");exit(SOCKET_ERR);}//创建成功logMessage(NORMAL,"create sock success");int opt=1;setsockopt(sock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));//允许套接字关闭后立刻重启return sock;} static void Bind(int sock,int port){//绑定自己的网络信息struct sockaddr_in local;memset(&local,0,sizeof(local));//将结构体清空local.sin_family=AF_INET;//添加协议local.sin_port=htons(port);//添加端口号local.sin_addr.s_addr=htons(INADDR_ANY);//不绑定指定IP,可以接收任意IP主机发送来的数据//将本地设置的信息绑定到网络协议栈if (bind(sock,(struct sockaddr*)&local,sizeof(local))<0){logMessage(FATAL,"bind socket error");exit(BIND_ERR);}logMessage(NORMAL,"bind socket success");}static void Listen(int sock)//将套接字设置为监听{if(listen(sock,0)<0){logMessage(FATAL,"listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL,"listen socket success");}static int Accpet(int listensock,string * clientip,uint16_t* clientport){struct sockaddr_in cli;socklen_t len= sizeof(cli);int sock=accept(listensock,(struct sockaddr*)&cli,&len);if(sock<0){logMessage(FATAL,"accept error");//这里accept失败为什么不退出}else{logMessage(NORMAL,"accept a new link,get new sock : %d",sock);*clientip=inet_ntoa(cli.sin_addr);*clientport=ntohs(cli.sin_port);}return sock;}
};
  • Sock类内新增了setNonBlock函数,用于将传导过来的sock设置成非阻塞。

image-20231126191654022

可以看到底层事件就绪时,ET模式下的epollserver只通知了上层一次。

reactor

通过Reactor对底层事件的关心,底层有就绪事件,就通知上层的Connection类对象调用相关函数处理就绪事件。

image-20231128202431892

Connection类

using func_t = function<void(Connection *)>;class Connection{public:Connection(int sock, Tcpserver *tcps) : _consock(sock), _tcps(tcps){}~Connection() {}void Register(func_t r, func_t w, func_t e) // 注册方法表,将方法传递进来{_reader = r;_writer = w;_excepter = e;}void Close(){close(_consock);}public:string _inbuffer;  // 输入缓冲区string _outbuffer; // 输出缓冲区func_t _reader;    // 读操作func_t _writer;    // 写操作func_t _excepter;  // 处理异常操作int _consock;Tcpserver *_tcps; // 指向Tcpserver对象的指针};
  1. 每一个文件描述符需要配备独自的堆上的接收缓冲区和输出缓冲区。先描述再组织,因此创建了结构体Connection,结构体内含文件描述符,及其配备的输出缓冲区,接收缓冲区,该缓冲区类型为string,当前该缓冲区只能处理字符串。三个回调函数。三个回调方法通过外部传参。回调方法分别是读事件方法、写事件方法、异常事件方法。
  2. 由于Connection对象内的回调函数是Tcpserver对象赋予的,因此需要先了解一下三个回调函数。

Accepter:负责获取连接,并将新连接添加到Connection对象内。

 void Accepter(Connection *con) // 针对listenfd,将获取到的连接从底层拿到应用层{for (;;){logMessage(DEBUG, "enter Accepter");string clientip;uint16_t clientport;int err = 0;int sock = _sock.Accpet(&clientip, &clientport, &err); // 获取成功,返回新的文件描述符用于通信,客户端的ip和port通过参数返回if (sock > 0){// 连接拿上来了,将fd添加到con对象中AddConnection(sock, EPOLLIN | EPOLLET,bind(&Tcpserver::Recver, this, placeholders::_1),bind(&Tcpserver::Sender, this, placeholders::_1),bind(&Tcpserver::Excepter, this, placeholders::_1));logMessage(DEBUG, "get new link,[%s:%d]", clientip.c_str(), clientport);}else{if (err == EAGAIN || err == EWOULDBLOCK)break; // 读完了else if (err == EINTR)continue; // 因为中断,继续读elsebreak; // 错误}}}

Recver:负责读取底层的数据

  void Recver(Connection *con) // 读事件{char buffer[1024];while (true){ssize_t i = recv(con->_consock, buffer, sizeof(buffer) - 1, 0);if (i > 0){buffer[i] = 0;con->_inbuffer += buffer;//每次读到的数据放到配套的缓冲区内logMessage(DEBUG, "recv str: %s", con->_inbuffer.c_str());_func(con);}else if (i == 0) // 断开连接,异常处理{if (con->_excepter){con->_excepter(con);return;}}else{if (errno == EAGAIN || errno == EWOULDBLOCK){break; // 读完了}else if (errno == EINTR) // 因信号中断,继续读{continue;}else{if (con->_excepter){con->_excepter(con);return;}}}}}
  • 将每次读取到的数据填充到配备的接收缓冲区,读到完整报文后,调用_func函数处理数据。

Sender:将sock配备的输出缓冲区内的数据发回给客户端。

void Sender(Connection *con) // 写事件{while (true){ssize_t i = send(con->_consock, con->_outbuffer.c_str(), sizeof(con->_outbuffer), 0);if (i > 0){if (con->_outbuffer.empty()) // 内容当前send函数一次性发完了{logMessage(DEBUG, "sender finish");con->_tcps->EnableReadWrite(con, true, false); // 发完了,把写通道关闭// sleep(2);break;}else{logMessage(DEBUG, "sender not finish");con->_outbuffer.erase(0, i); // 如果一次性没发完,那么就将发完的部分删减掉,剩余的下次再发}}else{if (errno == EAGAIN || errno == EWOULDBLOCK) // 上次发完了,这次再发就会err为这两个字段{break;}else if (errno == EINTR){continue; // 因信号中断了,继续发送}else{logMessage(DEBUG, "excepter");if (con->_excepter) // 异常了,执行异常事件{con->_excepter(con);return;}}}}

Tcpserver.hpp

#pragma once#include <iostream>
#include <sys/select.h>
#include <string>
#include <functional>
#include <sys/epoll.h>
#include <unordered_map>
#include <assert.h>
#include "Epoller.hpp"
#include "err.hpp"
#include "Sock.hpp"
#include "until.hpp"using namespace std;namespace TCP_sv
{static const int defaultport = 8080;static const int Gnum = 64;class Tcpserver;class Connection;using func_t = function<void(Connection *)>;class Connection{public:Connection(int sock, Tcpserver *tcps) : _consock(sock), _tcps(tcps){}~Connection() {}void Register(func_t r, func_t w, func_t e) // 注册方法表,将方法传递进来{_reader = r;_writer = w;_excepter = e;}void Close(){close(_consock);}public:string _inbuffer;  // 输入缓冲区string _outbuffer; // 输出缓冲区func_t _reader;    // 读操作func_t _writer;    // 写操作func_t _excepter;  // 处理异常操作int _consock;Tcpserver *_tcps; // 指向Tcpserver对象的指针};class Tcpserver{private:void Recver(Connection *con) // 读事件{char buffer[1024];while (true){ssize_t i = recv(con->_consock, buffer, sizeof(buffer) - 1, 0);if (i > 0){buffer[i] = 0;con->_inbuffer += buffer;//每次读到的数据放到配套的缓冲区内logMessage(DEBUG, "recv str: %s", con->_inbuffer.c_str());_func(con);}else if (i == 0) // 断开连接,异常处理{if (con->_excepter){con->_excepter(con);return;}}else{if (errno == EAGAIN || errno == EWOULDBLOCK){break; // 读完了}else if (errno == EINTR) // 因信号中断,继续读{continue;}else{if (con->_excepter){con->_excepter(con);return;}}}}}void Sender(Connection *con) // 写事件{while (true){ssize_t i = send(con->_consock, con->_outbuffer.c_str(), sizeof(con->_outbuffer), 0);if (i > 0){if (con->_outbuffer.empty()) // 内容当前send函数一次性发完了{logMessage(DEBUG, "sender finish");con->_tcps->EnableReadWrite(con, true, false); // 发完了,把写通道关闭// sleep(2);break;}else{logMessage(DEBUG, "sender not finish");con->_outbuffer.erase(0, i); // 如果一次性没发完,那么就将发完的部分删减掉,剩余的下次再发}}else{if (errno == EAGAIN || errno == EWOULDBLOCK) // 上次发完了,这次再发就会err为这两个字段{break;}else if (errno == EINTR){continue; // 因信号中断了,继续发送}else{logMessage(DEBUG, "excepter");if (con->_excepter) // 异常了,执行异常事件{con->_excepter(con);return;}}}}}void Excepter(Connection *con) // 异常事件{logMessage(DEBUG, "enter excepter");_epoller.Control(con->_consock, 0, EPOLL_CTL_DEL);con->Close();_Connections.erase(con->_consock);logMessage(DEBUG, "out excepter");delete con;}void Accepter(Connection *con) // 针对listenfd,将获取到的连接从底层拿到应用层{for (;;){logMessage(DEBUG, "enter Accepter");string clientip;uint16_t clientport;int err = 0;int sock = _sock.Accpet(&clientip, &clientport, &err); // 获取成功,返回新的文件描述符用于通信,客户端的ip和port通过参数返回if (sock > 0){// 连接拿上来了,将fd添加到con对象中AddConnection(sock, EPOLLIN | EPOLLET,bind(&Tcpserver::Recver, this, placeholders::_1),bind(&Tcpserver::Sender, this, placeholders::_1),bind(&Tcpserver::Excepter, this, placeholders::_1));logMessage(DEBUG, "get new link,[%s:%d]", clientip.c_str(), clientport);}else{if (err == EAGAIN || err == EWOULDBLOCK)break; // 读完了else if (err == EINTR)continue; // 因为中断,继续读elsebreak; // 错误}}}bool Isexist(int sock){auto iter = _Connections.find(sock);return iter != _Connections.end(); // 判断sock是否存在connection集合中}void AddConnection(int sock, uint32_t event, func_t reader, func_t writer, func_t excepter){if (event & EPOLLET) // 如果是ET模式,就将文件描述符设置为非阻塞Until::setNonBlock(sock);Connection *con = new Connection(sock, this);con->Register(reader, writer, excepter); // 把外面的函数传进去初始化内部函数bool n = _epoller.Add_Event(sock, event); // 告诉内核需要监管那些事件--将fd和事件添加到epoll模型中logMessage(DEBUG, "Add event num: %d", n);assert(n);(void)n;_Connections.insert(pair<int, Connection *>(sock, con)); // 将fd和con对象添加到map中进行管理}void Loop(int timeout){// logMessage(DEBUG,"enter Loop");int n = _epoller.Wait(_revs, _num, timeout); // 获取已经就绪的事件for (int i = 0; i < n; i++)  //遍历就绪事件                // epoll_wait出错,n是-1,此时i不<n,就进不去for循环{                                            // 拿到就绪事件的fd和event// sleep(2);// logMessage(DEBUG, "enter Loop for");//提取sock和eventint sock = _revs[i].data.fd;uint32_t event = _revs[i].events;//处理异常事件--如果是异常事件,那么会进入读事件和写事件,但是读事件是不就绪的即读出错,就会走到处理异常的代码区。
//同样的写事件也会发送写出错,走到处理异常的代码区if (event & EPOLLERR)//      logMessage(DEBUG, "event & EPOLLERR");event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)//      logMessage(DEBUG, "event & EPOLLHUP");event |= (EPOLLIN | EPOLLOUT); // 如果事件异常了,就将该事件设置为读写事件// listenfd事件就绪if ((event & EPOLLIN) && (Isexist(sock)) && (_Connections[sock]->_reader)){logMessage(DEBUG, "con->_reader");_Connections[sock]->_reader(_Connections[sock]);}if ((event & EPOLLOUT) && (Isexist(sock)) && (_Connections[sock]->_writer)){logMessage(DEBUG, "con->_writer");_Connections[sock]->_writer(_Connections[sock]);}}//   logMessage(DEBUG, "quit Loop");}public:Tcpserver(func_t fun, int port = defaultport) : _port(port), _func(fun){}void inittcpserver(){logMessage(DEBUG, "enter inittcpserver\n");// 1.创建文件描述符_sock.Socket();    // 创建文件描述符--用于建立连接_sock.Bind(_port); // 绑定端口号和ip_sock.Listen();    // 将文件描述符设置为监视状态// 2.创建epoll模型_epoller.Create();//3.将listenfd添加到con对象,即添加到epoll中,并且注册配备的缓冲区和回调函数AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,\bind(&Tcpserver::Accepter, this, placeholders::_1), nullptr, nullptr); // 对于listenfd来说,只关心读取事件logMessage(DEBUG, "quit inittcpserver\n");_revs = new struct epoll_event[Gnum]; // 创建一个事件集合,供后续存放已经就绪的事件使用_num = Gnum;}void EnableReadWrite(Connection *con, bool readable, bool writable){//判断uint32_t event = (readable ? EPOLLIN : 0) | (writable ? EPOLLOUT : 0) | EPOLLET;_epoller.Control(con->_consock, event, EPOLL_CTL_MOD);}void Distribute()//事件派发s{logMessage(DEBUG, "enter Distribute");while (true){Loop(-1);}logMessage(DEBUG, "quit Distribute");}~Tcpserver(){_epoller.Close();if (_revs != nullptr)delete[] _revs;}private:uint16_t _port;Sock _sock;Epoller _epoller;unordered_map<int, Connection *> _Connections; // 建立sock和connection对象的映射表struct epoll_event *_revs;                     // 用来存储返回的事件func_t _func;int _num; // 可监管的事件总数};
}
  1. 该代码中的bind用法是bind类内成员函数。用法是第一个参数需要传递类内成员函数对象的指针,第二个参数需要传递类对象的指针,后面才是传递类成员函数需要的参数。对一个成员函数对象使用bind后形成一个新的函数对象。例如调用AddConnection函数时,第三四五参数是仿函数对象,通过bind将传入的类内成员函数对象转换为仿函数对象。以传递Recver函数为例,在bind表达式内,第一个参数传递Recver函数对象指针,第二个参数传递Tcpserver类指针,第三个参数传递的是需要传递给Recver函数对象的参数,即con指针。通过bind,将Tcpserver内的Recver成员函数转换为Connection类内的_reader成员函数。

image-20231129180452445

  1. 建立sock和Connection对象的映射关系,以sock作为key值,Connection对象的指针作为value值,建立unordered_map数据结构进行组织管理,Connection对象内是针对key值的sock配备的缓冲区,处理函数。

  2. 在epoll中读取到数据,处理完后不能立刻发送回给客户端。因为发送缓冲区是否具有空间是未知的。服务器启动后,发送条件是就绪的,是可以直接发送,但会存在一次性发送完的情况,可以下一次调用sender的时候再发送,这就要求每一个sock需要配备自己的发送缓冲区。并且将发送事件注册到epoll中,让epoll管理。由于服务器的需求以接收事件居多,发送事件相比需求不大,因此对于epoll来说接收事件是常规设置,发送事件是按需设置。

main.cc

#include <iostream>
#include <functional>
#include <vector>
#include <memory>
#include "err.hpp"
#include "tcpserver.hpp"
#include "protocol.hpp"
using namespace std;
using namespace TCP_sv;static void Usage(string proc)
{cerr << "Usage:\n\t" << proc << " port "<< "\n\n";
}string resp(const string &s)
{return s;
}bool cal(const Request &req, Response &resp)
{// req已经有结构化完成的数据啦,你可以直接使用resp._exitcode = NONE;resp._result = NONE;switch (req._op){case '+':resp._result = req._x + req._y;break;case '-':resp._result = req._x - req._y;break;case '*':resp._result = req._x * req._y;break;case '/':{if (req._y == 0)resp._exitcode = DIV_ZERO;elseresp._result = req._x / req._y;}break;case '%':{if (req._y == 0)resp._exitcode = MOD_ZERO;elseresp._result = req._x % req._y;}break;default:resp._exitcode = OP_ERR;break;}return true;
}void calculate(Connection *conn)
{string onepackage;// 从完整报文中取出有效载荷while (handleOnePackage(conn->_inbuffer, &onepackage))// 如果是读到一个完整的报文,就进入while循环体内对报文进行处理,形成响应返回给client端{//去报头string req_str;if(!deLength(onepackage,&req_str)){logMessage(FATAL,"delength err");return;}cout << "有效载荷: " << req_str << endl;// 反序列化:用有效载荷去构造req对象Request req;if (!req.Deserialize(req_str)){logMessage(FATAL, "Deserialize err");return;}// 用req对象的成员去构造resp对象--处理函数,然后构造响应Response resp;cal(req, resp);string respstr;// 用resp对象去序列化出一个报文if (!resp.Serialize(&respstr)){logMessage(FATAL, "resp serialize err");return;}// 将报文加上报头,然后填充到输出缓冲区中conn->_outbuffer += enLength(respstr);cout << "-----result: " << conn->_outbuffer << endl;}//if (conn->_writer)conn->_writer(conn);if (!conn->_outbuffer.empty()) // 如果没有发送完conn->_tcps->EnableReadWrite(conn, true, true);//如果这次数据没发完,下次epoll检查就绪事件时当前的sock的写事件还是就绪的,那么下次epoll就自动调用写函数继续写elseconn->_tcps->EnableReadWrite(conn, true, false);//通过回调指针调用tcp对象的函数。这次写完了,将写事件关闭
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(USAGE_ERR);}uint16_t port = atoi(argv[1]);unique_ptr<Tcpserver> selsv(new Tcpserver(calculate, port));selsv->inittcpserver();selsv->Distribute();return 0;
}

protocol.hpp

#pragma once
#include<iostream>
#include<string>
#include <sys/types.h>
#include <sys/socket.h>
#include <cstring>
#include <jsoncpp/json/json.h>
#include"log.hpp"
using namespace std;#define SEP " "
#define SEP_LEN strlen(SEP)//strlen统计'\0'之前的字符个数,而sizeof统计的是所占内存的空间大小,使用sizeof会越界出问题
#define LINE_SEP "\r\n"
#define LINE_SEP_LEN strlen(LINE_SEP)enum {NONE=0,DIV_ZERO,MOD_ZERO,OP_ERR
};
//"x op y"->"text_len"\r\n"x op y"\r\n---給内容加上报头
std::string enLength(const std::string& text)//协议定制
{std::string send_str=to_string(text.size());send_str+=LINE_SEP;send_str+=text;send_str+=LINE_SEP;return send_str;
}
//"text_len"\r\n"x op y"\r\n -> "x op y"---去掉报头,取出里面的内容
bool deLength(const std::string& str,string* ret)//协议定制
{auto it=str.find(LINE_SEP);//找到报头if(it==std::string::npos) return false;//如果没找到则直接返回int len=stoi(str.substr(0,it));//取出字符串的长度*ret=str.substr(it+LINE_SEP_LEN,len);//取出数据return true;
}class Request
{
public:
Request():_x(0),_y(0),_op(0){}
Request(int x,int y,int op):_x(x),_y(y),_op(op){}bool Serialize(std::string* out)//序列化,将传入的x op y转化为字符串"x op y"
{*out="";*out+=to_string(_x);*out+=SEP;*out+=to_string(_op);*out+=SEP;*out+=to_string(_y);return true;
}bool Deserialize( const string& origin)//反序列化,将传过来的字符串拆出来传参給_x _op _y
{//"_xSEP_opSEP_y"-> _x,_op,_yauto leftit=origin.find(SEP);cout<<"Deserialize找到了leftSEP: "<<leftit<<endl;auto rightit=origin.rfind(SEP);cout<<"Deserialize找到了rightSEP: "<<rightit<<endl;if(leftit==string::npos|| rightit==string::npos) return false;if(leftit==rightit) return false;int opsize=rightit-leftit-1;cout<<"opsize: "<<opsize<<endl;
//1 43 1--leftit=1,rightit=4,opsize=rightit-leftit-1=4-1-1=2;
//1 3 1--leftit=1,right=3,opsize=rightit-leftit-1=3-1-1=1// if(rightit-(leftit+SEP_LEN)!=1) return false;if(rightit-(leftit+SEP_LEN)!=opsize) return false;//+号ASCII码是43,从char转int被解析成43即stringlen为两位,这里的运算rightit-(leftit+SEP_LEN)!=1就出问题
//4-(1+1)==2;3-(1+1)=1std::string origin_x=origin.substr(0,leftit);std::string origin_y=origin.substr(rightit+SEP_LEN);if(origin_x.empty()) return false;if(origin_y.empty()) return false;cout<<"origin_x: "<<origin_x<<" origin_y: "<<origin_y<<endl;_x=stoi(origin_x);int opf=stoi(origin.substr(leftit,rightit));_op=opf;cout<<"opf: "<<opf<<"_op: "<<_op<<endl;_y=stoi(origin_y);return true;}public:int _x;int _y;char _op;
};class Response
{
public:
Response():_exitcode(0),_result(0){}
Response(int exitcode,int result):_exitcode(exitcode),_result(result){}
bool Serialize(string*out)//序列化
{//_exitcode _result ->"_exitcodeSEP_result"
*out="";
*out+=to_string(_exitcode);
*out+=SEP;
*out+=to_string(_result);return true;
}bool Deserialize(const string& in)//反序列化
{//_exitcodeSEP_result"->_exitcode _resultauto pos=in.find(SEP);
if(pos==string::npos) return false;string excstr=in.substr(0,pos);
string resstr=in.substr(pos+SEP_LEN);
if(excstr.empty()||resstr.empty()) return false;_exitcode=stoi(excstr);
_result=stoi(resstr);
return true;}public:
int _exitcode;//退出码
int _result;//结果
};//"text_len"\r\n"x op y"\r\n
bool handleOnePackage(string& inbuffer,string*out)
{*out="";auto pos=inbuffer.find(LINE_SEP);//找\r\nif(pos==string::npos) return false;//没找到报头和有效载荷之间的分隔符---如果字节流式的报文没读全就继续读string text_len=inbuffer.substr(0,pos);//报头是有效载荷的长度int len=stoi(text_len);int totallen=text_len.size()+LINE_SEP_LEN*2+len;//整个报文的长度if(inbuffer.size()<totallen) {logMessage(WARNING,"got uncomplete message");return false;//报文没读完继续读}logMessage(NORMAL,"got complete message");*out=inbuffer.substr(0,totallen);inbuffer.erase(0,totallen);return true;}

log.hpp

#pragma once#include <iostream>
#include <string>
#include<ctime>
#include <sys/types.h>#include <unistd.h>#include <stdio.h>
#include <stdarg.h>
using namespace std;
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4
#define ERROR_EPOLL_CREATE 5#define NUM 1024
#define LOG_STR "./logstr.txt"
#define LOG_ERR "./log.err"
const char* to_str(int level)
{switch(level){case DEBUG: return "DEBUG";case NORMAL: return "NORMAL";case WARNING: return "WARNING";case ERROR: return "ERROR";case FATAL: return "FATAL";case ERROR_EPOLL_CREATE: return "ERROR_EPOLL_CREATE";default: return nullptr;}
}void logMessage(int level, const char* format,...)
{char logprestr[NUM];
snprintf(logprestr,sizeof(logprestr),"[%s][%ld][%d]",to_str(level),(long int)time(nullptr),getpid());char logeldstr[NUM];
va_list arg;
va_start(arg,format); 
vsnprintf(logeldstr,sizeof(logeldstr),format,arg);//arg是logmessage函数列表中的...cout<<logprestr<<logeldstr<<endl;}

Sock.hpp

#pragma once#include<iostream>
#include<string>
#include<cstring>
#include<sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"const static int backlog=32;
const static int defaultsock=-1;
class Sock
{const static int backlog=32;public:Sock(int sock=defaultsock):_listensock(sock){}void Socket(){_listensock=socket(AF_INET,SOCK_STREAM,0);//创建套接字if(_listensock<0)//创建失败{logMessage(FATAL,"create sock error");exit(SOCKET_ERR);}//创建成功logMessage(NORMAL,"create sock success,origin sock: %d\n",_listensock);int opt=1;setsockopt(_listensock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));//允许套接字关闭后立刻重启} void Bind(int port){//绑定自己的网络信息struct sockaddr_in local;memset(&local,0,sizeof(local));//将结构体清空local.sin_family=AF_INET;//添加协议local.sin_port=htons(port);//添加端口号local.sin_addr.s_addr=htons(INADDR_ANY);//不绑定指定IP,可以接收任意IP主机发送来的数据//将本地设置的信息绑定到网络协议栈if (bind(_listensock,(struct sockaddr*)&local,sizeof(local))<0){logMessage(FATAL,"bind socket error");exit(BIND_ERR);}logMessage(NORMAL,"bind socket success");}void Listen()//将套接字设置为监听{if(listen(_listensock,0)<0){logMessage(FATAL,"listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL,"listen socket success");}int Accpet(string * clientip,uint16_t* clientport,int*err){logMessage(DEBUG,"enter Accept");*err=errno;struct sockaddr_in cli;socklen_t len= sizeof(cli);logMessage(DEBUG,"will accept");//拿上来连接后,第二次调用到这里,调用accept函数阻塞住了,难道不是设定了sock是非阻塞了吗?11.17.21.24int sock=accept(_listensock,(struct sockaddr*)&cli,&len);logMessage(DEBUG,"accept done");if(sock<0){logMessage(FATAL,"accept error");//这里accept失败为什么不退出}else{logMessage(NORMAL,"accept a new link,get new sock : %d",sock);*clientip=inet_ntoa(cli.sin_addr);*clientport=ntohs(cli.sin_port);}logMessage(DEBUG,"quit Accept");return sock;}int Fd(){return _listensock;}private:int _listensock;
};

until.hpp

#pragma once#include<iostream>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<cerrno>
using namespace std;
class Until
{
public:
static void setNonBlock(int fd)//把文件描述符设置为非阻塞
{int n=fcntl(fd,F_GETFL);//获取文件描述符的状态,正常返回非-1的标志位,出错返回-1if(n<0){cerr<<"fcntl :"<<strerror(errno)<<endl;return ;}fcntl(fd,F_SETFL,n|O_NONBLOCK);//对文件描述符的状态进行设置,设置为非阻塞状态
}};void Print_log()
{cout<<"print_log"<<endl;
}void Download()
{cout<<"download_something"<<endl;
}

Epoller.hpp

#pragma once#include <iostream>
#include <sys/select.h>
#include <string>
#include <functional>
#include <sys/epoll.h>
#include "Sock.hpp"using namespace std;
static const int defaultfd = -1; // 默认fd
static const int size = 128;     //
class Epoller
{
public:Epoller(int fd = defaultfd) : _epfd(fd) {}~Epoller(){if (_epfd != defaultfd){close(_epfd);}}void Create(){logMessage(DEBUG, "enter epoller create");_epfd = epoll_create(size); // 将管理事件数传进去,创建一个具有指定事件数的epoll模型if (_epfd < 0){logMessage(ERROR, "epoll_create error");exit(ERROR_EPOLL_CREATE);}logMessage(DEBUG, "out epoller create,_epfd: %d\n", _epfd);}// 用户告知内核,需要底层监管那些事件bool Add_Event(int sock, uint16_t event) // 将sock和event添加到epoll模型中{struct epoll_event epv;epv.events = event;epv.data.fd = sock;int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &epv);return n == 0;}// 内核告诉用户,已经就绪了多少个事件int Wait(struct epoll_event revent[], int num, int timeout) // 监管num个事件,就绪事件保存在revent数组中{logMessage(DEBUG, "enter epoll_Wait");int n = epoll_wait(_epfd, revent, num, timeout);logMessage(DEBUG, "quit epoll_Wait,get n: %d", n);//     sleep(2);return n; // 内核帮助用户监管事件,返回已经就绪的事件数量}bool Control(int sock, uint32_t event, int action){struct epoll_event epv;epv.events = event;epv.data.fd = sock;int n = 0;if (action == EPOLL_CTL_MOD){logMessage(NORMAL,"enter Control MOD");n = epoll_ctl(_epfd, action, sock, &epv);}else if (action == EPOLL_CTL_DEL){n = epoll_ctl(_epfd, action, sock, nullptr);}elsen = -1;return n == 0;}void Close(){if (_epfd != defaultfd){close(_epfd);}}private:int _epfd;
};

由于该reactor处理的是类似于接收“1+1”的完整报文的数据,返回客户端所得数的业务,因此客户端也需要能够具备发送完整报文的能力。

calclient.cc

#include<iostream>
#include<string>
#include<memory>
#include"calclient.hpp"
using namespace std;
using namespace client;
static void Usage(string proc)
{cout<<"\nUsage :\n\t"<<proc<<" serverip serverport\n"<<endl;
}
int main(int argc, char* argv[])
{if(argc!=3){Usage(argv[0]);exit(1);}string serverip=argv[1];
uint16_t serverport=atoi(argv[2]);unique_ptr<calclient> tc(new calclient(serverip,serverport));tc->initclient();
tc->start();return 0;
}

calclient.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <ctype.h>
#include"protocol.hpp"
using namespace std;
#define NUM 1024
namespace client
{class calclient
{public:
calclient(const string& ip,const uint16_t& port)
:_sock(-1)
,_port(port)
,_ip(ip)
{}void initclient()
{
//1.创建sockfd
_sock=socket(AF_INET,SOCK_STREAM,0);
if(_sock<0)
{cerr<<"socket create error"<<endl;exit(2);
}
//2.绑定 ip port,不显示绑定,OS自动绑定
}void start()
{
struct sockaddr_in ser;
bzero(&ser,sizeof(ser));
socklen_t len=sizeof(ser);
ser.sin_family=AF_INET;
ser.sin_port=htons(_port);
ser.sin_addr.s_addr=inet_addr(_ip.c_str());
if(connect(_sock,(struct sockaddr *)&ser,len)!=0)
{cerr<<"connect error"<<endl;
}else
{string line;string inbuffer;while(true){cout<<"mycal>>: ";//输入"xopy"getline(cin,line);Request req=ParseLine(line);//用"xopy"取出x op y构造Request对象string context;req.Serialize(&context);//序列化,用x op y构造字符串"xSEPopSEPy"string send_str=enLength(context);//定制协议---"x op y"->"text_len"\r\n"x op y"\r\n---給内容加上报头cout<<"calclient send str: "<<send_str<<endl;send(_sock,send_str.c_str(),send_str.size(),0);//客户端把报文发送給服务器string package;if(!recvPackage(_sock,inbuffer,&package)) continue;//服务器处理完数据,客户端接收服务器发送来的报文//  "content_len"\r\n"exitcode result"\r\nstring reser_len;if(!deLength(package,&reser_len)) continue;//去报头//  "content_len"\r\n"exitcode result"\r\n -> "exitcode result"Response rep;rep.Deserialize(reser_len);//反序列化://_exitcodeSEP_result"->_exitcode _resultcout<<"_exitcode: "<<rep._exitcode<<endl;cout<<"_result: "<<rep._result<<endl;}
}
}~calclient()
{if(_sock>=0) close(_sock);
}Request ParseLine(const string& line)
{//"xopy"->取出来到x op y 上
int i=0;
int status=0;
int num=line.size();
string left,right;
char op;while(i<num)
{
switch(status)
{case 0:{if(!isdigit(line[i])){op=line[i];//取出运算符**status=1;}elseleft.push_back(line[i++]);//取出左操作数}break;case 1:i++;status=2;break;case 2:right.push_back(line[i++]);break;
}
}
cout<<"left: "<<stoi(left)<<" op: "<<op<<" right: "<<stoi(right)<<endl;
return Request(stoi(left),stoi(right),op);//返回Request对象}private:
int _sock;
uint16_t _port;
string _ip;};
}

image-20231129194945169
reactor保证了事件就绪,还负责了IO,并且还完成了业务处理。负责了IO过程+业务处理,该过程称为半同步。实际上可以将业务放到其他处理逻辑上,只负责IO过程,这称为半异步。


https://www.xjx100.cn/news/3119009.html

相关文章

【JavaEE初阶】死锁问题

目录 一、死锁的三种典型场景 1、一个线程&#xff0c;一把锁 2、两个线程&#xff0c;两把锁 3、N个线程&#xff0c;M把锁 死锁&#xff0c;是多线程代码中的一类经典问题。我们知道加锁是能解决线程安全问题的&#xff0c;但是如果加锁的方式不当&#xff0c;就可能产生死…

视图层、模板(补充)

视图层 响应对象 响应---》本质都是 HttpResponse HttpResponse---》字符串render----》放个模板---》模板渲染是在后端完成 js代码是在客户端浏览器里执行的模板语法是在后端执行的redirect----》重定向 字符串参数不是是空的状态码是 3开头JsonResponse---》json格式数据 …

ffmpeg在centos系统下的源文件下载脚本及编译脚本

下载脚本&#xff1a; #只做下载的动作&#xff0c;之后可以移动到其它环境下编译 #from https://trac.ffmpeg.org/wiki/CompilationGuide/Centos #yum install autoconf automake bzip2 bzip2-devel cmake freetype-devel gcc gcc-c git libtool make pkgconfig zlib-devel m…

加密挖矿、AI发展刺激算力需求激增!去中心化算力时代已来临!

2009年1月3日&#xff0c;中本聪在芬兰赫尔辛基的一个小型服务器上挖出了比特币的创世区块&#xff0c;并获得了50BTC的出块奖励。自加密货币诞生第一天起&#xff0c;算力一直在行业扮演非常重要的角色。行业对算力的真实需求&#xff0c;也极大推动了芯片厂商的发展&#xff…

点击元素以外的事件监听

在项目中&#xff0c;我们经常会遇到需要监听目标元素以外的区域被点击或鼠标移入移出等需求。 例如下面我们有一个表格里面嵌套表单的组件 我希望点击n行的时候&#xff0c;n行的元素变成表单元素进行输入或者选择&#xff0c; 当我点击其他其他区域n行又会恢复成数据展示…

nvm for windows使用与node/npm/yarn的配置

1 下载 nvm for windows download – github 下拉到Assets, 下载.exe文件 2 安装 安装到如下文件夹中 目录可以自己选, 可以换别的名字, 自己记住即可 新手建议全部看完再进行个人配置, 或者使用与博主一致的路径 D:\DevelopEnvironment\nvm3 配置nvm使用的镜像 node_mir…

Opencv手势控制音量!附源码!

效果演示&#xff1a; 废话不多说&#xff01;直接上源码&#xff01;下面写有所有代码注释&#xff01;&#xff01; import cv2 import mediapipe as mp #它包含了各种预训练的机器学习模型&#xff0c;可以用于姿势估计、手势识别等任务 from ctypes import cast, POINTE…

(C++)移动零--双指针法

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能&#xff0c;轻松拿下世界 IT 名企 Dream Offer。https://le…