当前位置: 首页 > news >正文

【学习记录】c完整线程池实现

前言

c实现线程池,初步测试无误,采用双端队列,和任务偷取,任务优先加入自身队列。为了避免极端情况下,某一个线程的两队列中任务过于多,而空闲线程又无事可做。加入了空闲队列和任务偷取,提交任务的时候不仅唤醒自身,还会唤醒一个空闲线程,让空闲线程尝试取偷取高负载的线程任务。
正常情况,提交任务是会平均的分配给几个线程队列,目前我能想到的是同时初始化很多线程,线程中提交任务,提交完之后关闭。这样确实有可能导致任务积压到一个线程里面(不过无需担心可以有任务偷取缓解)。
如果错误欢迎斧正

头文件


//
// Created by Administrator on 2025/8/22.
//#ifndef THREADPOOL_H
#define THREADPOOL_H
#ifndef MAXTHREADNUM
#define MAXTHREADNUM 64
#endif// 任务函数类型
typedef void *(*task_func_t)(void *);// 前向声明
typedef struct thread_pool thread_pool_t;// 任务结构
typedef struct {task_func_t func;void *arg;
} task_t;// 本地任务队列(双端队列)
typedef struct {task_t *queue;int capacity;int head;   // 从 head 取任务(本地线程用)int tail;   // 从 tail 放任务(本地线程用)pthread_mutex_t lock;     // 窃取时用pthread_cond_t  notify;   // 窃取通知
} local_queue_t;// 线程池
struct thread_pool {local_queue_t *queues;    // 每个线程的本地队列int num_workers;pthread_t *threads;int shutdown;// 空闲线程管理int idle_workers[MAXTHREADNUM];     // 存储空闲 worker 的索引(最大支持 MAXTHREADNUM 个线程)int idle_count;           // 当前空闲线程数量pthread_mutex_t idle_lock; // 保护 idle_workers 和 idle_count
};typedef struct {int index;thread_pool_t * pool;
}thread_pool_c;// 初始化线程池
thread_pool_t* thread_pool_init(int num_workers);
// 提交任务放入某个线程的本地队列
int thread_pool_submit(thread_pool_t *pool, task_func_t func, void *arg);
// 销毁线程池
void thread_pool_destroy(thread_pool_t *pool);
#endif //THREADPOOL_H

源文件

