当前位置: 首页 > web >正文

【sylar-webserver】重构日志系统

文章目录

  • 主要工作
  • 流程图
  • FiberCondition
  • Buffer
  • BufferManager
  • LogEvent 序列化 & 反序列化
  • Logger
  • RotatingFileLogAppender

主要工作

  1. 实现, LogEvent 序列化和反序列化 (使用序列化是为了更标准,如果转成最终的日志格式再存储(确实会快点,但是对于缓冲区的利用就不够了)
  2. 优化,使用 LoggerBuild 建造者 实现和管理日志器;
  3. 双缓冲区设计,使用条件变量等同步技术实现日志异步处理器。支持定时检查缓冲区 和 生产者缓冲区 唤醒;
  4. 使用 WorkerManager 管理多个调度器;
  5. 增加 循环日志写入,按时间分片的 LoggerAppender
  6. 支持上传下载和展示功能,支持备份重要日志; (待实现网络库后实现)
  7. 性能测试,2h4g 服务器下,异步日志器每秒输出 130MB 日志器。

流程图

在这里插入图片描述

FiberCondition

仿照 std::condition_variable

存在的问题,由于是 notify_one 是把 fiber 重新添加调度,并且调度策略是先来先服务。
所以,存在日志时序性错误问题。(解决方法,后期修改调度策略,增加优先级。)⭐

class FiberCondition{
public:using MutexType = Spinlock;void wait(MutexType::Lock& lock);template <typename Predicate>void wait(MutexType::Lock& lock, Predicate pred){while(!pred()){wait(lock);}}void notify_one();void notify_all();private:void printWaiters() const;private:MutexType m_mutex;std::list<std::pair<Scheduler*, Fiber::ptr>> m_waiters;
};
void FiberCondition::wait(MutexType::Lock& lock){SYLAR_ASSERT(Scheduler::GetThis());{MutexType::Lock lock(m_mutex);m_waiters.push_back(std::make_pair(Scheduler::GetThis(), Fiber::GetThis()));printWaiters();}lock.unlock();Fiber::GetThis()->yield();lock.lock();
}void FiberCondition::notify_one(){MutexType::Lock lock(m_mutex);if (!m_waiters.empty()) {auto next = m_waiters.front();m_waiters.pop_front();next.first->schedule(next.second);}
}void FiberCondition::notify_all() {MutexType::Lock lock(m_mutex);for (auto& waiter : m_waiters) {waiter.first->schedule(waiter.second);}m_waiters.clear();
}

Buffer

class Buffer {public:using ptr = std::shared_ptr<Buffer>;using MutexType = Spinlock;Buffer(size_t buffer_size);Buffer(size_t buffer_size, size_t threshold, size_t linear_growth);void push(const char* data, size_t len);void push(const std::string& str);char* readBegin(int len);bool isEmpty();void swap(Buffer& buf);size_t writeableSize();size_t readableSize() const;const char* Begin() const;void moveWritePos(int len);void moveReadPos(int len);void Reset();protected:void ToBeEnough(size_t len);private:MutexType m_mutex;size_t m_buffer_size;size_t m_threshold;size_t m_linear_growth;std::vector<char> m_buffer;size_t m_write_pos = 0;size_t m_read_pos = 0;
};

BufferManager

重点操作:

BufferManager::BufferManager(const functor& cb, 			// 消费者缓冲区写入的回调函数 ⭐AsyncType::Type asyncType,size_t buffer_size,size_t threshold,size_t linear_growth,size_t swap_time,IOManager* iom):   m_stop(false),m_swap_status(false),m_asyncType(asyncType),m_buffer_productor(std::make_shared<Buffer>(buffer_size, threshold, linear_growth)),m_buffer_consumer(std::make_shared<Buffer>(buffer_size, threshold, linear_growth)),m_callback(cb),m_swap_time(swap_time)
{assert(iom != nullptr);iom->schedule(std::bind(&BufferManager::ThreadEntry, this));m_timer = iom->addTimer(m_swap_time, std::bind(&BufferManager::TimerThreadEntry, this), true);
}// 写入线程,生产者
void BufferManager::push(const char* data, size_t len) {MutexType::Lock lock(m_mutex);if(m_asyncType == AsyncType::ASYNC_SAFE){if (len > m_buffer_productor->writeableSize()) {SYLAR_LOG_DEBUG(g_logger) << "notify consumer";m_cond_consumer.notify_one();}m_cond_producer.wait(lock, [&](){return (m_stop || (len <= m_buffer_productor->writeableSize()));});}if(m_stop){throw std::runtime_error("BufferManager is stopped");}m_buffer_productor->push(data, len);
}// 使用Timer,按照频率访问缓冲区
// 如果生产者没有就退出
void BufferManager::TimerThreadEntry(){{MutexType::Lock lock(m_mutex);if ((!m_buffer_productor->isEmpty() && m_buffer_consumer->isEmpty()) || m_stop) {swap_buffers();if(m_asyncType == AsyncType::ASYNC_SAFE){m_cond_producer.notify_all();}}else{return;}}{MutexType::Lock lock(m_swap_mutex);m_callback(m_buffer_consumer);m_buffer_consumer->Reset();}
}void BufferManager::ThreadEntry() {while(true){{MutexType::Lock lock(m_mutex);SYLAR_LOG_DEBUG(g_logger) << "ThreadEntry started.";m_cond_consumer.wait(lock, [&](){return m_stop || (!m_buffer_productor->isEmpty() && m_buffer_consumer->isEmpty());});swap_buffers();if(m_asyncType == AsyncType::ASYNC_SAFE){m_cond_consumer.notify_all();}}{MutexType::Lock lock(m_swap_mutex);m_callback(m_buffer_consumer);m_buffer_consumer->Reset();if(m_stop && m_buffer_productor->isEmpty()) return;}}
}

LogEvent 序列化 & 反序列化

#pragma pack(push, 1)
struct LogMeta {uint64_t timestamp;    // 时间戳uint32_t threadId;     // 线程IDuint32_t fiberId;      // 协程IDint32_t line;          // 行号uint32_t elapse;LogLevel::Level level; // 日志级别uint16_t fileLen;      // 文件名长度uint32_t threadNameLen;// 线程名长度uint32_t msgLen;       // 消息内容长度
};
#pragma pack(pop)
		Buffer::ptr LogEvent::serialize() const {LogMeta meta{ // ⭐.timestamp = m_time,.threadId = m_threadId,.fiberId = m_fiberId,.line = m_line,.elapse = m_elapse,.level = m_level,.fileLen = static_cast<uint16_t>(m_file.size()),.threadNameLen = static_cast<uint16_t>(m_threadName.size()),.msgLen = static_cast<uint32_t>(m_ss.str().size())};const size_t total_need = sizeof(LogMeta) + meta.fileLen + meta.threadNameLen + meta.msgLen;auto buffer = std::make_shared<Buffer>(total_need); // 使用 shared_ptr 管理内存// 序列化元数据buffer->push(reinterpret_cast<const char*>(&meta), sizeof(meta));// 序列化变长数据(包含终止符)buffer->push(m_file.c_str(), meta.fileLen);buffer->push(m_threadName.c_str(), meta.threadNameLen);buffer->push(m_ss.str().c_str(), meta.msgLen);return buffer; // 返回 shared_ptr}// 每次调用,解析一个LogEventstatic LogEvent::ptr LogEvent::deserialize(Buffer& buffer) {if(buffer.readableSize() < sizeof(LogMeta)) {return nullptr;}LogMeta meta;memcpy(&meta, buffer.Begin(), sizeof(LogMeta));const size_t total_need = sizeof(LogMeta) + meta.fileLen + meta.threadNameLen + meta.msgLen;if(buffer.readableSize() < total_need){return nullptr;}// 4. 提取各字段数据(使用临时指针操作)const char* data_ptr = buffer.Begin() + sizeof(LogMeta);// 文件名处理std::string file(data_ptr, meta.fileLen);data_ptr += meta.fileLen;// 线程名std::string thread_name(data_ptr, meta.threadNameLen);data_ptr += meta.threadNameLen;// 消息内容处理std::string message(data_ptr, meta.msgLen);// 5. 统一移动读指针(原子操作保证数据一致性)buffer.moveReadPos(total_need);// 6. 构建日志事件对象auto event = std::make_shared<LogEvent>(std::move(file),meta.line,meta.elapse,meta.threadId,std::move(thread_name),meta.fiberId,meta.timestamp,meta.level);event->getSS() << message;return event;}

Logger

重构时,出现的问题:Logger 对 BufferManager 的依赖,并且 BufferManger 也依赖 IOMgr调度器。
简单说,就说 全局静态变量的初始化顺序问题
解决方法:Logger默认构造的时候,不提供BufferParams,就使用同步方式创建。
导入 yaml 配置后,重置 logger,再创建 异步日志器

		Logger(const std::string name, LogLevel::Level level,std::vector<LogAppender::ptr>& appenders, const BufferParams& bufParams)   :m_name(name), m_level(level), m_appenders(appenders.begin(), appenders.end()){if(bufParams.isValid()){m_bufMgr = std::make_shared<BufferManager>(std::bind(&Logger::realLog, this, std::placeholders::_1), bufParams);}else{m_bufMgr = nullptr;}}// 由 iom_log 写入真正的文件。void realLog(Buffer::ptr buffer) {MutexType::Lock lock(m_log_mutex);      // 强制 只能 一个线程写入。if (!buffer) {std::cerr << "realLog: invalid buffer pointer" << std::endl;return;}std::vector<LogEvent::ptr> events;while (true) {  // 解析 bufferLogEvent::ptr event = LogEvent::deserialize(*buffer);// 理论上 buffer 里是多个Event的数据,不存在处理失败。if (event) {events.push_back(event);} else {if (buffer->readableSize() == 0) { // 读完了break;} else {// 处理失败但数据未读完(说明发生严重错误)std::cout << "Log deserialization error, remaining data: " << buffer->readableSize() << std::endl;break;}}}auto self = shared_from_this();for (auto& appender : m_appenders) {appender->log(self, events);}}// 写入缓冲区// 多个线程的 写日志,写入缓存区void log(LogEvent::ptr event){if(event->getLevel() >= m_level){if(m_bufMgr != nullptr){// MutexType::Lock lock(m_mutex);   当协程阻塞,这个锁就一直没释放。搞半天,给我整的怀疑人生了。Buffer::ptr buf = event->serialize();m_bufMgr->push(buf);}else{// 如果没有配置iom,直接同步输出日志auto self = shared_from_this();for(auto& appender : m_appenders) {appender->log(self, event);}}}}

RotatingFileLogAppender

FileLogAppeneder 改为使用 FILE 库函数

支持功能:

  1. max_size,限制单个日志文件大小(按照时间片创建文件名)
  2. m_maxFile = 0,无限增加日志
  3. m_maxFile > 0,限制日志文件的个数。当超过,从第一个文件循环写入。
class RotatingFileLogAppender : public LogAppender{
public:typedef std::shared_ptr<RotatingFileLogAppender> ptr;RotatingFileLogAppender(const std::string& filename, LogLevel::Level level, LogFormatter::ptr formatter,size_t max_size,size_t max_file = 0,  // 默认是无限增加FlushRule::Rule flush_rule = FlushRule::Rule::FFLUSH  // 默认是普通日志 );~RotatingFileLogAppender(){if(m_curFile){fclose(m_curFile);m_curFile = NULL;}}std::string toYamlString();void log(std::shared_ptr<Logger> logger, LogEvent::ptr event) override;void log(std::shared_ptr<Logger> logger, std::vector<LogEvent::ptr> events) override;     private:void initLogFile(size_t len = 0);/*** 判断是否写的下,如果写的下就 ss<<str,缓存* 如果写不写了,就把 ss 缓存一次性写入。重置ss */bool checkLogFile(const std::string& str);std::string createFilename();
private:std::string m_filename;FILE* m_curFile = NULL;std::vector<std::string> m_fileNames;size_t m_maxSize;size_t m_maxFile;FlushRule::Rule m_flushRule;size_t m_curFilePos = 0;size_t m_curFileIndex = 0;Buffer m_buffer; 
};
void RotatingFileLogAppender::initLogFile(size_t len){if(m_curFile == NULL || (m_curFilePos + len) > m_maxSize){// 写不下了,保证日志的完整性,直接新建文件。if(m_curFile != NULL){fflush(m_curFile);fclose(m_curFile);if(m_maxFile == 0){// 无限增加日志文件m_curFileIndex++;}else{m_curFileIndex = (m_curFileIndex + 1) % m_maxFile;if(!m_fileNames[m_curFileIndex].empty()){    // 说明 循环到 已有的文件了。std::string newfilename = createFilename();if(rename(m_fileNames[m_curFileIndex].c_str(), newfilename.c_str()) != 0){  // 文件 改新名字perror("rename failed");}   m_fileNames[m_curFileIndex] = newfilename;m_curFile = fopen(newfilename.c_str(), "r+b");fseek(m_curFile, 0, SEEK_SET);   // 从头 覆盖,不考虑 日志文件名了。默认最后一个文件可能会存在过往的日志信息。m_curFilePos = 0;return;}}}std::string filename = createFilename();m_fileNames[m_curFileIndex] = filename;m_curFile = fopen(filename.c_str(), "ab");if(m_curFile==NULL){std::cout <<__FILE__<<__LINE__<<"open file failed"<< std::endl;perror(NULL);}m_curFilePos = 0;return;}
}std::string RotatingFileLogAppender::createFilename() {time_t now = time(nullptr);struct tm tm;localtime_r(&now, &tm);char time_buf[64];strftime(time_buf, sizeof(time_buf), "%Y%m%d_%H%M%S", &tm);return m_filename + "_" + time_buf + "_" + std::to_string(m_curFileIndex) + ".log";
}void RotatingFileLogAppender::log(std::shared_ptr<Logger> logger, LogEvent::ptr event){MutexType::Lock lock(m_mutex);if(event->getLevel() >= m_level){std::string data = m_formatter->format(logger , event);initLogFile(data.size());fwrite(data.c_str(), 1, data.size() , m_curFile);if(ferror(m_curFile)){std::cout <<__FILE__<<__LINE__<<"write log file failed"<< std::endl;perror(NULL);}m_curFilePos += data.size();if(m_flushRule == FlushRule::Rule::FFLUSH){if(fflush(m_curFile)==EOF){  // 刚好最后一个日志把文件写满了。std::cout <<__FILE__<<__LINE__<<"fflush file failed"<< std::endl;perror(NULL);}}else if(m_flushRule == FlushRule::Rule::FSYNC){fflush(m_curFile);fsync(fileno(m_curFile));}}
}bool RotatingFileLogAppender::checkLogFile(const std::string& data){if(m_curFile == NULL || (m_curFilePos + data.size()) > m_maxSize){// 写不下了,保证日志的完整性,直接新建文件。if(m_curFile != NULL){// 把 ss 缓存一次性写入fwrite(m_buffer.Begin(), 1, m_buffer.readableSize(), m_curFile);m_buffer.Reset();// 判断错误信息if(ferror(m_curFile)){std::cout <<__FILE__<<__LINE__<<"write log file failed"<< std::endl;perror(NULL);}if(m_flushRule == FlushRule::Rule::FFLUSH){if(fflush(m_curFile)==EOF){  // 刚好最后一个日志把文件写满了。std::cout <<__FILE__<<__LINE__<<"fflush file failed"<< std::endl;perror(NULL);}}else if(m_flushRule == FlushRule::Rule::FSYNC){fflush(m_curFile);fsync(fileno(m_curFile));}fclose(m_curFile);if(m_maxFile == 0){// 无限增加日志文件m_curFileIndex++;}else{m_curFileIndex = (m_curFileIndex + 1) % m_maxFile;if(!m_fileNames[m_curFileIndex].empty()){    // 说明 循环到 已有的文件了。std::string newfilename = createFilename();if(rename(m_fileNames[m_curFileIndex].c_str(), newfilename.c_str()) != 0){  // 文件 改新名字std::cout << "rename failed" << std::endl;perror(NULL);}m_fileNames[m_curFileIndex] = newfilename;m_curFile = fopen(newfilename.c_str(), "r+b");if (m_curFile == NULL) {std::cout << __FILE__ << __LINE__ << "open file failed" << std::endl;perror(NULL);}// 从头 覆盖,不考虑 日志文件名了。默认最后一个文件可能会存在过往的日志信息。fseek(m_curFile, 0, SEEK_SET); m_curFilePos = 0;// 模拟写入文件,实际上写入缓存。m_buffer.push(data);m_curFilePos += data.size();return true;}}}// m_curFile为空,创建新文件 std::string filename = createFilename();if(m_maxFile > 0){m_fileNames[m_curFileIndex] = filename;    // 只有限制最大文件数,记录文件名}m_curFile = fopen(filename.c_str(), "ab");if(m_curFile==NULL){std::cout <<__FILE__<<__LINE__<<"open file failed"<< std::endl;perror(NULL);}m_curFilePos = 0;}// 模拟写入文件,实际上写入缓存。m_buffer.push(data);m_curFilePos += data.size();return false;
}void RotatingFileLogAppender::log(std::shared_ptr<Logger> logger, std::vector<LogEvent::ptr> events){// 这个时候,m_curFilePos 转变成对 m_buffer 写入数据的 pos 长度。 ⭐MutexType::Lock lock(m_mutex);for(auto& event : events){if(event->getLevel() >= m_level){std::string data = m_formatter->format(logger , event);checkLogFile(data);}}// 最后再次,把缓存里的写入if(m_buffer.readableSize() > 0 && m_curFile != NULL){fwrite(m_buffer.Begin(), 1, m_buffer.readableSize(), m_curFile);m_buffer.Reset();// 判断错误信息if(ferror(m_curFile)){std::cout <<__FILE__<<__LINE__<<"write log file failed"<< std::endl;perror(NULL);}if(m_flushRule == FlushRule::Rule::FFLUSH){if(fflush(m_curFile)==EOF){  // 刚好最后一个日志把文件写满了。std::cout <<__FILE__<<__LINE__<<"fflush file failed"<< std::endl;perror(NULL);}}else if(m_flushRule == FlushRule::Rule::FSYNC){fflush(m_curFile);fsync(fileno(m_curFile));}}
}     

剩下的就是:
works,多个调度器的的管理~
对Config监听函数的修改,补充BufferParams
分离works.yml和log.yml,分别导入。提前导入works.yml 保证 调度器创建完成。
详见 代码 https://github.com/star-cs/webserver

http://www.xdnf.cn/news/876.html

相关文章:

  • 数据仓库 vs 数据湖:架构、应用场景与技术差异全解析
  • 13.QT-DateTime Edit|Dial|Slider|日期计算器|调整窗口透明度|调整窗口大小|自定义快捷键(C++)
  • 中通 Redis 集群从 VM 迁移至 PVE:技术差异、PVE 优劣势及应用场景深度解析
  • 深入理解HotSpot JVM 基本原理
  • C++学习之游戏服务器开发十一DOCKER的基本使用
  • 【数学建模】随机森林算法详解:原理、优缺点及应用
  • 【Python 02 】数值类型、字符串、格式化输出
  • Java反射全解(八股)
  • 文档处理控件Aspose.Words 教程:在 Word 中删除空白页完整指南
  • 2025年二级造价工程师备考要点分析
  • spark和hadoop的区别
  • 【C++游戏引擎开发】第19篇:Compute Shader实现Tile划分
  • 计组1.2.2——各个硬件的工作原理
  • 硬件工程师面试常见问题(4)
  • 操作系统期中复习
  • 车载软件架构 --- 二级boot设计说明需求规范
  • 序列号绑定的SD卡坏了怎么办?
  • AI驱动下的企业学习:人力资源视角下的范式重构与价值觉醒
  • Materials Studio(二)——无机分子建模
  • 当try遇见catch:前端异常捕获的边界与突围
  • ADB -> pull指令推送电脑文件到手机上
  • 24. git revert
  • [渗透测试]渗透测试靶场docker搭建 — —全集
  • 【Linux】轻量级命令解释器minishell
  • 计算机组成原理笔记(十九)——4.4定点乘法运算
  • CentOS 7进入救援模式——VirtualBox虚拟机
  • 深入解析Vue3响应式系统:从Proxy实现到依赖收集的核心原理
  • Kubernetes 创建 Jenkins 实现 CICD 配置指南
  • 目标检测中的损失函数(二) | BIoU RIoU α-IoU
  • k8s之 kube-prometheus监控