环境配置:windows电脑用户可以安装WSL配置Linux环境,并且安装vscode及wsl的插件通过vscode连接本机电脑的Linux。
前置内容:
Linux网络编程——C/C++Web服务器(一):不断创建新线程处理多客户端连接和通信-CSDN博客
目录
同步IO多路复用——使用select/poll/epoll实现服务器同时监听多客户端的事件,通过单线程循环处理事件
一、select监听多客户端的单线程服务器
服务器实现流程
使用客户端测试的结果
select存在的问题与适用场景
二、poll监听多客户端的单线程服务器
poll相对于select改进的地方
三、epoll监听多客户端的单线程服务器
epoll相对于select/poll改进的地方
epoll的核心函数
使用epoll监听多客户端单线程服务器流程
后续
服务器功能:客户端连接服务器并发送数据,服务器端将小写字母转大写并返回给客户端。
select相关函数:
void FD_ZERO(fd_set *set); 清空文件描述符集合 void FD_SET(int fd, fd_set *set); 将待监听的文件描述符加入监听集合中 void FD_CLR(int fd, fd_set *set); 将监听集合中删除某个文件描述符 int FDISSET(int fd, fd_set *set); 判断某个文件描述符是否在监听集合中 int select(int nfds, fd_set *readfds, fdset *writefds, fd_set *exceptfds, struct timeval *timeout); nfds: 监听的所有文件描述符中,最大的文件描述符+1 readfds: 读 文件描述符的集合地址 是传入传出参数,传入要监听的集合,返回有时间发生的集合(覆盖式) writefds: 写 文件描述符的集合地址 是传入传出参数,可为NULL exceptfds: 异常 文件描述符集合地址 是传入传出参数,可为NULL timeout: >0设置超时时长,0为非阻塞,NULL为阻塞监听 返回值: >0为监听到有事件发生的文件描述符个数
select可以实现在单进程中同时连接多个客户端,select可以同时监测多个客户端是否有事件发生,如果有事件发生则通过循环遍历rset集合来确定哪个客户端有事件发生,并处理发生的事件。
1.服务器创建socket、设置端口复用、绑定IP地址与端口号、设定服务器监听上限。(与上节内容一致)
// 初始化服务器,创建socket、绑定IP地址、端口号、设置端口复用 int listenfd = socket(PF_INET, SOCK_STREAM, 0); int cfd; struct sockaddr_in address, temp_client_addr; socklen_t addr_len = sizeof(temp_client_addr); bzero(&address, sizeof(address)); address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); address.sin_port = htons(8000); char buf[16], ip_addr[16], read_buf[1024]; inet_ntop(AF_INET, &address.sin_addr, buf, sizeof(buf)); printf("net is: %s:\n", buf); int flag = 1; setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); // 端口复用 int ret = bind(listenfd, (const struct sockaddr *)&address, sizeof(address)); ret = listen(listenfd, 5);
2.初始化存储客户端文件描述符和地址的数组,并初始化cfd=-1为默认值。
// 定义用来存储客户端cfd和IP地址的结构体 struct ClientInfo { int cfd; struct sockaddr_in adress; }; // 初始化存储客户端cfd的数组,全部cfd置为-1 struct ClientInfo client_info[CLIENT_MAX_NUM]; for (int i = 0; i < 1024; i++) { client_info[i].cfd = -1; }
3.初始化select的传入传出参数,rset是监听读事件的集合,执行完select会改变。
// 初始化rset与allset,其中rset是监听读事件的集合,是传入传出参数 fd_set rset, allset; FD_ZERO(&allset); FD_SET(listenfd, &allset); int max_fd = listenfd; // 最大监听文件描述符
4.单进程不断循环监听是否有客户端连接,如果有客户端请求连接,就建立连接,并向存储客户端文件描述符和地址的数组中存入:建立连接的cfd和请求连接客户端的地址。
while (1) { rset = allset; // 因为每次rset作为传出参数,会变成有读事件的集合,因此要重新赋值为allset int nready = select(max_fd + 1, &rset, NULL, NULL, NULL); // 设置阻塞监听 if (nready < 0) perror("select error"); if (FD_ISSET(listenfd, &rset)) { // 有客户端连接 cfd = accept(listenfd, (struct sockaddr *)&temp_client_addr, &addr_len); // 成功建立起通讯,将cfd加入allset集合中,下次监听 FD_SET(cfd, &allset); max_fd = max_fd < cfd ? cfd : max_fd; // 更新max_fd // 获取客户端的ip地址 inet_ntop(AF_INET, &temp_client_addr.sin_addr, ip_addr, sizeof(ip_addr)); printf("IP(%s) client is connected\n", ip_addr); // 将连接上的客户端的cfd和地址信息存入客户端数组中 for (int j = 0; j < CLIENT_MAX_NUM; j++) { if (client_info[j].cfd == -1) { // 寻找空位,进行存储 client_info[j].cfd = cfd; client_info[j].adress.sin_addr = temp_client_addr.sin_addr; break; } } // 如果只监听到了一个事件,并且已经是listenfd了,就跳过循环 if (nready == 1) continue; }
5.通过单进程不断循环查看rset集合中监听到有事件客户端的cfd,如果有写入数据,则进行小写转大写,否则就跳过。
// 循环查看rset集合中监听到有事件的cfd,如果有,处理事件 for (int i = listenfd + 1; i < max_fd + 1; i++) { // 此时的i就是cfd if (FD_ISSET(i, &rset)) { int ret = read(i, read_buf, sizeof(read_buf)); if (ret < 0) { perror("Read error"); exit(-1); } else if (ret == 0) { // 如果客户端断开连接 close(i); FD_CLR(i, &allset); // 遍历客户端数组,将指定cfd的位置重新置空,并打印xx客户端断开连接 for (int j = 0; j < CLIENT_MAX_NUM; j++) { if (client_info[j].cfd == i) { // 寻找cfd,进行置空 client_info[j].cfd = -1; inet_ntop(AF_INET, &client_info[j].adress.sin_addr, ip_addr, sizeof(ip_addr)); printf("IP(%s) client is closed\n", ip_addr); break; } } } else { // 如果读取到数据 // 将读取到的数据小写转大写 for (int j = 0; j < ret; j++) { read_buf[j] = toupper(read_buf[j]); } write(i, read_buf, ret); // 转换后的数据写回客户端 } } } }
服务器端最终完整代码:
#include #include #include #include #include #include #include #include #include #include #define CLIENT_MAX_NUM 1024 // 定义用来存储客户端cfd和IP地址的结构体 struct ClientInfo { int cfd; struct sockaddr_in adress; }; int main(){ // 初始化服务器,创建socket、绑定IP地址、端口号、设置端口复用 int listenfd = socket(PF_INET, SOCK_STREAM, 0); int cfd; struct sockaddr_in address, temp_client_addr; socklen_t addr_len = sizeof(temp_client_addr); bzero(&address, sizeof(address)); address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); address.sin_port = htons(8000); char buf[16], ip_addr[16], read_buf[1024]; inet_ntop(AF_INET, &address.sin_addr, buf, sizeof(buf)); printf("net is: %s:\n", buf); int flag = 1; setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); // 端口复用 int ret = bind(listenfd, (const struct sockaddr *)&address, sizeof(address)); ret = listen(listenfd, 5); // 初始化存储客户端cfd的数组,全部cfd置为-1 struct ClientInfo client_info[CLIENT_MAX_NUM]; for (int i = 0; i < 1024; i++) { client_info[i].cfd = -1; } // 初始化rset与allset,其中rset是监听读事件的集合,是传入传出参数 fd_set rset, allset; FD_ZERO(&allset); FD_SET(listenfd, &allset); int max_fd = listenfd; // 最大监听文件描述符 while (1) { rset = allset; // 因为每次rset作为传出参数,会变成有读事件的集合,因此要重新赋值为allset int nready = select(max_fd + 1, &rset, NULL, NULL, NULL); // 设置阻塞监听 if (nready < 0) perror("select error"); if (FD_ISSET(listenfd, &rset)) { // 有客户端连接 cfd = accept(listenfd, (struct sockaddr *)&temp_client_addr, &addr_len); // 成功建立起通讯,将cfd加入allset集合中,下次监听 FD_SET(cfd, &allset); max_fd = max_fd < cfd ? cfd : max_fd; // 更新max_fd // 获取客户端的ip地址 inet_ntop(AF_INET, &temp_client_addr.sin_addr, ip_addr, sizeof(ip_addr)); printf("IP(%s) client is connected\n", ip_addr); // 将连接上的客户端的cfd和地址信息存入客户端数组中 for (int j = 0; j < CLIENT_MAX_NUM; j++) { if (client_info[j].cfd == -1) { // 寻找空位,进行存储 client_info[j].cfd = cfd; client_info[j].adress.sin_addr = temp_client_addr.sin_addr; break; } } // 如果只监听到了一个事件,并且已经是listenfd了,就跳过循环 if (nready == 1) continue; } // 循环查看rset集合中监听到有事件的cfd,如果有,处理事件 for (int i = listenfd + 1; i < max_fd + 1; i++) { // 此时的i就是cfd if (FD_ISSET(i, &rset)) { int ret = read(i, read_buf, sizeof(read_buf)); if (ret < 0) { perror("Read error"); exit(-1); } else if (ret == 0) { // 如果客户端断开连接 close(i); FD_CLR(i, &allset); // 遍历客户端数组,将指定cfd的位置重新置空,并打印xx客户端断开连接 for (int j = 0; j < CLIENT_MAX_NUM; j++) { if (client_info[j].cfd == i) { // 寻找cfd,进行置空 client_info[j].cfd = -1; inet_ntop(AF_INET, &client_info[j].adress.sin_addr, ip_addr, sizeof(ip_addr)); printf("IP(%s) client is closed\n", ip_addr); break; } } } else { // 如果读取到数据 // 将读取到的数据小写转大写 for (int j = 0; j < ret; j++) { read_buf[j] = toupper(read_buf[j]); } write(i, read_buf, ret); // 转换后的数据写回客户端 } } } } close(listenfd); return 0; }
使用上节的Linux系统命令:nc 地址 端口号,测试服务器是否可以实现多客户端连接。测试结果如下所示,可以完美实现多客户端与服务器连接并实现通信:
1.循环遍历全部cfd,性能差。每次都需要循环遍历到最大的文件描述符+1的位置,如果许多客户端一直没有事件发生,只有个别活跃的客户端,则性能会差。
2.代码编写麻烦。因为rset作为传入传出参数,每次循环rset都会被改变,需要增加个额外的allset进行存储全部文件描述符。而且在使用过程中还需要调用FD_ZERO、FD_SET、FD_ISSET、FD_CLR这些函数,比较麻烦。
适用场景:
1.少量客户端连接,且客户端都很活跃。
2.对跨平台支持更好。
取消了fd_set类型,使用用pollfd类型的结构体数组,存储需要监听的客户端文件描述符、监听事件、监听结果的返回值。如果监听到了有事件发生,在pollfd类型的结构体中监听结果的返回值。
poll函数具体如下:
int poll(struct pollfd *fds, nfds_t nfds, int timeout); fds是结构体数组 bfds是监测数组的最大个数 timeout是设置阻塞等待(为-1)、超时返回(>0)或不阻塞(为0) 返回值:返回满足监听事件的个数。
pollfd结构体如下:
struct pollfd{ int fd; // 待监听的文件描述符 short events; // 待监听的事件:POLLIN、POLLOUT、POLLERR short revents; // 传入时设为0,如果满足监听的事件,传出时为(POLLIN、POLLOUT、POLLERR) }
相比于select只是将传入传出参数rset给取消了,可以少定义一个allset。但是本质上仍然需要循环遍历所有的cfd,性能依然差。
由于poll并没什么大改进,基于poll实现监听多客户端的单线程服务器需要核心改动的地方省略。
创建一棵红黑树,将文件描述符和监听事件存在红黑树上,阻塞等待如果有监听事件发生,返回在数组中。这样在后续处理事件时,避免了循环遍历全部已有的文件描述符,只需要遍历有监听事件发生的文件描述符数组,即可处理事件。
在大量客户端连接且少量客户端活跃的情况下(这是大部分应用场景),性能大幅提高。
int epoll_create(int size); size:是创建红黑树监听节点的数量(供内核参考) 返回值:指向创建的红黑树根节点的文件描述符fd。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); epfd: epoll_create函数的返回值,红黑树根节点文件描述符 op: 对监听红黑树所作的操作 EPOLL_CTL_ADD: 添加监听fd EPOLL_CTL_MOD: 修改监听fd EPOLL_CTL_DEL: 取消监听fd fd: 待监听的fd event: 监听的事件,是struct epoll_event结构体 events: EPOLLIN / EPOLLOUT / EPOLLERR data: 联合体 int fd 对应监听事件的fd
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int); epfd: epoll_create函数的返回值,红黑树根节点。 events: 传出参数,是个数组,满足监听条件的文件描述符结构体数组。 maxevents: 数组元素的总个数。 例如1024,struct epoll_event events[1024]; timeout: -1为阻塞,0为不阻塞,>0为超时时间(毫秒)。 返回值: > 0 是满足监听的总个数。
流程与seletc基本一致,完整代码如下:
#include #include #include #include #include #include #include #include #include #include #define CLIENT_MAX_NUM 1024 // 定义用来存储客户端cfd和IP地址的结构体 struct ClientInfo { int cfd; struct sockaddr_in adress; }; int main(){ // 初始化服务器,创建socket、绑定IP地址、端口号、设置端口复用 int listenfd = socket(PF_INET, SOCK_STREAM, 0); int cfd; struct sockaddr_in address, temp_client_addr; socklen_t addr_len = sizeof(temp_client_addr); bzero(&address, sizeof(address)); address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); address.sin_port = htons(8000); char buf[16], ip_addr[16], read_buf[1024]; inet_ntop(AF_INET, &address.sin_addr, buf, sizeof(buf)); printf("net is: %s:\n", buf); int flag = 1; setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); // 端口复用 int ret = bind(listenfd, (const struct sockaddr *)&address, sizeof(address)); ret = listen(listenfd, 5); // 初始化存储客户端cfd的数组,全部cfd置为-1 struct ClientInfo client_info[CLIENT_MAX_NUM]; for (int i = 0; i < 1024; i++) { client_info[i].cfd = -1; } // 创建用于epoll监听的红黑树 int epfd = epoll_create(100); // 将listenfd加入红黑树中,监测客户端的连接 struct epoll_event temp_event; temp_event.events = EPOLLIN; temp_event.data.fd = listenfd; epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &temp_event); struct epoll_event result_events[CLIENT_MAX_NUM]; while (1) { int nready = epoll_wait(epfd, result_events, CLIENT_MAX_NUM, -1); // 设置阻塞监听 if (nready < 0) perror("select error"); // 循环遍历result_events数组中监听到的事件的fd for (int i = 0; i < nready; i++) { int now_fd = result_events[i].data.fd; // 获取当前连接的fd为now_fd // 如果fd是listenfd,证明有新的客户端发起了连接 if (now_fd == listenfd) { cfd = accept(listenfd, (struct sockaddr *)&temp_client_addr, &addr_len); // 成功建立起通讯,将cfd加入allset集合中,下次监听 temp_event.data.fd = cfd; epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &temp_event); // 获取客户端的ip地址 inet_ntop(AF_INET, &temp_client_addr.sin_addr, ip_addr, sizeof(ip_addr)); printf("IP(%s) client is connected\n", ip_addr); // 将连接上的客户端的cfd和地址信息存入客户端数组中 for (int j = 0; j < CLIENT_MAX_NUM; j++) { if (client_info[j].cfd == -1) { // 寻找空位,进行存储 client_info[j].cfd = cfd; client_info[j].adress.sin_addr = temp_client_addr.sin_addr; break; } } // 如果只监听到了一个事件,并且已经是listenfd了,就跳过循环 if (nready == 1) continue; } // 如果不是listenfd,此时的now_fd就是cfd int ret = read(now_fd, read_buf, sizeof(read_buf)); if (ret < 0) { perror("Read error"); exit(-1); } else if (ret == 0) { // 如果客户端断开连接 close(now_fd); epoll_ctl(epfd, EPOLL_CTL_DEL, now_fd, NULL); // 将当前cfd从红黑树中删除 // 遍历客户端数组,将指定cfd的位置重新置空,并打印xx客户端断开连接 for (int j = 0; j < CLIENT_MAX_NUM; j++) { if (client_info[j].cfd == now_fd) { // 寻找cfd,进行置空 client_info[j].cfd = -1; inet_ntop(AF_INET, &client_info[j].adress.sin_addr, ip_addr, sizeof(ip_addr)); printf("IP(%s) client is closed\n", ip_addr); break; } } } else { // 如果读取到数据 // 将读取到的数据小写转大写 for (int j = 0; j < ret; j++) { read_buf[j] = toupper(read_buf[j]); } write(now_fd, read_buf, ret); // 转换后的数据写回客户端 } } } close(listenfd); return 0; }
此代码是在select代码基础上进行改动的,经测试,与select的效果一致,完美实现了epoll!
后续将实现线程池的功能,让epoll监听到多个客户端有事件发生时不像当前单线程这样循环遍历处理事件,而是通过线程池分配线程去实现多客户端事件处理,大幅提升效率。