遵循Reactor模型,封装了EventLoop。有以下特点:
EventLoop::pendingFunctors_
的线程安全。EventLoop::pendingFunctors_
的异步函数队列机制,让Reactor线程能向ioLoop传递回调函数的方式,让ioLoop注册新的socket。wakeupFd_
写8字节的方式,触发ioLoop监听的wakeup_fd,从epoll_wait唤醒。遵从Reactor模型,简单阅读了下。 来到目录examples/ace/loggin/server.cc server.cc main->LogServer->EventLoop和TcpServer
image.png
当新连接到来时,会调用TcpServer::newConnection
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
...
EventLoop* ioLoop = threadPool_->getNextLoop();
...
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
...
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
Reactor线程调用ioLoop->runInLoop,传入回调函数TcpConnection::connectEstablished
在此尝试调用回调函数。
// EventLoop.cc
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}
先锁住pendingFunctors_
, 传入回调函数cb, 然后如果不是loop线程,要将其唤醒
void EventLoop::queueInLoop(Functor cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(std::move(cb));
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
函数向wakupFd
写了8个字节,触发其写事件,从而唤醒ioLoop线程。这里利用了epoll的事件机制。
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
当回调函数被调用后,一路
void TcpConnection::connectEstablished()
{
...
channel_->enableReading();
connectionCallback_(shared_from_this());
}
// Channel.h
void enableReading() { events_ |= kReadEvent; update(); }
// Channel.cc
void Channel::update()
{
addedToLoop_ = true;
loop_->updateChannel(this);
}
void EventLoop::updateChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}
channel->index()
标志了该Channel的状态,根据当前状态决定执行注册/注销/修改事件
void EPollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
const int index = channel->index();
LOG_TRACE << "fd = " << channel->fd()
<< " events = " << channel->events() << " index = " << index;
if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
if (index == kNew)
{
assert(channels_.find(fd) == channels_.end());
channels_[fd] = channel;
}
else // index == kDeleted
{
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
}
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else
{
// update existing one with EPOLL_CTL_MOD/DEL
int fd = channel->fd();
(void)fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(index == kAdded);
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}
void EPollPoller::update(int operation, Channel* channel)
{
struct epoll_event event;
memZero(&event, sizeof event);
event.events = channel->events();
event.data.ptr = channel;
int fd = channel->fd();
LOG_TRACE << "epoll_ctl op = " << operationToString(operation)
<< " fd = " << fd << " event = { " << channel->eventsToString() << " }";
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) // 为epollfd_注册事件
{
...
}
}
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); // 获取活跃的Channel
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_); // 逐个执行Channel对应的事件
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors(); // 处理异步队列,Reactor收到的socket在此被注册到epoll_fd
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(NULL)
{
LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
// 这两句话使得EventLoop监听了wakup_fd
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
}