当前位置: 首页 > ds >正文

移植pbrt中的并行化到ray trace in weeks中

整个复现pbrt的代码,劝退性太高了,我的做法是根据raytrace in weeks 的系列去做一个修改

首先是解决raytrace in weeks中渲染太慢的问题,因为他是一个像素一个像素处理的。

pbrt的做法

  1. 首先pbrt中的所有的任务不管是2d的块任务,还是1d的,继承自一个ParallelJob,这个job只有一个static 的线程池,理论上只会有这一个线程池。
  2. 然后任务通过链表的形式加在线程池中。
  3. 这里有一个很妙的点,就是比如说线程是我是一个线程公司。然后有其他的任务公司,公司告诉我,我的任务需要出你的多少工人,平均每人负责多少大任务,大任务下多少小任务才能做完,并且告诉你小任务的数量。
  4. 然后我每次工人会去遍历这个公司的任务,看看你现在的任务还缺不缺人,如果你现在的任务缺人,那这个工人就开始做你的一个大任务,然后释放任务锁,其他的工人就可以继续去找,等到一个大任务做完了,这个工人就可以去找其他的任务了。
  5. 这样的优点在于,如果我将所有的任务分线程,那么就会将时间长时间的浪费在选任务上。所以我需要你接手这个公司的业务,你就要多做一点,但是你不能一直在做,你做了一部分,其他部分可以由别的工人来做,所以就区分任务块。
  6. 但是这里它的做法是一次处理一个公司的任务,也就是不会说去做其他的任务。

结合ray trace in weeks的做法

首先是raytrace in weeks中对于每个像素的写入,这里是msaa的方式去写入每个像素

ParallelFor2D(image, [&](Point2i p){color pixel_color(0,0,0);for(int sampleu = 0; sampleu < sqrt_spp; sampleu++){for(int samplev = 0; samplev < sqrt_spp; samplev++){Ray r = get_ray(p.x, p.y, sampleu, samplev);}}
});

这里开始分配对应的任务,也就是这里划分多个大任务,其中包含了多个小任务

inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(Point2i)>& func)
{ParallelFor2D(extent, [&](const Bounds2i& b){for(const Point2i& p : b){func(p);}});
}

pbrt中设置了一个对先有线程的扩充。意思就是总小任务的区域。然后设置了8倍的线程去分配,得到每个8倍线程每个线程应该负责的小任务的区域,然后得到边长clamp。这里的目的就是得到一个线程最少执行多少个任务边长,这样不至于让每个线程执行很多的任务,也不会让线程不执行任务。也就是提高并行程度。

int tileSize = std::clamp((int)(
extent.Diagonal().x * extent.Diagonal().y / (8 * RunningThreads())), 1, 32
);

ThreadPool

  1. 这个线程池子类就是让线程在这里去得到对应的任务。全局维护一个这样的类,然后这个类维护一个任务链表。
  2. 这个池子维护一个锁去维护它的任务池子,每个主任务都可以得到该锁,一次只有一个线程可以访问任务池子和对应的任务。
  3. 一开始的时候需要去创建这个线程池子,在这个池子中有若干的工作线程和一个主线程,通过isEnqueuingThread去区分。所以这个线程池会再创建n-1个线程
  4. 每个线程都会进入到一个分配函数。这个函数WorkOrWait会判断当前是什么线程,如果是工作线程还需要判断是不是禁用了线程池。然后就会在线程池维护的任务队列中去找需要工作的任务,如果有就runstep去分配当前的线程任务。
  5. 当一个任务结束,我们需要把它移出去。因为这里一定是集合线程的力量去做一个队列的任务,所以当一个任务移除的时候,就意味着该去找下一个任务了。
  6. 最后当这个池子需要被销毁的时候,我们需要保证所有的线程任务执行完毕。
