并发服务器
创始人
2024-09-25 11:24:35
0

服务器,后台服务软件,后端开发,为软件客户端提供支持,数据支持,存储支持,数据中转

高并发要求,随着用户数量的增长,服务器的设计结构扩展,为更多用户提供服务器。

Web服务器(网页网站的后端都是web服务器),软件服务器(手机软件或者pc软件)。

开源服务器软件Apache,Nginx,用户配置即可使用,提供web服务器。

关于网页网站二点技术标准:共用协议(http https),超文本标记语言html,端口号共享使用http=80 https=443。

软件服务器,因不同软件不同功能,服务器的结构也不同,所以很难开源,因为并不适用其他软件。

后端开发的操作系统(服务器操作系统),Linux,Unix,window server,大多数都是控制台系统。

服务器集群:1.处理服务器(128核),2.图形处理器(GPU)渲染或图形计算,3.数据库服务器 多版本数据库兼容。4.文件存储系统。

搭建集群的成本比较高。

服务器的核心任务,网络穿透。

服务器作为信息中转站,帮助所有用户转接数据,变更网络环境,用户重新连接服务器,重新提交信息,服务器实时更新客户端的网络信息。

代理服务器

请求与响应(Request/Response)一次交互。

代理服务器可以作为过滤层,部署防火墙,防DOS攻击等等,隐藏主服务器。

横向扩展服务器硬件,分布式结构。分布式管理程序:资源共享,资源利用最大化,让每台处理机统一分配管理与调度,成为一个个体。

负载均衡概念:将大量的业务请求均匀分发给n个处理单元。避免某些处理单元过载,避免某些处理单元长期闲置。

轮询分发(逐个分发)

根据处理单元的负载情况,决定如何分配。

心跳机制 Keep_alive

心跳测试,在一个时间段内,要通过心跳测试测试对端存活是否有效,有效保持连接,无效断开连接。

操作系统自带心跳机制(setsocketopt),这个心跳机制默认下要很久才能检测出对端是否断开。

如果一个连接,一段时间没有交互,会被冷处理,通过间隔收发心跳包,让连接保持活性,避免被冷处理。

单进程服务器

连接部分,业务处理部分,协议解析部分。连接部分和业务处理部分都是阻塞的,请求时无法连接,等待连接时无法读取请求。

单进程服务端,满足基本tcp连接,完成简单的数据交互,对于系统时间的请求和响应。

htons();将 16 位的短整数(如端口号)从主机字节顺序转换为网络字节顺序。

htonl();将 32 位整数从主机字节顺序转换为网络字节顺序。

INADDR_ANY,服务器将绑定到所有可用的网络接口上。这意味着服务器将接受任何网卡或接口上的连接,而不限定于一个特定的 IP 地址。

bind()用于将一个套接字绑定到一个特定的地址和端口。绑定操作告诉操作系统,当有数据到达这个特定的 IP 地址和端口时,应该将数据交给这个套接字处理。

单进程阻塞

服务端:

#include #include #include #include #include #include #include #include #include #include  int init_net() {     int sock;     struct sockaddr_in sockaddr;     bzero(&sockaddr,sizeof(sockaddr));     sockaddr.sin_family=AF_INET;     sockaddr.sin_port=htons(1234);     sockaddr.sin_addr.s_addr=htonl(INADDR_ANY);     if((sock=socket(AF_INET,SOCK_STREAM,0))==-1)         printf("socket error\n");     else printf("socket ok\n");     if((bind(sock,(struct sockaddr*)&sockaddr,sizeof(sockaddr)))==-1)     {         perror("bind error\n");     }     else printf("bind ok\n");     listen(sock,128);     return sock; }  void mess_response(int so,struct sockaddr_in addr) {     char s[1024];     char c[16];     bzero(c,16);     bzero(s,1024);     inet_ntop(AF_INET,&addr.sin_addr,c,16);     sprintf(s,"hi %s\n",c);     int t=send(so,s,strlen(s),MSG_NOSIGNAL);     if(t<0)     {     perror("err");     }     return; }  int recv_request(int sock) {     char s[1024];     char tim[1024];     bzero(s,sizeof(s));     bzero(tim,sizeof(tim));     int len;     time_t tp;     while((len=recv(sock,s,sizeof(s),0))>0)     {     s[strlen(s)-1]='\0';     if(strncmp(s,"time",strlen(s))==0)     {         tp=time(NULL);         ctime_r(&tp,tim);         send(sock,tim,strlen(tim),MSG_NOSIGNAL);     }     else{         send(sock,"try again",9,MSG_NOSIGNAL);     }     bzero(s,sizeof(s));     bzero(tim,sizeof(tim));     }     if(len==0)     {         printf("client sock %d its exiting...\n",sock);         close(sock);  } } int main() {     int sock,csock;     struct sockaddr_in sock_client;     bzero(&sock_client,sizeof(sock_client));     sock=init_net();     printf("init ok\n");     socklen_t socklen;     while(1)     {     socklen=sizeof(sock_client);    if(( csock=(accept(sock,(struct sockaddr*)&sock_client,&socklen)))==-1)     perror("err\n");    else{        printf("accept success\n");    }     mess_response(csock,sock_client);     recv_request(csock);     }     close(csock);     close(sock);     return 0; } 

