当前位置: 首页 > java >正文

vulkanscenegraph显示倾斜模型(6.4)-多线程下的记录与提交

前言

        上章深入分析了帧循环中呈现阶段的具体实现。本章将分析多线程下的记录与提交,进一步剖析vsg帧循环过程中的同步机制,并揭露信号量(VkSemaphore)和围栏(VkFence)以及vsg::FrameBlock与vsg::Barrier在其中的作用。


目录

  • 1 信号量(VkSemaphore)、栅栏(VkFence)、vsg::FrameBlock与vsg::Barrier
  • 2 多线程记录与提交

1 信号量(VkSemaphore)、围栏(VkFence)与vsg::FrameBlock、vsg::Barrier

        vsg::Semaphore封装了VkSaphore,用于将vulkan命令的完成与其他vulkan命令提交的开始同步,为GPU内部的同步;vsg::Fence封装了vkFence,用于同步Vulkan命令提交到队列的完成情况,用于应用程序(CPU端)与Vulkan命令提交到队列的完成情况(GPU端)的同步;vsg::FrameBlock提供了一种机制,用于同步等待新帧开始的线程;vsg::Barrier提供了一种同步多个线程的方法,一旦指定数量的线程加入Barrier,这些线程就会一起释放。

1.1 vsg::Semaphore

Semaphore::Semaphore(Device* device, VkPipelineStageFlags pipelineStageFlags, void* pNextCreateInfo) :_pipelineStageFlags(pipelineStageFlags),_device(device)
{VkSemaphoreCreateInfo semaphoreInfo = {};semaphoreInfo.sType = VK_STRUCTURE_TYPE_SEMAPHORE_CREATE_INFO;semaphoreInfo.pNext = pNextCreateInfo;VkResult result = vkCreateSemaphore(*device, &semaphoreInfo, _device->getAllocationCallbacks(), &_semaphore);if (result != VK_SUCCESS){throw Exception{"Error: Failed to create semaphore.", result};}
}

        vsg::Semaphore构造函数使用vkCreateSemaphore创建信号量VkSemaphore,信号量的创建与某一逻辑设备绑定,即信号量可用于GPU内部同一队列或同一逻辑设备的不同队列间的同步。

Semaphore::~Semaphore()
{if (_semaphore){vkDestroySemaphore(*_device, _semaphore, _device->getAllocationCallbacks());}
}

        vsg::Semaphore析构函数使用vkDestroySemaphore释放信号量VkSemaphore。

1.2 vsg::Fence

Fence::Fence(Device* device, VkFenceCreateFlags flags) :_device(device)
{VkFenceCreateInfo createFenceInfo = {};createFenceInfo.sType = VK_STRUCTURE_TYPE_FENCE_CREATE_INFO;createFenceInfo.flags = flags;createFenceInfo.pNext = nullptr;if (VkResult result = vkCreateFence(*device, &createFenceInfo, _device->getAllocationCallbacks(), &_vkFence); result != VK_SUCCESS){throw Exception{"Error: Failed to create Fence.", result};}
}

        vsg::Fence构造函数使用vkCreateFence创建VkFence,围栏的创建与某一逻辑设备绑定。

Fence::~Fence()
{if (_vkFence){vkDestroyFence(*_device, _vkFence, _device->getAllocationCallbacks());}
}

        vsg::Fence析构函数使用vkDestroyFence释放围栏。

VkResult Fence::wait(uint64_t timeout) const
{return vkWaitForFences(*_device, 1, &_vkFence, VK_TRUE, timeout);
}

        vsg::Fence在应用层(CPU端)的使用通过调用wait函数,其通过封装vkWaitForFences实现。

VkResult Fence::reset() const
{return vkResetFences(*_device, 1, &_vkFence);
}

     vsg::Fence在GPU端使用时,需重置为无信号状态,否则可能会导致应用层调用vkWaitForFences卡死。

1.3 vsg::FrameBlock

        vsg::FrameBlock提供了一种机制,用于同步等待新帧开始的线程。

        std::mutex _mutex;std::condition_variable _cv;ref_ptr<FrameStamp> _value;ref_ptr<ActivityStatus> _status;

        上述代码为vsg::FrameBlock的成员变量,其通过对std::mutex和std::condition_variable的封装实现了一种针对vsg::FrameStamp是否变化的阻塞能力,即同步所有等待新帧开始的线程,而变量_status(vsg::ActivityStatus类型)用于标记vsg::FrameBlock的阻塞能力是否有效。

        bool wait_for_change(ref_ptr<FrameStamp>& value){std::unique_lock lock(_mutex);while (_value == value && _status->active()){_cv.wait(lock);}value = _value;return _status->active();}

        通过调用wait_for_change接口,当传入的vsg::FrameStamp对象与已有的一致时,阻塞应用程序所在线程。

        void set(ref_ptr<FrameStamp> frameStamp){std::scoped_lock lock(_mutex);_value = frameStamp;_cv.notify_all();}

        当设置vsg::FrameStamp对象后,则通知所有阻塞线程解除阻塞。

