当前位置: 首页 > backend >正文

【网络入侵检测】基于Suricata源码分析FlowManager实现

【作者主页】只道当时是寻常

【专栏介绍】Suricata入侵检测。专注网络、主机安全,欢迎关注与评论。

1. 概要

👋 本文聚焦开源网络入侵检测系统 Suricata 的核心模块 FlowManager,深入解读其源码,解析流量管理实现机制。FlowManager 承担流的超时检查、资源回收、紧急模式处理等关键功能。文章从内存压力值计算、动态调整休眠间隔、流状态切换逻辑入手,结合代码实例,说明其如何动态调节扫描行数与休眠时间,平衡负载与检测效率。

2. 源码解析

2.1 模块注册(Register)

在 Suricata 初始化阶段会将 FlowManager 模块注册到全局模块列表中 tmm_modules,后续由专门的线程负责执行该模块的主函数。

Flow Manager模块详细信息如下代码所示:

void TmModuleFlowManagerRegister (void)
{tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);SC_ATOMIC_INIT(flowmgr_cnt);SC_ATOMIC_INITPTR(flow_timeouts);
}

2.2 初始化(Init)

在 Suricata 中 FlowManagerThreadInit 函数实现 FlowManager 线程的初始化操作。下面我将基于 FlowManagerThreadInit 函数源码分段介绍初始化具体流程。

(1)创建 FlowManager 线程数据对象,用于保存当前线程的初始化数据。

FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
if (ftd == NULL)return TM_ECODE_FAILED;

(2)配置当前线程实例序号值。每个新增一个 FlowManager 线程示例序号值加一。

ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
SCLogDebug("flow manager instance %u", ftd->instance);

(3)在 Suricata 的配置文件中,可通过 flow.managers 来设置创建 FlowManager 线程的数量,flow.hash-size 设置了哈希表的大小。因此,这里需要计算每个 FlowManager 线程负责维护哈希表的范围。如果是最后一个线程示例则其右侧区间为哈希表的结尾。

 /* set the min and max value used for hash row walking* each thread has it's own section of the flow hash */
uint32_t range = flow_config.hash_size / flowmgr_number;ftd->min = ftd->instance * range;
ftd->max = (ftd->instance + 1) * range;/* last flow-manager takes on hash_size % flowmgr_number extra rows */
if ((ftd->instance + 1) == flowmgr_number) {ftd->max = flow_config.hash_size;
}
BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);

(4)初始化 FlowManager 模块的计数器。

FlowCountersInit(t, &ftd->cnt);

(5)初始化报文池,即每个处理线程支持同时处理的最大报文数量(max_pending_packets)。很奇怪,为什么这个函数在这里调用,该函数貌似和 FlowManager 没有什么联系。

PacketPoolInit();

2.3 流管理(FlowManager)

Suricata 的 FlowManager 函数是流管理模块的核心函数,主要用于检查流超时并回收资源。

FlowManager 函数中最核心的是其资源处理算法的实现,如果要理解这个算法需要先了解以下几个实现。

2.3.1 内存压力值

MemcapsGetPressure 函数用户获取当前内存压力值,该函数主要关注最容易成为内存压力瓶颈的四个引擎,分别是streamstream-reassemblyflow applayer-proto-http,并取其压力最大值。

如何计算每个引擎的压力值呢?

下面代码是 MemcapsGetPressure 函数的实现:

float MemcapsGetPressure(void)
{float percent = 0.0;for (int i = 0; i < 4; i++) { // only flow, streams, httpuint64_t memcap = memcaps[i].GetFunc();if (memcap) {uint64_t memuse = memcaps[i].GetMemuseFunc();float p = (float)((double)memuse / (double)memcap);// SCLogNotice("%s: memuse %"PRIu64", memcap %"PRIu64" => %f%%",//    memcaps[i].name, memuse, memcap, (p * 100));percent = MAX(p, percent);}}return percent;
}

可以看到,内存压力值算法是当前已用内存量除以申请的内存量。压力值为 0.0 表示未使用已申请的内存,1.0 表示内存使用达到上限。

2.3.2 动态调整休眠间隔

GetWorkUnitSizing 函数会根据系统内存压力值,即 MemcapsGetPressure 函数计算结果动态计算每次处理的哈希表行数和休眠间隔。下面通过拆解 GetWorkUnitSizing 函数实现来理解其算法原理:

(1)如果当前处于紧急状态(emergency),则需要流管理线程全速处理。要求线程实例处理哈希表所有行(即该线程实例负责的哈希表所有行),线程休眠间隔固定为250ms。

if (emergency) {*wu_rows = rows;*wu_sleep = 250;return;
}

(2)将内存压力值从百分比转成0~100的整数。要求压力值最小为10。

/* minimum busy score is 10 */
const uint32_t emp = MAX(mp, 10);

(3)计算每秒处理的哈希表行数,取决于内存压力值百分比。

const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));

(4)假设每行需要消耗的时间为1微秒,那么每秒处理的哈希行数需要消耗多长时间(最大为1s)。

 /* calc how much time we estimate the work will take, in ms. We assume* each row takes an average of 1usec. Maxing out at 1sec. */
const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);

(5)计算处理完指定的哈希行数后,剩余的时间即为休眠间隔时间,最小为250ms。

/* calc how much time we need to sleep to get to the per second cadence* but sleeping for at least 250ms. */
const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,sleep_per_unit);

2.3.3 源码解析

在了解2.3.1 和 2.3.2 两个算法实现的基础上,我们再来通过拆解 FlowManager 函数实现来了解流管理模块的实现流程。

(1)获取当前内存压力值。

uint32_t mp = MemcapsGetPressure() * 100;

(2)获取当前线程休眠时间。

GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);

(3)判断当前线程是否被暂停,如果被暂停则会等待,直到取消暂停。

if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {TmThreadsSetFlag(th_v, THV_PAUSED);TmThreadTestThreadUnPaused(th_v);TmThreadsUnsetFlag(th_v, THV_PAUSED);
}

(4)检查紧急模式状态。

bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);

(5)获取当前时间戳。

 /* Get the time */
ts = TimeGet();
SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
uint64_t ts_ms = SCTIME_MSECS(ts);

(6)设置流进入紧急模式。其中 prev_emerg 表示上一个流是否处于紧急模式,若否,则说明当前流进入新的紧急模式阶段。

const bool emerge_p = (emerg && !prev_emerg);
if (emerge_p) {next_run_ms = 0;prev_emerg = true;SCLogNotice("Flow emergency mode entered...");StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
}

(7)执行流超时检查,next_run_ms = ts_ms + sleep_per_wu; 即期望下一次执行流检查时的时间戳。

if (ts_ms >= next_run_ms) {... ...
}

(8)若处于紧急模式,需处理该线程实例负责的哈希表中所有行的超时;否则仅处理部分行。FlowTimeoutHash 和 FlowTimeoutHashInChunks 函数用于检查哈希表中指定行数的超时流,若发现超时流,则将其从哈希表移除并添加到 d->aside_queue 队列。

if (emerg) {/* in emergency mode, do a full pass of the hash table */FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
} else {SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,rows_per_wu);const uint32_t ppos = pos;FlowTimeoutHashInChunks(&ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);if (ppos > pos) {StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);}
}

(9)计算当前空闲流的百分比,当空闲流百分比超过恢复阈值,则紧急模式计数器加一直到连续30次超过恢复阈值,退出紧急模式。