客户端:

#include #include #include #include #include #include #include #include int main() {     int sock;     struct sockaddr_in serveraddr;     serveraddr.sin_family=AF_INET;     serveraddr.sin_port=htons(1234);     inet_pton(AF_INET,"127.0.0.1",&serveraddr.sin_addr.s_addr);     if((sock=socket(AF_INET,SOCK_STREAM,0))==-1)     {         printf("socket error\n");     }     else printf("socket success\n");     if((connect(sock,(struct sockaddr*)&serveraddr,sizeof(serveraddr)))==-1)     {         printf("connect error\n");     }     else{         printf("connect success\n");     }     char buff[1024];     bzero(buff,1024);     int r=recv(sock,buff,sizeof(buff),0);     printf("%d\n",r);     //if((recv(sock,buff,sizeof(buff),0))==-1)     //{       //  perror("recv error\n");     //}     //else printf("recv ok\n");     printf("%s\n",buff);  bzero(buff,1024);     while((fgets(buff,sizeof(buff),stdin))!=NULL)     {         send(sock,buff,strlen(buff),MSG_NOSIGNAL);         bzero(buff,1024);         recv(sock,buff,sizeof(buff),0);         printf("%s\n",buff);     }     close(sock);     return 0; } 

单进程非阻塞轮询模型

服务端:

#include #include #include #include #include #include #include #include #include #include #include #include #include #include int clientsock[10000]; int init_net(); void mess_response(int,struct sockaddr_in); int recv_request(int); int start_server();   #include void mess_response(int so,struct sockaddr_in addr) {         char s[1024];         char c[16];         bzero(c,16);         bzero(s,1024);         inet_ntop(AF_INET,&addr.sin_addr,c,16);          sprintf(s,"hi %s\n",c);           int t=send(so,s,strlen(s),MSG_NOSIGNAL);           if(t<0)perror("err");           return; }   #include int recv_request(int sock) {         char s[1024];         char tim[1024];         bzero(s,sizeof(s));         bzero(tim,sizeof(tim));         int len;         time_t tp;         int i=0;         for(i;i<10000;i++)         {         if(clientsock[i]!=-1)         {         if((len=recv(clientsock[i],s,sizeof(s),MSG_DONTWAIT))>0)         {         s[strlen(s)-1]='\0';         if(strncmp(s,"time",strlen(s))==0)         {                 tp=time(NULL);                 ctime_r(&tp,tim);                 send(clientsock[i],tim,strlen(tim),MSG_NOSIGNAL);         }         else{                 send(clientsock[i],"try again",9,MSG_NOSIGNAL);         }         bzero(s,sizeof(s));         bzero(tim,sizeof(tim));         }         if(len==0)         {                 printf("client sock %d its exiting...\n",sock);                 close(clientsock[i]);                 clientsock[i]=-1;         }         }         } }   #include int server_start() {         int sock,csock;         struct sockaddr_in sock_client;         int i;         for(i=0;i<10000;i++)         clientsock[i]=-1;         bzero(&sock_client,sizeof(sock_client));         sock=init_net();         printf("init ok\n");         int flag;         fcntl(sock,F_GETFL,&flag);         flag|=O_NONBLOCK;         fcntl(sock,F_SETFL,flag);         socklen_t socklen;         while(1)         {         socklen=sizeof(sock_client);         if(( csock=(accept(sock,(struct sockaddr*)&sock_client,&socklen)))>0)            {                 for(i=0;i<10000;i++)            {                 if(clientsock[i]==-1)                 {                 clientsock[i]=csock;                 break;                 }            }            mess_response(csock,sock_client);            printf("accept success\n");          }         if(csock==-1)         {         if(errno==EAGAIN)         recv_request(csock);         else perror("accept call failed\n");         }         }         close(csock);         close(sock); }   #include int init_net() { int sock; struct sockaddr_in sockaddr; bzero(&sockaddr,sizeof(sockaddr)); sockaddr.sin_family=AF_INET; sockaddr.sin_port=htons(1234); sockaddr.sin_addr.s_addr=htonl(INADDR_ANY); if((sock=socket(AF_INET,SOCK_STREAM,0))==-1)  printf("socket error\n");  else printf("socket ok\n"); if((bind(sock,(struct sockaddr*)&sockaddr,sizeof(sockaddr)))==-1)             {perror("bind error\n");}  else printf("bind ok\n");      listen(sock,128);              return sock; }   #include int main() { server_start(); return 0; } 

单进程服务端只符合简易环境下的需求。

单进程采用非阻塞交替执行的策略让等待连接和读取处理交替执行,但是如果服务器长时间为某个客户端处理业务,导致无法建立新连接处理其他人的数据。

并发服务器-多进程服务器

多进程服务器是具备并发处理能力的服务器模型,可以并发连接并发处理,为若干客户端提供服务。并发数量取决于进程数量,频繁创建和销毁进程有庞大的系统开销。

多进程模型中有多个处理单元,为多对多模型。多进程模型中,子进程随客户端持续,链接成功创建,客户端退出则子进程退出。僵尸进程回收,无论是阻塞回收还是非阻塞,都会影响连接,所以我们让线程负责回收。

