SRS流媒体服务器(7)源码分析之拉流篇
0.基础知识
srs代码优美简洁,重点需明白rtmp相关基础知识。否则很难阅读明白具体可以学习往期博客:
- RTMP协议分析_h264与rtmp格式-CSDN博客
- RTMP 传输结构分析_rtmp chunk消息-CSDN博客
- rtmp封包分析之flv格式封包细节_flv sequence header-CSDN博客
1. 回顾推流
根据《SRS流媒体服务器(6)源码分析之推流篇》回顾,服务启动SrsServer
→ 初始化 SrsBufferListener 每个端口对应一个SrsBufferListener
→ 每个 SrsBufferListener 管理一个 SrsTcpListener
→ SrsTcpListener 通过协程循环接受新连接
→ on_tcp_client 回调到上层SrsServer
→ SrsServer::accept_client 接收新 TCP 连接
→ 创建SrsRtmpConn连接对象
→ SrsRtmpConn::do_cycle()协程驱动cycle()主循环
→ SrsRtmpConn::stream_service_cycle() RTMP连接处理主逻辑,创建推拉流对象。以下是推流流程
→ SrsRecvThread::do_cycle() → SrsProtocol::recv_message 协程接收RTMP消息
→ SrsProtocol::on_recv_message 解析和处理RTMP消息,一些配置设置比如窗口大小
→ SrsRtmpConn::publishing 开始发布流
→ SrsRtmpConn::process_publish_message 分发流
→ SrsLiveConsumer::enqueue 推流数据加入到消费者队列
(gdb) bt
#0 SrsLiveConsumer::enqueue (this=0x6060000716c0, shared_msg=0x7ffff28a4090, atc=false, ag=SrsRtmpJitterAlgorithmFULL) at ./src/app/srs_app_source.cpp:452
#1 0x0000555555d123fa in SrsLiveSource::on_video_imp (this=0x61100001bd40, msg=0x7ffff28a4090) at ./src/app/srs_app_source.cpp:2445
#2 0x0000555555d0f92f in SrsLiveSource::on_frame (this=0x61100001bd40, msg=0x7ffff28a4090) at ./src/app/srs_app_source.cpp:2257
#3 0x0000555555d11559 in SrsLiveSource::on_video (this=0x61100001bd40, shared_video=0x606000073220) at ./src/app/srs_app_source.cpp:2391
#4 0x0000555555cea9df in SrsRtmpConn::process_publish_message (this=0x61200003dcc0, source=..., msg=0x606000073220) at ./src/app/srs_app_rtmp_conn.cpp:1217
#5 0x0000555555cea58f in SrsRtmpConn::handle_publish_message (this=0x61200003dcc0, source=..., msg=0x606000073220) at ./src/app/srs_app_rtmp_conn.cpp:1189
#6 0x0000555555f0fdd5 in SrsPublishRecvThread::consume (this=0x7ffff290f550, msg=0x606000073220) at ./src/app/srs_app_recv_thread.cpp:373
#7 0x0000555555f0d632 in SrsRecvThread::do_cycle (this=0x7ffff290f570) at ./src/app/srs_app_recv_thread.cpp:131
#8 0x0000555555f0d0ad in SrsRecvThread::cycle (this=0x7ffff290f570) at ./src/app/srs_app_recv_thread.cpp:100
#9 0x0000555555d67f02 in SrsFastCoroutine::cycle (this=0x60e00000cde0) at ./src/app/srs_app_st.cpp:309
#10 0x0000555555d68052 in SrsFastCoroutine::pfn (arg=0x60e00000cde0) at ./src/app/srs_app_st.cpp:324
--Type <RET> for more, q to quit, c to continue without paging--
#11 0x00005555560fa66f in _st_thread_main () at sched.c:380
#12 0x00005555560faf95 in st_thread_create (start=0x55555650c0d0, arg=0x41b58ab3, joinable=24672, stk_size=328) at sched.c:666
1.2 调试命令
推流
ffmpeg -re -i ./doc/source.200kbps.768x320.flv -c copy -f flv rtmp://192.168.126.129/live/livestream2
可使用vlc进行rtmp拉流播放。是rtmp://如果是http://则不走当前分析源码。
本博客是基于rtmp拉流源码分析。后续会出rtmp转rtp/http-flv/hls协议拉流播放
rtmp://192.168.126.129/live/livestream2
2.推流源码分析
2.1 开始入口
从SrsRtmpConn::stream_service_cycle()函数切入,它是RTMP拉流处理的核心入口,负责协调不同类型RTMP连接(播放或推流)的初始化和资源分配,为后续数据交互奠定基础。
/*** @brief 流服务周期函数** 此函数是 SrsRtmpConn 类的一个成员函数,用于处理 RTMP 连接的服务周期。** @return srs_error_t 类型,表示函数执行的结果。如果执行成功,则返回 srs_success;否则返回相应的错误码。*/
srs_error_t SrsRtmpConn::stream_service_cycle()
{srs_error_t err = srs_success;SrsRequest* req = info->req;// 解析vhostSrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);if (parsed_vhost) {req->vhost = parsed_vhost->arg0();}// 检查tcUrl各个字段是否为空// 检查vhost// 检查token// 安全检查// 不允许空流名// 设置超时时间// 查找或创建一个源SrsLiveSource* source = NULL;if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {return srs_error_wrap(err, "rtmp: fetch source");}srs_assert(source != NULL);// 设置gop缓存bool enabled_cache = _srs_config->get_gop_cache(req->vhost);switch (info->type) {case SrsRtmpConnPlay: {// 开始播放if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: start play");}if ((err = http_hooks_on_play()) != srs_success) {return srs_error_wrap(err, "rtmp: callback on play");}err = playing(source);http_hooks_on_stop();return err;}case SrsRtmpConnFMLEPublish: {// 开始发布(FMLE)if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: start FMLE publish");}return publishing(source);}case SrsRtmpConnHaivisionPublish: {// 开始发布(Haivision)if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: start HAIVISION publish");}return publishing(source);}case SrsRtmpConnFlashPublish: {// 开始发布(Flash)if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: start FLASH publish");}return publishing(source);}default: {// 未知客户端类型return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);}}return err;
}
2.2 拉流对象创建
当使用 ffplay 进行拉流时,服务器会创建一个消费者对象。这个对象代表了 ffplay 播放客户端,负责接收从流源拉取的媒体数据,并将其发送给 ffplay 客户端。
创建的消费者对象会被关联到对应的流源,以便及时获取和处理媒体数据。
在创建过程中,还会进行一系列的初始化操作,如设置初始时间戳、配置缓冲策略等,并根据服务器配置和客户端需求对消费者对象进行参数配置。
/*** 处理客户端播放事件,发送一系列RTMP控制包** 1. 发送StreamBegin事件包,通知流开始* 2. 发送NetStream.Play.Reset状态包* 3. 发送NetStream.Play.Start状态包* 4. 设置音视频样本访问权限* 5. 发送NetStream.Data.Start状态包** @param stream_id 流ID,用于标识当前流* @return 返回操作结果,成功返回srs_success,失败返回错误码*/
srs_error_t SrsRtmpServer::start_play(int stream_id)
{srs_error_t err = srs_success;// StreamBeginif (true){// 创建SrsUserControlPacket对象SrsUserControlPacket *pkt = new SrsUserControlPacket();// 设置事件类型为StreamBeginpkt->event_type = SrcPCUCStreamBegin;// 设置事件数据为stream_idpkt->event_data = stream_id;// 发送并释放包,如果失败则返回错误if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {return srs_error_wrap(err, "send StreamBegin");}}// onStatus(NetStream.Play.Reset)if (true) {// 创建SrsOnStatusCallPacket对象SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();// 设置状态等级为StatusLevelStatuspkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));// 设置状态码为StreamResetpkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamReset));// 设置状态描述为"Playing and resetting stream."pkt->data->set(StatusDescription, SrsAmf0Any::str("Playing and resetting stream."));// 设置状态详情为"stream"pkt->data->set(StatusDetails, SrsAmf0Any::str("stream"));// 设置客户端ID为RTMP_SIG_CLIENT_IDpkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));// 发送并释放包,如果失败则返回错误if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send NetStream.Play.Reset");}}// onStatus(NetStream.Play.Start)if (true) {// 创建SrsOnStatusCallPacket对象SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();// 设置状态等级为StatusLevelStatuspkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));// 设置状态码为StreamStartpkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamStart));// 设置状态描述为"Started playing stream."pkt->data->set(StatusDescription, SrsAmf0Any::str("Started playing stream."));// 设置状态详情为"stream"pkt->data->set(StatusDetails, SrsAmf0Any::str("stream"));// 设置客户端ID为RTMP_SIG_CLIENT_IDpkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));// 发送并释放包,如果失败则返回错误if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send NetStream.Play.Start");}}// |RtmpSampleAccess(false, false)if (true) {// 创建SrsSampleAccessPacket对象SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();// 允许音频/视频样本// @see: https://github.com/ossrs/srs/issues/49// 设置音频样本访问为truepkt->audio_sample_access = true;// 设置视频样本访问为truepkt->video_sample_access = true;// 发送并释放包,如果失败则返回错误if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send |RtmpSampleAccess true");}}// onStatus(NetStream.Data.Start)if (true) {// 创建SrsOnStatusDataPacket对象SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();// 设置状态码为DataStartpkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeDataStart));// 发送并释放包,如果失败则返回错误if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send NetStream.Data.Start");}}return err;
}
2.3 处理拉流请求
服务器接收到拉流请求后,会进行一系列的安全检查和配置设置。例如,检查Referer是否合法,设置Socket选项等。然后创建一个消费者对象,并将其与流源关联。接着启动接收线程,开始接收客户端的控制消息和数据请求。
/*** @brief 处理播放请求* * 处理RTMP播放请求,包括referer校验、集群重定向、创建消费者等步骤。* * @param source 流媒体源对象* @return srs_error_t 错误码,成功返回srs_success*/
srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{srs_error_t err = srs_success;// 检查播放器的页面引用// Check page referer of player.SrsRequest* req = info->req;if (_srs_config->get_refer_enabled(req->vhost)) {if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {return srs_error_wrap(err, "rtmp: referer check");}}// 当启用了源站集群时,尝试重定向到活跃的源站// 活跃的源站是指正在传输流的服务器if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) {vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);for (int i = 0; i < (int)coworkers.size(); i++) {string host; int port = 0; string coworker = coworkers.at(i);// 构建用于发现协作者的URL// 如果在当前协作者中发现流失败,我们应该请求下一个直到最后一个// 生成重定向的RTMP URL// 如果主机或端口无效则忽略// 尝试重定向}return srs_error_new(ERROR_OCLUSTER_REDIRECT, "no origin");}// 设置套接字选项set_sock_options();// 创建源的消费者SrsLiveConsumer* consumer = NULL;SrsAutoFree(SrsLiveConsumer, consumer);if ((err = source->create_consumer(consumer)) != srs_success) {return srs_error_wrap(err, "rtmp: create consumer");}if ((err = source->consumer_dumps(consumer)) != srs_success) {return srs_error_wrap(err, "rtmp: dumps consumer");}// 使用接收线程从对端接收数据包 底层使用协程接收数据,最终数据到consumer.SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());if ((err = trd.start()) != srs_success) {return srs_error_wrap(err, "rtmp: start receive thread");}// 向对端传递数据包wakable = consumer;err = do_playing(source, consumer, &trd);wakable = NULL;trd.stop();// 丢弃接收线程中的所有数据包if (!trd.empty()) {srs_warn("drop the received %d messages", trd.size());}return err;
}
2.4 拉流核心处理
在拉流核心处理过程中,服务器会处理各种 RTMP 控制消息,如播放控制消息等。对于每个消息,服务器会进行相应的处理,如暂停、恢复播放等。同时,服务器会从流源获取媒体数据,并将其发送给客户端。在发送数据时,服务器会进行批量写入操作,以提高数据传输的效率。
/*** 处理RTMP播放流程的核心函数** 该函数负责管理RTMP连接的播放过程,包括:* 1. 初始化播放统计信息和组件* 2. 设置实时性和消息合并(mw)参数* 3. 从消费者获取媒体数据包并发送* 4. 处理播放控制消息和持续时间限制* 5. 监控性能指标并输出日志** @param source 直播源对象指针* @param consumer 消费者对象指针,用于获取媒体数据* @param rtrd 接收线程对象指针** @return 返回错误码,成功返回srs_success** @remark 当启用send_min_interval时,每次只获取一条消息* @see SRS_PERF_QUEUE_COND_WAIT 宏控制是否使用条件等待优化性能* @see 关于持续时间限制的实现参考 #45* @see 关于协程调度的实现参考 #2194*/
srs_error_t SrsRtmpConn::do_playing(SrsLiveSource *source, SrsLiveConsumer *consumer, SrsQueueRecvThread *rtrd)
{srs_error_t err = srs_success;SrsRequest* req = info->req;srs_assert(req);srs_assert(consumer);// 更新源发现时的统计信息// 初始化其他组件SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();SrsAutoFree(SrsPithyPrint, pprint);SrsMessageArray msgs(SRS_PERF_MW_MSGS);// 检查用户是否指定了持续时间来停止播放bool user_specified_duration_to_stop = (req->duration > 0);int64_t starttime = -1;// 设置实时性参数realtime = _srs_config->get_realtime_enabled(req->vhost);// 设置合并写入(mw)配置// 当mw_sleep更改时,调整套接字发送缓冲区大小mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);mw_sleep = _srs_config->get_mw_sleep(req->vhost);skt->set_socket_buffer(mw_sleep);// 初始化最小发送间隔send_min_interval = _srs_config->get_send_min_interval(req->vhost);while (true) {// 协程拉取失败则断开if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "rtmp: thread quit");}// 收集用于简明打印的时间流逝pprint->elapse();// 使用独立协程接收数据,可以提高约33%的性能while (!rtrd->empty()) {SrsCommonMessage* msg = rtrd->pump();if ((err = process_play_control_msg(consumer, msg)) != srs_success) {return srs_error_wrap(err, "rtmp: play control message");}}// 当接收线程出错时退出if ((err = rtrd->error_code()) != srs_success) {return srs_error_wrap(err, "rtmp: recv thread");}#ifdef SRS_PERF_QUEUE_COND_WAIT// 等待消息到达// @see https://github.com/ossrs/srs/issues/257consumer->wait(mw_msgs, mw_sleep);
#endif// 从推流数据队列获取消息// msgs.msgs中的每个msg都必须被释放,因为SrsMessageArray永远不会释放它们// 注意:当启用send_min_interval时,一次只获取一条消息int count = (send_min_interval > 0)? 1 : 0;if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {return srs_error_wrap(err, "rtmp: consumer dump packets");}// 可报告的统计信息if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAITsrs_usleep(mw_sleep);
#endif// 当没有获取到消息时忽略continue;}// 只有当用户指定了持续时间时,// 我们才开始为每个消息收集持续时间if (user_specified_duration_to_stop) {for (int i = 0; i < count; i++) {SrsSharedPtrMessage* msg = msgs.msgs[i];// 对于每个消息,收集持续时间// 注意:发送后不要使用msg,因为协议sdk会释放它if (starttime < 0 || starttime > msg->timestamp) {starttime = msg->timestamp;}duration += (msg->timestamp - starttime) * SRS_UTIME_MILLISECONDS;starttime = msg->timestamp;}}// 发送消息,所有消息都由send_and_free_messages()释放// 不需要断言msg,因为rtmp会断言它if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: send %d messages", count);}// 如果指定了持续时间,并且超过了它,停止播放直播if (user_specified_duration_to_stop) {if (duration >= req->duration) {return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));}}// 应用传输流的最小间隔,单位为srs_utime_tif (send_min_interval > 0) {srs_usleep(send_min_interval);}// 让出给其他协程// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777437476srs_thread_yield();}return err;
}
2.4.1 RTMP播放控制消息
RTMP播放控制消息的处理机制。当服务器接收到客户端发送的控制消息时,会根据不同类型的控制消息执行相应的操作。例如,处理播放暂停消息时,服务器会暂停向客户端发送媒体数据,并发送暂停确认消息;处理播放恢复消息时,则恢复数据发送并通知客户端。
/*** 处理RTMP播放控制消息** 该函数负责处理来自客户端的RTMP播放控制消息,包括:* 1. 关闭流消息(CloseStreamPacket)* 2. 调用命令消息(CallPacket)* 3. 暂停/恢复播放消息(PausePacket)** @param consumer 直播消费者对象指针,用于处理播放控制事件* @param msg 接收到的RTMP消息对象指针* @return 返回处理结果错误码,srs_success表示成功*/
srs_error_t SrsRtmpConn::process_play_control_msg(SrsLiveConsumer *consumer, SrsCommonMessage *msg)
{// 初始化错误状态为成功srs_error_t err = srs_success;// 如果消息为空,直接返回if (!msg) {return err;}// 使用自动释放指针管理消息内存SrsAutoFree(SrsCommonMessage, msg);// 检查消息类型是否为AMF0或AMF3命令消息,如果不是则忽略if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {return err;}// 解码RTMP消息为具体的数据包SrsPacket* pkt = NULL;if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {return srs_error_wrap(err, "rtmp: decode message");}// 使用自动释放指针管理数据包内存SrsAutoFree(SrsPacket, pkt);// 处理jwplayer/flowplayer发送的关闭流消息// 这些播放器会将关闭操作作为暂停消息发送SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(pkt);if (close) {return srs_error_new(ERROR_CONTROL_RTMP_CLOSE, "rtmp: close stream");}// 处理调用命令消息// 支持首先响应空对象// TODO: FIXME: 需要以正确的方式响应,或在边缘模式下转发SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);if (call) {// 只有当事务ID不为零时才需要响应// 事务ID为零表示不需要响应if (call->transaction_id > 0) {// 创建响应包,包含两个空对象SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id);res->command_object = SrsAmf0Any::null();res->response = SrsAmf0Any::null();// 发送响应包if ((err = rtmp->send_and_free_packet(res, 0)) != srs_success) {return srs_error_wrap(err, "rtmp: send packets");}}return err;}// 处理暂停/恢复播放消息SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);if (pause) {// 通知RTMP连接暂停/恢复状态if ((err = rtmp->on_play_client_pause(info->res->stream_id, pause->is_pause)) != srs_success) {return srs_error_wrap(err, "rtmp: pause");}// 通知消费者暂停/恢复状态if ((err = consumer->on_play_client_pause(pause->is_pause)) != srs_success) {return srs_error_wrap(err, "rtmp: pause");}return err;}// 其他类型的消息直接忽略return err;
}/*** 处理客户端播放暂停/恢复事件** @param stream_id 流ID,标识要操作的流* @param is_pause 是否为暂停操作,true表示暂停,false表示恢复* @return 返回错误码,srs_success表示成功** @remark 暂停时会发送NetStream.Pause.Notify状态通知和StreamEOF控制消息* @remark 恢复时会发送NetStream.Unpause.Notify状态通知和StreamBegin控制消息*/
srs_error_t SrsRtmpServer::on_play_client_pause(int stream_id, bool is_pause)
{srs_error_t err = srs_success;if (is_pause) {// onStatus(NetStream.Pause.Notify)if (true) {SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamPause));pkt->data->set(StatusDescription, SrsAmf0Any::str("Paused stream."));if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send NetStream.Pause.Notify");}}// StreamEOFif (true) {SrsUserControlPacket* pkt = new SrsUserControlPacket();pkt->event_type = SrcPCUCStreamEOF;pkt->event_data = stream_id;if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {return srs_error_wrap(err, "send StreamEOF");}}} else {// onStatus(NetStream.Unpause.Notify)if (true) {SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamUnpause));pkt->data->set(StatusDescription, SrsAmf0Any::str("Unpaused stream."));if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send NetStream.Unpause.Notify");}}// StreamBeginif (true) {SrsUserControlPacket* pkt = new SrsUserControlPacket();pkt->event_type = SrcPCUCStreamBegin;pkt->event_data = stream_id;if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {return srs_error_wrap(err, "send StreamBegin");}}}return err;
}
2.4.2 获取推流数据
服务器通过消费者对象与消息队列进行交互,按照一定的规则和策略从队列中提取数据。例如,会根据队列的大小、消息的优先级等因素来决定每次提取多少数据。
在提取数据时,服务器会进行解码和格式转换等操作,将数据转换为适合网络传输的格式。同时,还会对数据进行缓存管理,以确保数据的完整性和顺序性。
/*** 从队列中获取指定数量的媒体包** @param msgs 消息数组,用于存储获取到的媒体包* @param count 输入时表示最大请求数量,输出时表示实际获取到的数量* @return 错误码,成功返回srs_success** @remark 当paused为true时直接返回空* @remark 会更新source_id信息*/
srs_error_t SrsLiveConsumer::dump_packets(SrsMessageArray *msgs, int &count)
{// 初始化错误状态为成功srs_error_t err = srs_success;// 参数有效性检查srs_assert(count >= 0);srs_assert(msgs->max > 0);// 根据输入的count值确定最大获取数量// 如果count为0,则使用msgs->max作为上限int max = count? srs_min(count, msgs->max) : msgs->max;// 重置count为0,后续会更新为实际获取的数量count = 0;// 如果需要更新source_id,则输出日志if (should_update_source_id) {srs_trace("update source_id=%s/%s", source->source_id().c_str(), source->pre_source_id().c_str());should_update_source_id = false;}// 如果消费者处于暂停状态,直接返回if (paused) {return err;}// 从队列中获取消息if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) {return srs_error_wrap(err, "dump packets");}return err;
}/*** @brief 从消息队列中转储指定数量的消息包** 将消息队列中的前max_count个消息包复制到提供的缓冲区中,并更新实际复制的数量。* 如果缓冲区足够大,会清空消息队列;否则仅移除已复制的消息。SrsFastVector msgs是之前推流数据队列** @param max_count 最大可转储的消息数量(必须大于0)* @param pmsgs 输出参数,用于存储消息指针的缓冲区* @param count 输出参数,实际转储的消息数量* @return srs_error_t 始终返回srs_success** @remark 当pmsgs缓冲区足够大时(如SRS_PERF_MW_MSGS=128),* 通常会清空整个消息队列,此情况性能最佳;* 否则会触发vector.erase操作,可能引起内存拷贝。*/
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage **pmsgs, int &count)
{// 初始化错误状态为成功srs_error_t err = srs_success;// 获取当前队列中的消息数量int nb_msgs = (int)msgs.size();// 如果队列为空,直接返回if (nb_msgs <= 0) {return err;}// 确保请求的消息数量大于0srs_assert(max_count > 0);// 计算实际可获取的消息数量,取最小值count = srs_min(max_count, nb_msgs);// 获取消息数组的数据指针SrsSharedPtrMessage** omsgs = msgs.data();// 将消息指针复制到输出数组 底层是数组,直接使用直接寻址+偏移拷贝数据效率高。memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*));// 更新队列的起始时间为最后一条消息的时间戳SrsSharedPtrMessage* last = omsgs[count - 1];av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);// 如果取出的消息数量大于等于队列中的消息数量if (count >= nb_msgs) {// 直接清空整个队列msgs.clear();} else {// 否则只删除已取出的消息// 注意:这种操作可能导致内存拷贝,性能不如clear// 但由于通常pmsgs足够大(如SRS_PERF_MW_MSGS=128)// 所以这个分支很少执行msgs.erase(msgs.begin(), msgs.begin() + count);}return err;
}
2.4.3 发送rtmp消息组
该部分主要描述了服务器如何将封装好的消息发送给客户端。
服务器会根据消息的类型和大小,选择合适的发送方式,确保数据能够准确无误地传输到客户端。在发送过程中,服务器会处理各种可能出现的网络错误和异常,以保证数据传输的可靠性。
srs_error_t SrsRtmpServer::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
{return protocol->send_and_free_messages(msgs, nb_msgs, stream_id);
}/*** 发送并释放RTMP消息数组** @param msgs RTMP消息指针数组,调用后会被释放* @param nb_msgs 消息数量,必须大于0* @param stream_id 目标流ID,用于校验消息* @return 错误码,成功返回srs_success* @remark 1. 会更新消息头中的流ID* 2. 发送失败时不执行flush操作* 3. 无论成功与否都会释放所有消息* 4. 发送成功后执行手动队列flush* @see do_send_messages()* @see manual_response_flush()*/
srs_error_t SrsProtocol::send_and_free_messages(SrsSharedPtrMessage **msgs, int nb_msgs, int stream_id)
{// 断言确保消息数组指针不为空srs_assert(msgs);// 断言确保消息数量大于0srs_assert(nb_msgs > 0);// 更新消息头中的流IDfor (int i = 0; i < nb_msgs; i++) {// 获取当前处理的消息SrsSharedPtrMessage* msg = msgs[i];// 如果消息为空,跳过处理if (!msg) {continue;}// 检查首选通道ID和流ID,一旦有一条消息的流ID设置正确,就跳出循环// check perfer cid and stream,// when one msg stream id is ok, ignore left.if (msg->check(stream_id)) {break;}}// 执行实际的消息发送,不使用自动释放以提高性能srs_error_t err = do_send_messages(msgs, nb_msgs);// 遍历并释放所有消息内存for (int i = 0; i < nb_msgs; i++) {SrsSharedPtrMessage* msg = msgs[i];srs_freep(msg);}// 如果发送失败,不执行flush操作if (err != srs_success) {return srs_error_wrap(err, "send messages");}// 发送成功后,刷新手动响应队列中的消息// flush messages in manual queueif ((err = manual_response_flush()) != srs_success) {return srs_error_wrap(err, "manual flush response");}// 打印调试信息print_debug_info();// 返回处理结果return err;
}
2.4.2.1 发送模式
这里讨论了服务器在处理多个消息发送时的策略。服务器会将多个消息进行整理和排序,然后依次发送。
这种批量处理的方式可以提高数据传输的效率,减少发送过程中的延迟。同时,服务器还会对发送的消息进行监控和管理,确保每个消息都能正确地到达客户端。
/*** 发送多个RTMP消息** 该函数负责批量发送RTMP消息,支持两种发送模式:* 1. 高性能模式(SRS_PERF_COMPLEX_SEND):使用iovec向量和缓存机制批量发送* 2. 简单模式:逐个消息发送** @param msgs 要发送的消息指针数组* @param nb_msgs 消息数量* @return 错误码,成功返回srs_success** @remark 在高性能模式下会使用缓存机制,当缓存不足时会自动发送已缓存数据并重置缓存* @remark 会忽略空消息和无效消息(payload为空或size<=0)* @remark 每个消息会被分割成多个chunk发送,chunk大小由out_chunk_size决定*/
srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage **msgs, int nb_msgs)
{// 初始化错误对象为成功状态srs_error_t err = srs_success;#ifdef SRS_PERF_COMPLEX_SEND// 初始化向量数组索引int iov_index = 0;// 获取当前向量数组位置指针iovec* iovs = out_iovs + iov_index;// 初始化头部缓存索引int c0c3_cache_index = 0;// 获取当前头部缓存位置指针char* c0c3_cache = out_c0c3_caches + c0c3_cache_index;// 尝试使用c0c3头部缓存发送消息(c0c3表示的是fmt =0/3是11字节或15字节 表示缓存chunk.header提高发送效率)// 如果缓存被消耗完,尝试另一个循环for (int i = 0; i < nb_msgs; i++) {// 获取当前要处理的消息SrsSharedPtrMessage* msg = msgs[i];// 如果消息为空,跳过处理if (!msg) {continue;}// 忽略空消息(负载为空或大小小于等于0)if (!msg->payload || msg->size <= 0) {continue;}// p指向当前写入位置// 当负载为NULL且大小为0时也是可以的char* p = msg->payload;// pend指向负载结束位置char* pend = msg->payload + msg->size;// 即使负载为空也总是写入头部while (p < pend) {// 总是有头部int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;// 生成chunk.header块头部并写入缓存,返回头部大小int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload);// 确保头部大小大于0srs_assert(nbh > 0);// 设置头部向量iovs[0].iov_base = c0c3_cache;iovs[0].iov_len = nbh;// 设置负载向量,大小不超过块大小int payload_size = srs_min(out_chunk_size, (int)(pend - p));iovs[1].iov_base = p;iovs[1].iov_len = payload_size;// 消耗已发送的字节p += payload_size;// 如果向量数组空间不足,重新分配// 因为我们不知道可能需要发送多少消息// 所以只是分配向量数组,这样就可以了if (iov_index >= nb_out_iovs - 2) {// 记录旧的向量数组大小int ov = nb_out_iovs;// 将向量数组大小翻倍nb_out_iovs = 2 * nb_out_iovs;// 计算重新分配的大小int realloc_size = sizeof(iovec) * nb_out_iovs;// 重新分配向量数组内存out_iovs = (iovec*)realloc(out_iovs, realloc_size);// 记录警告日志srs_warn("resize iovs %d => %d, max_msgs=%d", ov, nb_out_iovs, SRS_PERF_MW_MSGS);}// 移动到下一对向量iov_index += 2;iovs = out_iovs + iov_index;// 移动到下一个头部缓存位置// to next c0c3 header cachec0c3_cache_index += nbh;c0c3_cache = out_c0c3_caches + c0c3_cache_index;// 头部缓存不应该再次重新分配// 因为指针已经设置到向量数组中,所以我们只是警告用户设置更大的缓存// 并使用另一个循环再次发送int c0c3_left = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;if (c0c3_left < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) {// 对于一个连接只警告一次if (!warned_c0c3_cache_dry) {srs_warn("c0c3 cache header too small, recoment to %d", SRS_CONSTS_C0C3_HEADERS_MAX + SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE);warned_c0c3_cache_dry = true;}// 当c0c3缓存耗尽时// 发送所有消息并重置缓存,然后再次发送if ((err = do_iovs_send(out_iovs, iov_index)) != srs_success) {return srs_error_wrap(err, "send iovs");}// 重置缓存,这些缓存确保// 我们至少可以发送一个块iov_index = 0;iovs = out_iovs + iov_index;c0c3_cache_index = 0;c0c3_cache = out_c0c3_caches + c0c3_cache_index;}}}// 当c0c3缓存耗尽时,向量可能已经发送出去// 所以当没有向量要发送时就忽略if (iov_index <= 0) {return err;}// 一次性发送所有向量if ((err = do_iovs_send(out_iovs, iov_index)) != srs_success) {return srs_error_wrap(err, "send iovs");}return err;
#else// 尝试使用c0c3头部缓存发送// 如果缓存被消耗完,尝试另一个循环for (int i = 0; i < nb_msgs; i++) {// 获取当前要处理的消息SrsSharedPtrMessage* msg = msgs[i];// 如果消息为空,跳过处理if (!msg) {continue;}// 忽略空消息if (!msg->payload || msg->size <= 0) {continue;}// p指向当前写入位置// 当负载为NULL且大小为0时也是可以的char* p = msg->payload;// pend指向负载结束位置char* pend = msg->payload + msg->size;// 即使负载为空也总是写入头部while (p < pend) {// 对于简单发送,一次发送一个块iovec* iovs = out_iovs;char* c0c3_cache = out_c0c3_caches;int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX;// 总是有头部int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload);srs_assert(nbh > 0);// 设置头部向量// header ioviovs[0].iov_base = c0c3_cache;iovs[0].iov_len = nbh;// 设置负载向量// payload iovint payload_size = srs_min(out_chunk_size, pend - p);iovs[1].iov_base = p;iovs[1].iov_len = payload_size;// 消耗已发送的字节// consume sendout bytes.p += payload_size;// 直接发送两个向量(头部和负载)if ((er = skt->writev(iovs, 2, NULL)) != srs_success) {return srs_error_wrap(err, "writev");}}}return err;
#endif
}
2.4.2.2 批量写入机制
批量写入是服务器优化数据传输的一种方法。
服务器会将多个小的消息合并成一个较大的数据块进行发送。这样可以减少发送系统调用的次数,降低网络传输的开销,从而提高整体的性能。
在实际操作中,服务器会根据网络状况和消息的紧急程度,动态调整批量写入的大小和时机。
srs_error_t SrsProtocol::do_iovs_send(iovec* iovs, int size)
{return srs_write_large_iovs(skt, iovs, size);
}/*** 写入大型IO向量数组* * 该函数用于处理可能超过系统IO向量限制的写入操作,* 通过分批发送解决大型IO向量写入问题。* * @param skt 协议读写接口* @param iovs IO向量数组* @param size IO向量数组大小* @param pnwrite 可选参数,用于返回实际写入的字节数* @return 错误码,成功返回srs_success*/
srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter *skt, iovec *iovs, int size, ssize_t *pnwrite)
{// 初始化错误对象为成功状态srs_error_t err = srs_success;// 获取系统writev函数的IO向量数组大小限制// the limits of writev iovs.
#ifndef _WIN32// 对于Linux系统,一般限制为1024static int limits = (int)sysconf(_SC_IOV_MAX);
#else// Windows系统下使用1024作为默认限制static int limits = 1024;
#endif// 如果IO向量数组大小不超过系统限制,一次性发送if (size <= limits) {// 直接调用writev发送所有数据if ((err = skt->writev(iovs, size, pnwrite)) != srs_success) {// 发送失败时,包装错误并返回return srs_error_wrap(err, "writev");}// 发送成功,返回成功状态return err;}// IO向量数组大小超过系统限制,需要分批发送// 当前处理的IO向量索引int cur_iov = 0;// 已写入的字节数ssize_t nwrite = 0;// 循环处理,直到所有IO向量都被发送while (cur_iov < size) {// 计算当前批次要发送的IO向量数量,不超过系统限制int cur_count = srs_min(limits, size - cur_iov);// 发送当前批次的IO向量 SrsTcpConnection::writevif ((err = skt->writev(iovs + cur_iov, cur_count, &nwrite)) != srs_success) {// 发送失败时,包装错误并返回return srs_error_wrap(err, "writev");}// 更新当前处理的IO向量索引cur_iov += cur_count;// 如果提供了pnwrite参数,累加已写入的字节数if (pnwrite) {*pnwrite += nwrite;}}// 返回处理结果return err;
}/*** 向socket写入向量化数据** @param iov 向量数据数组指针* @param iov_size 向量数据数组大小* @param nwrite 实际写入字节数输出参数(可选)* @return 错误码,成功返回srs_success** @remark 支持超时设置,超时返回ERROR_SOCKET_TIMEOUT错误* @remark 写入失败返回ERROR_SOCKET_WRITE错误* @remark 成功写入会累加统计字节数sbytes*/
srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t *nwrite)
{// 初始化错误对象为成功状态srs_error_t err = srs_success;// 实际写入的字节数ssize_t nb_write;// 根据是否设置了超时时间,选择不同的超时参数调用st_writevif (stm == SRS_UTIME_NO_TIMEOUT) {// 如果没有设置超时,使用无限超时写入nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);} else {// 否则使用指定的超时时间写入nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm);}// 如果提供了nwrite参数,返回实际写入的字节数if (nwrite) {*nwrite = nb_write;}// 成功时返回写入的字节数,失败时返回-1并设置errnoif (nb_write <= 0) {// 如果写入超时,返回超时错误if (nb_write < 0 && errno == ETIME) {return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", srsu2msi(stm));}// 其他写入错误,返回写入错误码return srs_error_new(ERROR_SOCKET_WRITE, "writev");}// 累加写入的字节数到统计变量sbytes += nb_write;// 返回处理结果return err;
}
2.4.2.3 系统层发送
系统级发送涉及到服务器与操作系统之间的交互。服务器通过调用操作系统的底层接口,将数据发送到网络上。
服务器需要处理各种系统级的错误和异常,确保数据能够在不同的网络环境中稳定传输。同时,服务器还会利用操作系统的特性,如缓冲区管理等,来进一步优化数据发送的性能。
/*** 使用分散/聚集IO(writev)向指定的网络文件描述符写入数据** @param fd 目标网络文件描述符* @param iov 包含待写入数据的iovec数组* @param iov_size iovec数组的大小* @param timeout 超时时间(微秒)* @return 成功时返回写入的总字节数,失败返回-1** @note 函数会处理部分写入和EINTR中断情况* @note 当需要多次写入时,会创建临时iovec数组来处理剩余数据* @note 在DEBUG模式下会统计调用次数和EAGAIN情况*/
ssize_t st_writev(_st_netfd_t *fd, const struct iovec *iov, int iov_size, st_utime_t timeout)
{// 定义返回值:n用于存储每次writev的返回值,rv用于记录函数最终的返回结果ssize_t n, rv;// 定义字节计数:nleft表示剩余待写入的字节数,nbyte表示总的字节数size_t nleft, nbyte;// 定义索引:index用于遍历iov数组,iov_cnt表示当前iov数组的大小int index, iov_cnt;// 定义iov指针:tmp_iov用于操作iov数组,local_iov是本地栈上的临时iov数组struct iovec *tmp_iov;struct iovec local_iov[_LOCAL_MAXIOV];// 计算需要发送的总字节数,遍历所有iov向量,累加每个向量的长度nbyte = 0;for (index = 0; index < iov_size; index++)nbyte += iov[index].iov_len;// 初始化返回值rv为总字节数(假设全部写入成功),nleft为剩余待写入字节数rv = (ssize_t)nbyte;nleft = nbyte;// 初始化tmp_iov指向原始iov数组,承诺不修改原始iovtmp_iov = (struct iovec *) iov; // 初始化iov_cnt为原始iov数组的大小iov_cnt = iov_size;#if defined(DEBUG) && defined(DEBUG_STATS)++_st_stat_writev;#endif// 主循环:只要还有数据没写完就继续循环while (nleft > 0) {// 如果只剩一个iov结构,直接调用st_write函数处理if (iov_cnt == 1) {if (st_write(fd, tmp_iov[0].iov_base, nleft, timeout) != (ssize_t) nleft)rv = -1;break;}// 调用系统writev函数尝试写入数据if ((n = writev(fd->osfd, tmp_iov, iov_cnt)) < 0) {// 如果被信号中断,则继续重试if (errno == EINTR)continue;// 如果不是EAGAIN或EWOULDBLOCK,则是真正的错误,设置返回值为-1并退出if (!_IO_NOT_READY_ERROR) {rv = -1;break;}} else {// 如果一次性写完所有数据,则退出循环if ((size_t) n == nleft)break;// 更新剩余待写入的字节数nleft -= n;/* Find the next unwritten vector */// 查找下一个未完全写入的向量:n记录已写入的总字节数n = (ssize_t)(nbyte - nleft);// 跳过已完全写入的向量for (index = 0; (size_t) n >= iov[index].iov_len; index++)n -= iov[index].iov_len;// 如果tmp_iov仍指向原始iov数组,需要创建新的临时数组以保持原始iov不变if (tmp_iov == iov) {/* Must copy iov's around */// 如果剩余向量数量不多,使用栈上的local_iov数组if (iov_size - index <= _LOCAL_MAXIOV) {tmp_iov = local_iov;} else {// 否则动态分配内存创建新的iov数组tmp_iov = calloc(1, (iov_size - index) * sizeof(struct iovec));if (tmp_iov == NULL)return -1;}}/* Fill in the first partial read */// 设置第一个部分写入向量的基址和长度tmp_iov[0].iov_base = &(((char *)iov[index].iov_base)[n]);tmp_iov[0].iov_len = iov[index].iov_len - n;index++;/* Copy the remaining vectors */// 复制剩余的向量到临时数组for (iov_cnt = 1; index < iov_size; iov_cnt++, index++) {tmp_iov[iov_cnt].iov_base = iov[index].iov_base;tmp_iov[iov_cnt].iov_len = iov[index].iov_len;}}#if defined(DEBUG) && defined(DEBUG_STATS)++_st_stat_writev_eagain;#endif/* Wait until the socket becomes writable */// 等待socket变为可写状态,如果超时或出错,则设置返回值为-1并退出if (st_netfd_poll(fd, POLLOUT, timeout) < 0) {rv = -1;break;}}// 如果动态分配了内存,释放它if (tmp_iov != iov && tmp_iov != local_iov)free(tmp_iov);// 返回结果:成功时返回写入的总字节数,失败时返回-1return rv;
}
2.5 总结
拉流处理的核心机制
- 协程模型:SRS使用协程处理连接,提高并发能力
- 消费者模式:通过消费者从源获取数据,实现一对多分发
- 独立接收线程:使用独立线程接收控制消息,提升性能约33%
- 参数优化:支持实时性、合并写入和最小发送间隔等参数调优
- HTTP回调机制:在关键节点触发HTTP回调,便于外部系统集
3. 拉流发送数据调用栈
#从上层到操作系统写入调用栈
(gdb) bt
#0 st_writev (fd=0x604000013b10, iov=0x62d00001e400, iov_size=60, timeout=30000000)at io.c:489
#1 0x0000555555c6ba94 in SrsStSocket::writev (this=0x60700001dad0, iov=0x62d00001e400, iov_size=60, nwrite=0x0) at ./src/protocol/srs_protocol_st.cpp:653
#2 0x0000555555cbfa49 in SrsTcpConnection::writev (this=0x604000013b90, iov=0x62d00001e400, iov_size=60, nwrite=0x0) at ./src/app/srs_app_conn.cpp:566
#3 0x0000555555bfa61b in srs_write_large_iovs (skt=0x604000013b90, iovs=0x62d00001e400, size=60, pnwrite=0x0) at ./src/protocol/srs_protocol_utility.cpp:376
#4 0x0000555555bb1508 in SrsProtocol::do_iovs_send (this=0x611000013640, iovs=0x62d00001e400, size=60) at ./src/protocol/srs_protocol_rtmp_stack.cpp:556
#5 0x0000555555bb1468 in SrsProtocol::do_send_messages (this=0x611000013640, msgs=0x61900001f980, nb_msgs=30) at ./src/protocol/srs_protocol_rtmp_stack.cpp:496
#6 0x0000555555bb3d2f in SrsProtocol::send_and_free_messages (this=0x611000013640, msgs=0x61900001f980, nb_msgs=30, stream_id=1)at ./src/protocol/srs_protocol_rtmp_stack.cpp:752
#7 0x0000555555bc49f1 in SrsRtmpServer::send_and_free_messages (this=0x603000018af0, msgs=0x61900001f980, nb_msgs=30, stream_id=1)at ./src/protocol/srs_protocol_rtmp_stack.cpp:2208
#8 0x0000555555ce54fd in SrsRtmpConn::do_playing (this=0x61200004c3c0, source=..., consumer=0x606000010820, rtrd=0x7ffff29bd5f0) at ./src/app/srs_app_rtmp_conn.cpp:901
#9 0x0000555555ce3100 in SrsRtmpConn::playing (this=0x61200004c3c0, source=...)
--Type <RET> for more, q to quit, c to continue without paging--at ./src/app/srs_app_rtmp_conn.cpp:776
#10 0x0000555555cdf301 in SrsRtmpConn::stream_service_cycle (this=0x61200004c3c0)at ./src/app/srs_app_rtmp_conn.cpp:613
#11 0x0000555555cdc668 in SrsRtmpConn::service_cycle (this=0x61200004c3c0)at ./src/app/srs_app_rtmp_conn.cpp:446
#12 0x0000555555cd97ec in SrsRtmpConn::do_cycle (this=0x61200004c3c0)at ./src/app/srs_app_rtmp_conn.cpp:262
#13 0x0000555555cefe8a in SrsRtmpConn::cycle (this=0x61200004c3c0)at ./src/app/srs_app_rtmp_conn.cpp:1609
#14 0x0000555555d67f02 in SrsFastCoroutine::cycle (this=0x60e000009ce0)at ./src/app/srs_app_st.cpp:309
#15 0x0000555555d68052 in SrsFastCoroutine::pfn (arg=0x60e000009ce0)at ./src/app/srs_app_st.cpp:324
#16 0x00005555560fa66f in _st_thread_main () at sched.c:380
#17 0x00005555560faf95 in st_thread_create (start=0x604000013c50, arg=0x7ffff4c25ed0, joinable=-1890085540, stk_size=-1719832832) at sched.c:666
#18 0x0000555555c65a1b in srs_context_set_cid_of (trd=0x7ffff4c26648, v=...)at ./src/protocol/srs_protocol_log.cpp:91
#19 0x0000555555a5e631 in _SrsContextId::~_SrsContextId (this=0x7ffff4c26298, __in_chrg=<optimized out>) at ./src/core/srs_core.cpp:24
#20 0x0000555555c65bbf in impl_SrsContextRestore::~impl_SrsContextRestore (
--Type <RET> for more, q to quit, c to continue without paging--this=0x7ffff4c26290, __in_chrg=<optimized out>)at ./src/protocol/srs_protocol_log.cpp:101
#21 0x0000555555cb5a7a in SrsServer::do_on_tcp_client (this=0x61100000ff40, listener=0x604000002150, stfd=@0x7ffff4c263b0: 0xffffe984c94)at ./src/app/srs_app_server.cpp:1161
#22 0x00007ffff4749f20 in ?? ()
#23 0x00007ffff4c26648 in ?? ()
#24 0x00007ffff4c263c0 in ?? ()
#25 0x0000000100000001 in ?? ()
#26 0x8f57955c997d6f00 in ?? ()
#27 0x00007ffff4c263d0 in ?? ()
#28 0x00005555560fc8d4 in st_netfd_poll (fd=0x7ffff2999f90, how=1, timeout=18446744073709551615) at io.c:249
学习资料分享
0voice