当前位置: 首页 > news >正文

SRS流媒体服务器(4)源码分析之RTMP端口监听

1.目标

学习 RTMP 端口监听模块前,需明确两个核心问题:

  1. 如何从配置文件读取对应的端口?:服务器如何解析配置文件,获取正确服务端口。
  2. 如何启动 RTMP 监听并处理业务逻辑?:服务器如何绑定端口,等待客户端连接。创建rtmpconn连接对象,包含有握手和断开等等

1.1 引入服务启动时序图

通过此引入可以知道先进行配置文件解析,其次开启 SrsServer服务!

从main程序开始到SrsServer服务启动的时序图如下:

        

2.端口获取

2.1 核心调用栈

下列分别是rtmp端口获取操作调用栈。分别是检查配置文件是否正常,检查最大连接数量。

(gdb) bt
#0  SrsConfig::get_listens[abi:cxx11]() (this=0x555555e90340) at src/app/srs_app_config.cpp:3135
#1  0x0000555555737fbe in SrsConfig::check_normal_config (this=0x555555e90340) at src/app/srs_app_config.cpp:2559
#2  0x0000555555736af3 in SrsConfig::check_config (this=0x555555e90340) at src/app/srs_app_config.cpp:2431
#3  0x0000555555845bda in do_main (argc=3, argv=0x7fffffffe1c8) at src/main/srs_main_server.cpp:184
#4  0x0000555555845eb2 in main (argc=3, argv=0x7fffffffe1c8) at src/main/srs_main_server.cpp:216(gdb) bt
#0  SrsConfig::get_listens[abi:cxx11]() (this=0x555555e90340) at src/app/srs_app_config.cpp:3136
#1  0x000055555573c7b3 in SrsConfig::check_number_connections (this=0x555555e90340) at src/app/srs_app_config.cpp:2916
#2  0x0000555555736b4c in SrsConfig::check_config (this=0x555555e90340) at src/app/srs_app_config.cpp:2435
#3  0x0000555555845bda in do_main (argc=3, argv=0x7fffffffe1c8) at src/main/srs_main_server.cpp:184
#4  0x0000555555845eb2 in main (argc=3, argv=0x7fffffffe1c8) at src/main/srs_main_server.cpp:216

2.2 核心API 

该api是从文件解析出listen port 存储起来。后续会对每个端口new一个SrsBufferListener。

vector<string> SrsConfig::get_listens()
{std::vector<string> ports;SrsConfDirective* conf = root->get("listen");if (!conf) {return ports;}for (int i = 0; i < (int)conf->args.size(); i++) {ports.push_back(conf->args.at(i));}return ports;
}

3.端口监听

       流程顺序:端口创建SrsBufferListener再创建SrsTcpListener,SrsTcpListener引出协程对象并开启协程循环。通过这种方法实现了一连接一协程收发数据

  1. SrsServer每个端口  1:1  SrsBufferListener。这一层是协议的解耦,基类对象public SrsListener
  2. SrsBufferListener  1:1  SrsTcpListener。这一层是tcp协议,并包含协程设计。
  3. SrsTcpListener 1:1 SrsSTCoroutine。这一层是开启协程开启 trd->start() ->>实际调用

    SrsTcpListener::cycle()

3.1 调用栈