#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include  int init_net(); void mess_response(int,struct sockaddr_in); int recv_request(int); int start_server(); void* thread_wait(void* arg); void sig_wait(int n);   #include int init_net() { int sock; struct sockaddr_in sockaddr; bzero(&sockaddr,sizeof(sockaddr)); sockaddr.sin_family=AF_INET; sockaddr.sin_port=htons(1234); sockaddr.sin_addr.s_addr=htonl(INADDR_ANY); if((sock=socket(AF_INET,SOCK_STREAM,0))==-1)  printf("socket error\n");  else printf("socket ok\n"); if((bind(sock,(struct sockaddr*)&sockaddr,sizeof(sockaddr)))==-1)             {perror("bind error\n");}  else printf("bind ok\n");      listen(sock,128);              return sock; }  #include void mess_response(int so,struct sockaddr_in addr) {         char s[1024];         char c[16];         bzero(c,16);         bzero(s,1024);         inet_ntop(AF_INET,&addr.sin_addr,c,16);          sprintf(s,"hi %s\n",c);           int t=send(so,s,strlen(s),MSG_NOSIGNAL);           if(t<0)perror("err");           return; }  #include int recv_request(int sock) {         char s[1024];         char tim[1024];         bzero(s,sizeof(s));         bzero(tim,sizeof(tim));         int len;         time_t tp;          while((len=recv(sock,s,sizeof(s),0))>0)         {         s[strlen(s)-1]='\0';         if(strncmp(s,"time",strlen(s))==0)         {                 tp=time(NULL);                 ctime_r(&tp,tim);                 send(sock,tim,strlen(tim),MSG_NOSIGNAL);         }         else{                 send(sock,"try again",9,MSG_NOSIGNAL);         }         bzero(s,sizeof(s));         bzero(tim,sizeof(tim));         }                 printf("client sock %d  ,child process %d its exiting...\n",sock,getpid());                 close(sock);                 exit(sock); return 0; }  #include void sig_wait(int n) { pid_t zpid; while((zpid=waitpid(-1,NULL,WNOHANG))!=-1) { if(zpid>0){ printf("thread wait success,zpid %d\n",zpid); } }}   #include void* thread_wait(void* arg) { struct sigaction act,oact; pthread_detach(pthread_self()); act.sa_handler=sig_wait; act.sa_flags=0; sigemptyset(&act.sa_mask); sigaction(SIGCHLD,&act,&oact); sigprocmask(SIG_SETMASK,&act.sa_mask,NULL); printf("wait thread 0x%x waiting..\n",(unsigned int)pthread_self()); while(1) sleep(1); pthread_exit(NULL); }   #include int server_start() {         int sock,csock;         struct sockaddr_in sock_client;         int i;         bzero(&sock_client,sizeof(sock_client));         sock=init_net();         printf("init ok\n");         socklen_t socklen;          sigset_t set,oset;         sigemptyset(&set);         sigaddset(&set,SIGCHLD);         sigprocmask(SIG_SETMASK,&set,&oset);         pthread_t tid;         int err;         if((err=pthread_create(&tid,NULL,thread_wait,NULL))>0){         printf("wait thread create err:%s\n",strerror(err));         exit(0);         }         pid_t pid;         while(1)         {         socklen=sizeof(sock_client);         if(( csock=(accept(sock,(struct sockaddr*)&sock_client,&socklen)))>0)         {             pid=fork();             if(pid>0)            {           mess_response(csock,sock_client);            printf("accept success\n");          }            else if(pid==0)         {         recv_request(csock);         }         else exit(0);         }         else {         printf("accept error\n");                 }        }         close(sock);         return 0; }   #include int main() { server_start(); return 0; }  

多进程稳定性强,因为每个处理单元是一个进程,一个进程异常退出不会影响其他进程。

并发服务器-多线程服务器

稳定性较差,线程崩溃可能影响其他线程,但是开销小轻量级。(线程安全问题)

并发量取决于线程数量。

频繁创建销毁进程有庞大的系统开销。

和多进程差不多。

多路IO复用服务器

IO复用技术又名多路IO转接技术,可以帮助开发者监听大量的sock。

sock监听技术,io监听技术。

select模型

写的是单进程的。

#include//头文件

fd_set set;//监听集合类型

FD_ZERO(&set);//初始化监听集合,将位码初始化为0

FD_SET(int sockfd,&set);//将sockfd在集合中对应的位设置成1

FD_CLR(int sockfd,&sest);//将sockfd在集合中的对应位设置为0

int code=FD_ISSET(int sock,&set);//返回sockfd在集合中位码

int ready=select(maxfd+1,fd_set* rd,fd_set* wr,fd_set* err,timeval* timeout);//监听函数,监听事件选择以集合为单位,批处理,timeout=NULL,阻塞监听,监听集合对应文件描述符表,所以在设置监听数量时要考虑设备描述符。

struct timeval* val;   val.secons=0; val.ms=0;//如果要非阻塞使用select,需要定义时间结构体并将时间成员初始化成0,如果需要select定时阻塞,用户自行设置时间在结构体中。