//
// Created by Administrator on 2025/8/22.
//
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include "threadpool.h"// === 双端队列操作 ===// 队列是否为空
static int queue_empty(local_queue_t *q) {return q->head == q->tail;
}// 队列是否满
static int queue_full(local_queue_t *q) {return (q->tail + 1) % q->capacity == q->head;
}// 非线程安全,仅用于本线程查询自己的队列(由本线程调用)
int local_queue_size_fast(local_queue_t *q) {if (!q) return -1;int size = (q->tail - q->head + q->capacity) % q->capacity;return size;
}// 向本地队列尾部添加任务(由本线程调用)
static int queue_push_tail(local_queue_t *q, task_t task) {if (queue_full(q)) return -1;q->queue[q->tail] = task;q->tail = (q->tail + 1) % q->capacity;return 0;
}// 从本地队列头部获取任务(由本线程调用)
static int queue_pop_head(local_queue_t *q, task_t *task) {if (queue_empty(q)) return -1;*task = q->queue[q->head];q->head = (q->head + 1) % q->capacity;return 0;
}// 从本地队列尾部窃取任务(由其他线程调用)
static int queue_steal(local_queue_t *q, task_t *task) {pthread_mutex_lock(&q->lock);if (queue_empty(q)) {pthread_mutex_unlock(&q->lock);return -1;}// 从 tail-1 处窃取(队尾)int tail = q->tail;if (tail == q->head) {pthread_mutex_unlock(&q->lock);return -1;}// *task = q->queue[(tail - 1) % q->capacity];// q->tail = (tail - 1) % q->capacity;int new_tail = (tail - 1 + q->capacity) % q->capacity;*task = q->queue[new_tail];q->tail = new_tail;// 唤醒可能等待的线程pthread_cond_signal(&q->notify);pthread_mutex_unlock(&q->lock);return 0;
}// === 线程池实现 ===// 工作者线程主函数
void* worker_routine(void *arg) {thread_pool_c *poolc = (thread_pool_c*)arg;thread_pool_t *pool = poolc->pool;int self_id = poolc->index; //自身的序号free(poolc);int num_tries = 0;const int max_tries = pool->num_workers * 2;// printf("线程%d启动成功 id=%lu\n", self_id,pthread_self());while (!pool->shutdown) {task_t task;int executed = 0;// printf("线程[%d] 非空等待任务剩余 %d... \n", self_id,local_queue_size_fast(&pool->queues[self_id]));// 1. 先尝试执行自己的任务if (queue_pop_head(&pool->queues[self_id], &task) == 0) {// printf("线程[%d] 开始执行自身任务 %s \n", self_id, (char*)task.arg);task.func(task.arg);executed = 1;}// 2. 自己的空了,去偷别人的else {for (int i = 0; i < pool->num_workers && !executed && num_tries < max_tries; i++) {int thief_id = (self_id + i + 1) % pool->num_workers;if (thief_id == self_id) continue;if (queue_steal(&pool->queues[thief_id], &task) == 0) {// printf("线程[%d] 开始执行他人[%d]任务 %s \n", self_id,thief_id, (char*)task.arg);task.func(task.arg);executed = 1;}num_tries++;}}// 3. 都没有任务,等待if (!executed) {// === 注册自己为“空闲” ===pthread_mutex_lock(&pool->idle_lock);if (!pool->shutdown && pool->idle_count < MAXTHREADNUM) { // MAXTHREADNUM 是上限pool->idle_workers[pool->idle_count++] = self_id;}pthread_mutex_unlock(&pool->idle_lock);pthread_mutex_lock(&pool->queues[self_id].lock);// 再次检查,避免遗漏if (queue_empty(&pool->queues[self_id]) && !pool->shutdown) {// struct timespec ts;// clock_gettime(CLOCK_REALTIME, &ts);// ts.tv_sec += 1;  // 每1秒醒来一次检查pthread_cond_wait(&pool->queues[self_id].notify, &pool->queues[self_id].lock);// pthread_cond_timedwait(&pool->queues[self_id].notify, &pool->queues[self_id].lock, &ts);}pthread_mutex_unlock(&pool->queues[self_id].lock);}if(executed){num_tries = 0;}}return NULL;
}// 初始化线程池
thread_pool_t* thread_pool_init(int num_workers) {if (num_workers <= 0) return NULL;thread_pool_t *pool = calloc(1, sizeof(thread_pool_t));if (!pool) goto fail;pool->num_workers = num_workers;pool->threads = calloc(num_workers, sizeof(pthread_t));pool->queues = calloc(num_workers, sizeof(local_queue_t));if (!pool->threads || !pool->queues) {goto fail;}if (pthread_mutex_init(&pool->idle_lock, NULL) != 0) {goto fail;}pool->idle_count = 0;// 初始化每个本地队列for (int i = 0; i < num_workers; i++) {local_queue_t *q = &pool->queues[i];q->capacity = MAXTHREADNUM;q->queue = calloc(q->capacity, sizeof(task_t));q->head = q->tail = 0;if (pthread_mutex_init(&q->lock, NULL) != 0 ||pthread_cond_init(&q->notify, NULL) != 0 ||!q->queue) {goto fail;}}pool->shutdown = 0;// 创建线程for (int i = 0; i < num_workers; i++) {thread_pool_c *pool_c = calloc(1,sizeof(thread_pool_c));if(!pool_c) {pool->shutdown = 1;for (int j = 0; j < i; j++) {pthread_cond_signal(&pool->queues[j].notify);  // 唤醒等待中的线程}for (int j = 0; j < i; j++) {pthread_join(pool->threads[j], NULL);}goto fail;}pool_c->pool = pool;pool_c->index = i;if (pthread_create(&pool->threads[i], NULL, worker_routine, pool_c) != 0) {pool->shutdown = 1;for (int j = 0; j < i; j++) {pthread_cond_signal(&pool->queues[j].notify);  // 唤醒等待中的线程}for (int j = 0; j < i; j++) {pthread_join(pool->threads[j], NULL);}goto fail;}}return pool;fail:// if(pool_c) free(pool_c);if (pool) {for (int i = 0; i < num_workers; i++) {if (pool->queues && pool->queues[i].queue) free(pool->queues[i].queue);pthread_mutex_destroy(&pool->queues[i].lock);pthread_cond_destroy(&pool->queues[i].notify);}pthread_mutex_destroy(&pool->idle_lock);free(pool->queues);free(pool->threads);free(pool);}return NULL;
}// 提交任务放入某个线程的本地队列
int thread_pool_submit(thread_pool_t *pool, task_func_t func, void *arg) {if (!pool || !func) return -1;task_t task = { .func = func, .arg = arg };// 简单轮询分配static __thread int last_idx = 0; // 每个线程有自己的 last_idxint start = last_idx;int idx = start;do {local_queue_t *q = &pool->queues[idx];pthread_mutex_lock(&q->lock);if (queue_push_tail(&pool->queues[idx], task) == 0) {last_idx = (idx + 1) % pool->num_workers;// 唤醒目标线程(如果它在等)pthread_cond_signal(&pool->queues[idx].notify);pthread_mutex_unlock(&q->lock);// === 尝试唤醒一个空闲线程 ===pthread_mutex_lock(&pool->idle_lock);if (pool->idle_count > 0) {pool->idle_count-- ; //例如 1的时候应该取 0int target_id = pool->idle_workers[pool->idle_count];// 唤醒那个空闲线程,让它去偷任务pthread_cond_signal(&pool->queues[target_id].notify);}pthread_mutex_unlock(&pool->idle_lock);// printf("任务 %s 放入队列[%d]成功\n", (char*)arg, idx);return 0;}else{// printf("任务%d 队列已满\n", idx);}pthread_mutex_unlock(&q->lock);idx = (idx + 1) % pool->num_workers;} while (idx != start);// printf("Task rejected: all local queues are full\n");return -1;
}// 销毁线程池
void thread_pool_destroy(thread_pool_t *pool) {if (!pool) return;pool->shutdown = 1;// 唤醒所有线程for (int i = 0; i < pool->num_workers; i++) {pthread_cond_broadcast(&pool->queues[i].notify);}for (int i = 0; i < pool->num_workers; i++) {pthread_join(pool->threads[i], NULL);}// 清理资源for (int i = 0; i < pool->num_workers; i++) {free(pool->queues[i].queue);pthread_mutex_destroy(&pool->queues[i].lock);pthread_cond_destroy(&pool->queues[i].notify);}free(pool->queues);free(pool->threads);free(pool);
}// void* test_task(void *arg) {
//     printf("Task executed by thread %lu: %s\n", pthread_self(), (char*)arg);
//     free(arg);
//     usleep(100000);
//     return NULL;
// }
// int main() {
//     thread_pool_t *pool = thread_pool_init(4);
//     if (!pool) return 1;
//     sleep(1);
//     for (int i = 0; i < 500; i++) {
//         char *msg = malloc(32);
//         sprintf(msg, "Task %d",i);
//         int ret = thread_pool_submit(pool, test_task, msg);
//         // if (ret!=0){
//         //     printf("任务%d提交失败\n", i);
//         // }else{
//         //     printf("任务%d提交成功\n", i);
//         // }
//     }
//
//     sleep(30);
//     thread_pool_destroy(pool);
//     return 0;
// }
http://www.xdnf.cn/news/1347157.html