(gdb) bt
#0  SrsServer::listen_rtmp (this=0x555555f498f0) at src/app/srs_app_server.cpp:1177
#1  0x00005555556d37b7 in SrsServer::listen (this=0x555555ed3740) at src/app/srs_app_server.cpp:757
#2  0x00005555557d750b in SrsServerAdapter::run (this=0x555555ed59e0, wg=0x7fffffffdc80) at src/app/srs_app_hybrid.cpp:160
#3  0x00005555557d8072 in SrsHybridServer::run (this=0x555555ebe9d0) at src/app/srs_app_hybrid.cpp:275
#4  0x0000555555847917 in run_hybrid_server () at src/main/srs_main_server.cpp:497
#5  0x00005555558473da in run_directly_or_daemon () at src/main/srs_main_server.cpp:423
#6  0x0000555555845c2d in do_main (argc=3, argv=0x7fffffffe1c8) at src/main/srs_main_server.cpp:201
#7  0x0000555555845eb2 in main (argc=3, argv=0x7fffffffe1c8) at src/main/srs_main_server.cpp:216(gdb) bt
#0  SrsBufferListener::listen (this=0x55555572259e <SrsFileLog::trace(char const*, _SrsContextId, char const*, ...)>, i="", p=21845) at src/app/srs_app_server.cpp:89
#1  0x00005555556d6cf7 in SrsServer::listen_rtmp (this=0x555555ed3740) at src/app/srs_app_server.cpp:1193
#2  0x00005555556d37b7 in SrsServer::listen (this=0x555555ed3740) at src/app/srs_app_server.cpp:757
#3  0x00005555557d750b in SrsServerAdapter::run (this=0x555555ed59e0, wg=0x7fffffffdc80) at src/app/srs_app_hybrid.cpp:160
#4  0x00005555557d8072 in SrsHybridServer::run (this=0x555555ebe9d0) at src/app/srs_app_hybrid.cpp:275
#5  0x0000555555847917 in run_hybrid_server () at src/main/srs_main_server.cpp:497
#6  0x00005555558473da in run_directly_or_daemon () at src/main/srs_main_server.cpp:423
#7  0x0000555555845c2d in do_main (argc=3, argv=0x7fffffffe1c8) at src/main/srs_main_server.cpp:201
#8  0x0000555555845eb2 in main (argc=3, argv=0x7fffffffe1c8) at src/main/srs_main_server.cpp:216(gdb) bt
#0  SrsTcpListener::listen (this=0x7fffffffda10) at src/app/srs_app_listener.cpp:237
#1  0x00005555556cf354 in SrsBufferListener::listen (this=0x555555f20500, i="0.0.0.0", p=1935) at src/app/srs_app_server.cpp:98
#2  0x00005555556d6cf7 in SrsServer::listen_rtmp (this=0x555555ed3740) at src/app/srs_app_server.cpp:1193
#3  0x00005555556d37b7 in SrsServer::listen (this=0x555555ed3740) at src/app/srs_app_server.cpp:757
#4  0x00005555557d750b in SrsServerAdapter::run (this=0x555555ed59e0, wg=0x7fffffffdc80) at src/app/srs_app_hybrid.cpp:160
#5  0x00005555557d8072 in SrsHybridServer::run (this=0x555555ebe9d0) at src/app/srs_app_hybrid.cpp:275
#6  0x0000555555847917 in run_hybrid_server () at src/main/srs_main_server.cpp:497
#7  0x00005555558473da in run_directly_or_daemon () at src/main/srs_main_server.cpp:423
#8  0x0000555555845c2d in do_main (argc=3, argv=0x7fffffffe1c8) at src/main/srs_main_server.cpp:201
#9  0x0000555555845eb2 in main (argc=3, argv=0x7fffffffe1c8) at src/main/srs_main_server.cpp:216

3.2 核心API 

从上到下调用

srs_error_t SrsServer::listen_rtmp()
{srs_error_t err = srs_success;// stream service port.std::vector<std::string> ip_ports = _srs_config->get_listens();srs_assert((int)ip_ports.size() > 0);close_listeners(SrsListenerRtmpStream);for (int i = 0; i < (int)ip_ports.size(); i++) {SrsListener* listener = new SrsBufferListener(this, SrsListenerRtmpStream);//每个端口对应一个SrsBufferListenerlisteners.push_back(listener);int port; string ip;srs_parse_endpoint(ip_ports[i], ip, port);if ((err = listener->listen(ip, port)) != srs_success) {srs_error_wrap(err, "rtmp listen %s:%d", ip.c_str(), port);}}return err;
}srs_error_t SrsBufferListener::listen(string i, int p)
{srs_error_t err = srs_success;ip = i;port = p;srs_freep(listener);listener = new SrsTcpListener(this, ip, port);//每个SrsBufferListener对应一个SrsTcpListenerif ((err = listener->listen()) != srs_success) {return srs_error_wrap(err, "buffered tcp listen");}string v = srs_listener_type2string(type);srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());return err;
}srs_error_t SrsTcpListener::listen()
{srs_error_t err = srs_success;if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) {return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);}srs_freep(trd);trd = new SrsSTCoroutine("tcp", this); //每个SrsTcpListener对应SrsSTCoroutine负责acceptif ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "start coroutine");}return err;
}#最底层调用
srs_error_t SrsTcpListener::cycle()
{srs_error_t err = srs_success;while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "tcp listener");}srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);//等待新的fdif(fd == NULL){return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));}if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {//设置fd为close-on-exec,防止子进程继承该文件描述符return srs_error_wrap(err, "set closeexec");}if ((err = handler->on_tcp_client(fd)) != srs_success) {//handler是上层对象,SrsBufferListener::on_tcp_client or SrsHttpFlvListener::on_tcp_clientreturn srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));}}return err;
}#开始往上层回调
srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{srs_error_t err = server->accept_client(type, stfd);//最终RTMP业务回调到SrsServer::accept_clientif (err != srs_success) {srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());srs_freep(err);}return srs_success;
}

