当前位置: 首页 > ops >正文

TCP实现线程池竞争任务

服务端:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<netinet/ip.h>
#include<strings.h>
#include<unistd.h>
#include<ctype.h>
#include<arpa/inet.h>
#include<stdlib.h>
#include<string.h>
#include<sys/wait.h>
#include<pthread.h>#define SERV_PORT 8000
#define MAXLINE 80//打印报错信息
#define prrexit(msg){ \perror(msg); \exit(1); \}typedef struct Task{int fd;struct Task  *next;
}Task;
//任务池子,队列
typedef struct Task_pool{Task *head;Task *tail;pthread_mutex_t lock;pthread_cond_t havetask;
}Task_pool;Task_pool *task_pool_init(){Task_pool *tp=(Task_pool *)malloc(sizeof(Task_pool));tp->head=NULL;tp->tail=NULL;pthread_mutex_init(&tp->lock,NULL);pthread_cond_init(&tp->havetask,NULL);return tp;
}void task_pool_push( Task_pool *tp,int fd){pthread_mutex_lock(&tp->lock);Task *t=(Task *)malloc(sizeof(Task));t->fd=fd;t->next=NULL;//两种情况if(!tp->tail){tp-> head=tp->tail=t;}else{tp->tail->next=t;tp-> tail=t;}pthread_cond_broadcast(&tp->havetask);pthread_mutex_unlock(&tp->lock);
}Task task_pool_pop(Task_pool *tp){pthread_mutex_lock(&tp->lock);//为什么不能用ifwhile(tp->head==NULL){pthread_cond_wait(&tp->havetask,&tp->lock);}Task tmp,*k;k=tp->head;tmp=*k;tp->head=tp->head->next;//队列一开始为空的情况下if(!tp->head){tp->tail=NULL;}free(k);pthread_mutex_unlock(&tp->lock);return tmp;
}void task_pool_free(Task_pool *tp){pthread_mutex_lock(&tp->lock);Task *p=tp->head,*k;while(p){k=p;p=p->next;free(k);}tp->head=NULL;pthread_mutex_unlock(&tp->lock);pthread_mutex_destroy(&tp->lock);pthread_cond_destroy(&tp->havetask);free(tp);return ;
}//子线程
void *up_server(void *arg){//获取自己的线程id,自我释放pthread_detach(pthread_self());//进行安全的类型转换//  int connfd=(int)(intptr_t)arg;char buff[MAXLINE];int n,i;Task_pool *tp=arg;while(1){ Task tmp=task_pool_pop(tp);int connfd=tmp.fd;printf("get task fd=%d\n",connfd);while(1){n=read(connfd,buff,MAXLINE);if(n<=0){perror("read error  or connections closed");break;}buff[n]='\0';//添加字符串终止符write(1,buff,n);if(strncmp(buff,"quit",4)==0){break;}for(i = 0; i < n ; i++)buff[i]=toupper(buff[i]);write(connfd,buff,n);}printf("finish task fd=%d\n",connfd);close(connfd);}//正常退出return (void *)0;
}int main(){struct sockaddr_in serveraddr,claddr;int listenfd, connfd;socklen_t  claddr_len;// char buff[MAXLINE];char str[INET_ADDRSTRLEN];int n,i;//任务池创建Task_pool *tp=task_pool_init();//多线程   pthread_t  tid;//多少核就多少个//一上来就会打印idfor(i=0;i<4;i++){pthread_create(&tid,NULL,up_server,(void *)(intptr_t)tp);printf("new thread is %#lx\n",tid);}listenfd =socket(AF_INET,SOCK_STREAM,0);if(listenfd<0){prrexit("socket");}//服务器ip地址,端口初始化bzero(&serveraddr,sizeof(serveraddr));serveraddr.sin_family=AF_INET;serveraddr.sin_port = htons(SERV_PORT);serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);if(bind(listenfd,(struct  sockaddr *)&serveraddr,sizeof(serveraddr))<0)prrexit("bind");if(listen(listenfd,3)<0)prrexit("listen");printf("Accepting connections...\n");while(1){claddr_len= sizeof(claddr);connfd=accept(listenfd,(struct sockaddr *)&claddr,&claddr_len);if(connfd<0)prrexit("accept");printf("received from %s:%d\n",inet_ntop(AF_INET,&claddr.sin_addr,str,sizeof(str)),ntohs(claddr.sin_port));/*多进程pid_t pid= fork();if(pid<0){prrexit("fork");}//父进程 :等待 创建连接if(pid > 0){close(connfd);//回收进程资源while(waitpid(-1,NULL,WNOHANG)>0){ };continue;}close(listenfd);*///多线程//  pthread_t  tid;//  pthread_create(&tid,NULL,up_server,(void *)(intptr_t)connfd);//  printf("new thread is %#lx\n",tid);task_pool_push(tp,connfd);}task_pool_free(tp);return 0;
}

