TCP实现线程池竞争任务
服务端:
#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<netinet/ip.h>
#include<strings.h>
#include<unistd.h>
#include<ctype.h>
#include<arpa/inet.h>
#include<stdlib.h>
#include<string.h>
#include<sys/wait.h>
#include<pthread.h>#define SERV_PORT 8000
#define MAXLINE 80//打印报错信息
#define prrexit(msg){ \perror(msg); \exit(1); \}typedef struct Task{int fd;struct Task *next;
}Task;
//任务池子,队列
typedef struct Task_pool{Task *head;Task *tail;pthread_mutex_t lock;pthread_cond_t havetask;
}Task_pool;Task_pool *task_pool_init(){Task_pool *tp=(Task_pool *)malloc(sizeof(Task_pool));tp->head=NULL;tp->tail=NULL;pthread_mutex_init(&tp->lock,NULL);pthread_cond_init(&tp->havetask,NULL);return tp;
}void task_pool_push( Task_pool *tp,int fd){pthread_mutex_lock(&tp->lock);Task *t=(Task *)malloc(sizeof(Task));t->fd=fd;t->next=NULL;//两种情况if(!tp->tail){tp-> head=tp->tail=t;}else{tp->tail->next=t;tp-> tail=t;}pthread_cond_broadcast(&tp->havetask);pthread_mutex_unlock(&tp->lock);
}Task task_pool_pop(Task_pool *tp){pthread_mutex_lock(&tp->lock);//为什么不能用ifwhile(tp->head==NULL){pthread_cond_wait(&tp->havetask,&tp->lock);}Task tmp,*k;k=tp->head;tmp=*k;tp->head=tp->head->next;//队列一开始为空的情况下if(!tp->head){tp->tail=NULL;}free(k);pthread_mutex_unlock(&tp->lock);return tmp;
}void task_pool_free(Task_pool *tp){pthread_mutex_lock(&tp->lock);Task *p=tp->head,*k;while(p){k=p;p=p->next;free(k);}tp->head=NULL;pthread_mutex_unlock(&tp->lock);pthread_mutex_destroy(&tp->lock);pthread_cond_destroy(&tp->havetask);free(tp);return ;
}//子线程
void *up_server(void *arg){//获取自己的线程id,自我释放pthread_detach(pthread_self());//进行安全的类型转换// int connfd=(int)(intptr_t)arg;char buff[MAXLINE];int n,i;Task_pool *tp=arg;while(1){ Task tmp=task_pool_pop(tp);int connfd=tmp.fd;printf("get task fd=%d\n",connfd);while(1){n=read(connfd,buff,MAXLINE);if(n<=0){perror("read error or connections closed");break;}buff[n]='\0';//添加字符串终止符write(1,buff,n);if(strncmp(buff,"quit",4)==0){break;}for(i = 0; i < n ; i++)buff[i]=toupper(buff[i]);write(connfd,buff,n);}printf("finish task fd=%d\n",connfd);close(connfd);}//正常退出return (void *)0;
}int main(){struct sockaddr_in serveraddr,claddr;int listenfd, connfd;socklen_t claddr_len;// char buff[MAXLINE];char str[INET_ADDRSTRLEN];int n,i;//任务池创建Task_pool *tp=task_pool_init();//多线程 pthread_t tid;//多少核就多少个//一上来就会打印idfor(i=0;i<4;i++){pthread_create(&tid,NULL,up_server,(void *)(intptr_t)tp);printf("new thread is %#lx\n",tid);}listenfd =socket(AF_INET,SOCK_STREAM,0);if(listenfd<0){prrexit("socket");}//服务器ip地址,端口初始化bzero(&serveraddr,sizeof(serveraddr));serveraddr.sin_family=AF_INET;serveraddr.sin_port = htons(SERV_PORT);serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);if(bind(listenfd,(struct sockaddr *)&serveraddr,sizeof(serveraddr))<0)prrexit("bind");if(listen(listenfd,3)<0)prrexit("listen");printf("Accepting connections...\n");while(1){claddr_len= sizeof(claddr);connfd=accept(listenfd,(struct sockaddr *)&claddr,&claddr_len);if(connfd<0)prrexit("accept");printf("received from %s:%d\n",inet_ntop(AF_INET,&claddr.sin_addr,str,sizeof(str)),ntohs(claddr.sin_port));/*多进程pid_t pid= fork();if(pid<0){prrexit("fork");}//父进程 :等待 创建连接if(pid > 0){close(connfd);//回收进程资源while(waitpid(-1,NULL,WNOHANG)>0){ };continue;}close(listenfd);*///多线程// pthread_t tid;// pthread_create(&tid,NULL,up_server,(void *)(intptr_t)connfd);// printf("new thread is %#lx\n",tid);task_pool_push(tp,connfd);}task_pool_free(tp);return 0;
}
客户端:
#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<netinet/ip.h>
#include<string.h>
#include<unistd.h>
#include<ctype.h>
#include<arpa/inet.h>
#include<stdlib.h>#define SERV_PORT 8000
#define MAXLINE 80int main(){struct sockaddr_in servaddr;char buff[MAXLINE];int sockfd = socket(AF_INET, SOCK_STREAM,0);if(sockfd < 0){perror("socket");exit(1);}bzero(&servaddr,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_port = htons(SERV_PORT);inet_pton(AF_INET,"127.0.0.1",&servaddr.sin_addr);if(connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0){perror("cnnect");exit(1);}printf("Connect to server .Type 'quit' to exit.\n");//死循环进行读入int n;while((n=read(0,buff,MAXLINE))>0){if(n > 0)buff[n-1] = '\0';//边界检查,只比较前四个字节if(strncmp(buff,"quit",4)==0){printf("Quitting ..\n");write(sockfd,buff,strlen(buff));break;}ssize_t bytes_written =write(sockfd,buff,strlen(buff));if(bytes_written!=strlen(buff)){perror("write error");break;}//读取云服务器响应n = read(sockfd,buff,MAXLINE);if(n<=0){if(n==0){printf("Server closed the connection.\n");}else{perror("read error");}break;}buff[n] = '\0';//确保响应字符串正确终止printf("Server response: %s\n",buff);printf("Enter next message : ");fflush(stdout);//强制刷新输出缓冲区} if(n<0){perror("read from stdin error");}close(sockfd);printf("Client exited.\n");return 0;
}