1.4 vsg::Barrier

        vsg::Barrier提供了一种同步多个线程的方法,一旦指定数量的线程加入Barrier,这些线程就会一起释放。

        const uint32_t _num_threads;uint32_t _num_arrived;uint32_t _phase;std::mutex _mutex;std::condition_variable _cv;

        vsg::Barrier同样是封装std::mutex和std::condition_variable实现,辅以_num_theads(同步的线程数)和_num_arrived(到达的线程数)实现多线程同步。

        void arrive_and_wait(){std::unique_lock lock(_mutex);if (++_num_arrived == _num_threads){_release();}else{auto my_phase = _phase;_cv.wait(lock, [this, my_phase]() { return this->_phase != my_phase; });}}

        如上代码为arrive_and_wait函数实现,当到达的线程数与总线程数一致时,则释放所有线程,否则阻塞且记录当前阶段my_phase。

        void _release(){_num_arrived = 0;++_phase;_cv.notify_all();}

        释放所有线程的代码如上所示,同时更新当前的阶段(++_phase),将当前到达的线程数_num_arrived置为0。

        void arrive_and_drop(){std::unique_lock lock(_mutex);if (++_num_arrived == _num_threads){_release();}}

        arrive_and_drop会更新当前到达的线程数,同时判断,当到达线程数等于所有线程数时,则释放所有线程,但不阻塞当前线程。

2 多线程记录与提交