class ThreadPool
{
public:explicit ThreadPool(int nThreads);~ThreadPool();size_t size() const { return threads.size(); }std::unique_lock<std::mutex> AddToJobList(ParallelJob* job);void RemoveFromJobList(ParallelJob* job);void WorkOrWait(std::unique_lock<std::mutex>* lock, bool isEnqueuingThread);
private:void Worker();
private:std::vector<std::thread> threads;ParallelJob* jobLists = nullptr;mutable std::mutex mutex;bool shutdownThreads = false;bool disabled = false;std::condition_variable condition;
};ThreadPool::ThreadPool(int nThreads)
{for(int i = 0; i < nThreads - 1; i++){threads.push_back(std::thread(&ThreadPool::Worker, this));}
}//单个工作线程会一直执行
void ThreadPool::Worker()
{std::unique_lock<std::mutex> lock(mutex);while(!shutdownThreads){WorkOrWait(&lock, false);}
}std::unique_lock<std::mutex> ThreadPool::AddToJobList(ParallelJob* job)
{std::unique_lock<std::mutex> lock(mutex);if(jobLists){jobLists->prev = job;}job->next = jobLists;jobLists = job;condition.notify_all();return lock;
}void ThreadPool::WorkOrWait(std::unique_lock<std::mutex>* lock, bool isEnqueuingThread)
{if(!lock->owns_lock()){assert("lock is not owned by current thread");exit(-1);}// Return if this is a worker thread and the thread pool is disabledif(!isEnqueuingThread && disabled){condition.wait(*lock); //release lock and block itself, waiting for notifyreturn;}ParallelJob* job = jobLists;while(job && !job->HaveWork()){job = job->next;}if(job){job->activeWorkers++;job->RunStep(lock);if(lock->owns_lock()){assert("you need to release lock before return");}lock->lock();job->activeWorkers--;if(job->Finished()){condition.notify_all();}}else{condition.wait(*lock);}
}ThreadPool::~ThreadPool()
{if(threads.empty()){return;}{std::lock_guard<std::mutex> lock(mutex);shutdownThreads = true;condition.notify_all();}for(auto& thread : threads){thread.join();}
}void ThreadPool::RemoveFromJobList(ParallelJob* job)
{if(job->prev){job->prev->next = job->next;}else{jobLists = job->next;}if(job->next){job->next->prev = job->prev;}job->removed = true;
}

主线程就是true

inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(const Bounds2i)>& func)
{if(extent.IsEmpty()){return;}else if(extent.Area() == 1){func(extent);}int tileSize = std::clamp((int)(extent.Diagonal().x * extent.Diagonal().y / (8 * RunningThreads())), 1, 32);ParallelForLoop2D loop(extent, tileSize, std::move(func));std::unique_ptr<std::mutex> lock = ParallelJob::threadPool->AddToJobList(&loop);while(!loop.Finished()){ParallelJob::threadPool->WorkOrWait(&lock, true);}
} 
  1. 当主线程进来以后线程池锁住整个线程池,判断有任务可以做,然后,对应的任务runstep也就是当前的线程去做这个任务的一部分的任务。
  2. 之前阻塞的线程池

Parallel2D

  1. 这个类去记录一个主任务,我们创建了一个这种任务以后给他加入到任务池中。
  2. 然后这个主任务去维护一个链表,对应的就是线程池中的队列。
  3. 我们需要然线程池知道如何去做一部分的小任务,需要去知道这个任务是否还需要线程,是否做完。
void ParallelForLoop2D::RunStep(std::unique_lock<std::mutex>* lock)
{Point2i end = nextStart + Vector2i(chunkSize, chunkSize);Bounds2i b = Intersect(Bounds2i(nextStart, end), extent);if(b.IsEmpty()){assert("bounds is empty");return;}nextStart.x += chunkSize;if(nextStart.x >= extent.pMax.x){nextStart.x = extent.pMin.x;nextStart.y += chunkSize;}if(!HaveWork()){threadPool->RemoveFromJobList(this);}lock->unlock();func(b);
}
class ParallelJob
{
public:virtual ~ParallelJob() { assert(removed && "ParallelJob is being destroyed without being removed!"); }virtual bool HaveWork() const = 0;virtual void RunStep(std::unique_lock<std::mutex>* lock) = 0;bool Finished() const { return !HaveWork() && activeWorkers == 0; }virtual std::string ToString() const = 0;static ThreadPool* threadPool;
protected:std::string BasicToString() const {char resString[256];sprintf(resString, "activeWorkers: %d removed: %s", activeWorkers, removed);return std::string(resString);}
private:friend class ThreadPool;bool removed = false;int activeWorkers = 0;ParallelJob* prev = nullptr, *next = nullptr;
};class ParallelForLoop2D : public ParallelJob
{
public:ParallelForLoop2D(const Bounds2i& extent, int chunkSize, std::function<void(Bounds2i)> func) : extent(extent) ,nextStart(extent.pMin),chunkSize(chunkSize),func(std::move(func)){}virtual std::string ToString() const override{return BasicToString();}virtual bool HaveWork() const override { return nextStart.y < extent.pMax.y; }virtual void RunStep(std::unique_lock<std::mutex>* lock) override;
private:std::function<void(Bounds2i)> func;const Bounds2i extent;Point2i nextStart;int chunkSize;
};inline int RunningThreads()
{return ParallelJob::threadPool ? (1 + ParallelJob::threadPool->size()) : 1;
}inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(const Bounds2i)>& func)
{if(extent.IsEmpty()){return;}else if(extent.Area() == 1){func(extent);}int tileSize = std::clamp((int)(extent.Diagonal().x * extent.Diagonal().y / (8 * RunningThreads())), 1, 32);ParallelForLoop2D loop(extent, tileSize, std::move(func));std::unique_lock<std::mutex> lock = ParallelJob::threadPool->AddToJobList(&loop);while(!loop.Finished()){ParallelJob::threadPool->WorkOrWait(&lock, true);}
} inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(Point2i)>& func)
{ParallelFor2D(extent, [&](const Bounds2i& b){for(const Point2i& p : b){func(p);}});
}
#endif