const uint32_t spare_pool_len = FlowSpareGetPoolSize();
StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);FlowCountersUpdate(th_v, ftd, &counters);if (emerg == true) {SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32"flow_spare_q status: %" PRIu32 "%% flows at the queue",spare_pool_len, flow_config.prealloc,spare_pool_len * 100 / MAX(flow_config.prealloc, 1));/* only if we have pruned this "emergency_recovery" percentage* of flows, we will unset the emergency bit */if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) >flow_config.emergency_recovery) {emerg_over_cnt++;} else {emerg_over_cnt = 0;}if (emerg_over_cnt >= 30) {SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);FlowTimeoutsReset();emerg = false;prev_emerg = false;emerg_over_cnt = 0;SCLogNotice("Flow emergency mode over, back to normal... unsetting"" FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", ""ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32"%% flows at the queue",(uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),spare_pool_len * 100 / MAX(flow_config.prealloc, 1));StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);}
}/* update work units */
const uint32_t pmp = mp;
mp = MemcapsGetPressure() * 100;
if (ftd->instance == 0) {StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
if (pmp != mp) {StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
}next_run_ms = ts_ms + sleep_per_wu;

(10)定期执行其它模块超时检查的逻辑,只有第一个流量管理实例(instance 0)执行这些操作,避免多线程重复处理。

if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {if (ftd->instance == 0) {DefragTimeoutHash(ts);HostTimeoutHash(ts);IPPairTimeoutHash(ts);HttpRangeContainersTimeoutHash(ts);other_last_sec = (uint32_t)SCTIME_SECS(ts);}
}

(11)如果处于紧急模式,现成休眠250ms,如果未处于紧急模式,计算休眠时间,通过带超时的条件变量对线程进行休眠操作。

if (emerg || !time_is_live) {usleep(250);
} else {struct timeval cond_tv;gettimeofday(&cond_tv, NULL);struct timeval add_tv;add_tv.tv_sec = 0;add_tv.tv_usec = (sleep_per_wu * 1000);timeradd(&cond_tv, &add_tv, &cond_tv);struct timespec cond_time = FROM_TIMEVAL(cond_tv);SCCtrlMutexLock(&flow_manager_ctrl_mutex);while (1) {int rc = SCCtrlCondTimedwait(&flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);if (rc == ETIMEDOUT || rc < 0)break;if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {break;}}SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
}

http://www.xdnf.cn/news/9991.html

相关文章:

  • DEEPSEEK帮写的STM32消息流函数,直接可用.已经测试
  • PostgreSQL主从同步双机集群创建与配置
  • 使用 Arthas 查看接口方法执行时间
  • 时间序列噪声模型分析软件推荐与使用经验
  • SQL(Database Modifications)
  • 【达梦】达梦数据库使用TypeHandler读取数据库时,将字段中的数据读取为数组
  • UIAbility组件基础
  • Cadence Allegro中设置主画面最小显示间距
  • 江科大UART串口通讯hal库实现
  • 【大模型/MCP】MCP简介
  • 哈希之旅:从使用到底层建设
  • CCPC shandong 2025 G
  • 【数据集】中国日尺度1 km全天候地表温度数据集(2000-2022)
  • 尚硅谷redis7 74-85 redis集群分片之集群是什么
  • 【区间dp】-----例题5【田忌赛马】(暂时只会贪心解法)
  • Chuanpai、Nihongo wa Muzukashii Desu、K-skip Permutation
  • 3340. 检查平衡字符串
  • 【2025文博会现场直击】多图预警
  • One Year~
  • WES(三)——变异检测
  • Pix4d航测软件正射影像生产流程(一)项目创建及快速空三
  • Baklib企业知识激活解决方案
  • MySQL 数据库中的主键、超键、候选键、外键是什么?
  • vue3 driverjs
  • 车载摄像头选型相关
  • 初识JAVA:Java异常种类
  • Blaster - Multiplayer P117-PXXX: 匹配状态
  • 项目使用富文本编辑器发送邮件,邮箱无法预览
  • Parasoft C++Test软件单元测试_常见问题及处理
  • MySQL 8.0中的mysql.ibd文件