4. 新连接 

根据上文,底层回调上来的fd交到应用层来处理。这里以rtmp举例,主要是

启动 RTMP 监听器→接收新 TCP 连接→创建SrsRtmpConn连接对象→协程驱动cycle()主循环→完成握手、应用连接、媒体流传输→连接断开清理

#主要是根据type创建对应连接
srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{srs_error_t err = srs_success;ISrsStartableConneciton* conn = NULL;//fd_to_resource:会带出一个连接对象 connif ((err = fd_to_resource(type, stfd, &conn)) != srs_success) {if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok())             {srs_close_stfd(stfd); srs_error_reset(err);return srs_success;}return srs_error_wrap(err, "fd to resource");}srs_assert(conn);// directly enqueue, the cycle thread will remove the client.conn_manager->add(conn);//根据连接对象开启协程if ((err = conn->start()) != srs_success) {return srs_error_wrap(err, "start conn coroutine");}return err;
}srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsStartableConneciton** pr)
{srs_error_t err = srs_success;int fd = srs_netfd_fileno(stfd);string ip = srs_get_peer_ip(fd);int port = srs_get_peer_port(fd);
.........................................
.........................................if (type == SrsListenerRtmpStream) {*pr = new SrsRtmpConn(this, stfd, ip, port);} else if (type == SrsListenerHttpApi) {*pr = new SrsHttpApi(false, this, stfd, http_api_mux, ip, port);} else if (type == SrsListenerHttpsApi) {*pr = new SrsHttpApi(true, this, stfd, http_api_mux, ip, port);} else if (type == SrsListenerHttpStream) {*pr = new SrsResponseOnlyHttpConn(false, this, stfd, http_server, ip, port);} else if (type == SrsListenerHttpsStream) {*pr = new SrsResponseOnlyHttpConn(true, this, stfd, http_server, ip, port);} else {srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);srs_close_stfd(stfd);return err;}return err;
}#rtmp逻辑处理
srs_error_t SrsRtmpConn::cycle()
{srs_error_t err = do_cycle();// Notify manager to remove it.// Note that we create this object, so we use manager to remove it.manager->remove(this);// success.if (err == srs_success) {srs_trace("client finished.");return err;}// It maybe success with message.if (srs_error_code(err) == ERROR_SUCCESS) {srs_trace("client finished%s.", srs_error_summary(err).c_str());srs_freep(err);return err;}// client close peer.// TODO: FIXME: Only reset the error when client closed it.if (srs_is_client_gracefully_close(err)) {srs_warn("client disconnect peer. ret=%d", srs_error_code(err));} else if (srs_is_server_gracefully_close(err)) {srs_warn("server disconnect. ret=%d", srs_error_code(err));} else {srs_error("serve error %s", srs_error_desc(err).c_str());}srs_freep(err);return srs_success;
}// 负责处理一个RTMP客户端连接的生命周期,包括连接建立、握手、应用连接、服务循环以及断开连接等步骤。
srs_error_t SrsRtmpConn::do_cycle()
{srs_error_t err = srs_success;// 打印RTMP客户端的IP地址和端口srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));// 设置RTMP的接收和发送超时时间rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);// 执行RTMP握手if ((err = rtmp->handshake()) != srs_success) {return srs_error_wrap(err, "rtmp handshake");}// 获取RTMP代理的真实客户端IP地址uint32_t rip = rtmp->proxy_real_ip();if (rip > 0) {srs_trace("RTMP proxy real client ip=%d.%d.%d.%d",uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));}// 获取请求信息SrsRequest* req = info->req;if ((err = rtmp->connect_app(req)) != srs_success) {return srs_error_wrap(err, "rtmp connect tcUrl");}// 将客户端IP地址设置到请求中// set client ip to request.req->ip = ip;// 打印连接应用的信息srs_trace("connect app, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, args=%s",req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),req->schema.c_str(), req->vhost.c_str(), req->port,req->app.c_str(), (req->args? "(obj)":"null"));// 显示客户端身份信息// show client identityif(req->args) {std::string srs_version;std::string srs_server_ip;int srs_pid = 0;int srs_id = 0;SrsAmf0Any* prop = NULL;if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {srs_version = prop->to_str();}if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {srs_server_ip = prop->to_str();}if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {srs_pid = (int)prop->to_number();}if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {srs_id = (int)prop->to_number();}if (srs_pid > 0) {srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);}}// 执行服务循环if ((err = service_cycle()) != srs_success) {err = srs_error_wrap(err, "service cycle");}srs_error_t r0 = srs_success;if ((r0 = on_disconnect()) != srs_success) {err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());srs_freep(r0);}// 如果客户端被重定向到其他服务器,则已经记录了该事件// If client is redirect to other servers, we already logged the event.if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {srs_error_reset(err);}return err;
}

