前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >掌握异步日志:解锁日志系统的效率和性能

掌握异步日志:解锁日志系统的效率和性能

原创
作者头像
Lion Long
发布2024-11-01 22:04:50
850
发布2024-11-01 22:04:50
举报
文章被收录于专栏:后端开发技术

一、异步日志机制

图片
图片

通过notify和超时方式唤醒日志落盘线程读取日志写入磁盘。 多线程间使用mutex互斥保证线程安全。 日志写入磁盘时采用批量写入方式。

注意:队列不是每一行日志,而是buffer缓冲区(比如4M)。

二、双缓存机制

日志写入过程(假设buffer为4M): (1)加锁,判断当前的buffer是否超过4M。 (2)如果没有超过4M,把日志写入buffer;如果超出4M则把当前的buffer插入到队列中。此时,当前日志写到一个新的buffer(循环复用的buffer)中。

图片
图片

日志notify问题: (1)写满1个buffer才发一次notify唤醒日志落盘。 (2)超时通过wait_timeout唤醒日志落盘线程,buffer只要有数据就写入到磁盘。

双缓冲机制中循环使用buffer,避免buffer不断分配。

代码语言:javascript
复制
void AsyncLogging::append(const char* logline, int len)
{
  // if(cnt++ == 50000)abort();
  MutexLockGuard lock(mutex_);    // 多线程加锁
  if (currentBuffer_->avail() > len)    // 判断buffer还有没有空间写入这条日志
  {
    currentBuffer_->append(logline, len); // 直接写入
  }
  else
  {
    buffers_.push_back(std::move(currentBuffer_));    // buffers_是vector,把buffer入队列
    // printf("push_back append_cnt:%d, size:%d\n", ++append_cnt, buffers_.size());
    if (nextBuffer_)  // 用了双缓存
    {
      currentBuffer_ = std::move(nextBuffer_);    // 如果不为空则将buffer转移到currentBuffer_
    }
    else
    {
      // 重新分配buffer
      currentBuffer_.reset(new Buffer); // Rarely happens如果后端写入线程没有及时读取数据,那要再分配buffer
    }
    currentBuffer_->append(logline, len);   // buffer写满了
    cond_.notify(); // 唤醒写入线程
  }
}

void AsyncLogging::threadFunc()
{
  assert(running_ == true);
  latch_.countDown();
  LogFile output(basename_, rollSize_, false);
  BufferPtr newBuffer1(new Buffer); // 是给currentBuffer_
  BufferPtr newBuffer2(new Buffer); // 是给nextBuffer_
  newBuffer1->bzero();
  newBuffer2->bzero();
  BufferVector buffersToWrite;    // 保存要写入的日志
  buffersToWrite.reserve(16);
  while (running_)
  {
    assert(newBuffer1 && newBuffer1->length() == 0);
    assert(newBuffer2 && newBuffer2->length() == 0);
    assert(buffersToWrite.empty());

    { // 锁的作用域
      MutexLockGuard lock(mutex_);
      if (buffers_.empty())  // 没有数据可读取,休眠
      {
        // printf("waitForSeconds into\n");
        cond_.waitForSeconds(flushInterval_);   // 超时退出或者被唤醒(收到notify)
        // printf("waitForSeconds leave\n");
      }
      buffers_.push_back(std::move(currentBuffer_));  // currentBuffer_被锁住  currentBuffer_被置空
      // printf("push_back threadFunc:%d, size:%d\n", ++threadFunc_cnt, buffers_.size());
      currentBuffer_ = std::move(newBuffer1); // currentBuffer_ 需要内存空间
      buffersToWrite.swap(buffers_);          // 用了双队列,把前端日志的队列所有buffer都转移到buffersToWrite队列
      if (!nextBuffer_)     // newBuffer2是给nextBuffer_
      {
        nextBuffer_ = std::move(newBuffer2);  // 如果为空则使用newBuffer2的缓存空间
      }
    }
    // 从这里是没有锁,数据落盘的时候不要加锁
    assert(!buffersToWrite.empty());
    // fixme的操作 4M一个buffer *25 = 100M
    if (buffersToWrite.size() > 25)  // 这里缓存的数据太多了,比如4M为一个buffer空间,25个buffer就是100M了。
    {
      printf("Dropped\n");
      char buf[256];
      snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
               Timestamp::now().toFormattedString().c_str(),
               buffersToWrite.size()-2);    // 只保留2个buffer
      fputs(buf, stderr);
      output.append(buf, static_cast<int>(strlen(buf)));
      buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());   // 只保留2个buffer(默认4M)
    }

    for (const auto& buffer : buffersToWrite)  // 遍历buffer
    {
      // FIXME: use unbuffered stdio FILE ? or use ::writev ?
      output.append(buffer->data(), buffer->length());    // 负责fwrite数据
    }
    output.flush();   // 保证数据落到磁盘了
    if (buffersToWrite.size() > 2)
    {
      // drop non-bzero-ed buffers, avoid trashing
      buffersToWrite.resize(2);   // 只保留2个buffer
    }

    if (!newBuffer1)
    {
      assert(!buffersToWrite.empty());
      newBuffer1 = std::move(buffersToWrite.back());    // 复用buffer对象
      buffersToWrite.pop_back();
      newBuffer1->reset();    // 重置
    }

    if (!newBuffer2)
    {
      assert(!buffersToWrite.empty());
      newBuffer2 = std::move(buffersToWrite.back());   // 复用buffer对象
      buffersToWrite.pop_back();
      newBuffer2->reset();   // 重置
    }

    buffersToWrite.clear(); 
    
  }
  output.flush();
}