#include #include #include #include #include #include #include #include int main() {         //sock的就绪数量         int socknum;         //存储客户端sock         int sockarry[1020];         //最大的描述符         int maxfd;         int ssock;         int csock;         //初始化客户端sock数组         int i;         for(i=0;i<1020;i++)         {         sockarry[i]=-1;         }         //初始化网络         struct sockaddr_in saddr,caddr;         saddr.sin_family=AF_INET;         saddr.sin_port=htons(1234);         saddr.sin_addr.s_addr=htonl(INADDR_ANY);         if((ssock=socket(AF_INET,SOCK_STREAM,0))==-1)         {         perror("sock init error\n");         }         bind(ssock,(struct sockaddr*)&saddr,sizeof(saddr));         listen(ssock,128);         maxfd=ssock;         //监听集合         fd_set set,oset;         //初始化监听集合         FD_ZERO(&set);         FD_SET(ssock,&set);         socklen_t addrlen;         //初次连接回复         char response[1500];         char cip[16];         char buff[1024];         bzero(response,1500);         bzero(cip,16);         bzero(buff,1024);         //时间         char timebuff[1024];         time_t tp;          int len;         printf("server running\n");         //轮询监听         while(1)         {         oset=set;         if((socknum=select(maxfd+1,&oset,NULL,NULL,NULL))==-1)         perror("select() failed");     while(socknum)         {         //是severfd         if(FD_ISSET(ssock,&oset))         {         addrlen=sizeof(caddr);         csock=accept(ssock,(struct sockaddr*)&caddr,&addrlen);         //首次连接成功返回信息         inet_ntop(AF_INET,&caddr.sin_addr.s_addr,cip,16);         sprintf(response,"hi %s",cip);         send(csock,response,strlen(response),MSG_NOSIGNAL);         bzero(response,sizeof(response));         //设置监听,存入数组         FD_SET(csock,&set);         for( i=0;i<1020;i++)         {         if(sockarry[i]==-1)         {         sockarry[i]=csock;         break;         }         }         //更新最大sockfd         if(maxfd0)         {         buff[strlen(buff)-1]='\0';         if((strcmp(buff,"time"))==0)         {         bzero(timebuff,1024);         tp=time(NULL);         ctime_r(&tp,timebuff);         send(sockarry[i],timebuff,strlen(timebuff),MSG_NOSIGNAL);         }         else{         send(sockarry[i],"try again",10,MSG_NOSIGNAL);         }         bzero(buff,1024);         FD_CLR(sockarry[i],&oset);         }         if(len==0)         {         //客户端退出,在监听中删除,在组中删除         FD_CLR(sockarry[i],&set);         FD_CLR(sockarry[i],&oset);         close(sockarry[i]);         sockarry[i]=-1;         }         break;         }         }         }         socknum--;         }         }         close(ssock);         return 0; }  

优点:

1.使用比较简单,了解监听集合以及IO复用机制即可使用,帮助用户完成少量sock的网络事件监听

2.跨平台兼容性比较好,在各个系统语言均有select支持

3.select支持微妙级定时,可以满足一些特定需求

缺点:

1.select无法满足大监听需求,最大监听数1024

2.轮询监听,随着轮询数量的增大,io处理性能呈线性下降

3.select监听到就绪后只返回就绪的数量,需要用户自行遍历查找就绪的sock

4.需要用户进行传入传出分离设置

5.随着select的持续使用,会有庞大的开销和挂载开销

6.监听数目比较少,其次设置监听不灵活,无法针对不同的sock设置不同的监听

poll模型

优点:

1.监听事件的种类丰富,对监听与就绪进行了传入传出分离,无需用户分离

2.poll支持用户自定义长度结构体数组作为集合,突破了1024限制(轮询)

缺点:

1.轮询问题

2.拷贝开销挂载开销

3.只返回就绪需要用户自行遍历查找就绪的sock

4.只支持毫秒级别定时

5.在某些特定的Linux版本才可以使用

struct pollfd listen_array[4096];//监听数组,还可以帮助用户存储sock

listen_array[0].fd=sock;//如果需要监听则设置为某个sock,取消监听设置为-1

listen_arry[0].events=POLLIN|POLLOUT|POLLERR;//设置要监听的事件

listen_arry[0],revents;//如果监听的sock就绪,系统将就绪事件传到revents中

int readycode=poll(listen_array);//timeout -1阻塞 0非阻塞 >0定时阻塞