#if 1if (_threading)
#else// The following is a workaround for an odd "Possible data race during write of size 1" warning that valgrind tool=helgrind reports// on the first call to vkBeginCommandBuffer despite them being done on independent command buffers.  This could well be a driver bug or a false positive.// If you want to quieten this warning then change the #if above to #if 0 as rendering the first three frames single threaded avoids the warning.if (_threading && _frameStamp->frameCount > 2)
#endif{_frameBlock->set(_frameStamp);_submissionCompleted->arrive_and_wait();}else{for (auto& recordAndSubmitTask : recordAndSubmitTasks){recordAndSubmitTask->submit(_frameStamp);}}

        上述代码为Viewer.cpp中的821-838行,当标记_threading为true时,执行多线程提交。首先更新当前帧(上述代码第9行),接着等待提交的完成(上述代码第10行)。其中_frameBlock和_submissionComplete分别为vsg::FrameBlock和vsg::Barrier对象,其初始化在vsg::Viewer::setupThreading函数中完成。vsg::Viewer::setupThreading的执行可分为多线程同步变量初始化、创建多线程两部分。

    uint32_t numValidTasks = 0;for (const auto& task : recordAndSubmitTasks){if (!task->commandGraphs.empty()){++numValidTasks;}}// check if there is any point in setting up threadingif (numValidTasks == 0){return;}status->set(true);_threading = true;_frameBlock = FrameBlock::create(status);_submissionCompleted = Barrier::create(1 + numValidTasks);

        上述代码首先统计有效的提交任务数,接着创建vsg::FrameBlock和vsg::Barrier对象,其中vsg::Barrier对象_submissionCompleted传入的线程数为有效任务数+1,其中'+1'代表主线程,即主线程调用其arrive_and_wait方法并阻塞,当所有提交线程完成时,则释放主线程。

        创建多线程部分以vsg::RecordAndSubmitTask为粒度创建,vsg::RecordAndSubmitTask与vsg::CommandGraph和vsg::TransferTask关系如下:

        线程的创建分两种情况,当vsg::RecordAndSubmitTask对象中包含的CommandGraph数组数量为1且vsg::TransferTask对象为空时,则仅创建一个任务提交线程,否则需同时创建数据传输线程。

        if (task->commandGraphs.size() == 1 && !task->transferTask){// task only contains a single CommandGraph so keep thread simpleauto run = [](ref_ptr<RecordAndSubmitTask> viewer_task, ref_ptr<FrameBlock> viewer_frameBlock, ref_ptr<Barrier> submissionCompleted, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(viewer_task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = viewer_frameBlock->initial_value;// wait for this frame to be signaledwhile (viewer_frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer run", COLOR_RECORD);viewer_task->submit(frameStamp);submissionCompleted->arrive_and_drop();}};threads.emplace_back(run, task, _frameBlock, _submissionCompleted, make_string("Viewer run thread"));}

       上述代码为,当vsg::RecordAndSubmitTask对象中包含的CommandGraph数组数量为1且vsg::TransferTask对象为空时,仅创建一个提交线程,线程中主要调用vsg::RecordAndSubmitTask的submit方法执行提交任务。其中线程为std::thread。

else if (!task->commandGraphs.empty())
{// we have multiple CommandGraphs in a single Task so set up a thread per CommandGraphstruct SharedData : public Inherit<Object, SharedData>{SharedData(ref_ptr<RecordAndSubmitTask> in_task, ref_ptr<FrameBlock> in_frameBlock, ref_ptr<Barrier> in_submissionCompleted, uint32_t numThreads) :task(in_task),frameBlock(in_frameBlock),submissionCompletedBarrier(in_submissionCompleted){recordedCommandBuffers = RecordedCommandBuffers::create();recordStartBarrier = Barrier::create(numThreads);recordCompletedBarrier = Barrier::create(numThreads);}// shared between all threadsref_ptr<RecordAndSubmitTask> task;ref_ptr<FrameBlock> frameBlock;ref_ptr<Barrier> submissionCompletedBarrier;// shared between threads associated with each taskref_ptr<RecordedCommandBuffers> recordedCommandBuffers;ref_ptr<Barrier> recordStartBarrier;ref_ptr<Barrier> recordCompletedBarrier;};uint32_t numThreads = static_cast<uint32_t>(task->commandGraphs.size());if (task->transferTask) ++numThreads;ref_ptr<SharedData> sharedData = SharedData::create(task, _frameBlock, _submissionCompleted, numThreads);auto run_primary = [](ref_ptr<SharedData> data, ref_ptr<CommandGraph> commandGraph, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer primary", COLOR_RECORD);// primary thread starts the taskdata->task->start();data->recordStartBarrier->arrive_and_wait();//vsg::info("run_primary");commandGraph->record(data->recordedCommandBuffers, frameStamp, data->task->databasePager);data->recordCompletedBarrier->arrive_and_wait();// primary thread finishes the task, submitting all the command buffers recorded by the primary and all secondary threads to its queuedata->task->finish(data->recordedCommandBuffers);data->recordedCommandBuffers->clear();data->submissionCompletedBarrier->arrive_and_wait();}};auto run_secondary = [](ref_ptr<SharedData> data, ref_ptr<CommandGraph> commandGraph, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer secondary", COLOR_RECORD);data->recordStartBarrier->arrive_and_wait();commandGraph->record(data->recordedCommandBuffers, frameStamp, data->task->databasePager);data->recordCompletedBarrier->arrive_and_wait();}};auto run_transfer = [](ref_ptr<SharedData> data, ref_ptr<TransferTask> transferTask, TransferTask::TransferMask transferMask, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer transfer", COLOR_RECORD);data->recordStartBarrier->arrive_and_wait();//vsg::info("run_transfer");if (auto transfer = transferTask->transferData(transferMask); transfer.result == VK_SUCCESS){if (transfer.dataTransferredSemaphore){data->task->earlyDataTransferredSemaphore = transfer.dataTransferredSemaphore;}}data->recordCompletedBarrier->arrive_and_wait();}};for (uint32_t i = 0; i < task->commandGraphs.size(); ++i){if (i == 0)threads.emplace_back(run_primary, sharedData, task->commandGraphs[i], make_string("Viewer primary thread"));elsethreads.emplace_back(run_secondary, sharedData, task->commandGraphs[i], make_string("Viewer seconary thread ", i));}if (task->transferTask){threads.emplace_back(run_transfer, sharedData, task->transferTask, TransferTask::TRANSFER_BEFORE_RECORD_TRAVERSAL, make_string("Viewer early transferTask thread"));}
}

       其它情况创建的线程,需针对vsg::TransferTask对象创建传输线程,当存在多个CommandGraph时,创建的提交线程的方式需区分。线程使用std::thread,通过lambda表达式封装线程的执行函数。

                SharedData(ref_ptr<RecordAndSubmitTask> in_task, ref_ptr<FrameBlock> in_frameBlock, ref_ptr<Barrier> in_submissionCompleted, uint32_t numThreads) :task(in_task),frameBlock(in_frameBlock),submissionCompletedBarrier(in_submissionCompleted){recordedCommandBuffers = RecordedCommandBuffers::create();recordStartBarrier = Barrier::create(numThreads);recordCompletedBarrier = Barrier::create(numThreads);}

       上述代码为SharedData的构造函数,提交线程和数据传输线程的同步通过上述recordStartBarrier和recordCompletedBarrier两个vsg::Barrier对象实现。

            auto run_primary = [](ref_ptr<SharedData> data, ref_ptr<CommandGraph> commandGraph, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer primary", COLOR_RECORD);// primary thread starts the taskdata->task->start();data->recordStartBarrier->arrive_and_wait();//vsg::info("run_primary");commandGraph->record(data->recordedCommandBuffers, frameStamp, data->task->databasePager);data->recordCompletedBarrier->arrive_and_wait();// primary thread finishes the task, submitting all the command buffers recorded by the primary and all secondary threads to its queuedata->task->finish(data->recordedCommandBuffers);data->recordedCommandBuffers->clear();data->submissionCompletedBarrier->arrive_and_wait();}};auto run_secondary = [](ref_ptr<SharedData> data, ref_ptr<CommandGraph> commandGraph, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer secondary", COLOR_RECORD);data->recordStartBarrier->arrive_and_wait();commandGraph->record(data->recordedCommandBuffers, frameStamp, data->task->databasePager);data->recordCompletedBarrier->arrive_and_wait();}};auto run_transfer = [](ref_ptr<SharedData> data, ref_ptr<TransferTask> transferTask, TransferTask::TransferMask transferMask, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer transfer", COLOR_RECORD);data->recordStartBarrier->arrive_and_wait();//vsg::info("run_transfer");if (auto transfer = transferTask->transferData(transferMask); transfer.result == VK_SUCCESS){if (transfer.dataTransferredSemaphore){data->task->earlyDataTransferredSemaphore = transfer.dataTransferredSemaphore;}}data->recordCompletedBarrier->arrive_and_wait();}};

       如上代码为三个lambda函数,run_primary、run_secondary、run_transfer,分别对应第一个提交线程、其它提交线程、数据传输线程的执行函数。其将主体执行内容放置在recordStartBarrier->arrive_and_wait() 和 recordCompletedBarrier->arrive_and_wait()之间,实现线程间的同步,采用如下的模式实现线程和主线程的同步:

                while (data->frameBlock->wait_for_change(frameStamp)){//执行内容data->submissionCompletedBarrier->arrive_and_wait();}

       vulkanscenegraph显示倾斜模型(6.2)-记录与提交-CSDN博客中将任务提交的具体实现分为开始、recordTraversal前的数据传输、record、完成四个部分,而run_primary独自负责任务的开始、recordTraversal前的数据传输、完成三个部分,run_primary与run_secondary共同负责record部分。通过run_primary函数的实现可看出,当前帧所有数据传输完成、命令录制完成,最后调用finish方法提交任务到队列。

文末:本章深入分析了帧循环中多线程下的记录与提交,首先深入剖析了vsg中与多线程同步相关的封装:vsg::Semaphore、vsg::Fence、vsg::FrameBlock、vsg::Barrier,接着进一步分析了记录与提交过程中的多线程机制。下章将分析vsg::DatabasePager在更新场景树过程中的作用。

待分析项:vsg::DatabasePager在更新场景树过程中的作用。

http://www.xdnf.cn/news/3807.html

相关文章:

  • Dalvik虚拟机和ART虚拟机
  • ART 下 Dex 加载流程源码分析 和 通用脱壳点
  • 【ArcGIS微课1000例】0145:如何按照自定义形状裁剪数据框?
  • 学习黑客Linux权限
  • 【中间件】brpc_基础_用户态线程中断
  • LeetCode每日一题5.4
  • 架构思维:利用全量缓存架构构建毫秒级的读服务
  • 2001-2023年 上市公司-企业广告支出数据-社科数据
  • 使用宝塔面板、青龙面板实现定时推送功能
  • 【数据结构】稀疏矩阵的快速转置
  • 单细胞测序数据分析试验设计赏析(二)
  • Android 输入控件事件使用示例
  • 信息系统监理师第二版教材模拟题第一组(含解析)
  • HTML学习笔记(7)
  • PostgreSQL 的 ANALYZE 命令
  • PostgreSQL 查看索引碎片的方法
  • 论文阅读笔记——STDArm
  • PostgreSQL 判断索引是否重建过的方法
  • 4电池_基于开关电容的均衡
  • Ubuntu 系统上广受好评的浏览器推荐
  • 蘑菇管理——AI与思维模型【94】
  • 【翻译、转载】使用 LLM 构建 MCP
  • 【五一培训】Day 3
  • 机器学习+多目标优化的算法如何设计?
  • AI跑得快,MCP来加速——模型计算平台在训练与推理中的硬核作用
  • 位图的实现和拓展
  • P1603 斯诺登密码详解
  • 【项目篇之统一内存操作】仿照RabbitMQ模拟实现消息队列
  • Android运行时ART加载类和方法的过程分析
  • Python-Django系列—视图