三、前台日志写入栈

流程图:

图片
图片

四、后台日志(落盘)写入栈

流程图:

图片
图片
代码语言:javascript
复制
for (const auto& buffer : buffersToWrite)  // 遍历buffer
{
  // FIXME: use unbuffered stdio FILE ? or use ::writev ?
  output.append(buffer->data(), buffer->length());    // 负责fwrite数据
}
output.flush();   // 保证数据落到磁盘了

五、使用示例

代码语言:javascript
复制
#include "AsyncLogging.h"

#include <stdio.h>
#include <sys/resource.h>
#include <unistd.h>
#include <sys/time.h>
#include <iostream>
#include "Logging.h"
#define LOG_NUM 5000000 // 总共的写入日志行数

using namespace std;

off_t kRollSize= 1 * 1000 * 1000;    // 只设置1M

static AsyncLogging *g_asyncLog = NULL;
static void asyncOutput(const char *msg, int len)
{
	g_asyncLog->append(msg, len);
}

// 时间戳
static uint64_t get_tick_count()
{
	struct timeval tval;
	uint64_t ret_tick;

	gettimeofday(&tval, NULL);
	ret_tick = tval.tv_sec * 1000L + tval.tv_usec / 1000L;

	return ret_tick;
}

int main(int argc,char*argv[])
{
	printf("PID = %d\n",getpid());

	char name[260] = { 0 };
	strncpy(name, argv[0], sizeof name - 1);

	// 设置 回滚大小kRollSize(1M), 最大1秒刷一次盘(flush)
	AsyncLogging log(::basename(name), kRollSize, 1);
	Logger::setOutput(asyncOutput);

	g_asyncLog = &log;
	// 启动日志写入线程
	log.start();

	uint64_t begin_time = get_tick_count();
	cout << "name: " << basename(name) << "\nbegin time: " << begin_time << endl;
	for (int i = 0; i < LOG_NUM; i++)
	{
		LOG_INFO << "NO." << i << " Root Error Message!"; // 47个字节
	}

	log.stop();
	uint64_t end_time = get_tick_count();
	std::cout << "end_time: " << end_time << std::endl;
	int64_t ops = LOG_NUM;
	ops = ops * 1000 / (end_time - begin_time);
	std::cout << "need the time1: " << end_time << " " << begin_time << ", " << end_time - begin_time << "毫秒"
		<< ", ops = " << ops << "ops/s\n";
	return 0;
}

六、总结

(1)日志可以采用批量写入(以数据大小为判断为准)来做到高性能。 同步方式通过攒够数据(比如4M)或者时间超过一定阈值(比如1秒)触发写入。比如glog日志库。 异步方式(比如moduo日志库)采用append积攒数据,异步落盘线程负责数据写入磁盘。

什么时候触发?  notify+wait_timeout,即 通知唤醒+超时唤醒。

(2)为减少锁的粒度,减少刷新磁盘的时候日志接口阻塞,采用双队列方式(前台队列+后台刷新磁盘队列,后台队列刷新数据到磁盘)。

(3)内存分配通过move语义避免深拷贝。 (4)log4cpp的日志框架值得参考,但是它的性能不佳,要自己做完善、扩展。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、异步日志机制
  • 二、双缓存机制
  • 三、前台日志写入栈
  • 四、后台日志(落盘)写入栈
  • 五、使用示例
  • 六、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档