前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >muduo网络编程库阅读

muduo网络编程库阅读

作者头像
平凡的学生族
发布2021-01-07 17:46:45
5530
发布2021-01-07 17:46:45
举报
文章被收录于专栏:后端技术

正文

遵循Reactor模型,封装了EventLoop。有以下特点:

  1. 利用MutexLockGuard->MutexLock->pthread_mutex_t控制EventLoop::pendingFunctors_的线程安全。
  2. 回调函数, std::function和std::bind的大量应用。用于回调读/写/错误/关闭等事件。
  3. 高度抽象。用TcpConnection封装Channel,后者封装了socket fd。用EventLoop封装Reactor线程。用TcpServer封装Acceptor线程和Event线程池。
  4. 异步函数队列。利用EventLoop::pendingFunctors_的异步函数队列机制,让Reactor线程能向ioLoop传递回调函数的方式,让ioLoop注册新的socket。
  5. 利用wakupFd唤醒线程。EventLoop在构造函数中就注册监听了wakupFd的读事件。Reactor线程通过向wakeupFd_写8字节的方式,触发ioLoop监听的wakeup_fd,从epoll_wait唤醒。

正文

遵从Reactor模型,简单阅读了下。 来到目录examples/ace/loggin/server.cc server.cc main->LogServer->EventLoop和TcpServer

image.png

当新连接到来时,会调用TcpServer::newConnection

代码语言:javascript
复制
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  ...
  EventLoop* ioLoop = threadPool_->getNextLoop();
  ...
  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
  ...
  ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

Reactor线程调用ioLoop->runInLoop,传入回调函数TcpConnection::connectEstablished

在此尝试调用回调函数。

  • 如果调用者就是ioLoop本身,就直接调用cb
  • 如果不是,则放入异步队列,等待ioLoop后续调用
代码语言:javascript
复制
// EventLoop.cc
void EventLoop::runInLoop(Functor cb)
{
  if (isInLoopThread())
  {
    cb();
  }
  else
  {
    queueInLoop(std::move(cb));
  }
}

回调函数的异步调用

先锁住pendingFunctors_, 传入回调函数cb, 然后如果不是loop线程,要将其唤醒

代码语言:javascript
复制
void EventLoop::queueInLoop(Functor cb)
{
  {
  MutexLockGuard lock(mutex_);
  pendingFunctors_.push_back(std::move(cb));
  }

  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();
  }
}

利用事件机制唤醒线程

函数向wakupFd写了8个字节,触发其写事件,从而唤醒ioLoop线程。这里利用了epoll的事件机制

代码语言:javascript
复制
void EventLoop::wakeup()
{
  uint64_t one = 1;
  ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}

回调函数调用

当回调函数被调用后,一路

代码语言:javascript
复制
void TcpConnection::connectEstablished()
{
  ...
  channel_->enableReading();

  connectionCallback_(shared_from_this());
}
代码语言:javascript
复制
// Channel.h
void enableReading() { events_ |= kReadEvent; update(); }
代码语言:javascript
复制
// Channel.cc
void Channel::update()
{
  addedToLoop_ = true;
  loop_->updateChannel(this);
}
代码语言:javascript
复制
void EventLoop::updateChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  poller_->updateChannel(channel);
}

channel->index()标志了该Channel的状态,根据当前状态决定执行注册/注销/修改事件

代码语言:javascript
复制
void EPollPoller::updateChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  const int index = channel->index();
  LOG_TRACE << "fd = " << channel->fd()
    << " events = " << channel->events() << " index = " << index;
  if (index == kNew || index == kDeleted)
  {
    // a new one, add with EPOLL_CTL_ADD
    int fd = channel->fd();
    if (index == kNew)
    {
      assert(channels_.find(fd) == channels_.end());
      channels_[fd] = channel;
    }
    else // index == kDeleted
    {
      assert(channels_.find(fd) != channels_.end());
      assert(channels_[fd] == channel);
    }

    channel->set_index(kAdded);
    update(EPOLL_CTL_ADD, channel);
  }
  else
  {
    // update existing one with EPOLL_CTL_MOD/DEL
    int fd = channel->fd();
    (void)fd;
    assert(channels_.find(fd) != channels_.end());
    assert(channels_[fd] == channel);
    assert(index == kAdded);
    if (channel->isNoneEvent())
    {
      update(EPOLL_CTL_DEL, channel);
      channel->set_index(kDeleted);
    }
    else
    {
      update(EPOLL_CTL_MOD, channel);
    }
  }
}
代码语言:javascript
复制
void EPollPoller::update(int operation, Channel* channel)
{
  struct epoll_event event;
  memZero(&event, sizeof event);
  event.events = channel->events();
  event.data.ptr = channel;
  int fd = channel->fd();
  LOG_TRACE << "epoll_ctl op = " << operationToString(operation)
    << " fd = " << fd << " event = { " << channel->eventsToString() << " }";
  if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)  // 为epollfd_注册事件
  {
    ...
  }
}

EventLoop主循环

代码语言:javascript
复制
void EventLoop::loop()
{
  assert(!looping_);
  assertInLoopThread();
  looping_ = true;
  quit_ = false;  // FIXME: what if someone calls quit() before loop() ?
  LOG_TRACE << "EventLoop " << this << " start looping";

  while (!quit_)
  {
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);  // 获取活跃的Channel
    ++iteration_;
    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    // TODO sort channel by priority
    eventHandling_ = true;
    for (Channel* channel : activeChannels_)
    {
      currentActiveChannel_ = channel;
      currentActiveChannel_->handleEvent(pollReturnTime_);  // 逐个执行Channel对应的事件
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
    doPendingFunctors();  // 处理异步队列,Reactor收到的socket在此被注册到epoll_fd
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

EventLoop在构造函数监听wakeup_fd

代码语言:javascript
复制
EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    eventHandling_(false),
    callingPendingFunctors_(false),
    iteration_(0),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),
    timerQueue_(new TimerQueue(this)),
    wakeupFd_(createEventfd()),
    wakeupChannel_(new Channel(this, wakeupFd_)),
    currentActiveChannel_(NULL)
{
  LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
  if (t_loopInThisThread)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;
  }

  // 这两句话使得EventLoop监听了wakup_fd
  wakeupChannel_->setReadCallback(
      std::bind(&EventLoop::handleRead, this));
  // we are always reading the wakeupfd
  wakeupChannel_->enableReading();
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 正文
  • 正文
  • 回调函数的异步调用
  • 利用事件机制唤醒线程
  • 回调函数调用
  • EventLoop主循环
  • EventLoop在构造函数监听wakeup_fd
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档