这一个课程的笔记
相关文章
协议
Socket编程
高并发服务器实现
线程池
使用多进程并发服务器时要考虑以下几点:
在使用线程模型开发服务器时需考虑以下问题:
多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。
主要使用的方法有三种
可以使用只一个命令监听一个新的连接(需要把监管使用的lfd给select函数处理), 之后进程可以使用accept获取cfd, select可以使用获取的cfd监听连接的客户端是否发过来数据
#include /* According to earlier standards */ #include #include #include int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
nfds: 监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态
这一个是他的循环的上限
readfds: 监控有读数据到达文件描述符集合,传入传出参数
writefds: 监控写数据到达文件描述符集合,传入传出参数
exceptfds:监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数
文件描述符指针结合, 传入传出参数, 把需要监听的文件描述符放到这几个数组里面
这几个的实现实际是一个和文件描述符表对应的位图
传入的时候是要交监听的, 传出来的是实际的有事件发生的, 不关心的事件可以发送一个NULL
timeout: 定时阻塞监控时间,3种情况
1.NULL,永远等下去
2.设置timeval,等待固定时间
3.设置timeval里时间均为0,检查描述字后立即返回,轮询struct timeval { long tv_sec; /* seconds */ long tv_usec; /* microseconds */ };
返回值: 三个集合里面的事件发生的总个数
void FD_CLR(int fd, fd_set *set); //把文件描述符集合里fd位清0
int FD_ISSET(int fd, fd_set *set); //测试文件描述符集合里fd是否置1
void FD_SET(int fd, fd_set *set); //把文件描述符集合里fd位置1
void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0
if(ret > 0){ //检测一下哪一个文件描述符被设置了 if(FD_ISSET(lfd, &rset)){ cfd = accept(); FD_SET(cfd, &rset); } //遍历其余的 }
#include #include #include #include #include #include #include #include int main(void){ int i, j, nready, ret; int maxfd = 0; int listenfd, connfd; char buf[BUFSIZ]; struct sockaddr_in clie_addr, serv_addr; socklen_t clie_addr_len; listenfd = socket(AF_INET, SOCK_STREAM, 0); if(listenfd == -1){ perror("socket error"); exit(1); } int opt = 1; //设置端口复用 ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); if(ret == -1){ perror("setsockopt error"); exit(1); } //设置监听端口 serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(6666); serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); ret = bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); if(ret == -1){ perror("bind error"); exit(1); } ret = listen(listenfd, 128); if(ret == -1){ perror("listen error"); exit(1); } //两个文件描述符集合, 用于监听多个文件描述符 fd_set rset, allset; maxfd = listenfd; FD_ZERO(&allset); FD_SET(listenfd, &allset); while(1){ rset = allset; //监听多个文件描述符 nready = select(maxfd+1, &rset, NULL, NULL, NULL); if(nready < 0){ perror("select error"); exit(1); } //检测一下有没有连接事件 if(FD_ISSET(listenfd, &rset)){ clie_addr_len = sizeof(clie_addr); connfd = accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len); if(connfd == -1){ perror("accept error"); exit(1); } FD_SET(connfd, &allset); //记录一下现在的最大值 if(connfd > maxfd){ maxfd = connfd; } if(--nready == 0){ continue;//只有一个连接, 不需要进一步处理 } } //遍历其余的读取事件 for(i = listenfd + 1; i <= maxfd; i++){ if(FD_ISSET(i, &rset)){ //获取数据, 这一个实例里面是每一次写入以后会读取服务器的返回值 ret = read(i, buf, sizeof(buf)); if(ret == 0){ printf("client close\n"); close(i); FD_CLR(i, &allset); int nowmax; //处理一下最大值 for(j = listenfd + 1; j <= maxfd; j++){ if(FD_ISSET(j, &allset)) nowmax = j; } maxfd = nowmax; }else if(ret == -1){ perror("read error"); exit(1); }else{ //处理一次写入 for(j = 0; j < ret; j++){ buf[j] = toupper(buf[j]); } write(i, buf, ret); } if(--nready == 0){ break; } } } } }
可以跨平台, windows, linux, macOS, Unix, mips都支持这一个
这一个函数的是Linux才有的, 使用的时候不如epoll, 了解即可
#include int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd { int fd; /* 文件描述符 */ short events; /* 监控的事件 读写异常*/ short revents; /* 监控事件中满足条件返回的事件(返回值) */ };
监听的文件描述符的数组, 实际是把select函数的参数分开了
- 监听事件(主要使用的是是POLLIN(读), POLLOUT(写), POLLERR(错误))
POLLIN 普通或带外优先数据可读,即POLLRDNORM | POLLRDBAND
POLLRDNORM 数据可读
POLLRDBAND 优先级带数据可读
POLLPRI 高优先级可读数据
POLLOUT 普通或带外数据可写
POLLWRNORM 数据可写
POLLWRBAND 优先级带数据可写
POLLERR 发生错误
POLLHUP 发生挂起
POLLNVAL 描述字不是一个打开的文件
如果传入的是0的话, 只会监听POLLERR, POLLHUP, POLLNVAL
nfds 监控数组中有多少文件描述符需要被监控
timeout 毫秒级等待
-1:阻塞等,#define INFTIM -1 Linux中没有定义此宏
0:立即返回,不阻塞进程
>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值
如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1,poll不再监控此pollfd,下次返回时,把revents设置为0
he field fd contains a file descriptor for an open file. If this field is negative, then the corresponding events field is ignored and the revents field returns zero. (This provides an easy way of ignoring a file descriptor for a single poll() call: simply negate the fd field.
相较于select而言,poll的优势:
#include #include #include #include #include #include #include #include int main(void){ int i, j, nready, ret; int maxfd = 0; int listenfd, connfd; struct pollfd client[1024];//poll使用的文件描述符数组 char buf[BUFSIZ], str[INET_ADDRSTRLEN]; struct sockaddr_in clie_addr, serv_addr; socklen_t clie_addr_len; //获取文件描述符 listenfd = socket(AF_INET, SOCK_STREAM, 0); if(listenfd == -1){ perror("socket error"); exit(1); } int opt = 1; //设置端口复用 ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); if(ret == -1){ perror("setsockopt error"); exit(1); } //设置监听端口 serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(6666); serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); ret = bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); if(ret == -1){ perror("bind error"); exit(1); } ret = listen(listenfd, 128); if(ret == -1){ perror("listen error"); exit(1); } //设置poll使用的结构体 client[0].fd = listenfd; client[0].events = POLLIN; for(i = 1; i < 1024 ;i++){ client[i].fd = -1; } maxfd = 0; while(1){ nready = poll(client, maxfd+1, -1); if(nready < 0){ perror("select error"); exit(1); } if(client[0].revents & POLLIN){ clie_addr_len = sizeof(clie_addr); connfd = accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len); printf("connect\n"); if(connfd == -1){ perror("accept error"); exit(1); } for(i = 1;i < 1024; i++){ if(client[i].fd < 0){ client[i].fd = connfd; client[i].events = POLLIN; break; } } if(i == 1024){ perror("too many client"); } if(i>maxfd){ maxfd = i; } if(--nready == 0){ continue; } } //处理其他的文件描述符 for(i = 1; i <= maxfd; i++){ if(client[i].fd < 0){ continue; } if(client[i].revents & POLLIN){ ret = read(client[i].fd, buf, sizeof(buf)); if(ret == 0){ printf("client close\n"); close(client[i].fd); int nowmax = 0; for(j = listenfd + 1; j <= maxfd; j++){ if(client[j].fd > 0) nowmax = i; } maxfd = nowmax; client[i].fd = -1; }else if(ret == -1){ perror("read error"); exit(1); }else{ for(j = 0; j < ret; j++){ buf[j] = toupper(buf[j]); } write(client[i].fd, buf, ret); } if(--nready == 0){ break; } } } } }
自带数据结构, 把监听事件和返回事件分离
可以拓展监听上限, 超出1024
不可以跨平台
无法直接定位满足监听事件的描述符
epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行
epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
#include int epoll_create(int size);
这是一个平衡二叉树里面的红黑树,
In the initial epoll_create() implementation, the size argument informed the kernel of the number of file descriptors that the caller expected to add to the epoll instance. The kernel used this information as a hint for the amount of space to initially allocate in internal data structures describing events. (If necessary, the kernel would allocate more space if the caller’s usage exceeded the hint given in size.) Nowadays, this hint is no longer required (the kernel dynamically sizes the required data structures without needing the hint), but size must still be greater than zero, in order to ensure backward compatibility when new epoll applications are run on older kernels.
这一个参数使用一个大于0的数字就可以了, 现在的内核会自动分配
返回值是一个用于监听的文件描述符 , 这一个是红黑树的根节点
#include int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epfd : 之前获取的描述符
op: 表示动作,用3个宏来表示:
EPOLL_CTL_ADD (注册新的fd到epfd),
EPOLL_CTL_MOD (修改已经注册的fd的监听事件),
EPOLL_CTL_DEL (从epfd删除一个fd, 取消监听);
这一个事件监听会在这一个文件关闭的时候退出
fd : 要监听的fd
event: 告诉内核需要监听的事件
typedef union epoll_data { void *ptr; int fd; //对应监听事件的fd uint32_t u32; //这一个不使用 uint64_t u64; //不使用 } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events 主要还是EPOLLIN, EPOLLOUT, EPOLLERR*/ epoll_data_t data; /* User data variable */ };
返回值:成功:0;失败:-1,设置相应的errno
#include int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epfd : 之前获取的文件描述符
events: 用来存内核得到事件的集合,可简单看作数组。这是一个传出数组
maxevents: 告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,
timeout: 是超时时间
-1: 阻塞
0: 立即返回,非阻塞
>0: 指定毫秒
返回值: 成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1
#include #include #include #include #include #include #include #include int main(void){ int i, j, nready, ret; int listenfd, connfd, efd; struct epoll_event temp, ep[1024]; char buf[BUFSIZ], str[INET_ADDRSTRLEN]; struct sockaddr_in clie_addr, serv_addr; socklen_t clie_addr_len; //获取文件描述符 listenfd = socket(AF_INET, SOCK_STREAM, 0); if(listenfd == -1){ perror("socket error"); exit(1); } int opt = 1; //设置端口复用 ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); if(ret == -1){ perror("setsockopt error"); exit(1); } //设置监听端口 serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(6666); serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); ret = bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); if(ret == -1){ perror("bind error"); exit(1); } ret = listen(listenfd, 128); if(ret == -1){ perror("listen error"); exit(1); } //获取一个树根 efd = epoll_create(1024); if(efd < 0){ perror("epoll creat error"); exit(0); } //把监听使用的节点添加 temp.events = EPOLLIN; temp.data.fd = listenfd; ret = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &temp); if(ret == -1){ perror("epoll ctl error"); exit(1); } while(1){ //开始等待 nready =epoll_wait(efd, ep, 1024, -1); if(nready < 0){ perror("select error"); exit(1); } //处理 for(i = 0 ; i < nready ; i++){ if(!ep[i].events & EPOLLIN) continue; if(ep[i].data.fd == listenfd){ //这是一个连接事件 clie_addr_len = sizeof(clie_addr); connfd = accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len); //打印一下连接端口的信息 printf("connect %s :%d\n", inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)), ntohs(clie_addr.sin_port)); if(connfd == -1){ perror("accept error"); exit(1); } temp.events = EPOLLIN; temp.data.fd = connfd; ret = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &temp); if(ret < 0){ perror("epoll ctl con error"); exit(1); } }else{ //处理其他的文件 if(ep[i].events & EPOLLIN){ ret = read(ep[i].data.fd, buf, sizeof(buf)); if(ret == 0){ //连接结束 printf("client close\n"); close(ep[i].data.fd); epoll_ctl(efd, EPOLL_CTL_DEL, ep[i].data.fd, NULL); }else if(ret == -1){ //read错误, 最好判断一下errno perror("read error"); epoll_ctl(efd, EPOLL_CTL_DEL, ep[i].data.fd, NULL); close(ep[i].data.fd); }else{ for(j = 0; j < ret; j++){ buf[j] = toupper(buf[j]); } write(ep[i].data.fd, buf, ret); } } } } } }
可以使用cat命令查看一个进程可以打开的socket描述符上限。
cat /proc/sys/fs/file-max
这一个计算机可以打开的最多的文件的个数
之前使用的
ulimit -a
查看的是这一个用户下面的进程可以打开的文件的个数
如有需要,可以通过修改配置文件的方式修改该上限值。
sudo vi /etc/security/limits.conf
在文件尾部写入以下配置,soft软限制,hard硬限制。如下图所示。
* soft nofile 65536
* hard nofile 100000
soft nofile :可打开的文件描述符的最大数(超过会警告);
hard nofile :可打开的文件描述符的最大数(超过会报错);也可以使用命令
ulimit -n 数值
进行改变[改变以后需要注销用户让他生效]Linux中soft nproc 、soft nofile和hard nproc以及hard nofile配置-CSDN博客
EPOLL事件有两种模型:
Edge Triggered (ET) 边缘触发只有数据到来才触发,不管缓存区中是否还有数据。
Level Triggered (LT) 水平触发只要有数据都会触发(缓冲区里面的数据没有读取完就会触发)。
默认使用的时候水平触发
思考如下步骤:
在这个过程中,有两种工作模式:
ET模式即Edge Triggered工作模式。
如果我们在第1步将rfd添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。
event.events = EPOLLIN | EPOLLET;//监听的时候使用ET模式
#include #include #include #include #include #include int main(void){ int efd, i; int pfd[2];//管道使用的文件描述符 pid_t pid; char buf[10]; char ch = 'a'; pipe(pfd);//创建一个管道 pid = fork(); if(pid == 0){ //子进程 //子进程关闭读端 close(pfd[0]); while(1){ //子进程写数据到管道 for(i = 0 ;i < 5 ;i++) buf[i] = ch; buf[i-1] = '\n'; ch++; for(;i<10;i++) buf[i] = ch; buf[i-1] = '\n'; if(ch > 'z') ch = 'a'; write(pfd[1], buf, sizeof(buf)); sleep(5); } close(pfd[1]); }else if(pid > 0){ //父进程 //父进程关闭写端 close(pfd[1]); struct epoll_event event; struct epoll_event revents[10]; int ret, len; efd = epoll_create(1); event.events = EPOLLIN | EPOLLET;//监听的时候使用ET模式 event.data.fd = pfd[0]; epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event); while(1){ ret = epoll_wait(efd, revents, 10, -1); if(ret > 0){ for(i = 0; i < ret; i++){ if(revents[i].data.fd == pfd[0]){ len = read(pfd[0], buf, sizeof(buf)/2);//这一个读取的时候只会读取一半 write(STDOUT_FILENO, buf, len); } } } } close(pfd[0]); close(efd); }else{ perror("fork"); exit(1); } return 0; }
使用这一个模式的时候, 虽然还有数据, 但是还是会阻塞, 每5秒打印一行数据
LT模式即Level Triggered工作模式。
与ET模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用。
LT(level triggered):LT是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
如果读取数据的时候不需要读取所有的数据, 其余的数据可以丢弃的时候, 使用这一个模式会导致问题
ET(edge-triggered):ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once).
如果在epoll的ET模式下使用阻塞方式进行操作,可能会导致以下问题:
- 事件堆积:阻塞方式读取数据时,如果应用程序没有处理完当前的事件,而新的事件已经到达,这些事件会被内核忽略,从而导致事件堆积。这样会造成较差的性能,因为应用程序无法及时处理新到达的事件。
- 资源浪费:阻塞模式下,当没有数据可读时,应用程序会一直阻塞在读取操作上,浪费了CPU资源。在应用程序一直阻塞等待数据可读的过程中,无法进行其他任务的处理,造成资源的浪费。
- 接收数据不及时:由于阻塞模式下需要等待数据到达才能读取,会导致延迟增加。对于实时性要求较高的应用程序,阻塞模式可能无法满足要求。
这一个模式一般用于对速率要求比较高的地方, 这个时候最好不要阻塞, 读取的时候需要把所有的数据读取完, 否则下一次的连接会延迟
LT(Level-Triggered)和ET(Edge-Triggered)是两种不同的触发方式,它们支持的模式不同的原因主要有以下几点:
- 触发时机:LT模式在文件描述符还有未处理的数据时会持续触发,即文件描述符可读或可写时都会触发。而ET模式只在文件描述符状态发生变化时触发一次,即文件描述符从未就绪到就绪时触发。
- 处理效率:ET模式相较于LT模式可以提高处理效率。因为ET模式只在状态发生变化时才触发,应用程序需要立即对变化的事件进行处理,避免错过任何已就绪的事件。而LT模式在每次循环中都会检查已就绪的文件描述符,即使应用程序在某一次循环中没有对文件描述符进行操作。
- 应用场景:LT模式适用于对实时性要求不高的场景,例如网络编程中的服务器监听请求。而ET模式适用于对实时性要求较高的场景,例如实时数据处理或高并发服务器。
[(1 封私信) 如何理解Epoll中的LT和ET模式,底层实现又是怎么样的? - 知乎 (zhihu.com)](https://www.zhihu.com/question/403893498#:~:text=而 et,模式呢,数据从内核拷贝到用户空间后,内核不会重新将就绪事件节点添加回就绪队列,当事件在用户空间处理完后,用户空间根据需要重新将这个事件通过 epoll_ctl 添加回就绪队列(又或者这个节点因为有新的数据到来,重新触发了就绪事件而被添加)。)
(1 封私信) epoll中ET/LT触发模式分别适用的场景是什么? - 知乎 (zhihu.com)
实际是使用epoll的ET模式加非阻塞加返回联合体里面的void *ptr
可以使用这一个结构体里面加一个fd和一个回调函数
使用这一个模型的时候处理的方式会发生改变
socket, bind, listen – epoll_creat 创建一个红黑树 – 返回 epfd – epoll_ctl 在树上加一个监听节点 – while(1) – epoll_wait 监听 – 监听事件发生 – 返回数组 – 判断返回元素 – lfd – accept / cfd – read() – 大小写转换 – write
socket, bind, listen – epoll_creat 创建一个红黑树 – 返回 epfd – epoll_ctl 在树上加一个监听节点 – while(1) – epoll_wait 监听 – 监听事件发生 – 返回数组 – 判断返回元素 – lfd – accept / cfd – 调用读回调函数 – epoll_ctl把这一个节点取下来 – 改为监听写事件 – epoll_ctl把这一个节点加回去 – epoll_wait等待可写 – 写回调函数 – 把这一个节点取下来 – 改为读事件 – 加回去
这一个加了一下判断是不是可以写, 是的话再发送数据
实际的操作改为使用回调函数
struct myevent_s { int fd; //记录文件描述符 int events; //记录当前监听的事件 void *arg; //一个数据 void (*call_back)(int fd, int events, void *arg); //回调函数 int status; //记录在不在红黑树里面 char buf[BUFLEN]; int len; long last_active; //记录最后一次的连接时间, 长时间连接不断开踢出去 };
高并发服务器epoll接口、epoll Reactor(反应堆)模型详解 - 知乎 (zhihu.com)
[epoll原理详解及epoll反应堆模型-CSDN博客](https://zhuanlan.zhihu.com/p/161073519)
高效, 使用简单, 不受文件描述符的个数的限制
不能跨平台, Linux专有