本篇开始总结的是一个仿muduo库one-thread-one-loop式并发服务器的项目
本项目已开源到下面链接下的仓库当中
concurrent-server
针对于日志的信息,我采用了之前写的一份利用可变参数实现日志的代码,具体链接如下
C++:可变参数实现日志系统
Http服务器自主实现
Boost搜索引擎客户端
在前面的Linux网络编程中,通过Socket可以实现一个简单的TCP的服务器,也可以实现一个简单的通信功能,但是如果现在有很多的客户端同时对于这个服务器进行请求,那么很明显效率是不高的,所以这个项目就是要在前面的TCP服务器的基础之上,进行一些设计,最终实现一个效率很高的并发服务器
本项目会用到HTTP服务器,在之前的内容中对于HTTP有过理解,这里简单总结一下:
实现HTTP服务器主要就是以上的几步,虽然实现HTTP服务器并不难,但是想要在有很多客户端同时进行请求,还能实现高性能的服务器并不容易,这也是本项目要解决的核心问题
Reactor模型说的是,把一个或者多个输入同时传递给服务器进行请求处理时的事件驱动处理模式,当服务器收到了多个请求之后,把这些请求同步分派给请求对应的处理线程,这就是Reactor模式,也叫做是Dispatcher模式,简单来说,就是IO多路复用,也叫做是多路转接,统一对事件进行监听,之后把收到的事件发送到处理的进程或者是线程,而IO多路复用是高性能网络版本服务器必备的技术之一,由此可见这个技术也收到了广泛的使用
具体的原理如下所示:
单Reactor单线程:
对于单Reactor单线程来说,相对的思想比较简单:
但是由于它的设计比较简单,所以只是适用于一些客户端数量比较少,处理速度比较快速的场景
单Reactor多线程:
在这种设计模式中,和前面的区别是在处理方面,当触发事件后,这种设计模式只会进行单纯的简单的IO操作,而对于接收上来的数据的后续处理,则是放到线程池中进行业务处理,当工作线程处理结束之后,再把响应放到Reactor线程进行数据响应
由此可见,它和上述的设计模式相比,优点在于采用线程池,可以充分的利用CPU的资源进行处理,但是由于是采用了多线程的设计模式,所以在对于数据共享访问控制比较负责,并且由于是单Reactor进行所有事件的监听和响应,在单线程中运行,在遇到高并发的场景下也会遇到性能瓶颈的问题
多Reactor多线程:
所谓多Reactor多线程,就是设计一个主Reactor,用来监听新的链接,在获取到新链接后,下派到多个从属Reactor当中,每一个从属Reactor负责的工作就是进行数据的IO获取,在获取到IO数据之后再下发到业务线程池当中,进行业务数据的处理,在工作线程处理结束之后,就可以把响应交给子Reactor线程进行数据响应了
具体的逻辑描述如下所示:
由此可见,这种设计模式相较于上述的两种设计模式来说,充分利用了CPU的多核资源,并且让主从Reactor分开进行处理,不过,这种设计模式有比较多的执行流,在面临到一些场景中,太多的执行流也可能会面临一些可能存在的问题,比如频繁的切换会增加CPU的调度成本,这些后续如果遇到了再进行分析处理
有了上面的三种理论设计,那在本项目当中,使用的是多Reactor设计模式,但对于数据的业务处理,也放到了从属Reactor当中进行处理,因为过多的执行流可能会对CPU的调度产生比较大的压力,未来如果有机会,可以实现出带有多线程池版本的并发服务器
所以本项目的初步版本,实现的是一个One Thread One Loop的思想,也就是说是把所有的操作都放到一个线程中执行,一个线程对应一个事件的处理的循环
在整体的模块分析方面,可以大致分为两个模块
那下面就针对于下面的这些模块,来进行更加详细的划分
在这个模块中,主要是要对于所有的链接以及线程进行管理,管理的主要内容有下面的三个方面
所以基于上面的这三个主要的管理方式,又可以构建出下面的一些更加细致化的模块
Buffer模块
在之前的代码中,对于Buffer的设计都是直接用一个大数组来表示,但是在实际的TCP服务器进行通信的时候,有可能会出现读取上来的数据并不是一个完整的数据,在发送的时候可能对方还不能进行接收,所以要对于这些没有完全就绪的数据进行一个暂时的缓存管理的过程,所以要单独设计出一个Buffer模块,主要的功能就是要实现通信套接字的用户态缓冲区
对于这个模块来说,基本的设计功能就是可以对于缓冲区中添加数据,以及可以对于缓冲区中的数据进行读取,主要就是要实现上述的这两个功能即可
Socket模块
在之前的代码中对于Socket已经实现过封装,那么这里就不再过多描述这个封装的内容目的了,在创建套接字会有一些比较繁琐的步骤,因此要对于套接字进行封装,主要实现的功能设计包括有创建套接字、绑定地址信息、开始监听、向服务器发起连接、获取新连接、接收和发送数据、关闭套接字、新建一个监听连接和客户端连接
Channel模块
在对于Channel模块的设计中,主要是要完成的内容是对于一个描述符进行各种事件的监控和管理,这样就能在进行触发事件后的操作流程更加的清晰可靠
在功能的设计方面,要包含对于监控事件的管理,大致包含有:描述符是否可读、描述符是否可写、对描述符监控可读、对描述符监控可写、解除可读事件的监控、解除可写事件的监控、解除所有事件的监控,另外还要包含对于监控事件触发后的管理,大致包含有要设置不同事件的回调处理函数,这样在触发了某个事件之后,可以更好的直接对于这些事件进行调用处理
Connection模块
对于Connection模块来说,它内部会包含有前面所说的三个模块,它本身就是一种整体的封装,用来实现的是对一个通信套接字整体的管理过程,每一个进行数据通信的套接字,都会使用到Connection进行管理
那么在Connection模块内部必然会设置一定的回调函数,主要包括的功能有连接建立完毕之后的回调、事件回调、新数据回调、关闭回调,而在Connection模块内部会为组件的使用者提供两个接口,分别是用户态接收缓冲区和用户态发送缓冲区的接口,而在这个模块整体的内部会设置有一个Socket对象,这个Socket对象负责的就是对于系统的各项IO操作,模块内部还会包含有一个Channel对象,用来完成描述符IO事件就绪的处理过程
对于整体的Channel模块的设计当中,要先把例如有可读、可写、错误等不同事件的IO事件设置有回调函数,之后再把Channel和对应的描述符设置到poller事件的监控当中
之后当这个描述符所对应的文件信息已经就绪了一些IO的可读事件之后,此时就可以调用这个描述符对应Channel当中保存的读事件处理函数,来把下面的数据读取到上层当中,把Socket接收缓冲区全部读取到Connection管理的用户态接收缓冲区当中,再把这些数据上传到回调函数中进行新的处理模式
而当组件的使用者对于数据的处理结束之后,就可以用Connection提供的数据接口,再把所使用的数据写入到Connection的发送缓冲区中,然后在启动这个描述符在poll模块当中的IO写事件的监控,就绪之后就可以调用Channel中保存的写事件处理函数,把发送给缓冲区当中的数据通过Socket进行面向系统的实际数据的发送,这样就完成了一次完成的数据读取、处理、发送的过程
所以对于功能设计来说,整个Connection模块必然要面临有关闭连接的能力,同时也要有支持协议切换的功能,对于长时间占用服务器资源而不连接的用户来说,也要给使用者提供对应的开启和关闭非活跃连接超时释放的功能
Acceptor模块
对于Acceptor模块来说,主要是hi对于Socket,Channel模块的一个整体的封装,主要是实现的是对于一个监听套接字的整体的管理过程
其中,在Acceptor模块的内部是包含有一个Socket对象的,用来实现的是一个监听套接字的操作,并且还要有一个Channel对象,用来实现对于监听套接字IO事件就绪的后续处理过程
所以从功能设计来说,这个模块的主要功能就是对于回调函数的设置,能够到达的效果就是在新建连接获取成功的回调设置,让服务器本身来进行指定
TimeQueue模块
对于TimeQueue模块来说,要实现的效果是可以定时的执行某个任务,在我们当前的这个项目当中,就是设置非活跃连接超时释放的功能,假设对于一个新的连接,在它连接后的一段时间内如果没有任何事件的发生,那么就会把它判定为是一个非活跃的连接,所以就要把它释放掉,这就是一个TimeQueue模块的基本功能
所以从功能设计的角度来讲,想要设计它的功能就必须要先有一个添加定时任务的功能,有添加的功能就要有取消的功能,对于一个事件触发之后,就要对于这个非活跃的判断进行重新的刷新,所以就还要带有一个定时任务重新开始计时的设计方案
Poller模块
对于Poller模块其实并不陌生,在前面学习的select、poll、epoll这样的多路转接方案中都有Poller这样的概念,简单来说就是对于所有的描述符进行监听各种的IO事件,比如说有添加,修改,移除这样的功能,还应该有获取活跃连接的功能
Poller模块的底层我采用的就是epoll来进行的一个封装
EventLoop模块
本项目的名称叫做One Thread One Loop,那这个Loop的意思就将在本模块中进行介绍,在本模块当中主要是要保证内部监控的所有的描述符都是活跃的,而对于非活跃的描述符要及时释放,以免造成服务器资源的浪费,Loop其实就是所说的Reactor
而在这个模块的内部会包含有一个eventfd,这是Linux内核提供的一个事件fd,专门用来进行事件的通知,同时还会包含有一个Poller对象,用来对于描述符进行IO事件监控操作,也会包含有TimeQueue对象,用来对于定时任务的管理
在这个模块的内部会存在有一个PendingTask队列,使用者在进行Connection进行的所有操作,都会加入到这个任务队列当中,由EventLoop进行统一的管理,并且在EventLoop对应的线程中进行执行,每一个Connection对象都会绑定在一个EventLoop上,这样就可以保证整个的链接的操作是在一个线程中完成的
从功能设计的角度来讲,对于这个EventLoop的模块要有把链接的操作任务添加到任务队列的功能,同时对于定时任务要有添加、刷新、取消的功能
TcpServer模块
上面的模块设计都是在内部进行实现的,而真正暴露给用户的是下面的这个模块,TcpServer,这个模块也是把上面的所有模块进行了一个总结的功能
上述展示了各个模块的基本功能和设计,但是整体来说还是思维较乱,所以下面我用几张图来把这些模块之间的关系构建出来
首先看的是Connection模块的逻辑图
首先我展示的是对于Connection通信连接管理的一个图,在Connection模块当中是包含有如下的三个模块的,分别是Buffer缓冲区模块,Socket套接字操作模块,Channel描述符事件管理模块,对于这三个模块的描述都在图中,下面回到Connection模块,来看看Connection模块是如何借助下面的这三个模块来实现它的基本功能的
我们直接看内部描述符事件操作的接口,Connection模块可以通过Socket接收数据和发送数据,所以Connection模块必然是要和Socket模块进行紧密联系的,而这里采用的是多路转接的方案,所以就意味着会对所有的描述符进行监听,那此时就需要Channel模块对于所有事件进行管理的工作了,Channel模块当中包含有对于可读和可写事件的回调,所以在Connection模块的Socket接收和发送数据,本质上是通过Channel模块的回调进行设置的,当监听的描述符满足要求之后,就会调用Channel模块自定义设置的回调函数,进行可读和可写事件的回调,进而在Connection模块就可以通过Socket接收和发送数据了,这样就解释清楚了Connection模块是如何通过Socket接收和发送数据的
每一个Connection对象内部都会被提前设置好各个事件回调,例如有连接建立完成回调,新数据接收后的回调,任意时间的回调,关闭连接的回调,这些回调都是由TcpServer进行设置好的,这也算是Connection对象内部的接口
而在Connection模块当中还会存在有关闭套接字解除事件监控和刷新活跃度的功能,这两个功能本质上是和Socket功能是一样的,它的底层都是借助了Channel模块,当触发了挂断和错误事件回调的时候,就会促使Connection模块执行对应的方案,如果使用者设置了非活跃度销毁的方案,也会在事件触发后刷新统计时间,至此我就把Connection模块和Channel模块联系在一起,解释清楚了
那下面来看Socket套接字模块,其实Socket套接字模块本身就和上面有十分紧密的关系,在底层进行监听的本质,其实就是监听对应的Socket的相关信息到底有没有就绪,当Socket套接字监听到有信息就绪的时候,就会被多路转接的相关接口监听到,进而促使到Channel模块进行函数回调,促使Connection模块执行对应的策略,所以Socket模块也就解释清楚了
那对于Buffer模块来说,其实就更加简单了,Buffer模块是缓冲区模块,这就意味着只要涉及到接收和发送数据的操作,都是和Buffer模块是紧密相关的,Socket接收到的数据放到接收缓冲区中,而发送数据是放到发送缓冲中,也就是说Buffer缓冲区是和Socket模块紧密相关的,而正是有了Socket模块才会有多路转接提醒上层可以进行后续操作了,此时就会调用到Channel模块进行调用用户设置的回调,进而到达Connection模块的各种数据的处理
至此,站在Connection模块的层面,不关心底层的逻辑,已经可以把内部的这些事件的接口都理清楚了,而对于暴露在外的接口来说,也只是底层的这些内部接口的封装,比如所谓关闭连接,发送数据,切换协议,启动和取消非活跃销毁这些操作,未来其本质就是借助的是内部对于描述符事件操作接口的描述,这个在后面的代码中可以进行体现
下面来看Acceptor模块的逻辑图
如上所示的是关于Acceptor监听连接管理模块的逻辑示意图,其中需要注意的是该模块是在将监听套接字添加到可读事件监控,一旦有事件触发,就意味着有新连接已经建立完成,那么就要为这个新连接设置初始化操作
其实这个模块并不陌生,在前面的poll和epoll的部分已经有过这个模块的内容,但是考虑到项目的完整性,我再把这块内容逻辑分析一下:当使用多路转接去监听各个文件描述符时,如果有新的连接上来了,那么就会触发可读事件的监控,那么在Channel版块就会调用一些回调函数到上层进行新连接的获取,在获取了新连接之后就要进行新连接的初始化,当然这个新连接的初始化是TcpServer来决定的,Acceptor模块也并不清楚,而Acceptor模块自然也是和Socket模块是相关联的,因为这当中必然会涉及到一些套接字相关的操作
如上就把Acceptor模块的内容梳理结束了
下面引入的是EventLoop模块的逻辑图
如上所示是对于EventLoop的逻辑图,先看一下EventLoop内部的接口:在其内部的接口当中包含有的这六个接口,分别来自于两个小模块:Poller描述符事件监控模块和TimeQueue定时任务模块,同时EventLoop还包含有添加连接操作任务到任务队列中的功能。
先看Poller描述符事件监控:这个模块主要做的是对于一个文件描述符进行各项事件的监控操作,包含有添加,修改,移除事件监控的操作,而每一个Poller管理的描述符又会和Channel模块产生联系,因为Channel模块本身就是用来对于每一个描述符可能包含的事件做出的管理,当管理的描述符内部触发了某种事件,那么就会相应的调用这些事件内部的一些回调函数,这是提前就被设置好的内容,而对于TimeQueue来说,它的设置主要是对于一些超时不连接进行断开连接的操作,这部分内容会在Connection模块被设置,在前面的内容已经提及过
在Connection模块看到,对于Connection模块来说,Channel模块的意义就是设置了各种回调,这些回调都是会回调指向原来的Connection模块的,所以Channel对象的回调函数回调的位置就到了Connection模块,这样就把EventLoop、Channel、Connection模块都联系在了一起
至此也就完成了上述这些模块的逻辑思路构建,下面就可以进行代码的编写了
在进行TimeQueue模块中,需要用到一个定时器的概念,而在之前的内容中没有对于这部分内容进行总结,因此这里对于这个定时器的内容进行学习
首先认识一下定时器的相关接口信息:
#include int timerfd_create(int clockid, int flags); int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value); // 参数中包含的结构体 struct timespec { time_t tv_sec; /* Seconds */ long tv_nsec; /* Nanoseconds */ }; struct itimerspec { struct timespec it_interval; /* Interval for periodic timer */ struct timespec it_value; /* Initial expiration */ };
首先介绍一下timerfd_create函数,这个函数的功能是创建一个定时器
对于第一个参数clockid来说,有两个选项:
第二个参数是flag,也就是所谓的标记位,这里我们选择是0表示的是阻塞操作
函数的返回值是一个文件描述符,因为Linux下一切皆文件,所以对于这个函数来说其实就是打开了一个文件,对于这个定时器的操作就是对于这个文件的操作,定时器的原理其实就是在定时器的超时时间之后,系统会给这个描述符对应的文件定时器当中写入一个8字节的数据,当创建了这个定时器之后,假设定时器中创建的超时时间是3秒,那么就意味着每3秒就算是一次超时,那么从启动开始,每隔3秒,系统就会给描述符对应的文件当中写入一个1,表示的是从上一次读取到现在超时了1次,假设在30s之后才读取数据,那么会读上来的数据是10,表示的是从上一次读取到现在实践超出限制了10次
如上是第一个函数的详细内容的介绍,下面进入第二个函数timerfd_settime
对于这个函数来说,表示的是启动定时器,函数的第一个参数是第一个函数的返回值,这个文件描述符其实也是创建的定时器的标识符,而第二个标记位表示的是默认位0,表示的是使用的是相对时间,后面的两个参数也很好理解,表示的是新的时间和旧的时间,不需要就置空即可,那么下面写一份实例代码:
#include #include #include using namespace std; int main() { // 创建一个定时器 int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); struct itimerspec itm; // 设置第一次超时的时间 itm.it_value.tv_sec = 1; itm.it_value.tv_nsec = 0; // 设置第一次超时后,每隔多长时间超时一次 itm.it_interval.tv_sec = 1; itm.it_interval.tv_nsec = 0; timerfd_settime(timerfd, 0, &itm, nullptr); while(true) { uint64_t tmp; int ret = read(timerfd, &tmp, sizeof(tmp)); if(ret < 0) return -1; cout << "超时的次数:" << tmp << endl; sleep(3); } close(timerfd); return 0; }
上面的例子,就是一个定时器的使用实例,借助这个使用的例子就可以进行判断出每隔1秒超时器就会超时一次,然后向文件中写入一个1,我们这里sleep了3秒,所以后面的读取数据中都是超时了3次
在上述的例子当中,存在一个比较大的问题,每次超时都要把所有的连接遍历一次,这样的效率是比较底下的,所以衍生出了一个新的方案,时间轮:
现在定义一个数组,其中包含一个指针,指针指向的是数组的起始位置,这个指针每秒钟向后走一步,走到哪里就代表哪里的任务被执行了,那么如果想要定一个3秒之后的任务,那么只需要将任务添加到指针后三步的位置,让其每秒钟走一格,那么走到对应的位置就可以执行对应位置的任务即可
下面的第二个问题是,假设当前的定时任务是一个连接的非活跃销毁任务,那么这个任务在什么时候被添加到时间轮当中比较合适呢?
假设我们现在定义非活跃为,一个连接在30秒内都没有通信,那么就把这个看成是一个非活跃的连接,此时就可以把它进行销毁了,但是如果一个连接如果在建立的时候添加了一个30s后小胡的任务,不过在这30s内都有数据通信,那么就把这个链接看成是一个活跃的链接,就不会把它进行销毁了,所以说简单来说,思想就是,当需要在一个连接有IO事件产生,此时就要把这个任务进行延迟执行
那我们如何解决这个问题呢?
类的析构函数
设计一个类,对于定时任务进行封装,类实例化的每一个对象,就是一个定时任务的对象,当对象被销毁的时候,再去执行定时任务,也就是说把定时任务的执行放到析构函数当中去
智能指针shared_ptr
在智能指针中使用shared_ptr,用于对于new出来的对象进行空间管理,当用这个智能指针进行管理的时候,内部会有一个计数器的概念,当计数器为0的时候,就释放所管理的对象
具体的来说,对于智能指针的内部会存在有一个计数器,当计数器为0的时候,才算真正释放对象,那么假设现在在连接10秒的时候进行了一次通信,那么在后续的定时任务中,添加一个30秒后的任务类对象的shared_ptr,那么此时两个任务的shared_ptr的引用计数变成2,那么在第30s的时候定时任务要被释放的时候,引用计数依旧存在,只有在40秒释放的时候,这个任务才会被真正释放,此时就调用析构函数进行释放了
以上就是时间轮的基本思路,那么下面用一个demo代码来说明一下这些思想
#include #include #include #include #include #include #include using namespace std; // 在类内部设置回调 using TaskFunc = function; using ReleaseFunc = function; // 表示一个任务 class TimeTask { public: TimeTask(uint64_t id, uint32_t delay, const TaskFunc &cb) : _id(id), _timeout(delay), _task_cb(cb), _canceled(false) { } ~TimeTask() { if(_canceled == false) _task_cb(); _release(); } void SetRelease(const ReleaseFunc &cb) { _release = cb; } uint32_t DelayTime() { return _timeout; } void Cancel() { _canceled = true; } private: // 表示任务对象的ID信息 uint64_t _id; // 超时时间 uint32_t _timeout; // 定时器对象要执行的任务 TaskFunc _task_cb; // 用于删除时间轮中保存的定时器对象的信息 ReleaseFunc _release; bool _canceled; }; class TimeWheel { using WeakTask = weak_ptr; using PtrTask = shared_ptr; public: TimeWheel() : _capacity(60), _tick(0), _wheel(_capacity) {} // 新增一个指定id的任务 void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { // 新建一个任务,设置相关的属性 PtrTask pt(new TimeTask(id, delay, cb)); pt->SetRelease(bind(&TimeWheel::RemoveTimer, this, id)); int pos = (_tick + delay) % _capacity; // 把信息都存储起来 _wheel[pos].push_back(pt); _timers[id] = WeakTask(pt); } // 刷新和延长指定id的定时任务 void TimerRefresh(uint64_t id) { // 先找到这个id对应的任务,存储起来 auto it = _timers.find(id); if (it == _timers.end()) { // 没找到,那就不做处理了 return; } // 新建一个shared_ptr出来,放到轮子当中 PtrTask pt = it->second.lock(); int delay = pt->DelayTime(); // 找到下一个要放入的位置,放进去 int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); } // 取消任务 void TimerCancel(uint64_t id) { // 先找到这个id对应的任务,然后设置为取消即可 auto it = _timers.find(id); if (it == _timers.end()) return; PtrTask pt = it->second.lock(); if (pt) pt->Cancel(); } // 秒针向后走一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; // 直接把指向的数组释放,其中的元素也就都释放掉了 _wheel[_tick].clear(); } private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if (it != _timers.end()) _timers.erase(it); } private: int _capacity; int _tick; // 秒针,指向哪里释放哪里 vector> _wheel; // 根据任务对象的ID信息找到任务 unordered_map _timers; }; class Test { public: Test() { cout << "任务激活" << endl; } ~Test() { cout << "任务销毁" << endl; } }; void DelTest(Test *t) { delete t; } int main() { // 创建轮子和任务 TimeWheel tw; Test *t = new Test(); // 把任务放到轮子里面 tw.TimerAdd(100, 5, bind(DelTest, t)); // 刷新任务,指针后移 for (int i = 0; i < 5; i++) { tw.TimerRefresh(100); tw.RunTimerTask(); cout << "任务已刷新" << endl; sleep(1); } while (true) { cout << "----------" << endl; tw.RunTimerTask(); sleep(1); } return 0; }
对于Connection来说,它的工作任务之一是要对于连接进行管理,那么就意味着这个模块是会涉及到对于应用层协议的处理的,因此在Connection中要设置协议处理的上下文来控制处理节奏
应用层的协议是有很多的,平时使用最多的是http协议,不过也会有例如ftp协议这样的存在,而为了使得本项目可以支持的协议足够多,那么就意味着不能固定写死某个特殊的协议,而是可以存储任意协议的上下文信息,因此就需要设计一个通用类型来保存各种不同的数据
我们想要做成的效果是,这个Any类,可以接受各种类型的数据,例如有这样的用法
Any a; a = 10; a = "abc"; a = 12.34; ...
那该如何设计这个通用类型Any呢?这里参考了一种嵌套类型,在一个类中嵌套存在一个新的类,在这个类中存在模板,而进而对于类进行处理
class Any { private: class holder { // ... }; template class placeholder : public holder { T _val; }; holder *_content; };
在这个Any类中,保存的是holder类的指针,当Any类容器需要保存一个数据的时候,只需要通过placeholder子类实例化一个特定类型的子类对象出来,让这个子类对象保存数据即可,具体原理为:
这就是C++中的多态在实际运用中的实例
#include #include using namespace std; class Any { public: Any() : _content(nullptr) { } template Any(const T &val) : _content(new placeholder(val)) { } Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) { } ~Any() { delete _content; } Any &AnySwap(Any &other) { swap(_content, other._content); return *this; } // 把子类对象保存的数据取回来 template T *get() { // 想要获取的数据类型,必须和保存的数据类型一致 assert(typeid(T) == _content->type()); // 父类类型的指针指向子类,调用子类内部的接口 return &((placeholder *)_content)->_val; } // 赋值运算符的重载函数 template Any &operator=(const T &val) { // 为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放 Any(val).swap(*this); return *this; } Any &operator=(const Any &other) { Any(other).AnySwap(*this); return *this; } private: class holder { public: virtual ~holder() { } virtual const type_info &type() = 0; virtual holder *clone() = 0; }; template class placeholder : public holder { public: placeholder(const T &val) : _val(val) { } // 把数据类型取出来 virtual const type_info &type() { return typeid(T); } // 克隆出新的对象子类 virtual holder *clone() { return new placeholder(_val); } public: T _val; }; holder *_content; };
至此,前置知识已经都准备完毕了,下一步就开始进行各个模块的实现
在对于Buffer模块的设计中,主要是要考虑到底层用什么设计,如何设计的问题,而在我的这个项目中主要是使用一个vector来表示缓冲区
那为什么不使用string?string更偏向于是字符串的操作,而对于缓冲区来说把他当成一个数组更合适一些
对于Buffer模块的设计来说,主要要考虑的点是要选取的默认的空间大小,读取位置和写入位置,因此在整体的设计中,底层必然要有一个缓冲区数组,以及两个指针,一个指针表示的是当前的读取位置,一个指针表示的是当前的写入位置,对于缓冲区的操作来说,从大的方向上来看肯定是要有接受数据和读取数据的能力,那么下面就对于这两个小模块进行分析
接收数据
对于从外界来的数据,使用一个缓冲区来进行接收,对于缓冲区的问题也要考虑到当前写入的数据写入到哪里了,从哪里开始进行写入,如果缓冲区的空间不足了会怎么办?这些都是需要解决的问题:由于存在读取的问题,那么就意味着在写入的时候,可能前面是存在有已经被读取的数据的,那么已经读取的数据就不需要进行保存了,那么此时当缓冲空间不足的时候,就可以移动指针到起始位置,或者是进行扩容
读取数据
对于数据的读取来说,当前数据的读取位置指向哪里,就从哪里开始读取,前提是要有数据可以读,可读数据的大小计算,就是用当前写入位置减去当前的读取位置
对于缓冲区的类来说,主体的设计就是上面所示的设计,落实到每一个模块来说,主要有如下的几个功能:
所以,下面就基于上述的这些功能,进行一一实现即可:
#pragma once #include #include #include "Log.hpp" #include using namespace std; enum { ReadError = 2, WriteError }; const int buffer_default_size = 1024; class Buffer { public: // 判断读取和写入数据是否可能存在异常情况 void CheckRead(uint64_t len) { if (len <= ReadableSize()) { lg(Fatal, "len is too long, ReadableSize is %ld", ReadableSize()); exit(ReadError); } } void CheckWrite(uint64_t len) { if (len <= ReadableSize()) { lg(Fatal, "len is too long, ReadableSize is %ld", ReadableSize()); exit(WriteError); } } Buffer() : _buffer(buffer_default_size), _read(0), _write(0) {} char *Begin() { return &*_buffer.begin(); } // 获取当前写入地址 char *WritePos() { return Begin() + _read; } // 获取当前读取地址 char *ReadPos() { return Begin() + _write; } // 获取缓冲区末尾空闲大小 uint64_t TailSize() { return _buffer.size() - _write; } // 获取缓冲区开头空闲大小 uint64_t HeadSize() { return _read; } // 获取当前可读数据大小 uint64_t ReadableSize() { return _write - _read; } // 读偏移向后移动 void MoveReadOffset(uint64_t len) { CheckRead(len); _read += len; } // 写偏移向后移动 void MoveWriteOffset(uint64_t len) { CheckWrite(len); _write += len; } // 确保可写空间足够,移动数据/扩容 void CheckWriteSpace(uint64_t len) { if (TailSize() >= len) return; // 如果不够,就试试加上前面的部分 if (TailSize() + HeadSize() >= len) { // 把数据挪到前面 uint64_t cursize = ReadableSize(); copy(ReadPos(), ReadPos() + cursize, Begin()); // 偏移量更新 _read = 0; _write = cursize; } // 如果还是不够,只能扩容了 else { lg(Info, "update resize: %ld", _write + len); _buffer.resize(_write + len); } } // 读取数据 void Read(void *buf, uint64_t len) { // 判断一下len是否合法 CheckRead(len); // 把数据读到buf中 copy(ReadPos(), ReadPos() + len, (char *)buf); } void ReadAndPop(void *buf, uint64_t len) { Read(buf, len); MoveReadOffset(len); } // 获取指定大小的数据 string ReadString(uint64_t len) { CheckRead(len); string str; str.resize(len); Read(&str[0], len); return str; } string ReadStringAndPop(uint64_t len) { string str = ReadString(len); MoveReadOffset(len); return str; } // 写入数据 void Write(const void *data, uint64_t len) { // 确保空间 + 拷贝 CheckWriteSpace(len); const char *d = (const char *)(data); copy(d, d + len, WritePos()); } void WriteAndPush(const void *data, uint64_t len) { Write(data, len); MoveWriteOffset(len); } void WriteString(const std::string &data) { return Write(data.c_str(), data.size()); } void WriteStringAndPush(const std::string &data) { WriteString(data); MoveWriteOffset(data.size()); } void WriteBuffer(Buffer &data) { return Write(data.ReadPos(), data.ReadableSize()); } void WriteBufferAndPush(Buffer &data) { WriteBuffer(data); MoveWriteOffset(data.ReadableSize()); } // 获取第一次换行的位置 char *FindCRLF() { char *res = (char*)memchr(ReadPos(), '\n', ReadableSize()); return res; } // 把第一行数据读取出来 string GetLine() { char* pos = FindCRLF(); if(pos == nullptr) return ""; return ReadString(pos - ReadPos() + 1); } string GetLineAndPop() { string str = GetLine(); MoveReadOffset(str.size()); return str; } void Clear() { _write = 0; _read = 0; } private: // 缓冲区底层是一个数组和读写的偏移量数据 vector _buffer; uint64_t _read; uint64_t _write; };
这个模块就是对于Socket进行一些封装,在前面的内容中已经封装过很多次了,这里只需要注意一个是端口重用,一个是改为非阻塞状态,这些内容在epoll等模型中都有介绍过,这里考虑到篇幅就不写了
具体实现如下:
#pragma once #include #include #include #include #include "Log.hpp" using namespace std; const int max_listen_size = 1024; class Socket { private: int _sockfd; public: Socket() : _sockfd(-1) {} Socket(int fd) : _sockfd(fd) {} ~Socket() { Close(); } int Fd() { return _sockfd; } // 创建套接字 bool Create() { _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (_sockfd < 0) { lg(Fatal, "create socket fail"); return false; } return true; } // 绑定地址信息 bool Bind(const string &ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); int ret = bind(_sockfd, (struct sockaddr *)&addr, len); if (ret < 0) { lg(Fatal, "bind fail!"); return false; } return true; } // 开始监听 bool Listen(int backlog = max_listen_size) { int ret = listen(_sockfd, backlog); if (ret < 0) { lg(Fatal, "listen fail!"); return false; } return true; } // 向服务器发起连接 bool Connect(const string &ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); int ret = connect(_sockfd, (struct sockaddr *)&addr, len); if (ret < 0) { lg(Fatal, "connection fail"); return false; } return true; } // 获取新连接 int Accept() { int newfd = accept(_sockfd, nullptr, nullptr); if (newfd < 0) { lg(Fatal, "accept fail"); return -1; } return newfd; } // 接收数据 ssize_t Recv(void *buf, size_t len, int flag = 0) { ssize_t ret = recv(_sockfd, buf, len, flag); if (ret <= 0) { // EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误 // EINTR 表示当前socket的阻塞等待,被信号打断了, if (errno == EAGAIN || errno == EINTR) { return 0; // 表示这次接收没有接收到数据 } lg(Warning, "recv fail!"); return -1; } return ret; // 实际接收的数据长度 } ssize_t NonBlockRecv(void *buf, size_t len) { return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。 } // 发送数据 ssize_t Send(const void *buf, size_t len, int flag = 0) { ssize_t ret = send(_sockfd, buf, len, flag); if (ret < 0) { if (errno == EAGAIN || errno == EINTR) { return 0; } lg(Warning, "send fail!"); return -1; } return ret; // 实际发送的数据长度 } ssize_t NonBlockSend(void *buf, size_t len) { if (len == 0) return 0; return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞。 } // 关闭套接字 void Close() { if (_sockfd != -1) { close(_sockfd); _sockfd = -1; } } // 创建一个服务端连接 bool CreateServer(uint16_t port, const string &ip = "0.0.0.0", bool block_flag = false) { // 1. 创建套接字,2. 绑定地址,3. 开始监听,4. 设置非阻塞, 5. 启动地址重用 if (Create() == false) return false; if (block_flag) NonBlock(); if (Bind(ip, port) == false) return false; if (Listen() == false) return false; ReuseAddress(); return true; } // 创建一个客户端连接 bool CreateClient(uint16_t port, const string &ip) { // 1. 创建套接字,2.指向连接服务器 if (Create() == false) return false; if (Connect(ip, port) == false) return false; return true; } // 设置套接字选项---开启地址端口重用 void ReuseAddress() { int val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int)); val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int)); } // 设置套接字阻塞属性-- 设置为非阻塞 void NonBlock() { int flag = fcntl(_sockfd, F_GETFL, 0); fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK); } };
对于Channel模块来说,这个模块主要的作用是要对于每一个描述符的事件进行设置相应的事件回调,所以我们要完成的内容就包含有可读、可写、挂断、错误、任意这五种事件,之后用Channel模块统一进行管理,未来对于任意一个描述符来说,当需要使用到某种事件的时候,只需要调用Channel模块内部的回调函数,就可以执行对应的方法,以达到目的
#pragma once #include #include #include "EventLoop.hpp" using namespace std; class Channel { using EventCallBack = function; public: Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {} int Fd() { return _fd; } // 获取当前监控的事件 uint32_t Events() { return _events; } void SetRevents(uint32_t events) { _revents = events; } void SetReadCallback(const EventCallBack &cb) { _read_callback = cb; } void SetWriteCallback(const EventCallBack &cb) { _write_callback = cb; } void SetErrorCallback(const EventCallBack &cb) { _error_callback = cb; } void SetCloseCallback(const EventCallBack &cb) { _close_callback = cb; } void SetEventCallback(const EventCallBack &cb) { _event_callback = cb; } // 当前是否监控了可读 bool ReadAble() { return (_events & EPOLLIN); } // 当前是否监控了可写 bool WriteAble() { return (_events & EPOLLOUT); } // 启动读事件监控 void EnableRead() { _events |= EPOLLIN; Update(); } // 启动写事件监控 void EnableWrite() { _events |= EPOLLOUT; Update(); } // 关闭读事件监控 void DisableRead() { _events &= ~EPOLLIN; Update(); } // 关闭写事件监控 void DisableWrite() { _events &= ~EPOLLOUT; Update(); } // 关闭所有事件监控 void DisableAll() { _events = 0; Update(); } // 移除监控 void Remove(); void Update(); // 根据触发的事件调用对应的函数 void Handler() { if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if (_read_callback) _read_callback(); } if (_revents & EPOLLOUT) { if (_write_callback) _write_callback(); } else if (_revents & EPOLLERR) { if (_error_callback) _error_callback(); } else if (_revents & EPOLLHUP) { if (_close_callback) _close_callback(); } if (_event_callback) _event_callback(); } private: EventLoop *_loop; int _fd; // 当前监控的事件和当前触发的事件 uint32_t _events; uint32_t _revents; EventCallBack _read_callback; // 可读事件被触发的回调函数 EventCallBack _write_callback; // 可写事件被触发的回调函数 EventCallBack _error_callback; // 错误事件被触发的回调函数 EventCallBack _close_callback; // 连接断开事件被触发的回调函数 EventCallBack _event_callback; // 任意事件被触发的回调函数 };
对于这个模块来说,更多要结合上面的逻辑图来进行理解,这里由于无法单独进行说明,所以与后面的模块一起进行说明
对于这个模块来说,它其实和Socket模块很像,就是对于epoll的一些接口的封装,核心的接口就是添加和修改以及移除对于某个描述符的事件的监控,具体的落实到封装中,还要有对于事件的监控操作,主体上来说这个模块难度不大,主要是对于epoll的一些接口进行封装
#pragma once #include #include #include #include "Channel.hpp" #include "Log.hpp" using namespace std; const int max_epoller_size = 1024; class Poller { public: Poller() { _epfd = epoll_create(max_epoller_size); if (_epfd < 0) lg(Fatal, "epoll create fail"); } // 添加修改监控的事件 void UpdateEvent(Channel *channel) { // 不存在就添加 if (!HasChannel(channel)) { _channels.insert(make_pair(channel->Fd(), channel)); Update(channel, EPOLL_CTL_ADD); return; } // 存在就修改 Update(channel, EPOLL_CTL_MOD); } // 移除对于事件的监控 void RemoveEvent(Channel *channel) { // 不仅要溢出哈希表,也要在epoll当中溢出,才算真正溢出 auto it = _channels.find(channel->Fd()); if (it != _channels.end()) _channels.erase(it); Update(channel, EPOLL_CTL_DEL); } // 开始监控,返回链接 void Poll(vector *active) { int nfds = epoll_wait(_epfd, _evs, max_epoller_size, -1); if (nfds < 0) { if (errno == EINTR) return; lg(Fatal, "epoll wait fail! errno: %d, strerrno: %s", errno, strerror(errno)); abort(); } // 遍历数组,把已经活跃的链接返回到上层 for (int i = 0; i < nfds; i++) { auto it = _channels.find(_evs[i].data.fd); if (it == _channels.end()) lg(Fatal, "_channels error in Poller.hpp"); it->second->SetRevents(_evs[i].events); active->push_back(it->second); } } private: // 对于epoll进行操作 void Update(Channel *channel, int op) { int fd = channel->Fd(); struct epoll_event ev; ev.data.fd = fd; ev.events = channel->Events(); int ret = epoll_ctl(_epfd, op, fd, &ev); if (ret < 0) lg(Fatal, "epoll_ctl fail!"); } // 判断Channel是否添加了事件监控 bool HasChannel(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it == _channels.end()) return false; return true; } private: int _epfd; struct epoll_event _evs[max_epoller_size]; unordered_map _channels; };
EventLoop模块应该说是本项目难度比较大的一个模块,会对前面的所有信息进行一个整合,所以也会搭配有比较多的文字说明
在对于EventLoop模块的学习前,要先看一下eventfd这个函数,它的核心功能就是一种事件的通知机制,简单来说,当调用这个函数的时候,就会在内核当中管理一个计数器,每当向eventfd当中写入一个数值,表示的就是事件通知的次数,之后可以使用read来对于数据进行读取,读取到的数据就是通知的次数
假设每次给eventfd写一个1,那么就表示通知了一次,通知三次之后再进行读取,此时读取出来的就是3,读取了之后这个计数器就会变成0
eventfd的应用场景在本项目中,是用于EventLoop模块中实现线程之间的事件通知功能的
具体的该如何进行设计呢?
首先要明确,EventLoop模块的核心功能,是对于事件进行监控,并且进行数据的处理,那么这就意味着一个线程就要有一个专门的EventLoop模块来进行管理,对于监控事件来说,当监控的这个连接一旦就绪,那么就要进行事件的处理,对于不同的描述符来说,会有不同的EventLoop对象进行管理,如果在多个线程中都触发了事件,那么在对于事件的处理过程中就可能会出现线程安全的问题,因此就要把对于一个连接的事件监控,以及事件监控的处理,放到一个线程中去执行,具体该如何进行操作?
这里给出一种设计的模式,在EventLoop当中添加一个任务队列,对于连接的所有操作中,都进行一次封装,对于要执行的任务先不去执行,而是放到任务队列当中,等到监控的事件结束之后,再把任务队列当中的任务拿出来进行执行
所以具体的流程为:
落实到具体来说,在对于事件监控的过程当中,肯定会涉及到Poller模块,当有事件就绪就进行事件的处理,而对于执行任务队列中的任务 过程中,需要想办法设计出一个线程安全的任务队列
那在上面的这些流程当中,为什么要存在一个eventfd?这个知识存在的意义是什么?假设现在这样的场景,当执行流卡在第一步,对于描述符事件的监控过程中,可能造成执行流阻塞,因为没有任何一个描述符产生事件,那么此时阻塞就会导致第三步的执行任务队列当中的任务得不到执行,所以就要包含有一个eventfd,这个是用来进行事件的通知,从而唤醒事件监控的阻塞,这样就能从第一步当中脱离开,进而去执行后续的第二步和第三步
而在前面的部分中提及到,当事件就绪之后,就要进行处理,而在进行数据处理的时候本质上是对于连接做处理,而对于连接做处理的时候必须要保证所执行的任务是在本线程当中执行的,这样才能保证是线程安全,否则如果在一个其他的线程当中执行任务,必然会带来线程不安全的问题,所以才引入了任务池的概念,当执行的任务就是本线程当中的任务,就直接进行执行,否则就要放到任务池当中,当事件处理结束之后再进行任务执行
EventLoop调用逻辑应该是本项目中非常复杂的一个部分,作为梳理,我画出了下面的逻辑图,并配有文字说明,目的是更加清楚的理解Loop的含义
首先要清楚这个Loop表示的是什么含义:
循环结构
Loop 在这里是一个编程术语,指代程序中的循环语句,如 while、for 等。在One Thread One Loop模型中,它具体表现为一个持续运行的无限循环,其代码形式可能类似于以下伪代码:
while (true) { // ... 循环体内的操作 ... }
理解Channel和Poller和Loop
这个循环不会因为任何常规条件而主动终止,除非遇到异常情况或显式地从外部触发退出机制,所以正常来说在整个项目中,只会存在一个Loop,来进行所有信息的循环,所以就先研究一下Loop内部包含什么内容,已在逻辑图中展现清楚:
抛开一些不重要的模块来说,按顺序来看第一个是Channel模块,它提供了一个_event_channel,那该如何理解这个模块?本质上来说在它内部就是一个文件描述符和一堆函数的回调,读写关闭等等,所以它就是用来对于任何一个描述符关联这个描述符对应的调用方法。紧跟着下一个模块是Poller,这个模块就是对于epoll的封装,底层包含有epfd和Channels,用来对于各种事件的监听,如果监听到了某种事件,就把这个事件进行处理即可,再下面是任务池和定时器模块,这两个模块主要是辅助来使用,并不是这个模块的重点,考虑到篇幅先不谈这两个模块,重点先看Channel和Poller
两种Channel
在整个项目当中,被创建出的第一个Channel,被叫做是listenChannel,这个Channel是独一无二的,如果把本项目中所有的Channel分成两类,它一定是唯一的那一类中的唯一一个,如何理解?因为它的核心目的就是用来对于新连接进行管理,换句话说就是对于新的客户端连接进行管理,所以在它内部的这么多回调函数来说,不用关心那么多,只关心一个可读的回调即可,其他设置为空即可
那么在这个独一无二的listenChannel当中,它被设置的可读回调就是来处理一个新连接的到来,也就是图中所示的Acceptor模块,当有新连接来的时候就给这个新连接创建新的Channel,这个新的Channel属于另外一类Channel,我们下一个部分来谈,而这个listenChannel如何被提醒有新的事件到来?原因就是因为有EnableRead的存在,这个存在可以把事件挂接到Poller模块上,当有新事件到来的时候,这个Poller就会提醒上层可以进行处理了
第二种Channel就是新连接的Channel,这种Channel的主要工作是面向客户端的,而在它们的内部会被设置四种回调,就是图中所示的四种,而它们被监听也是通过EnableRead来挂接到Poller上的
下面是对于Connection模块的理解:
Connection模块的目的就是对于连接进行全方位的管理,对于通信连接的所有操作都是借助这个模块来进行完成的
管理的内容主要包括有,对于套接字的管理,可以对于套接字进行各种操作,也有对于连接事件的管理,比如有可读、可写、挂断、任意,也有对于缓冲区的管理,方便与Socket进行数据的接收和发送,还有对于协议上下文的管理,用来记录请求数据的发送过程,最后还有用户设置各种回调的功能
对于用户可以自己设置的部分,主要包括有,当连接接收到数据之后的处理,是可以自己决定的,对于连接建立成功后如何处理,连接关闭前如何处理,任意事件的产生如何处理,都是由用户进行决定
Connection模块的功能主要有发送数据、关闭连接、启动和取消非活跃连接超时销毁、协议切换的功能,而Connection模块是对于连接的管理模块,对于连接的所有操作都是通过这个模块来完成的,但是其中一个问题是,如果对于连接进行操作的时候,连接已经被释放了,那么就会存在内存访问错误的风险,这样会导致程序崩溃,所以一种解决方案是使用智能指针来对于Connection对象进行管理,这样可以保证任意一个地方对于Connection对象进行操作的时候都会在内部保存一个shared_ptr,这样就可以保证这个内容不会在调用的时候被释放了
那么下面就进行Connection模块的编写
#pragma once #include #include #include "Socket.hpp" #include "Buffer.hpp" #include "Any.hpp" class Connection; typedef enum { connecting, connected, disconnecting, disconnected } ConnStatu; class Connection; using PtrConnection = shared_ptr; class Connection : public enable_shared_from_this { using ConnectedCallback = function; using MessageCallback = function; using ClosedCallback = function; using AnyEventCallback = function; public: Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _conn_id(conn_id), _sockfd(sockfd), _enable_inactive_release(false), _loop(loop), _statu(connecting), _socket(_sockfd), _channel(loop, _sockfd) { _channel.SetCloseCallback(bind(&Connection::HandleClose, this)); _channel.SetEventCallback(bind(&Connection::HandleEvent, this)); _channel.SetReadCallback(bind(&Connection::HandleRead, this)); _channel.SetWriteCallback(bind(&Connection::HandleWrite, this)); _channel.SetErrorCallback(bind(&Connection::HandleError, this)); } ~Connection() { lg(Info, "release connection :%p", this); } // 获取管理的文件描述符 int Fd() { return _sockfd; } // 获取连接ID int Id() { return _conn_id; } // 是否处于CONNECTED状态 bool Connected() { return (_statu == connected); } // 设置上下文--连接建立完成时进行调用 void SetContext(const Any &context) { _context = context; } // 获取上下文,返回的是指针 Any *GetContext() { return &_context; } void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; } void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; } // 连接建立就绪后,进行channel回调设置,启动读监控,调用_connected_callback void Established() { _loop->RunInLoop(bind(&Connection::EstablishedInLoop, this)); } // 发送数据,将数据放到发送缓冲区,启动写事件监控 void Send(const char *data, size_t len) { // 外界传入的data,可能是个临时的空间,我们现在只是把发送操作压入了任务池,有可能并没有被立即执行 Buffer buf; buf.WriteAndPush(data, len); _loop->RunInLoop(bind(&Connection::SendInLoop, this, move(buf))); } // 提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理 void Shutdown() { _loop->RunInLoop(bind(&Connection::ShutdownInLoop, this)); } void Release() { _loop->QueueInLoop(bind(&Connection::ReleaseInLoop, this)); } // 启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务 void EnableInactiveRelease(int sec) { _loop->RunInLoop(bind(&Connection::EnableInactiveReleaseInLoop, this, sec)); } // 取消非活跃销毁 void CancelInactiveRelease() { _loop->RunInLoop(bind(&Connection::CancelInactiveReleaseInLoop, this)); } // 切换协议---重置上下文以及阶段性回调处理函数 -- 而是这个接口必须在EventLoop线程中立即执行 // 防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了。 void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _loop->AssertInLoop(); _loop->RunInLoop(bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event)); } public: // 使用者自己设置的四个回调函数 ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; // 组件内连接信息关闭,当有连接要关闭的时候,就移除自己的信息 ClosedCallback _server_closed_callback; private: // Channel模块的事件回调函数设置 // 可读 void HandleRead() { // 1. 接收socket的数据,放到缓冲区 char buf[65536]; ssize_t ret = _socket.NonBlockRecv(buf, 65535); if (ret < 0) { // 出错了,不能直接关闭连接 return ShutdownInLoop(); } // 这里的等于0表示的是没有读取到数据,而并不是连接断开了,连接断开返回的是-1 // 将数据放入输入缓冲区,写入之后顺便将写偏移向后移动 _in_buffer.WriteAndPush(buf, ret); // 2. 调用message_callback进行业务处理 if (_in_buffer.ReadableSize() > 0) { // shared_from_this--从当前对象自身获取自身的shared_ptr管理对象 return _message_callback(shared_from_this(), &_in_buffer); } } // 可写 void HandleWrite() { //_out_buffer中保存的数据就是要发送的数据 ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPos(), _out_buffer.ReadableSize()); if (ret < 0) { // 发送错误就该关闭连接了, if (_in_buffer.ReadableSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); // 这时候就是实际的关闭释放操作了。 } _out_buffer.MoveReadOffset(ret); // 千万不要忘了,将读偏移向后移动 if (_out_buffer.ReadableSize() == 0) { _channel.DisableWrite(); // 没有数据待发送了,关闭写事件监控 // 如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放 if (_statu == disconnecting) { return Release(); } } return; } // 挂断 void HandleClose() { /*一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕关闭连接*/ if (_in_buffer.ReadableSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); } // 出错 void HandleError() { return HandleClose(); } // 任意 void HandleEvent() { if (_enable_inactive_release == true) { _loop->TimerRefresh(_conn_id); } if (_event_callback) { _event_callback(shared_from_this()); } } // 连接获取之后,所处的状态下要进行各种设置(启动读监控,调用回调函数) void EstablishedInLoop() { // 1. 修改连接状态; 2. 启动读事件监控; 3. 调用回调函数 if (_statu != connecting) // 当前的状态必须一定是上层的半连接状态 { lg(Fatal, "status error in Connection.hpp"); } _statu = connected; // 当前函数执行完毕,则连接进入已完成连接状态 // 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁 _channel.EnableRead(); if (_connected_callback) _connected_callback(shared_from_this()); } // 这个接口才是实际的释放接口 void ReleaseInLoop() { // 1. 修改连接状态,将其置为disconnected _statu = disconnected; // 2. 移除连接的事件监控 _channel.Remove(); // 3. 关闭描述符 _socket.Close(); // 4. 如果当前定时器队列中还有定时销毁任务,则取消任务 if (_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop(); // 5. 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数 if (_closed_callback) _closed_callback(shared_from_this()); // 移除服务器内部管理的连接信息 if (_server_closed_callback) _server_closed_callback(shared_from_this()); } // 这个接口并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控 void SendInLoop(Buffer &buf) { if (_statu == disconnected) return; _out_buffer.WriteBufferAndPush(buf); if (_channel.WriteAble() == false) { _channel.EnableWrite(); } } // 这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送 void ShutdownInLoop() { _statu = disconnecting; // 设置连接为半关闭状态 if (_in_buffer.ReadableSize() > 0) { if (_message_callback) _message_callback(shared_from_this(), &_in_buffer); } // 要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭 if (_out_buffer.ReadableSize() > 0) { if (_channel.WriteAble() == false) { _channel.EnableWrite(); } } if (_out_buffer.ReadableSize() == 0) { Release(); } } // 启动非活跃连接超时释放规则 void EnableInactiveReleaseInLoop(int sec) { // 1. 将判断标志 _enable_inactive_release 置为true _enable_inactive_release = true; // 2. 如果当前定时销毁任务已经存在,那就刷新延迟一下即可 if (_loop->HasTimer(_conn_id)) { return _loop->TimerRefresh(_conn_id); } // 3. 如果不存在定时销毁任务,则新增 _loop->TimerAdd(_conn_id, sec, bind(&Connection::Release, this)); } void CancelInactiveReleaseInLoop() { _enable_inactive_release = false; if (_loop->HasTimer(_conn_id)) { _loop->TimerCancel(_conn_id); } } void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _context = context; _connected_callback = conn; _message_callback = msg; _closed_callback = closed; _event_callback = event; } private: uint64_t _conn_id; int _sockfd; bool _enable_inactive_release; EventLoop *_loop; ConnStatu _statu; Socket _socket; Channel _channel; Buffer _in_buffer; Buffer _out_buffer; Any _context; };
下面进入的是Acceptor模块,这个模块的意义主要是对于监听套接字进行管理,主要的功能是:
class Acceptor { using AcceptCallback = function; public: // 不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动 // 否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏 Acceptor(EventLoop *loop, int port) : _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd()) { _channel.SetReadCallback(bind(&Acceptor::HandleRead, this)); } void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; } void Listen() { _channel.EnableRead(); } private: // 监听套接字的读事件回调处理函数---获取新连接,调用_accept_callback函数进行新连接处理 void HandleRead() { int newfd = _socket.Accept(); if (newfd < 0) { return; } if (_accept_callback) _accept_callback(newfd); } int CreateServer(int port) { bool ret = _socket.CreateServer(port); assert(ret == true); return _socket.Fd(); } private: Socket _socket; // 用于创建监听套接字 EventLoop *_loop; // 用于对监听套接字进行事件监控 Channel _channel; // 用于对监听套接字进行事件管理 AcceptCallback _accept_callback; };
该模块的功能主要是来把EventLoop模块和线程结合在一起,要形成的最终效果是,让EventLoop和线程是一一对应的。在EventLoop模块实例化的对象,在构造的时候就会初始化线程的id,而当后妈需要运行一个操作的时候,就判断是否运行在EventLoop模块对应的线程中,如果是就代表是一个线程,不是就代表当前运行的线程不是EventLoop线程
如果创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id重新给EventLoop进行设置,就会存在问题,在构造EventLoop对象到设置新的线程id这个期间是不可控的
所以就要构造一个新的模块,LoopThread,这个模块的意义就是把EventLoop和线程放到一块,主要的思路就是创建线程,在线程中实例化一个EventLoop对象,这样可以向外部返回一个实例化的EventLoop
class LoopThread { public: // 创建线程,设定线程入口函数 LoopThread() : _loop(nullptr), _thread(thread(&LoopThread::ThreadEntry, this)) {} // 返回当前线程关联的EventLoop对象指针 EventLoop *GetLoop() { EventLoop *loop = nullptr; { unique_lock lock(_mutex); // 加锁 _cond.wait(lock, [&]() { return _loop != nullptr; }); // loop为NULL就一直阻塞 loop = _loop; } return loop; } private: // 实例化 EventLoop 对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能 void ThreadEntry() { EventLoop loop; { unique_lock lock(_mutex); // 加锁 _loop = &loop; _cond.notify_all(); } loop.Start(); } private: // 用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop mutex _mutex; // 互斥锁 condition_variable _cond; // 条件变量 EventLoop *_loop; // EventLoop指针变量,这个对象需要在线程内实例化 thread _thread; // EventLoop对应的线程 };
这个模块整体来说还是比较简单的,主要就是用线程来管理Loop
那么有了这么多线程,必然要对于这些线程做管理,所以这个模块就是一个线程池模块,来对于新创建的这些EventLoopThread来进行管理
class LoopThreadPool { public: LoopThreadPool(EventLoop *baseloop) : _thread_count(0), _next_idx(0), _baseloop(baseloop) {} void SetThreadCount(int count) { _thread_count = count; } void Create() { if (_thread_count > 0) { _threads.resize(_thread_count); _loops.resize(_thread_count); for (int i = 0; i < _thread_count; i++) { _threads[i] = new LoopThread(); _loops[i] = _threads[i]->GetLoop(); } } return; } EventLoop *NextLoop() { if (_thread_count == 0) { return _baseloop; } _next_idx = (_next_idx + 1) % _thread_count; return _loops[_next_idx]; } private: int _thread_count; int _next_idx; EventLoop *_baseloop; vector _threads; vector _loops; };
这里默认设置的是0个线程,也可以设置多个线程,那这有什么区别呢?
当前项目做的是一个主从Reactor服务器,那在这个服务器当中主Reactor表示的是新连接的获取,而从属线程负责的是对于新连接的事件监控以及处理,所以对于线程的管理,本质上来说就是管理0个或者多个LoopThread对象,当主线程获取到了一个新连接之后,需要把这个新连接挂到从属线程上来进行事件的监控和处理,如果现在只有0个从属线程,那么表示的就是新连接会被挂接到主线程的EventLoop上进行处理,如果要是有多个从属线程,那么就会对于线程进行分配,将对应线程的EventLoop获取到,设置给对应的Connection
如果对于线程池当中有内容,那么就意味着是有从属Reactor的,对于从属Reactor来说可以用来进行事件的处理,主Reactor只需要负责进行新连接的获取即可
至此,对于Server模块基本结束,下面用一个echo服务器来梳理一下整个Server模块的逻辑架构
#pragma once #include "Buffer.hpp" #include "Server.hpp" #include "TcpServer.hpp" #include "Log.hpp" class EchoServer { public: // 对于Server进行初始化 EchoServer(int port) : _server(port) { // 设置一下线程池,设置从属Reactor _server.SetThreadCount(2); // 设置非活跃连接销毁10s _server.EnableInactiveRelease(10); // 设置回调 _server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1)); _server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } void Start() { _server.Start(); } private: void OnConnected(const PtrConnection &conn) { lg(Info, "NEW CONNECTION:%p", conn.get()); } void OnClosed(const PtrConnection &conn) { lg(Info, "CLOSE CONNECTION:%p", conn.get()); } void OnMessage(const PtrConnection &conn, Buffer *buf) { // 把消息发出去,更新一下偏移量 conn->Send(buf->ReadPos(), buf->ReadableSize()); buf->MoveReadOffset(buf->ReadableSize()); // 关闭连接 // conn->Shutdown(); } private: // 底层就是一个TcpServer TcpServer _server; };
那么下面对于这个逻辑图进行分析:
首先对于这个EchoServer来说,它底层就是一个TCPServer,而在这个TcpServer的内部,包含有EventLoop,用来处理新连接和各种事件,还有Acceptor用来对于获取新连接的处理,还有线程池来对于从属Reactor进行管理的工作
对于这个模块,就不在本篇文章中体现了,详情可以移步下方链接进行查看:
Http服务器自主实现