5.总结

RTMP 监听的核心流程是:
SrsServer → 初始化 SrsBufferListener → 每个 SrsBufferListener 管理一个 SrsTcpListener → SrsTcpListener 通过协程循环接受新连接 → 连接通过 on_tcp_client 回调传递给 SrsServer 处理业务逻辑。on_tcp_client启动 RTMP 监听器→接收新 TCP 连接→创建SrsRtmpConn连接对象→协程驱动cycle()主循环→完成握手、应用连接、媒体流传输→连接断开清理

下列给出程序开始到rtmp监听整体时序图:

学习资料分享

0voice · GitHub

http://www.xdnf.cn/news/376525.html

相关文章:

  • Python+OpenCV实现手势识别与动作捕捉:技术解析与应用探索
  • ROS-关节轨迹(position、velocities/accelerations)绘图
  • 大模型微调算法原理:从通用到专用的桥梁
  • Linux系统管理与编程17:自动化部署ftp服务
  • 31.下一个排列
  • 慈缘基金会“蝴蝶飞”助西藏女孩白玛卓嘎“折翼重生”
  • FreeRTOS Semaphore信号量-笔记
  • 项目管理从专家到小白
  • Pale Moon:速度优化的Firefox定制浏览器
  • 棒球裁判员学习指南·棒球1号位
  • 【数据结构与算法】图的基本概念与遍历
  • 嵌入式硬件篇---麦克纳姆轮(简单运动实现)
  • Linux系统入门第十二章 --Shell编程之正则表达式
  • [架构之美]Windows系统安装MySQL 8.0详细图文教程(十八)
  • 论文精读:YOLOE: Real-Time Seeing Anything
  • 从0开始学习大模型--Day05--理解prompt工程
  • 零知识证明:区块链隐私保护的变革力量
  • HTTPS加密握手与加密算法
  • Kotlin 内联函数深度解析:从源码到实践优化
  • 分书问题的递归枚举算法
  • [思维模式-25]:《本质思考力》-6- 马哲的三大规律:对立统一规律、质量互变规律、否定之否定规律,以及在计算机领域中的体现
  • RHCE实验:远程控制qq邮箱发送邮件
  • 20250510解决NanoPi NEO core开发板在Ubuntu core22.04.3系统下适配移远的4G模块EC200A-CN的问题
  • C++内存管理
  • 仓库管理系统,Java+Vue,含源码及文档,高效管理仓库物资,实现入库、存储、出库全流程数字化精准管控
  • 基于CNN卷积神经网络的带频偏QPSK调制信号检测识别算法matlab仿真
  • MySQL 从入门到精通(五):索引深度解析 —— 性能优化的核心武器
  • idea如何快速生成测试类
  • 【赵渝强老师】TiDB SQL层的工作机制
  • Yocto中`${B}`变量的作用