#include #include #include #include #include #include #include #include #include #include int main() {         //sock的就绪数量         int socknum;         //存储客户端sock         struct pollfd sockarry[4096];         int ssock;         int csock;         //初始化客户端sock数组         int i;         for(i=0;i<4096;i++)         {         sockarry[i].fd=-1;         sockarry[i].events=POLLIN;         }         //初始化网络         struct sockaddr_in saddr,caddr;         saddr.sin_family=AF_INET;         saddr.sin_port=htons(1234);         saddr.sin_addr.s_addr=htonl(INADDR_ANY);         if((ssock=socket(AF_INET,SOCK_STREAM,0))==-1)         {         perror("sock init error\n");         }         bind(ssock,(struct sockaddr*)&saddr,sizeof(saddr));         listen(ssock,128);          //监听集合         sockarry[0].fd=ssock;         socklen_t addrlen;         //初次连接回复         char response[1500];         char cip[16];         char buff[1024];         bzero(response,1500);         bzero(cip,16);         bzero(buff,1024);         //时间         char timebuff[1024];         time_t tp;          int len;         printf("server running\n");         //轮询监听         while(1)         {         if((socknum=poll(sockarry,4096,-1))==-1)         perror("poll() failed");         while(socknum)         {         //是severfd         if(sockarry[0].revents==POLLIN)         {         addrlen=sizeof(caddr);         csock=accept(ssock,(struct sockaddr*)&caddr,&addrlen);         //首次连接成功返回信息         inet_ntop(AF_INET,&caddr.sin_addr.s_addr,cip,16);         sprintf(response,"hi %s",cip);         send(csock,response,strlen(response),MSG_NOSIGNAL);         bzero(response,sizeof(response));         //设置监听,存入数组         for( i=1;i<4096;i++)         {         if(sockarry[i].fd==-1)         {         sockarry[i].fd=csock;         break;         }         }         //就绪处理         sockarry[0].revents=0;         }         //是clientfd         else         {         i=1;         //遍历sockarry数组,查找就绪的csock并处理         for(i;i<4096;i++)         {         if(sockarry[i].fd!=-1)         if(sockarry[i].revents==POLLIN)         {         //读取处理数组         if((len=recv(sockarry[i].fd,buff,sizeof(buff),0))>0)         {         buff[strlen(buff)-1]='\0';         if((strcmp(buff,"time"))==0)         {         bzero(timebuff,1024);         tp=time(NULL);         ctime_r(&tp,timebuff);         send(sockarry[i].fd,timebuff,strlen(timebuff),MSG_NOSIGNAL);         }          else{         send(sockarry[i].fd,"try again",10,MSG_NOSIGNAL);         }         bzero(buff,1024);         sockarry[i].revents=0;         }         if(len==0)         {         //客户端退出,在监听中删除,在组中删除         close(sockarry[i].fd);         sockarry[i].fd=-1;         sockarry[i].revents=0;         }         break;         }         }         }         socknum--;         }         }         close(ssock);         return 0; } 

用ulimit -a查看一下,如果小于4096,那么创建的sockarray会报错。

epoll模型

整合了select和poll的优势,并且优化了问题

epoll使用红黑树作为监听集合(监听树)

int epfd=epoll_create(int treeMax);//创建监听树,参数为大小,返回值指向树的描述符

struct epolevent node;

node.data.fd=sock;

node.events=EPOLLIN|EPOLLOUT|EPOLLERR;

epoll_ctl(epfd,EPOLL_CTL_ADD,sock,struct epollevent* node);//添加节点

epoll_ctl(epfd,EPOLL_CTL_DEL,sock,NULL);//删除节点

epoll_ctl(epfd,EPOLL_CTL_MOD,sock,&node);//修改只能监听的事件,无法修改sock

struct epollevent ready_arry[Max];//用户空间定义就绪队列(数组)

int readynode=epoll_wait(epfd,ready_arry,Max,-1);//Max就绪最大数,树大小=就绪队列大小=最大就绪数

epoll监听树直接创建于内核,然后节点放入epoll等待队列,然后就绪后弹出到就绪链表(双向链表结构体,内核),然后从内核空间拷贝到用户空间,然后epoll监听到就绪直接返回就绪节点(sock),用户遍历处理这些sock即可。

epoll不用担心轮询问题,所以没有监听限制,可以监听系统最大描述符数量,并没有多余开销

epoll不存在轮询问题,无需担心监听数量增大,系统开销增大。

epoll直接返回就绪的sock,用户直接处理即可。

epoll监听集合在内核层,所以不会出现重复拷贝和重复挂载的问题,保证每个节点只拷贝一次,只挂载一次。

epoll与poll一样,监听的事件更丰富,而后设置监听比较灵活,可以对不同的sock设置不同的事件监听。

作为监听模型,select,poll,epoll,epoll监听能力最强,但是处理能力而言(与监听模型无关)

监听部分(通过io复用技术监听管理有效连接)

处理部分(并发处理),又快又好的将请求全部处理掉(快速反应,提高体验)

epoll的监听模式(水平触发模式EPOLLLT 边缘触发模式EPOLLET)EPOLLONTSHOT

epoll是线程安全的,epoll_ctl函数内部自带互斥锁,所以一个线程在访问修改树节点时,其他线程无法挂起等待

EPOLL+线程池服务器(满足高并发需求)

线程池技术:

预创建原则,线程池内部准备线程备用,不宜过多

线程应该重用性,可以一对多处理任务或服务不同的客户端

处理单元(线程)数量并不固定,动态扩容与缩减(任务量)

设计灵活的任务传递方式与任务接口,线程可以执行不同种类的任务,不能将线程工作固定

线程池技术多为线程容器

thread_poll_create(int max,int min,int qmax);

int thread_producer_add(pool_t* pt,business_t bs);//添加一次业务向队列(生产者)

void* thread_customer(void* arg);//消费者线程任务,等待队列,循环获取业务并执行

void* thread_manager(void* arg);//查看线程池阈值,动态扩容缩减线程

int net_init();//服务器网络初始化

int first_response();//连接成功首次响应

int epoll_init();//epoll创建初始化