结果

最终的耗时从390多ms,变为了59ms

整个代码

  1. parallel.h
#ifndef PARALLEL_H
#define PARALLEL_H
#include <thread>
#include <mutex>
#include <condition_variable>
#include <rtweekend.h>
#include "vecmath.h"
#include <stdio.h>
#include <cassert>
inline int AvaliableCores()
{return std::max<int>(1, std::thread::hardware_concurrency());
}class ParallelJob;class ThreadPool
{
public:explicit ThreadPool(int nThreads);~ThreadPool();size_t size() const { return threads.size(); }std::unique_lock<std::mutex> AddToJobList(ParallelJob* job);void RemoveFromJobList(ParallelJob* job);void WorkOrWait(std::unique_lock<std::mutex>* lock, bool isEnqueuingThread);
private:void Worker();
private:std::vector<std::thread> threads;ParallelJob* jobLists = nullptr;mutable std::mutex mutex;bool shutdownThreads = false;bool disabled = false;std::condition_variable condition;
};class ParallelJob
{
public:virtual ~ParallelJob() { assert(removed && "ParallelJob is being destroyed without being removed!"); }virtual bool HaveWork() const = 0;virtual void RunStep(std::unique_lock<std::mutex>* lock) = 0;bool Finished() const { return !HaveWork() && activeWorkers == 0; }virtual std::string ToString() const = 0;static ThreadPool* threadPool;
protected:std::string BasicToString() const {char resString[256];sprintf(resString, "activeWorkers: %d removed: %s", activeWorkers, removed);return std::string(resString);}
private:friend class ThreadPool;bool removed = false;int activeWorkers = 0;ParallelJob* prev = nullptr, *next = nullptr;
};class ParallelForLoop2D : public ParallelJob
{
public:ParallelForLoop2D(const Bounds2i& extent, int chunkSize, std::function<void(Bounds2i)> func) : extent(extent) ,nextStart(extent.pMin),chunkSize(chunkSize),func(std::move(func)){}virtual std::string ToString() const override{return BasicToString();}virtual bool HaveWork() const override { return nextStart.y < extent.pMax.y; }virtual void RunStep(std::unique_lock<std::mutex>* lock) override;
private:std::function<void(Bounds2i)> func;const Bounds2i extent;Point2i nextStart;int chunkSize;
};inline int RunningThreads()
{return ParallelJob::threadPool ? (1 + ParallelJob::threadPool->size()) : 1;
}inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(const Bounds2i)>& func)
{if(extent.IsEmpty()){return;}else if(extent.Area() == 1){func(extent);}int tileSize = std::clamp((int)(extent.Diagonal().x * extent.Diagonal().y / (8 * RunningThreads())), 1, 32);ParallelForLoop2D loop(extent, tileSize, std::move(func));std::unique_lock<std::mutex> lock = ParallelJob::threadPool->AddToJobList(&loop);while(!loop.Finished()){ParallelJob::threadPool->WorkOrWait(&lock, true);}
} inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(Point2i)>& func)
{ParallelFor2D(extent, [&](const Bounds2i& b){for(const Point2i& p : b){func(p);}});
}
#endif
  1. parallel.cpp
