AimRT从入门到精通 - 02执行器Executor
执行器是AimRT六大模块之一,其主要作用是当我们向其传递任务的时候,可以对任务进行执行;
详细的关于AimRT的操作可以参考下面的官方文档:
Executor — AimRT v0.10.0 documentation
AimRT中关于执行器的种类有很多,但是大致是基于普通的执行器executor、基于协程的执行器executor co这两种执行器进行开发和改进的;
一、线程执行器
接下来我们还是实现一个示例代码,代码的结构如下所示:
executor_normal
├── build.sh
├── cmake
│ └── GetAimRT.cmake
├── CMakeLists.txt
└── src├── app│ └── executor_normal_app├── CMakeLists.txt├── install│ ├── cfg│ │ └── executor_normal.yaml│ └── start_executor_normal.sh├── moudle│ └── executor_normal_moudle│ ├── CMakeLists.txt│ ├── executor_normal.cc│ └── executor_normal.h└── pkg└── executor_normal_pkg├── CMakeLists.txt└── pkg_main.cc
接下来我们依次实现上面的文件;
1. 主项目的CMakeLists.txt
cmake_minimum_required(VERSION 3.24) # 设置cmake的版本project(helloworld LANGUAGES C CXX) # 设置项目名并支持C/C++set(CMAKE_CXX_STANDARD 20) # 设置C++的编译版本
set(CMAKE_CXX_STANDARD_REQUIRED ON) # 编译器必须支持C++20,否则报错
set(CMAKE_CXX_EXTENSIONS OFF) # 禁用编译器特有的扩展(如GNU的 -std=gnu++20)set(CMAKE_POLICY_DEFAULT_CMP0077 NEW) # 将策略 CMP0077 的默认行为设置为 NEW。include(cmake/GetAimRT.cmake) # 调用AimRTadd_subdirectory(src) # 将src目录下的CMakeLists.txt也加入构建系统中
其结构跟我们上一节的Helloworld实现的结构类似;
2. /cmake/GetAimRT.cmake
include(FetchContent) # 引入 FetchContent 模块,提供从远程仓库(如 Git)下载和管理依赖项的功能。# 声明依赖项信息
FetchContent_Declare(aimrt # 依赖项的名称GIT_REPOSITORY https://github.com/AimRT/aimrt.git # 依赖项的github的URL地址GIT_TAG v1.x.x) # 依赖项的版本# 检测依赖项是否被下载
FetchContent_GetProperties(aimrt)
# 如果依赖项被下载,此时会生成变量 aimrt_POPULATED用来标记if(NOT aimrt_POPULATED)FetchContent_MakeAvailable(aimrt) # 实际执行下载操作
endif()
用于获取AimRT;
3. /src/CMakeLists.txt
add_subdirectory(module/executor_normal_moudle)
add_subdirectory(pkg/executor_normal_pkg)
这里我们引用 src 下的各个子目录;
4. /src/module/executor_normal_moudle/CMakeLists.txt
file(GLOB_RECURSE src ${CMAKE_CURRENT_SOURCE_DIR}/*.cc)add_library(executor_normal_moudle STATIC)
add_library(executor_normal::executor_normal_moudle ALIAS executor_normal_moudle)target_sources(executor_normal_moudle PRIVATE ${src})target_include_directories(executor_normal_moudlePUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/..)# 引用aimrt_module_cpp_interface
target_link_libraries(executor_normal_moudlePRIVATE yaml-cpp::yaml-cppPUBLIC aimrt::interface::aimrt_module_cpp_interface)
这里我们实际上是将moudle模块块编译成了对应的静态库;
5. /src/pkg/executor_normal_pkg/CMakeLists.txt
# 获取当前文件夹名
string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR})# 生成命名空间相关变量
get_namespace(CUR_SUPERIOR_NAMESPACE)
string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE})# 设置目标名称和别名
set(CUR_TARGET_NAME ${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR})
set(CUR_TARGET_ALIAS_NAME ${CUR_SUPERIOR_NAMESPACE}::${CUR_DIR})file(GLOB_RECURSE src ${CMAKE_CURRENT_SOURCE_DIR}/*.cc)# 创建共享库并带别名
add_library(${CUR_TARGET_NAME} SHARED)
add_library(${CUR_TARGET_ALIAS_NAME} ALIAS ${CUR_TARGET_NAME})# 添加源文件到目标
target_sources(${CUR_TARGET_NAME} PRIVATE ${src})# 设置依赖库
target_link_libraries(${CUR_TARGET_NAME}PRIVATE aimrt::interface::aimrt_pkg_c_interface${CUR_SUPERIOR_NAMESPACE}::executor_module
)# 输出的库的名称为目录名
set_target_properties(${CUR_TARGET_NAME} PROPERTIES OUTPUT_NAME ${CUR_DIR})
6./src/module/executor_normal_moudle/executor_normal.h
#pragma once#include <atomic>#include "aimrt_module_cpp_interface/module_base.h"namespace aimrt::cpp::executor::executor_module {class ExecutorModule : public aimrt::ModuleBase {public:ExecutorModule() = default;~ExecutorModule() override = default;ModuleInfo Info() const override {return ModuleInfo{.name = "ExecutorModule"};}bool Initialize(aimrt::CoreRef aimrt_ptr) override;bool Start() override;void Shutdown() override;private:auto GetLogger() { return core_.GetLogger(); }void SimpleExecuteDemo();void ThreadSafeDemo();void TimeScheduleDemo();private:aimrt::CoreRef core_;aimrt::executor::ExecutorRef work_executor_;aimrt::executor::ExecutorRef thread_safe_executor_;std::atomic_bool run_flag_ = true;uint32_t loop_count_ = 0;aimrt::executor::ExecutorRef time_schedule_executor_;
};} // namespace aimrt::examples::cpp::executor::executor_module
这里模块的整体实现可以看到,也是和上节的HelloWorld类似,包含获取模块信息、初始化、开始和结束四个部分;
其中这里在这个执行器moudle中,共有三种类型的执行器,分别是:
- 普通的线程执行器;
- 线程安全执行器;
- 支持定时任务的执行器;
除此之外,也在这个moudle中提供了这3种执行器的调用函数,和一个回去模块的logger的接口;
7. /src/module/executor_normal_moudle/executor_module.cc
接下来我们编写这个库的头文件实现的代码:
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.#include "executor_module/executor_module.h"#include "yaml-cpp/yaml.h"namespace aimrt::cpp::executor::executor_module {bool ExecutorModule::Initialize(aimrt::CoreRef core) {// Save aimrt framework handlecore_ = core;// Get executorwork_executor_ = core_.GetExecutorManager().GetExecutor("work_executor");AIMRT_CHECK_ERROR_THROW(work_executor_, "Can not get work_executor");// Get thread safe executorthread_safe_executor_ = core_.GetExecutorManager().GetExecutor("thread_safe_executor");AIMRT_CHECK_ERROR_THROW(thread_safe_executor_ && thread_safe_executor_.ThreadSafe(),"Can not get thread_safe_executor");// Get time schedule executortime_schedule_executor_ = core_.GetExecutorManager().GetExecutor("time_schedule_executor");AIMRT_CHECK_ERROR_THROW(time_schedule_executor_ && time_schedule_executor_.SupportTimerSchedule(),"Can not get time_schedule_executor");AIMRT_INFO("Init succeeded.");return true;
}bool ExecutorModule::Start() {// Test simple executeSimpleExecuteDemo();// Test thread safe executeThreadSafeDemo();// Test time schedule executeTimeScheduleDemo();AIMRT_INFO("Start succeeded.");return true;
}void ExecutorModule::Shutdown() {run_flag_ = false;std::this_thread::sleep_for(std::chrono::seconds(1));AIMRT_INFO("Shutdown succeeded.");
}void ExecutorModule::SimpleExecuteDemo() {work_executor_.Execute([this]() {AIMRT_INFO("This is a simple task");});
}void ExecutorModule::ThreadSafeDemo() {uint32_t n = 0;for (uint32_t ii = 0; ii < 10000; ++ii) {thread_safe_executor_.Execute([&n]() {n++;});}std::this_thread::sleep_for(std::chrono::seconds(1));AIMRT_INFO("Value of n is {}", n);
}void ExecutorModule::TimeScheduleDemo() {if (!run_flag_) return;AIMRT_INFO("Loop count : {}", loop_count_++);time_schedule_executor_.ExecuteAfter(std::chrono::seconds(1),std::bind(&ExecutorModule::TimeScheduleDemo, this));
}} // namespace aimrt::examples::cpp::executor::executor_module
这里我们看一下整体的代码框架结构:
- #include "yaml-cpp/yaml.h"可以用来接下来我们定义的配置文件进行解析;
bool ExecutorModule::Initialize(aimrt::CoreRef core) {// Save aimrt framework handlecore_ = core;// Get executorwork_executor_ = core_.GetExecutorManager().GetExecutor("work_executor");AIMRT_CHECK_ERROR_THROW(work_executor_, "Can not get work_executor");// Get thread safe executorthread_safe_executor_ = core_.GetExecutorManager().GetExecutor("thread_safe_executor");AIMRT_CHECK_ERROR_THROW(thread_safe_executor_ && thread_safe_executor_.ThreadSafe(),"Can not get thread_safe_executor");// Get time schedule executortime_schedule_executor_ = core_.GetExecutorManager().GetExecutor("time_schedule_executor");AIMRT_CHECK_ERROR_THROW(time_schedule_executor_ && time_schedule_executor_.SupportTimerSchedule(),"Can not get time_schedule_executor");AIMRT_INFO("Init succeeded.");return true;
}
初始化函数中,依次是获取3个类型的执行器;
我们还记得CoreRef这个核心句柄提供了6个模块的管理:
namespace aimrt {class CoreRef {public:ModuleInfo Info() const;configurator::ConfiguratorRef GetConfigurator() const;executor::ExecutorManagerRef GetExecutorManager() const;logger::LoggerRef GetLogger() const;rpc::RpcHandleRef GetRpcHandle() const;channel::ChannelHandleRef GetChannelHandle() const;parameter::ParameterHandleRef GetParameterHandle() const;
};} // namespace aimrt
所以这里我们先通过core. GetExecutorManager()获得执行器模块的管理句柄;
namespace aimrt::executor {class ExecutorManagerRef {public:ExecutorRef GetExecutor(std::string_view executor_name) const;
};} // namespace aimrt::executor
通过执行器管理模块,我们就可以获取到执行器的句柄,但是这里我们需要传入执行器的名字;
补充知识点:std::string_view是什么类型?有什么作用?
不可变的字符串视图
- std::string_view 是 C++17 引入的轻量级类型,表示对现有字符串数据的只读视图。它不拥有数据,仅通过指针和长度引用其他字符串(如 std::string、C 风格字符串、字符数组等)。
内存高效
- 它仅包含两个成员:一个指向字符串起始位置的指针和一个表示长度的整数。因此,它的拷贝和传递成本极低(几乎是0拷贝)。
兼容性
- 可以隐式构造自多种字符串类型(如 std::string、const char*、字面量等),无需额外转换。
问题:这里为什么用std::string_view?
这是因为AimRT整体的框架实际上是支持C++20的,支持的语法是很新颖的,这里如果我们传入的是 const string& 类型,如果再传入C 风格字符串或子字符串时,可能触发临时 std::string 的构造和内存分配。
并且,string_view支持向string/char*的隐式类型转换,不用再向我们之前的string.c_str()这样的接口函数,自己进行转换,且这里的执行器的名字我们并不会对其进行修改,所以使用std::string_view这样的格式效率更高;
namespace aimrt::executor {class ExecutorRef {public:std::string_view Type() const;std::string_view Name() const;bool ThreadSafe() const;bool IsInCurrentExecutor() const;bool SupportTimerSchedule() const;void Execute(Task&& task) const;std::chrono::system_clock::time_point Now() const;void ExecuteAt(std::chrono::system_clock::time_point tp, Task&& task) const;void ExecuteAfter(std::chrono::nanoseconds dt, Task&& task) const;
};} // namespace aimrt::executor
而当我们获取到执行器的句柄的时候,我们就可以对执行器进行上面的操作;
其中,比较重要的是Execute这个成员函数,因为执行器就是为了要执行对应的任务,所以这里我们传入task,然后执行器进行执行;
这时候我们再回到上面的初始化函数,我们就会很容易的看懂:分别定义获取三个执行器,然后传入执行器的名字;
然后AIMRT_CHECK_ERROR_THROW是aimrt内部定义的宏函数,检查执行器是否获取成功,如果获取失败则会抛出对应的错误;
接下来就是执行器对应的start代码:
bool ExecutorModule::Start() {// Test simple executeSimpleExecuteDemo();// Test thread safe executeThreadSafeDemo();// Test time schedule executeTimeScheduleDemo();AIMRT_INFO("Start succeeded.");return true;
}
这里比较好容易理解,执行器模块开始也就是执行器到执行对应的工作;
三个执行器具体执行的在任务下面有实现;
void ExecutorModule::Shutdown() {run_flag_ = false;std::this_thread::sleep_for(std::chrono::seconds(1));AIMRT_INFO("Shutdown succeeded.");
}
上面的是关闭模块,这里我们主要的将运行的标记变量run_flg设置为false;
然后再1s后打印语句;
void ExecutorModule::SimpleExecuteDemo() {work_executor_.Execute([this]() {AIMRT_INFO("This is a simple task");});
}
这里是向其中传递一个对象,这个对象实际上可以是函数指针、仿函数、或者是lambda;
而上面传递的也就是一个lambda表达式;
`work_executor`:投递一个简单的任务到其中执行,这里实际上也就是打印一条INFO语句;
需要注意的是,这里的[this]捕捉列表,捕捉了当前的this指针,允许在 lambda 内部访问 ExecutorModule
的成员(虽然此处未用到)
void ExecutorModule::ThreadSafeDemo() {uint32_t n = 0;for (uint32_t ii = 0; ii < 10000; ++ii) {thread_safe_executor_.Execute([&n]() {n++;});}std::this_thread::sleep_for(std::chrono::seconds(1));AIMRT_INFO("Value of n is {}", n);
}
上面的操作实际上就是让线程执行10000次的任务,每次执行的任务都是对n进行++操作;
最终打印的结束也就是10000,因为这里是单个线程执行10000次,但是如果多个线程一共执行10000次,此时就需要保证n是原子的;
void ExecutorModule::TimeScheduleDemo() {if (!run_flag_) return;AIMRT_INFO("Loop count : {}", loop_count_++);time_schedule_executor_.ExecuteAfter(std::chrono::seconds(1),std::bind(&ExecutorModule::TimeScheduleDemo, this));
}
接下来我们分析的是定时执行任务的执行器,这里是每隔1s执行一次对应的任务;
因为这个执行器是一直循环执行的,所以当我们把模块进行关闭的时候,这个执行器才会关闭,而当模块关闭的话,此时标志位run_flag_会被设置为false,函数也就直接返回结束;
下面的AIMRT_INFO会记录我们的循环次数;
然后这里面调用的是ExecuteAfter这个函数,实际上也就是在1s之后,执行对应的task;
我们可以看一下官方的ExecuteAfter函数解释:
void ExecuteAfter(std::chrono::nanoseconds dt, Task&& task)
:在某个时间后执行一个任务。
-
第一个参数-时间段,以本执行器的时间体系为准。
-
可将第二个参数
Task
简单的视为一个满足std::function<void()>
签名的任务闭包。 -
如果本执行器不支持按时间调度,则调用此接口时会抛出一个异常。
-
此接口可以在 Initialize/Start 阶段调用,但执行器在 Start 阶段后才能保证开始执行,因此在 Start 阶段之前调用此接口,有可能只能将任务投递到执行器的任务队列中而暂时不执行,等到 Start 之后才开始执行任务
std::bind
:将成员函数 TimeScheduleDemo
绑定到当前对象(this
),确保回调时能正确访问成员变量(如 run_flag_
和 loop_count_
)。
8. /src/install/cfg/executor_normal_cfg.yaml
接下来我们编写一个简单的执行器的配置文件:
aimrt:executor: # 执行器配置executors: # 当前先支持thread型,未来可根据加载的网络模块提供更多类型- name: thread_safe_executor # 安全线程池type: asio_thread options:thread_num: 3 # 线程数,不指定则默认单线程
aimrt:log:core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Offbackends:- type: consoleexecutor:executors:- name: work_executortype: asio_threadoptions:thread_num: 2- name: thread_safe_executortype: asio_strandoptions:bind_asio_thread_executor_name: work_executor- name: time_schedule_executortype: asio_threadoptions:thread_num: 2
具体的关于执行器的配置文件可以参考下面的链接:
AimRT 中的基本概念 — AimRT v0.10.0 documentation
这里我们分别对三个执行器进行配置:work_executor, thread_safe_executor, time_schedule_executor;
其中,这里的执行器的type有多种,这里只介绍以下几种:
- simple_thread 执行器
simple_thread是一种简单的单线程执行器,不支持定时调度。
- asio_thread 执行器
asio_thread执行器是一种基于Asio库实现的执行器,是一种线程池,可以手动设置线程数,此外它还支持定时调度;
这里我们需要注意的是asio_thread这个执行器不能保证线程安全,如果要保证线程安全,都是就需要调用下面的执行器;
- asio_strand 执行器
asio_strand执行器是一种依附于asio_thread执行器的伪执行器,基于 Asio 库的 strand 实现。它不能独立存在,并不拥有实际的线程,它在运行过程中会将任务交给绑定的asio_thread执行器来实际执行。但是它保证线程安全,也支持定时调度。
其中,当我们使用asio_strand这个执行器的时候,必须绑定到asio_thread,这个配置是必填项;
这里我们指定thread_safe_executor的线程池中的线程数量为3;
接下来我们进行编写生成app可执行程序的代码,这里采用的是注册模块的方式;
9./src/app/executor_normal/main.cc
#include <csignal>
#include <iostream>#include "core/aimrt_core.h"
#include "executor_normal_moudle/executor_normal.h"using namespace aimrt::runtime::core;
using namespace aimrt::cpp::executor::executor_module;AimRTCore* global_core_ptr = nullptr;void SignalHandler(int sig) {if (global_core_ptr && (sig == SIGINT || sig == SIGTERM)) {global_core_ptr->Shutdown();return;}raise(sig);
};int32_t main(int32_t argc, char** argv) {signal(SIGINT, SignalHandler);signal(SIGTERM, SignalHandler);std::cout << "AimRT start." << std::endl;try {AimRTCore core;global_core_ptr = &core;// register moduleExecutorModule executor_module;core.GetModuleManager().RegisterModule(executor_module.NativeHandle());AimRTCore::Options options;if (argc > 1) options.cfg_file_path = argv[1];core.Initialize(options);core.Start();core.Shutdown();global_core_ptr = nullptr;} catch (const std::exception& e) {std::cout << "AimRT run with exception and exit. " << e.what()<< std::endl;return -1;}std::cout << "AimRT exit." << std::endl;return 0;
}
其实这里我们可以看到,大致和之前实现的Helloworld这个功能代码的结构是一样的;
分别初始化Aimrt、运行Aimrt和shutdown这三个模块,然后采用信号捕捉的方式对框架进行关闭,运行的结果如下所示:
可以看到:当我们在配置文件中调用线程安全执行器的时候,此时可以实现正常运行获得n = 10000;
除此之外,定时执行器可正常运行;
截止到这里,我们对执行器就有了一个简单的了解;
后续会出关于协程接口的执行器的相关操作;