客户端:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<netinet/ip.h>
#include<string.h>
#include<unistd.h>
#include<ctype.h>
#include<arpa/inet.h>
#include<stdlib.h>#define SERV_PORT 8000
#define MAXLINE 80int main(){struct  sockaddr_in servaddr;char buff[MAXLINE];int sockfd = socket(AF_INET, SOCK_STREAM,0);if(sockfd < 0){perror("socket");exit(1);}bzero(&servaddr,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_port = htons(SERV_PORT);inet_pton(AF_INET,"127.0.0.1",&servaddr.sin_addr);if(connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0){perror("cnnect");exit(1);}printf("Connect to server .Type 'quit' to exit.\n");//死循环进行读入int n;while((n=read(0,buff,MAXLINE))>0){if(n > 0)buff[n-1] = '\0';//边界检查,只比较前四个字节if(strncmp(buff,"quit",4)==0){printf("Quitting ..\n");write(sockfd,buff,strlen(buff));break;}ssize_t bytes_written =write(sockfd,buff,strlen(buff));if(bytes_written!=strlen(buff)){perror("write error");break;}//读取云服务器响应n = read(sockfd,buff,MAXLINE);if(n<=0){if(n==0){printf("Server closed the connection.\n");}else{perror("read error");}break;}buff[n] = '\0';//确保响应字符串正确终止printf("Server  response: %s\n",buff);printf("Enter next message : ");fflush(stdout);//强制刷新输出缓冲区}  if(n<0){perror("read from stdin error");}close(sockfd);printf("Client exited.\n");return 0;
}

http://www.xdnf.cn/news/19355.html

相关文章:

  • FPGA|Quartus II 中使用TCL文件进行引脚一键分配
  • 深入理解零拷贝:本地IO与网络IO的性能优化利器
  • Docker基本介绍
  • MySQL 慢查询 debug:索引没生效的三重陷阱
  • 深度学习框架与工具使用心得:从入门到实战优化
  • 动作指令活体检测通过动态交互验证真实活人,保障安全
  • 数字后端tap cell:新老工艺tap cell区别
  • 软考中级数据库系统工程师学习专篇(67、数据库恢复)
  • Linux网络socket套接字(中)
  • AI人工智能大模型应用如何落地
  • DriveDreamer-2
  • C++ 模板全览:从“非特化”到“全特化 / 偏特化”的完整原理与区别
  • CUDA与图形API的深度互操作:解锁GPU硬件接口的真正潜力
  • Linux 系统都有哪些
  • Playwright Python 教程:实战篇
  • docker中的命令(四)
  • Coze源码分析-工作空间-项目开发-前端源码
  • 如何重置SVN被保存的用户名和密码
  • 【pve】
  • 轻量化注意力+脉冲机制,Transformer在低功耗AI中再度进化
  • 吴恩达机器学习作业十 PCA主成分分析
  • 基于单片机智能大棚/温室大棚/智慧农业/智能栽培种植系统/温湿度控制
  • LeetCode 37.解数独
  • k8s三阶段项目
  • 狂神说--Nginx--通俗易懂
  • 线程池八股文
  • 从零开始写个deer-flow-mvp-第一天
  • 拆分TypeScript项目的学习收获:处理编译缓存和包缓存,引用本地项目,使用相对路径
  • 粗糙表面接触模型MATLAB代码
  • 多租户配额与预算:限额、配额周期与突发桶的结算模型(Final)