Linux网络编程day7 线程池
线程池
typedef struct{void*(*function)(void*); //函数指针,回调函数void*arg; //上面函数的参数
}threadpool_task_t; //各子线程任务的结构体/*描述线程池相关信息*/struct threadpool_t{pthread_mutex_t lock; // 用于锁住本结构体pthread_mutex_t thread_counter; //记录忙状态线程个数的锁 -- bust_thr_numpthread_cond_t queue_not_full; //当任务队列满时 , 添加任务的线程阻塞 , 等待此条件变量pthread_cond_t queue_not_empty; //任务队列不为空时,通知等待任务的线程pthread_t *threads; //存放线程池中每个线程的Tid . 数组pthread_t adjust_tid; //存管理线程的tidthreadpool_task_t *task_queue; //任务队列--数组首地址int min_thr_num; //线程池最小线程数int max_thr_num; //线程池最大线程数int live_thr_num; //当前存活线程个数int busy_thr_num; //忙状态线程个数int wait_exit_thr_num; //要销毁的线程个数int queue_front; //task_queue队头下标int queue_rear; //task_queue队尾下标int queue_size; //task_queue队中实际任务数int queue_max_size; //task_queue队列可容纳任务数上限int shutdown; //标志位,线程池使用状态,true or false
};
线程池模块分析
1、main():创建线程池
向线程池中添加任务,借助回调处理任务
销毁线程池
int main(void)
{//threadpool_t * threadpool_create(int min_thr_num , int max_thr_num , int queue_max_size);threadpool_t *thp = threadpool_create(3 , 100 , 100);//创建线程池,最大数量100,最小数量3 ,任务队列最
大容量100.printf("pool inited");int num[20] , i; //模拟客户端向服务器发送数据等场景for(i = 0; i < 20 ; i++){num[i] = i;printf("add task %d\n" , i);//int threadpool_add(threadpool_t *pool , void*(*function)(void*arg) , void arg);threadpool_add(thp , process , (void*)&num[i]); //向线程池中添加任务}sleep(10); //等待子线程完成任务threadpool_destroy(thp);return 0 ;
}
2、pthreadpool_create:创建线程池结构体指针
初始化线程池结构体中N个成员变量
创建N个任务线程
创建1个管理者线程
失败时 , 释放空间
threadpool_t* threadpool_create(int min_thr_num , int max_thr_num , int queue_max_size)
{int i ;struct threadpool_t *pool = NULL; // 线程池 结构体do{if((pool = (struct threadpool_t*)malloc(sizeof(struct threadpool_t))) == NULL){printf("malloc threadpool fail");break;}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num; //活着的线程数 初值=最小线程数pool->wait_exit_thr_num = 0;pool->queue_size = 0; //有0个产品pool->queue_max_size = queue_max_size;//最大任务队列数pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false; // 不关闭线程池/*根据最大线程上线数 , 给工作线程数组开辟空间,清零*/pool->threads = (pthread*)malloc(sizeof(pthread_t)*max_thr_num);if(pool->threads == NULL){printf("malloc threads fail");break;}memset(pool->threads , 0 , sizeof(pthread_t)*max_thr_num);/*给任务队列开辟空间 */pool->task_queue = (threadpool_task_t*)malloc(sizeof(threadpool_task_t)*queue_max_size);if(pool->task_queue == NULL){printf("malloc task_queue fail");break;}/*初始化互斥锁、条件变量 , 使用init动态初始化 , 加上进行返回值判断*/if(pthread_mutex_init((&pool->lock) , NULL) != 0|| pthread_mutex_init(&(pool->thread_counter) , NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty) , NULL) != 0|| pthread_cond_init(&(pool->queue_not_full) , NULL) != 0){printf("init the lock or cond fail");break;}/*启动min_thr_num个work thread*/for(i = 0 ; i < min_thr_num ; i++){pthread_create(&(pool->threads[i]) , NULL , threadpool_thread , (void*)pool);//pool指向当前线>程池printf("stat thread 0x%x...\n" , (unsigned int)pool->threads[i]);}pthread_create(&(pool->adjust_tid) , NULL , adjust_thread , (void*)pool);//创建管理者线程return pool;}while(0);threadpool_free(pool); // 前面代码调用失败时,释放pool空间return NULL;
}
3、threadpool_thread():进入子线程回调函数。
接收参数(void*)arg
加锁--》lock--》整个结构体的锁
判断条件变量--》wait
/* 线程池中各个工作线程 */
void* threadpool_thread(void* threadpool)
{struct threadpool_t *pool = (struct threadpool_t*)threadpool;threadpool_task_t task;//任务队列对象while(true){/*刚创建出线程,等待任务队列里面有队列 ,否则阻塞等待任务队列李有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));//queue_size = 0说明没有任务,调用wait函数阻塞在条件变量上,若有任务,跳过whilewhile((pool->queue_size == 0) && (!pool->shutdown)){printf("thread 0x%x is waiting\n" , (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty) , &(pool->lock));//清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程if(pool->wait_exit_thr_num > 0 ){pool->wait_exit_thr_num--;//如果线程池里线程个数大于最小值时可以结束当前线程if(pool->live_thr_num > pool->min_thr_num){printf("thread 0x%x is exiting\n" , (unsigned int)pthread_self());pool_live_thr_num--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);}}//指定true,要关闭线程池里的每个线程,自行退出-->销毁线程池if(pool->shutdown){pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n" , (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL); // 线程自行结束}//从任务队列获取任务,出队操作task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; //出队,模拟环形pool->queue_size--;//通知可以有新的任务添加进来pthread_cond_broadcast(&(pool->queue_not_full));//任务取出后立即将线程池锁释放pthread_mutex_unlock(&(pool->lock));//执行任务printf("thread 0x%x stat working\n" , (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter)); //忙状态线程数变量锁pool->busy_thr_num++; //忙状态线程数+1pthread_mutex_unlock(&((pool->thread_counter));(*(task.function))(task.arg);//执行回调函数//任务结束处理printf("thread 0x%x end working\n" , (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--; //处理掉任务,忙状态线程数-1pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}
4、adjust_thread():进入管理者线程回调函数
循环10s执行一次
接收参数(void*)arg
加锁--》lock--》整个结构体的锁
获取管理线程时需要用到的变量:live busy queue task
根据既定算法,使用上述3变量判断是否应该创建、销毁线程池中的指定步长的线程。
void* adjust_thread(void* threadpool)
{int i ;struct threadpool_t *pool = (struct threadpool_t*)threadpool;while(!pool->shutdown){sleep(DEFAULT_TIME); //定时对线程池管理pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size;int live_thr_num = pool->live_thr_num;pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num;pthread_mutex_unlock(&(pool->pthread_counter));//创建新线程,任务数大于最小线程池个数,且存活线程数少于最大线程数if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num){pthread_mutex_lock(&(pool->lock));int add = 0;//一次增加DEFAULT_THREAD个线程for(i = 0 ; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool_max_thr_num ; i++){pthread_create(&(pool->thread[i]) , NULL , threadpool_thread , (void*)pool);add++;pool->live_thr_num++;}pthread_mutex_unlock(&(pool->lock));}if((busy_thr_num *2) < live_thr_num && live_thr_num > pool->min_thr_num){pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;pthread_mutex_unlock(&(pool->lock));for(i = 0 ; i < DEFAULT_THREAD_VARY ; i++){pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}
5、threadpool_add:模拟产生任务 num[20]
设置回调函数,处理任务sleep(1)代表处理完成
初始化任务队列结构体成员 回调函数和arg
利用环形队列机制实现添加任务,借助队尾指针
唤醒阻塞在条件变量上的线程
//线程池中的线程,模拟处理业务
void* process(void*arg)
{printf("thread 0x%x working on task %d\n" , (unsigned int)pthread_self() , (int)arg);sleep(1);printf("task %d is end\n" , (int)arg);return NULL
}
int threadpool_add(struct threadpool_t *pool , (void*)(**function)(void*arg) , (void*)arg)
{pthread_mutex_lock(&(pool->lock));//为真 , 队列已满 , 调用wait阻塞while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)){pthread_cond_wait(&(pool->queue_not_full) , &(pool->lock));}if(pool->shutdown){pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0 ;}//清空工作线程 调用的回调函数 的参数if(pool->task_queue[pool->queue_rear].arg != NULL){pool->task_queue[pool->queue_rear].arg = NULL;}//添加任务到任务队列pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;//队尾指针移动,模拟环形pool->queue_size++;//向任务队列中添加一个任务//添加完任务后,队列不为空,唤醒线程池中等待处理任务的线程pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0 ;
}
6、从3中wait之后执行,处理任务:获取任务处理回调函数及参数
利用环形队列机制实现处理任务,借助队头指针
唤醒阻塞在条件变量上的server
修改忙线程数量++
执行处理任务线程
修改忙线程数量--
7、创建和销毁线程:管理者线程根据上述三个参数判断是否创建、销毁
满足创建条件pthread_create()回调任务线程函数
满足销毁条件wait_exit_thr_num赋值,signal给阻塞在条件变量上的线程发送假条件满足信号,跳转至wait阻塞,阻塞线程会被假信号唤醒,使用pthread_exit。
int threadpool_destroy(threadpool_t *pool)
{int i;if(pool == NULL)return -1;pool->shutdown = true;pthread_join(pool->adjust_tid , NULL);for(i = 0 ; i < pool->live_thr_num ; i++){pthread_cond_broadcast(&(pool->queue_not_empty));}for(i = 0; i < pool->live_thr_num ; i++){pthread_join(pool->threads[i] , NULL);}threadpool_free(pool);return 0;
}
int threadpool_free(threadpool_t *pool)
{if(pool == NULL)return -1;if(pool->task_queue)free(pool->tast_queue);if(pool->threads){free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_full));pthread_cond_destroy(&(pool->queue_not_empty));}free(pool);pool = NULL;return 0;
}
UDP服务器
TCP通信和UDP通信的优缺点
TCP
面向连接的,可靠数据包传输。对于不稳定的网络层,采取完全弥补的通信方式,丢包重传。
优点:稳定 数据流量稳定、速度稳定、顺序
缺点:传输速度慢、效率低,资源开销大。
使用场景:数据完整要求性较高,不追求效率
大数据传输、文件传输。
UDP
无连接的,不可靠的数据报传递。对于不稳定的网络层,采取完全不弥补的通信方式,默认还原网络状况。
优点:传输速度快,效率高,资源开销小。
缺点:不稳定 数据流量、速度不稳定,顺序不稳定
使用场景:对时效性要求较高场合。稳定性其次。
游戏、视频会议、视频电话。
----腾讯、华为、阿里 -- 应用层添加数据校验协议,弥补UDP的不足
UDP实现的C/S模型
无三次握手建立连接,故没有accept()、connect()
recv()/send()只能用于TCP通信
server
server:
lfd = socket(AF_INET , SOCK_DGRAM , 0); SOCK_DGRAM--->报式协议
bind();
listen(); ----可有可无
while(1){ //不使用read函数recvfrom() //涵盖accept函数中的传出地址结构sendto();
}
close();
client
cfd = socket(AF_INET , SOCK_DGRAM , 0);sendto("服务器地址结构" , 地址结构大小)recvfrom()
写屏幕
close()
recvfrom函数
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
socket:lfd
buf:缓冲区地址
len:缓冲区大小
flags:0
src_addr:传出参数,传出对端地址结构
src_addr:传入传出返回值:成功接收数据字节数
失败-1 errno 0对端关闭
sendto函数
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
socket:套接字
buf:存储数据的缓冲区
len:数据长度
flags:0
dest_addr:传入参数,目标地址结构
src_addr:地址结构长度返回值:成功写出数据字节数
失败-1 errno