通过notify和超时方式唤醒日志落盘线程读取日志写入磁盘。 多线程间使用mutex互斥保证线程安全。 日志写入磁盘时采用批量写入方式。
注意:队列不是每一行日志,而是buffer缓冲区(比如4M)。
日志写入过程(假设buffer为4M): (1)加锁,判断当前的buffer是否超过4M。 (2)如果没有超过4M,把日志写入buffer;如果超出4M则把当前的buffer插入到队列中。此时,当前日志写到一个新的buffer(循环复用的buffer)中。
日志notify问题: (1)写满1个buffer才发一次notify唤醒日志落盘。 (2)超时通过wait_timeout唤醒日志落盘线程,buffer只要有数据就写入到磁盘。
双缓冲机制中循环使用buffer,避免buffer不断分配。
void AsyncLogging::append(const char* logline, int len)
{
// if(cnt++ == 50000)abort();
MutexLockGuard lock(mutex_); // 多线程加锁
if (currentBuffer_->avail() > len) // 判断buffer还有没有空间写入这条日志
{
currentBuffer_->append(logline, len); // 直接写入
}
else
{
buffers_.push_back(std::move(currentBuffer_)); // buffers_是vector,把buffer入队列
// printf("push_back append_cnt:%d, size:%d\n", ++append_cnt, buffers_.size());
if (nextBuffer_) // 用了双缓存
{
currentBuffer_ = std::move(nextBuffer_); // 如果不为空则将buffer转移到currentBuffer_
}
else
{
// 重新分配buffer
currentBuffer_.reset(new Buffer); // Rarely happens如果后端写入线程没有及时读取数据,那要再分配buffer
}
currentBuffer_->append(logline, len); // buffer写满了
cond_.notify(); // 唤醒写入线程
}
}
void AsyncLogging::threadFunc()
{
assert(running_ == true);
latch_.countDown();
LogFile output(basename_, rollSize_, false);
BufferPtr newBuffer1(new Buffer); // 是给currentBuffer_
BufferPtr newBuffer2(new Buffer); // 是给nextBuffer_
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite; // 保存要写入的日志
buffersToWrite.reserve(16);
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
{ // 锁的作用域
MutexLockGuard lock(mutex_);
if (buffers_.empty()) // 没有数据可读取,休眠
{
// printf("waitForSeconds into\n");
cond_.waitForSeconds(flushInterval_); // 超时退出或者被唤醒(收到notify)
// printf("waitForSeconds leave\n");
}
buffers_.push_back(std::move(currentBuffer_)); // currentBuffer_被锁住 currentBuffer_被置空
// printf("push_back threadFunc:%d, size:%d\n", ++threadFunc_cnt, buffers_.size());
currentBuffer_ = std::move(newBuffer1); // currentBuffer_ 需要内存空间
buffersToWrite.swap(buffers_); // 用了双队列,把前端日志的队列所有buffer都转移到buffersToWrite队列
if (!nextBuffer_) // newBuffer2是给nextBuffer_
{
nextBuffer_ = std::move(newBuffer2); // 如果为空则使用newBuffer2的缓存空间
}
}
// 从这里是没有锁,数据落盘的时候不要加锁
assert(!buffersToWrite.empty());
// fixme的操作 4M一个buffer *25 = 100M
if (buffersToWrite.size() > 25) // 这里缓存的数据太多了,比如4M为一个buffer空间,25个buffer就是100M了。
{
printf("Dropped\n");
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2); // 只保留2个buffer
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end()); // 只保留2个buffer(默认4M)
}
for (const auto& buffer : buffersToWrite) // 遍历buffer
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffer->data(), buffer->length()); // 负责fwrite数据
}
output.flush(); // 保证数据落到磁盘了
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2); // 只保留2个buffer
}
if (!newBuffer1)
{
assert(!buffersToWrite.empty());
newBuffer1 = std::move(buffersToWrite.back()); // 复用buffer对象
buffersToWrite.pop_back();
newBuffer1->reset(); // 重置
}
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back()); // 复用buffer对象
buffersToWrite.pop_back();
newBuffer2->reset(); // 重置
}
buffersToWrite.clear();
}
output.flush();
}
流程图:
流程图:
for (const auto& buffer : buffersToWrite) // 遍历buffer
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffer->data(), buffer->length()); // 负责fwrite数据
}
output.flush(); // 保证数据落到磁盘了
#include "AsyncLogging.h"
#include <stdio.h>
#include <sys/resource.h>
#include <unistd.h>
#include <sys/time.h>
#include <iostream>
#include "Logging.h"
#define LOG_NUM 5000000 // 总共的写入日志行数
using namespace std;
off_t kRollSize= 1 * 1000 * 1000; // 只设置1M
static AsyncLogging *g_asyncLog = NULL;
static void asyncOutput(const char *msg, int len)
{
g_asyncLog->append(msg, len);
}
// 时间戳
static uint64_t get_tick_count()
{
struct timeval tval;
uint64_t ret_tick;
gettimeofday(&tval, NULL);
ret_tick = tval.tv_sec * 1000L + tval.tv_usec / 1000L;
return ret_tick;
}
int main(int argc,char*argv[])
{
printf("PID = %d\n",getpid());
char name[260] = { 0 };
strncpy(name, argv[0], sizeof name - 1);
// 设置 回滚大小kRollSize(1M), 最大1秒刷一次盘(flush)
AsyncLogging log(::basename(name), kRollSize, 1);
Logger::setOutput(asyncOutput);
g_asyncLog = &log;
// 启动日志写入线程
log.start();
uint64_t begin_time = get_tick_count();
cout << "name: " << basename(name) << "\nbegin time: " << begin_time << endl;
for (int i = 0; i < LOG_NUM; i++)
{
LOG_INFO << "NO." << i << " Root Error Message!"; // 47个字节
}
log.stop();
uint64_t end_time = get_tick_count();
std::cout << "end_time: " << end_time << std::endl;
int64_t ops = LOG_NUM;
ops = ops * 1000 / (end_time - begin_time);
std::cout << "need the time1: " << end_time << " " << begin_time << ", " << end_time - begin_time << "毫秒"
<< ", ops = " << ops << "ops/s\n";
return 0;
}
(1)日志可以采用批量写入(以数据大小为判断为准)来做到高性能。 同步方式通过攒够数据(比如4M)或者时间超过一定阈值(比如1秒)触发写入。比如glog日志库。 异步方式(比如moduo日志库)采用append积攒数据,异步落盘线程负责数据写入磁盘。
什么时候触发? notify+wait_timeout,即 通知唤醒+超时唤醒。
(2)为减少锁的粒度,减少刷新磁盘的时候日志接口阻塞,采用双队列方式(前台队列+后台刷新磁盘队列,后台队列刷新数据到磁盘)。
(3)内存分配通过move语义避免深拷贝。 (4)log4cpp的日志框架值得参考,但是它的性能不佳,要自己做完善、扩展。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。