#include "parallel.h"ThreadPool::ThreadPool(int nThreads)
{for(int i = 0; i < nThreads - 1; i++){threads.push_back(std::thread(&ThreadPool::Worker, this));}
}ThreadPool *ParallelJob::threadPool = new ThreadPool(AvaliableCores());
void ThreadPool::Worker()
{std::unique_lock<std::mutex> lock(mutex);while(!shutdownThreads){WorkOrWait(&lock, false);}
}std::unique_lock<std::mutex> ThreadPool::AddToJobList(ParallelJob* job)
{std::unique_lock<std::mutex> lock(mutex);if(jobLists){jobLists->prev = job;}job->next = jobLists;jobLists = job;condition.notify_all();return lock;
}void ThreadPool::WorkOrWait(std::unique_lock<std::mutex>* lock, bool isEnqueuingThread)
{if(!lock->owns_lock()){assert("lock is not owned by current thread");exit(-1);}// Return if this is a worker thread and the thread pool is disabledif(!isEnqueuingThread && disabled){condition.wait(*lock); //release lock and block itself, waiting for notifyreturn;}ParallelJob* job = jobLists;while(job && !job->HaveWork()){job = job->next;}if(job){job->activeWorkers++;job->RunStep(lock);if(lock->owns_lock()){assert("you need to release lock before return");}lock->lock();job->activeWorkers--;if(job->Finished()){condition.notify_all();}}else{condition.wait(*lock);}
}ThreadPool::~ThreadPool()
{if(threads.empty()){return;}{std::lock_guard<std::mutex> lock(mutex);shutdownThreads = true;condition.notify_all();}for(auto& thread : threads){thread.join();}
}void ThreadPool::RemoveFromJobList(ParallelJob* job)
{if(job->prev){job->prev->next = job->next;}else{jobLists = job->next;}if(job->next){job->next->prev = job->prev;}job->removed = true;
}void ParallelForLoop2D::RunStep(std::unique_lock<std::mutex>* lock)
{Point2i end = nextStart + Vector2i(chunkSize, chunkSize);Bounds2i b = Intersect(Bounds2i(nextStart, end), extent);if(b.IsEmpty()){assert("bounds is empty");return;}nextStart.x += chunkSize;if(nextStart.x >= extent.pMax.x){nextStart.x = extent.pMin.x;nextStart.y += chunkSize;}if(!HaveWork()){threadPool->RemoveFromJobList(this);}lock->unlock();func(b);
}
http://www.xdnf.cn/news/16533.html

相关文章:

  • 268. 丢失的数字
  • RocksDB跳表MemTable优化揭秘
  • Java 集合进阶:从 Collection 接口到迭代器的实战指南
  • Containerd简介
  • 栈算法之【有效括号】
  • mybatis-plus从入门到入土(三):持久层接口之IService
  • Day 22: 复习
  • OTG原理讲解
  • 进制间的映射关系
  • 【RHCSA 问答题】第 12 章 安装和更新软件包
  • WorkManager vs Flow 适用场景分析
  • CSS变量与Houdini自定义属性:解锁样式编程新维度
  • [硬件电路-94]:模拟器件 - 信号耦合,让被放大信号与静态工作点的直流偏置信号完美的融合
  • 慧星云新增大模型服务:多款大模型轻松调用
  • 编程语言Java——核心技术篇(四)集合类详解
  • Go的内存管理和垃圾回收
  • 震网(Stuxnet):打开潘多拉魔盒的数字幽灵
  • 网络:基础概念
  • React入门指南——指北指南(第二节)
  • 深入浅出学习 KNN 算法:从原理到数字识别实践
  • 【简述】C++11/14/17/20/23 中的关键新特性
  • 从UX到AX:从“设计路径”到“共创关系”的范式革命——Agentic Experience如何重塑未来产品哲学
  • 秋招Day19 - 分布式 - 限流
  • 数据科学与大数据技术专业的核心课程体系及发展路径全解析
  • 从0开始学linux韦东山教程Linux驱动入门实验班(5)
  • 基于华为ENSP的OSPFLSA深入浅出-0
  • 元宇宙新基建:重塑数字市场的“超大陆”边界
  • LeetCode 895:最大频率栈
  • 6G通感算
  • 利用DeepSeek解决kdb+x进行tpch测试的几个问题及使用感受