相关文章:

  • 集成算法学习笔记
  • C++ OpenGL中几个常见库及其区别
  • Python实现从Parquet文件生成Redshift表并存储SQL语句
  • Eigen 中Sparse 模块的简单介绍和实战使用示例
  • (纯新手教学)计算机视觉(opencv)实战八——四种边缘检测详解:Sobel、Scharr、Laplacian、Canny
  • Day11 数据统计 图形报表
  • RKLLM 模型转换从0开始
  • vagrant怎么在宿主机操作虚拟机里面的系统管理和软件安装
  • 2025软件供应链安全技术路线未来趋势预测
  • vim的使用
  • Retrieval-Augmented Generation(RAG)
  • 为什么访问HTTPS站点时,会发生SSL证书错误
  • Trie 树(字典树)
  • 8月22号打卡
  • FFmpeg及 RTSP、RTMP
  • GitGithub相关(自用,持续更新update 8/23)
  • 文件下载和文件上传漏洞
  • LeetCode第1695题 - 删除子数组的最大得分
  • CSS自定义属性(CSS变量)
  • Jenkins发布spring项目踩坑——nohup java -jar发布后显示成功,但实际jps查询并未运行
  • kubernetes中pod的管理及优化
  • Python打卡Day49 CBAM注意力
  • Apache Ozone 2.0.0集群部署
  • 微信原生下载互联网oss资源保存到本地
  • CCleaner v1.2.3.4 中文解锁注册版,系统优化,隐私保护,极速清理
  • Unreal Engine Class System
  • 图数据库(neo4j)基础: 分类/标签 节点 关系 属性
  • 蓝牙部分解析和代码建构
  • set_disable_timing应用举例
  • OpenCV 图像边缘检测