小智源码分析——音频部分(二)
一、利用创建好的对象来调用音频服务
上周从上图的getaudiocode()方法进去感受了一下底层小智的构造如何实现。所以用一个codec来接收我们所构造的音频对象。
下来是用构造好的音频对象来调用音频初始化服务Initialize,因为启动函数Application函数的类中有audio_servicez_所以可以进行调用。
这段初始化代码的核心作用是:
1绑定并启动音频编解码器
2配置音频数据流的格式和处理流程
3按需初始化音频处理器和唤醒词检测模块
4设置好各类回调,保证音频事件能及时通知到主程序
5创建定时器,自动管理音频硬件电源
void AudioService::Initialize(AudioCodec* codec) {// 保存传入的音频编解码器指针codec_ = codec;// 启动音频编解码器,准备采集和播放codec_->Start();/* 初始化 Opus 解码器和编码器 */// 创建 Opus 解码器,采样率与输出一致,单声道,帧长为 OPUS_FRAME_DURATION_MSopus_decoder_ = std::make_unique<OpusDecoderWrapper>(codec->output_sample_rate(), 1, OPUS_FRAME_DURATION_MS);// 创建 Opus 编码器,采样率固定为 16kHz,单声道,帧长为 OPUS_FRAME_DURATION_MSopus_encoder_ = std::make_unique<OpusEncoderWrapper>(16000, 1, OPUS_FRAME_DURATION_MS);// 设置编码复杂度为最低,节省算力opus_encoder_->SetComplexity(0);// 如果输入采样率不是 16kHz,则配置重采样器,将输入音频转换为 16kHzif (codec->input_sample_rate() != 16000) {input_resampler_.Configure(codec->input_sample_rate(), 16000);reference_resampler_.Configure(codec->input_sample_rate(), 16000);}// 根据编译配置选择不同的音频处理器(如带有回声消除的AFE,或无处理的空实现)
#if CONFIG_USE_AUDIO_PROCESSORaudio_processor_ = std::make_unique<AfeAudioProcessor>();
#elseaudio_processor_ = std::make_unique<NoAudioProcessor>();
#endif// 根据编译配置选择不同的唤醒词检测算法
#if CONFIG_USE_AFE_WAKE_WORDwake_word_ = std::make_unique<AfeWakeWord>();
#elif CONFIG_USE_ESP_WAKE_WORDwake_word_ = std::make_unique<EspWakeWord>();
#elif CONFIG_USE_CUSTOM_WAKE_WORDwake_word_ = std::make_unique<CustomWakeWord>();
#elsewake_word_ = nullptr;
#endif// 设置音频处理器的输出回调,当有处理好的音频输出时,推入编码队列audio_processor_->OnOutput([this](std::vector<int16_t>&& data) {PushTaskToEncodeQueue(kAudioTaskTypeEncodeToSendQueue, std::move(data));});// 设置语音活动检测(VAD)回调,检测到说话状态变化时,更新状态并通知外部audio_processor_->OnVadStateChange([this](bool speaking) {voice_detected_ = speaking;if (callbacks_.on_vad_change) {callbacks_.on_vad_change(speaking);}});// 如果启用了唤醒词检测,设置唤醒词检测回调,检测到唤醒词时通知外部if (wake_word_) {wake_word_->OnWakeWordDetected([this](const std::string& wake_word) {if (callbacks_.on_wake_word_detected) {callbacks_.on_wake_word_detected(wake_word);}});}// 创建音频电源管理定时器,定期检查音频输入/输出是否需要关闭以省电esp_timer_create_args_t audio_power_timer_args = {.callback = [](void* arg) {AudioService* audio_service = (AudioService*)arg;audio_service->CheckAndUpdateAudioPowerState();},.arg = this,.dispatch_method = ESP_TIMER_TASK,.name = "audio_power_timer",.skip_unhandled_events = true,};esp_timer_create(&audio_power_timer_args, &audio_power_timer_);
}
二、启动音频服务
经过上部分的初始化,配置好了音频的编解码器,以及处理时对于音频的要求(不符合要求的要重新采样为符合要求的格式),还包括唤醒词的检测、提取和回调。
启动流程(Start)
1标记服务未停止
service_stopped_ = false;
让各任务知道服务正在运行。
2清除音频相关事件位
xEventGroupClearBits(...)
确保音频输入、唤醒词、音频处理等任务可以正常启动。
3启动音频电源管理定时器
esp_timer_start_periodic(...)
每秒检查一次音频硬件的电源状态,自动省电。
4启动音频输入任务
xTaskCreatePinnedToCore 或 xTaskCreate
创建音频采集任务,负责从麦克风采集音频数据。
5启动音频输出任务
xTaskCreate
创建音频播放任务,负责将音频数据输出到扬声器。
6启动 Opus 编解码任务
xTaskCreate
创建音频编解码任务,负责音频数据的编码(发送)和解码(播放)。
void AudioService::Start() {// 标记服务未停止service_stopped_ = false;// 清除音频相关的事件位,确保任务可以正常启动xEventGroupClearBits(event_group_, AS_EVENT_AUDIO_TESTING_RUNNING | AS_EVENT_WAKE_WORD_RUNNING | AS_EVENT_AUDIO_PROCESSOR_RUNNING);// 启动音频电源管理定时器,每秒检查一次音频硬件电源状态esp_timer_start_periodic(audio_power_timer_, 1000000);/* 启动音频输入任务 */
#if CONFIG_USE_AUDIO_PROCESSOR// 如果使用音频处理器,任务绑定到指定内核xTaskCreatePinnedToCore([](void* arg) {AudioService* audio_service = (AudioService*)arg;audio_service->AudioInputTask();vTaskDelete(NULL);}, "audio_input", 2048 * 3, this, 8, &audio_input_task_handle_, 1);
#else// 不使用音频处理器,普通方式创建任务xTaskCreate([](void* arg) {AudioService* audio_service = (AudioService*)arg;audio_service->AudioInputTask();vTaskDelete(NULL);}, "audio_input", 2048 * 3, this, 8, &audio_input_task_handle_);
#endif/* 启动音频输出任务 */xTaskCreate([](void* arg) {AudioService* audio_service = (AudioService*)arg;audio_service->AudioOutputTask();vTaskDelete(NULL);}, "audio_output", 4096, this, 3, &audio_output_task_handle_);/* 启动 Opus 编解码任务 */xTaskCreate([](void* arg) {AudioService* audio_service = (AudioService*)arg;audio_service->OpusCodecTask();vTaskDelete(NULL);}, "opus_codec", 4096 * 7, this, 2, &opus_codec_task_handle_);
}
三、音频回调服务
下来回到Application函数内,下一步执行下图这一模块:
首先定义一个callbacks对象,他的类型如下:
AudioServiceCallbacks 是一个回调函数集合,用于让外部(比如主应用 Application)能够“订阅”音频服务(AudioService)中的各种事件。当音频服务内部发生特定事件时,会自动调用这些回调,通知外部进行相应处理。
struct AudioServiceCallbacks {std::function<void(void)> on_send_queue_available;std::function<void(const std::string&)> on_wake_word_detected;std::function<void(bool)> on_vad_change;std::function<void(void)> on_audio_testing_queue_full; };
让主程序通过事件组机制,能够及时响应音频服务中的关键事件,实现音频事件的异步通知和处理。
每个成员的含义
- on_send_queue_available
类型:std::function<void(void)>
说明:当音频发送队列有可用数据时触发。比如可以通知主程序“可以发送音频数据到服务器了”。
- on_wake_word_detected
类型:std::function<void(const std::string&)>
说明:当检测到唤醒词(如“小智”)时触发。参数是检测到的唤醒词内容。
- on_vad_change
类型:std::function<void(bool)>
说明:当语音活动检测(VAD)状态发生变化时触发。参数 bool 表示当前是否有人在说话(true=正在说话,false=静音)。
- on_audio_testing_queue_full
类型:std::function<void(void)>
说明:当音频测试队列已满时触发。一般用于调试或测试场景。
异步和函数回调的区别?
方面 | 异步执行 | 自动回调 |
---|---|---|
是否并发 | 是,任务后台运行 | 不一定,回调是响应机制 |
主体是谁 | 程序发起的异步任务 | 异步任务完成后执行的函数 |
控制权 | 主程序不阻塞,控制权立即返回 | 控制权在回调被触发时才回到你手里 |
是否依赖异步 | 异步通常搭配回调使用 | 回调常用在异步任务,但也可用于同步场景 |
举个例子 | setTimeout() 不会阻塞主线程 | setTimeout(fn, 1000) 中的 fn 是回调 |
四、音频服务具体功能
分别了解下列三个核心任务函数:
- AudioInputTask():音频采集
- AudioOutputTask():音频播放
- OpusCodecTask():音频编解码
// 音频输入任务,运行在一个 FreeRTOS 任务中
void AudioService::AudioInputTask() {while (true) {// 等待音频相关事件触发:测试模式、唤醒词检测、通用音频处理EventBits_t bits = xEventGroupWaitBits(event_group_,AS_EVENT_AUDIO_TESTING_RUNNING |AS_EVENT_WAKE_WORD_RUNNING |AS_EVENT_AUDIO_PROCESSOR_RUNNING,pdFALSE, // 不清除标志位pdFALSE, // 任意一个事件即可返回portMAX_DELAY // 无限等待);// 如果服务已经停止,则退出任务if (service_stopped_) {break;}// 若麦克风需要预热,延迟一段时间后继续下一轮循环if (audio_input_need_warmup_) {audio_input_need_warmup_ = false;vTaskDelay(pdMS_TO_TICKS(120)); // 延迟 120mscontinue;}/** ==========================* 音频测试处理逻辑(如按下 BOOT 录音)* ========================== */if (bits & AS_EVENT_AUDIO_TESTING_RUNNING) {// 判断测试队列是否已满(按最大时长判断)if (audio_testing_queue_.size() >= AUDIO_TESTING_MAX_DURATION_MS / OPUS_FRAME_DURATION_MS) {ESP_LOGW(TAG, "Audio testing queue is full, stopping audio testing");EnableAudioTesting(false); // 自动关闭测试continue;}// 准备读取一帧音频数据(例如 20ms × 16000Hz)std::vector<int16_t> data;int samples = OPUS_FRAME_DURATION_MS * 16000 / 1000;// 如果成功读取音频数据if (ReadAudioData(data, 16000, samples)) {// 若为双声道,仅保留左声道数据(变为单声道)if (codec_->input_channels() == 2) {auto mono_data = std::vector<int16_t>(data.size() / 2);for (size_t i = 0, j = 0; i < mono_data.size(); ++i, j += 2) {mono_data[i] = data[j];}data = std::move(mono_data);}// 推送数据到测试编码队列PushTaskToEncodeQueue(kAudioTaskTypeEncodeToTestingQueue, std::move(data));continue; // 当前处理完毕,回到等待下一次事件}}/** ==========================* 唤醒词检测处理逻辑* ========================== */if (bits & AS_EVENT_WAKE_WORD_RUNNING) {std::vector<int16_t> data;int samples = wake_word_->GetFeedSize(); // 获取所需帧长度// 若帧长度有效且成功读取数据if (samples > 0 && ReadAudioData(data, 16000, samples)) {wake_word_->Feed(data); // 投喂唤醒词检测器continue;}}/** ==========================* 通用音频处理逻辑* ========================== */if (bits & AS_EVENT_AUDIO_PROCESSOR_RUNNING) {std::vector<int16_t> data;int samples = audio_processor_->GetFeedSize(); // 获取处理器需要的数据大小// 若帧有效且数据读取成功if (samples > 0 && ReadAudioData(data, 16000, samples)) {audio_processor_->Feed(std::move(data)); // 投喂音频处理器continue;}}// 如果没有任何已知事件被处理到,这通常是逻辑错误ESP_LOGE(TAG, "Should not be here, bits: %lx", bits);break; // 退出任务}// 最后,任务退出时打印警告日志ESP_LOGW(TAG, "Audio input task stopped");
}
void AudioService::AudioInputTask() {while (true) {EventBits_t bits = xEventGroupWaitBits(event_group_, AS_EVENT_AUDIO_TESTING_RUNNING |AS_EVENT_WAKE_WORD_RUNNING | AS_EVENT_AUDIO_PROCESSOR_RUNNING,pdFALSE, pdFALSE, portMAX_DELAY);if (service_stopped_) {break;}if (audio_input_need_warmup_) {audio_input_need_warmup_ = false;vTaskDelay(pdMS_TO_TICKS(120));continue;}/* Used for audio testing in NetworkConfiguring mode by clicking the BOOT button */if (bits & AS_EVENT_AUDIO_TESTING_RUNNING) {if (audio_testing_queue_.size() >= AUDIO_TESTING_MAX_DURATION_MS / OPUS_FRAME_DURATION_MS) {ESP_LOGW(TAG, "Audio testing queue is full, stopping audio testing");EnableAudioTesting(false);continue;}std::vector<int16_t> data;int samples = OPUS_FRAME_DURATION_MS * 16000 / 1000;if (ReadAudioData(data, 16000, samples)) {// If input channels is 2, we need to fetch the left channel dataif (codec_->input_channels() == 2) {auto mono_data = std::vector<int16_t>(data.size() / 2);for (size_t i = 0, j = 0; i < mono_data.size(); ++i, j += 2) {mono_data[i] = data[j];}data = std::move(mono_data);}PushTaskToEncodeQueue(kAudioTaskTypeEncodeToTestingQueue, std::move(data));continue;}}/* Feed the wake word */if (bits & AS_EVENT_WAKE_WORD_RUNNING) {std::vector<int16_t> data;int samples = wake_word_->GetFeedSize();if (samples > 0) {if (ReadAudioData(data, 16000, samples)) {wake_word_->Feed(data);continue;}}}/* Feed the audio processor */if (bits & AS_EVENT_AUDIO_PROCESSOR_RUNNING) {std::vector<int16_t> data;int samples = audio_processor_->GetFeedSize();if (samples > 0) {if (ReadAudioData(data, 16000, samples)) {audio_processor_->Feed(std::move(data));continue;}}}ESP_LOGE(TAG, "Should not be here, bits: %lx", bits);break;}ESP_LOGW(TAG, "Audio input task stopped");
}
void AudioService::AudioOutputTask() {while (true) {// 加锁等待播放队列非空或服务停止信号std::unique_lock<std::mutex> lock(audio_queue_mutex_);// 如果队列为空且服务未停止,则阻塞等待条件变量触发audio_queue_cv_.wait(lock, [this]() { return !audio_playback_queue_.empty() || service_stopped_; });// 如果检测到服务已经停止,则退出任务if (service_stopped_) {break;}// 从播放队列取出一个音频任务(前移出队)auto task = std::move(audio_playback_queue_.front());audio_playback_queue_.pop_front();// 通知等待的线程队列已发生变化(唤醒可能的生产者)audio_queue_cv_.notify_all();// 解锁互斥量,开始进行播放处理lock.unlock();// 如果音频输出尚未启用,则启用输出并启动功耗监测定时器if (!codec_->output_enabled()) {codec_->EnableOutput(true);esp_timer_start_periodic(audio_power_timer_, AUDIO_POWER_CHECK_INTERVAL_MS * 1000);}// 将 PCM 数据输出到音频设备codec_->OutputData(task->pcm);// 更新时间戳记录为最近一次输出时间last_output_time_ = std::chrono::steady_clock::now();// 播放计数器 +1,用于调试/统计debug_statistics_.playback_count++;#if CONFIG_USE_SERVER_AEC// 若启用了服务器端 AEC,并且任务中包含有效时间戳,则记录该时间戳if (task->timestamp > 0) {lock.lock(); // 重新加锁以保护 timestamp_queue_timestamp_queue_.push_back(task->timestamp);}#endif}// 最后,任务退出时打印日志ESP_LOGW(TAG, "Audio output task stopped");
}
void AudioService::OpusCodecTask() {while (true) {// 加锁并等待条件满足:// - 服务已停止// - 编码队列非空 且 发送队列未满// - 解码队列非空 且 播放队列未满std::unique_lock<std::mutex> lock(audio_queue_mutex_);audio_queue_cv_.wait(lock, [this]() {return service_stopped_ ||(!audio_encode_queue_.empty() && audio_send_queue_.size() < MAX_SEND_PACKETS_IN_QUEUE) ||(!audio_decode_queue_.empty() && audio_playback_queue_.size() < MAX_PLAYBACK_TASKS_IN_QUEUE);});// 若服务已停止,则退出任务if (service_stopped_) {break;}/** ========================* 解码逻辑* ======================== */if (!audio_decode_queue_.empty() && audio_playback_queue_.size() < MAX_PLAYBACK_TASKS_IN_QUEUE) {// 取出一个待解码数据包auto packet = std::move(audio_decode_queue_.front());audio_decode_queue_.pop_front();audio_queue_cv_.notify_all();lock.unlock(); // 解锁以便其他线程访问队列// 构造新的播放任务auto task = std::make_unique<AudioTask>();task->type = kAudioTaskTypeDecodeToPlaybackQueue;task->timestamp = packet->timestamp;// 设置解码参数SetDecodeSampleRate(packet->sample_rate, packet->frame_duration);// 解码数据if (opus_decoder_->Decode(std::move(packet->payload), task->pcm)) {// 如果解码后的采样率不一致,则重采样if (opus_decoder_->sample_rate() != codec_->output_sample_rate()) {int target_size = output_resampler_.GetOutputSamples(task->pcm.size());std::vector<int16_t> resampled(target_size);output_resampler_.Process(task->pcm.data(), task->pcm.size(), resampled.data());task->pcm = std::move(resampled);}// 加锁并推送到播放队列lock.lock();audio_playback_queue_.push_back(std::move(task));audio_queue_cv_.notify_all();} else {// 解码失败ESP_LOGE(TAG, "Failed to decode audio");lock.lock();}debug_statistics_.decode_count++;}/** ========================* 编码逻辑* ======================== */if (!audio_encode_queue_.empty() && audio_send_queue_.size() < MAX_SEND_PACKETS_IN_QUEUE) {auto task = std::move(audio_encode_queue_.front());audio_encode_queue_.pop_front();audio_queue_cv_.notify_all();lock.unlock(); // 解锁以进行编码// 构建音频流数据包auto packet = std::make_unique<AudioStreamPacket>();packet->frame_duration = OPUS_FRAME_DURATION_MS;packet->sample_rate = 16000;packet->timestamp = task->timestamp;// 编码 PCM 数据if (!opus_encoder_->Encode(std::move(task->pcm), packet->payload)) {ESP_LOGE(TAG, "Failed to encode audio");continue;}// 根据任务类型,推送到不同队列if (task->type == kAudioTaskTypeEncodeToSendQueue) {{std::lock_guard<std::mutex> lock(audio_queue_mutex_);audio_send_queue_.push_back(std::move(packet));}// 通知有新的可发送数据if (callbacks_.on_send_queue_available) {callbacks_.on_send_queue_available();}} else if (task->type == kAudioTaskTypeEncodeToTestingQueue) {std::lock_guard<std::mutex> lock(audio_queue_mutex_);audio_testing_queue_.push_back(std::move(packet));}debug_statistics_.encode_count++;lock.lock(); // 重新加锁以进入下一轮循环}}// 任务退出时记录日志ESP_LOGW(TAG, "Opus codec task stopped");
}