int epoll_listen();//主线程循环监听sock事件,并根据就绪添加任务

void* accept_business(void* arg);//tcp连接业务

void* response_business(voiod* arg);//响应处理业务

if_thread_alive(pthread_t tid);//测试线程,返回0表示线程失效,1表示有效

#include #include #include #include #include #include #include #include #include #include #include #include #include #include #define EPOLL_MAX 190000 int epfd;//监听树的描述符 //business_t业务类型 typedef struct { void* (*task)(void* arg); void* arg; }business_t; //pool_t线程池类型 typedef struct { //线程池开关 int thread_shutdown; //线程池最大线程数 int thread_max; //线程池最小线程数 int thread_min; //存活有效的线程数量 int thread_alive; //繁忙线程数量 int thread_busy; //缩减码 int exit_code; //环形任务队列 business_t* queue; int front; int rear; int max; int cur; //互斥锁 pthread_mutex_t lock; //生产者条件变量 pthread_cond_t Not_Full; //消费者条件变量 pthread_cond_t Not_Empty; //消费者tid数组 pthread_t* ctids; //管理者线程io pthread_t mtid; }pool_t; pool_t* thread_pool_create(int tmax,int tmin,int qmax); int thread_producer_add(pool_t* pt,business_t bs); void* thread_customer(void* arg); int if_thread_alive(pthread_t tid); void* thread_mananger(void* arg); int net_init(void); int first_response(int sock,struct sockaddr_in caddr); int epoll_init(int sock); int epoll_listen(pool_t* pt,int sock); void* accept_business(void*); void* recv_business(void*);  #include void* accept_business(void* arg) {         int sock=*(int*)arg;         int csock;         struct sockaddr_in caddr;         char buffer[16];         bzero(buffer,sizeof(buffer));         socklen_t addrlen=sizeof(caddr);         if((csock=accept(sock,(struct sockaddr*)&caddr,&addrlen))==-1)                 perror("accept_business,accept failed");         first_response(csock,caddr);         //设置监听         struct epoll_event node;         node.data.fd=csock;         node.events=EPOLLIN|EPOLLET;         epoll_ctl(epfd,EPOLL_CTL_ADD,csock,&node);         inet_ntop(AF_INET,&caddr.sin_addr.s_addr,buffer,16);         printf("customer 0x%x execl accept_business success,client ip %s,client port %d\n",(unsigned int)pthread_self(),buffer,ntohs(caddr.sin_port));         return NULL; }      #include     int if_thread_alive(pthread_t tid)     {     int err;     err=pthread_kill(tid,0);     if(err==ESRCH)     return 0;     else return 1;     }  #include void* recv_business(void* arg) {         int sock=*(int*)arg;         //读取请求数据,使用非阻塞,当前缓冲区无可读数据立即返回         char buffer[1024];         char timebuf[1024];         time_t tp;         int recvlen;         bzero(buffer,sizeof(buffer));         while((recvlen=recv(sock,buffer,sizeof(buffer),MSG_DONTWAIT))>0)         {                 //检测关键字,处理请求                 buffer[strlen(buffer)-1]='\0';                 if((strcmp(buffer,"time"))==0){                         bzero(timebuf,1024);                         tp=time(NULL);                         ctime_r(&tp,timebuf);                         send(sock,timebuf,strlen(timebuf),MSG_NOSIGNAL);                         printf("recv_business execl success,response time\n");                 }                 else{                         send(sock,"Please try again",17,MSG_NOSIGNAL);                         printf("customer 0x%x recv_business execl success,reponse tryagain\n",(unsigned int)pthread_self());                 }         }         if(recvlen==0){                 //删除监听                 epoll_ctl(epfd,EPOLL_CTL_DEL,sock,NULL);                 printf("client %d exit,close csock\n",sock);                 close(sock);                 printf("customer 0x%xrecv_business execl success, client exit\n",(unsigned int)pthread_self());         }         if(recvlen==-1){                 if(errno==EAGAIN){                         //非阻塞返回                 }                 else perror("recv_bussiness recv call fialed");         }  return NULL; }  #include extern int epfd; int epoll_init(int sock) { if((epfd=epoll_create(EPOLL_MAX))==-1) perror("create failed"); struct epoll_event node; node.data.fd=sock; node.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_ADD,sock,&node); //创建监听树并将seversock设置成监听读事件 printf("server epoll success\n"); return 0; }  #include void* thread_customer(void* arg) {         //接收参数         pool_t* pt=(pool_t*)arg;         business_t bs;         pthread_detach(pthread_self());         //循环等待任务并处理         printf("customer thread 0x%x,wait business\n",(unsigned int)pthread_self());         while(pt->thread_shutdown)         {                 pthread_mutex_lock(&pt->lock);                 while(pt->cur==0)                 {                         pthread_cond_wait(&pt->Not_Empty,&pt->lock);                         if(!pt->thread_shutdown){                                 printf("customer 0x%x,thread_shutdown %d,customer exiting..\n",(unsigned int)pthread_self(),pt->thread_shutdown);                                 pthread_mutex_unlock(&pt->lock);                                 pthread_exit(NULL);                         }                         if(pt->exit_code>0){                                 printf("manager kill customer 0x%x success\n",(unsigned int)pthread_self());                                 --(pt->thread_alive);                                 --(pt->exit_code);                                 pthread_mutex_unlock(&pt->lock);                                 pthread_exit(NULL);                         }                 }                 //获取任务                 bs.task=pt->queue[pt->rear].task;                 bs.arg=pt->queue[pt->rear].arg;                 --(pt->cur);                 pt->rear=(pt->rear+1)%pt->max;                 ++(pt->thread_busy);                 pthread_mutex_unlock(&pt->lock);                 //唤醒生产者                 pthread_cond_signal(&pt->Not_Full);                 //执行任务                 bs.task(bs.arg);                 pthread_mutex_lock(&pt->lock);                 --(pt->thread_busy);                 pthread_mutex_unlock(&pt->lock);         }         printf("customer 0x%x,thread_shutdown %d,customer exiting..\n",(unsigned int)pthread_self(),pt->thread_shutdown);         pthread_exit(NULL); }  #include void* thread_mananger(void* arg) {         pool_t* pt=(pool_t*)arg;         int alive,busy,cur;         int flag;         int add;         int fd;         if((access("manager_information",F_OK))==0)                 unlink("manager_information");         fd=open("manager_information",O_RDWR|O_CREAT,0664);         char result[4096];         while(pt->thread_shutdown){                 pthread_mutex_lock(&pt->lock);                 alive=pt->thread_alive;                 busy=pt->thread_busy;                 cur=pt->cur;                 pthread_mutex_unlock(&pt->lock);                 bzero(result,sizeof(result));                 sprintf(result,"manager 0x%x,alive %d ,busy %d,idel %d busy/alive %.2f%% alive/max %.2f%%\n",alive,busy,alive-busy,(double)busy/alive*100,(double)alive/pt->thread_max*100);                 write(fd,result,strlen(result));                 //扩容 thread_min作为扩容与缩减量                 //1.任务数量cur>=alive-busy                 //2.使用百分比扩容,如果忙线程占存活线程的70%                 //3.扩容不能超出最大阈值thread_max                 int err;                 if((cur>=alive-busy||(double)busy/alive*100>=70)&&alive+pt->thread_min<=pt->thread_max)                 {                         //ctids,如果为0直接使用,非0要测试线程是否存活                         //遍历tids查找可用位置,并且创建新消费者                         for(flag=0,add=0;flagthread_max&&addthread_min;flag++)                         {                                 if(pt->ctids[flag]==0||!if_thread_alive(pt->ctids[flag]))                                 {                                         if((err=pthread_create(&pt->ctids[flag],NULL,thread_customer,(void*)pt))>0)                                                 printf("manager create new customer failed:%s\n",strerror(err));                                         add++;                                         pthread_mutex_lock(&pt->lock);                                         ++(pt->thread_alive);                                         pthread_mutex_unlock(&pt->lock);                                  }                         }                 }                 //缩减条件:闲线程是忙线程的倍数,缩减不允许小于最小阈值                 if(busy*2<=alive-busy&&alive-pt->thread_min>=pt->thread_min)                 {                         //如何缩减                         //1.管理者缩减cance,改造ctids,除了存储tid,还要存储记录线程状态,管理者遍历而后cancel取消闲线程                         //2.配合缩减,消费者退出码,唤醒闲消费者,消费者检查退出码,而后自行退出                         pt->exit_code=pt->thread_min;                         int i;                         for(i=0;ithread_min;i++)                                 pthread_cond_signal(&pt->Not_Empty);                 }                 sleep(1);         }         printf("manager 0x%x exit.\n",(unsigned int)pthread_self());         pthread_exit(NULL); }  #include //生产者监听模型 int epoll_listen(pool_t* pt,int sock) {         int readycode;         struct epoll_event readyarray[EPOLL_MAX];         int i;         business_t bs;         while(pt->thread_shutdown)         {                 if((readycode=epoll_wait(epfd,readyarray,EPOLL_MAX,-1))==-1)                         perror("epoll_listen,epoll_wait call failed");                 else{                         i=0;                         while(readycode)                         {                                 //判断就绪                                 if(readyarray[i].data.fd==sock){                                         //serversock ready                                         bs.task=accept_business;                                         bs.arg=(void*)&readyarray[i].data.fd;                                         thread_producer_add(pt,bs);                                         printf("thread producer 0x%x add accept business..\n",(unsigned int)pthread_self());                                 }                                 else{                                         //clientsock ready                                         bs.task=recv_business;                                         bs.arg=(void*)&readyarray[i].data.fd;                                         thread_producer_add(pt,bs);                                         printf("thrad producer 0x%x add recv business..\n",(unsigned int)pthread_self());                                 }                                 --readycode;                                 ++i;                         }}         }         printf("producer is done..\n");         return 0; }  #include pool_t* thread_pool_create(int tmax,int tmin,int qmax) {         //对线程池类初始化,申请空间,按最小阈值创建消费者并创建管理者         pool_t* pt=NULL;         if((pt=malloc(sizeof(pool_t)))==NULL)                 perror("thread_pool_create,malloc pool failed");         pt->thread_shutdown=1;         pt->thread_max=tmax;         pt->thread_min=tmin;         pt->thread_alive=0;         pt->thread_busy=0;         pt->exit_code=0;         //创建队列         if((pt->queue=malloc(sizeof(business_t)*qmax))==NULL)                 perror("thread_pool_crreate,malloc queue failed");         pt->front=0;         pt->rear=0;         pt->max=qmax;         pt->cur=0;         if((pt->ctids=malloc(sizeof(pthread_t)*tmax))==NULL)                 perror("thread_pool_create,malloc ctids failed");         //初始化ctids,0         bzero(pt->ctids,sizeof(pthread_t)*tmax);         if(pthread_mutex_init(&(pt->lock),NULL)!=0||pthread_cond_init(&(pt->Not_Full),NULL)!=0||pthread_cond_init(&(pt->Not_Empty),NULL)!=0){                 printf("thread_pool_create,init lock or cond failed\n");         }         //预创建消费者         int err;         int i;         for(i=0;ictids[i],NULL,thread_customer,pt))>0)                         printf("thread_pool_create,create customer failed:%s\n",strerror(err));                 else ++(pt->thread_alive);         }         //管理线程         if((err=pthread_create(&pt->mtid,NULL,thread_mananger,pt))>0)                 printf("thread_pool_create,cretate mannager failed:%s\n",strerror(err));         return pt; }  #include int first_response(int sock,struct sockaddr_in caddr) { char cip[16]; char response[1500]; bzero(cip,16); bzero(response,sizeof(response)); inet_ntop(AF_INET,&caddr.sin_addr,cip,16); sprintf(response,"hi,%s connection server host  success\n",cip); if((send(sock,response,strlen(response),0))==-1) perror("first_response,send failed"); return 0; }  #include int net_init(void) { int sock; struct sockaddr_in saddr; bzero(&saddr,sizeof(saddr)); saddr.sin_family=AF_INET; saddr.sin_port=htons(1234); saddr.sin_addr.s_addr=htonl(INADDR_ANY); if((sock=socket(AF_INET,SOCK_STREAM,0))==-1) perror("sock failed"); if((bind(sock,(struct sockaddr*)&saddr,sizeof(saddr)))==-1) perror("bind failed"); if((listen(sock,128))==-1) perror("listen failed"); printf("srver init success\n"); return sock; }  #include int thread_producer_add(pool_t* pt,business_t bs) {         //生产者任务         pthread_mutex_lock(&pt->lock);         if(pt->thread_shutdown)         {                 while(pt->cur==pt->max)                 {                         pthread_cond_wait(&pt->Not_Full,&pt->lock);                         if(!pt->thread_shutdown){                                 printf("cusomter thread 0x%x,shutdown %d,thread exit..\n",(unsigned int)pthread_self(),pt->thread_shutdown);                                 pthread_mutex_unlock(&pt->lock);                                 pthread_exit(NULL);                         }                 }                 //添加一次业务                 pt->queue[pt->front].task=bs.task;                 pt->queue[pt->front].arg=bs.arg;                 ++(pt->cur);                 pt->front=(pt->front+1)%pt->max;                 pthread_mutex_unlock(&pt->lock);                 pthread_cond_signal(&pt->Not_Empty);  } pthread_mutex_unlock(&pt->lock); return 0; }  #include int main() {         int serverfd=net_init();         epoll_init(serverfd);         pool_t* pt=thread_pool_create(100,10,1000);         epoll_listen(pt,serverfd);         return 0; } 

相关内容

热门资讯

联想笔记本bios讲解(联想笔... 大家好!小编今天给大家解答一下有关联想笔记本bios讲解,以及分享几个联想笔记本bios介绍对应的知...
笔记本没电后关机会怎么样-笔记... 朋友们,你们知道笔记本没电后关机这个问题吗?如果不了解该问题的话,小编将详细为你解答,希望对你有所帮...
笔记本gtx970m能玩什么游... 各位访客大家好!今天小编关注到一个比较有意思的话题,就是关于gtx970游戏笔记本的问题,于是小编就...
笔记本怎么自己装系统,笔记本如... 大家好呀!今天小编发现了笔记本怎么自己装系统的有趣问题,来给大家解答一下,别忘了关注本站哦,现在我们...
杭州华为笔记本批发_杭州笔记本... 大家好呀!今天小编发现了杭州华为笔记本批发的有趣问题,来给大家解答一下,别忘了关注本站哦,现在我们开...
笔记本两个风扇的作用,笔记本两... 朋友们,你们知道笔记本两个风扇的作用这个问题吗?如果不了解该问题的话,小编将详细为你解答,希望对你有...
笔记本怎么增强wifi信号接收... 大家好!小编今天给大家解答一下有关笔记本怎么增强wifi,以及分享几个笔记本怎么增强wifi信号接收...
笔记本vr设备推荐 可以调试v... 各位访客大家好!今天小编关注到一个比较有意思的话题,就是关于可以调试vr的笔记本的问题,于是小编就整...
南通联想笔记本电脑售后维修服务... 大家好呀!今天小编发现了南通联想笔记本的有趣问题,来给大家解答一下,别忘了关注本站哦,现在我们开始阅...
笔记本电脑连网络受限_笔记本连... 接下来,给各位带来的是笔记本电脑连网络受限的相关解答,其中也会对笔记本连接网络受限进行详细解释,假如...