前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Envoy:event相关代码阅读(二)

Envoy:event相关代码阅读(二)

作者头像
灰子学技术
发布于 2023-10-30 08:04:52
发布于 2023-10-30 08:04:52
28500
代码可运行
举报
文章被收录于专栏:灰子学技术灰子学技术
运行总次数:0
代码可运行

本篇文章试图来介绍envoy的事件处理部分的代码,对于envoy来说是基于libevent做了简单封装来实现的异步调度。

本篇文章会从下面两部分来进行讲解,libevent的基础知识介绍,envoy中event的类的实现和event在envoy中的调度逻辑,本篇介绍第二部分内容。

一、envoy中event相关类的介绍

envoy将libevent的三类事件做了一个简单的封装,如下图所示:

signal类

timer类

文件类:

envoy核心处理事件的逻辑主要是在Dispatcherimpl里面。

二、envoy中事件调度的逻辑介绍

DispatcherImpl 类里面维护了一个 post_callbacks_队列,用于存储这些事件触发的callback函数,通过生产者、消费者模式进行互动来进行操作。 (一)生产者的实现方式: 使用post的入口,以及这部分postcallbacks都有哪一些?这里有三类生产者,分别是: 1.guarddog的postcallback:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[&guarddog_thread_started]() { guarddog_thread_started.Notify(); }

设置postcallback的代码位置:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void GuardDogImpl::start(Api::Api& api) {
  Thread::LockGuard guard(mutex_);

  // Synchronize between calling thread and guarddog thread.
  absl::Notification guarddog_thread_started;

  // See comments in WorkerImpl::start for the naming convention.
  Thread::Options options{absl::StrCat("dog:", dispatcher_->name())};
  thread_ = api.threadFactory().createThread(
      [this, &guarddog_thread_started]() -> void {
        loop_timer_->enableTimer(std::chrono::milliseconds(0));
        dispatcher_->post([&guarddog_thread_started]() { guarddog_thread_started.Notify(); });
        // 事件触发方式是
        // Runs the event-loop until loopExit() is called, blocking
        // until there are pending or active events.
        dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit);
      },
      options);

  guarddog_thread_started.WaitForNotification();
}

2.Server里的InstanceImpl实现: 这里对应的是主线程,对应的postcallback:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[this] { notifyCallbacksForStage(Stage::Startup); }

实现代码的位置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void InstanceImpl::run() {
  // RunHelper exists primarily to facilitate testing of how we respond to early shutdown during
  // startup (see RunHelperTest in server_test.cc).
  const auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(),
                                    access_log_manager_, init_manager_, overloadManager(), [this] {
                                      notifyCallbacksForStage(Stage::PostInit);
                                      startWorkers();
                                    });
 
  // Run the main dispatch loop waiting to exit.
  ENVOY_LOG(info, "starting main dispatch loop");
  auto watchdog = main_thread_guard_dog_->createWatchDog(api_->threadFactory().currentThreadId(),
                                                         "main_thread", *dispatcher_);
  dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
  dispatcher_->run(Event::Dispatcher::RunType::Block); // Runs the event-loop until there are no pending events.
  ENVOY_LOG(info, "main dispatch loop exited");
  main_thread_guard_dog_->stopWatching(watchdog);
  watchdog.reset();
 
  terminate();
}

3.worker_impl实现的方式:// 这里对应的是worker 对应的postcallback,传递进来的cb是下面这个:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[this, &guard_dog, cb]() {
cb();
watch_dog_ = guard_dog.createWatchDog(api_.threadFactory().currentThreadId(),
dispatcher_->name(), *dispatcher_);
}
// 上面对应的cb的代码实现如下所示:
[&workers_waiting_to_run]() {
workers_waiting_to_run.DecrementCount();
};

调用关系如下所示:startWorkers--->start--->createThread--->threadRoutine--->post // 最后一步添加的postcallback函数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog, std::function<void()> callback) {
  ENVOY_LOG(info, "all dependencies initialized. starting workers");
  ASSERT(!workers_started_);
  workers_started_ = true;
  uint32_t i = 0;
 
  absl::BlockingCounter workers_waiting_to_run(workers_.size());
  Event::PostCb worker_started_running = [&workers_waiting_to_run]() {
    workers_waiting_to_run.DecrementCount();
  };
 
  // We can not use "Cleanup" to simplify this logic here, because it results in a issue if Envoy is
  // killed before workers are actually started. Specifically the AdminRequestGetStatsAndKill test
  // case in main_common_test fails with ASAN error if we use "Cleanup" here.
  const auto listeners_pending_init =
      std::make_shared<std::atomic<uint64_t>>(workers_.size() * active_listeners_.size());
  for (const auto& worker : workers_) {
    ENVOY_LOG(debug, "starting worker {}", i);
    ASSERT(warming_listeners_.empty());
    for (const auto& listener : active_listeners_) {
      addListenerToWorker(*worker, absl::nullopt, *listener,
                          [this, listeners_pending_init, callback]() {
                            if (--(*listeners_pending_init) == 0) {
                              stats_.workers_started_.set(1);
                              callback();
                            }
                          });
    }
    worker->start(guard_dog, worker_started_running); // 这里是入口最终调用的threadRoutine
    if (enable_dispatcher_stats_) {
      worker->initializeStats(*scope_);
    }
    i++;
  }
 
// worker的启动入口,会调用threadRoutine
void WorkerImpl::start(GuardDog& guard_dog, const Event::PostCb& cb) {
  ASSERT(!thread_);
 
  // In posix, thread names are limited to 15 characters, so contrive to make
  // sure all interesting data fits there. The naming occurs in
  // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we
  // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes
  // for the thread index, leaving us 4 bytes left to distinguish between the
  // two threads used per dispatcher. We'll call this one "dsp:" and the
  // one allocated in guarddog_impl.cc "dog:".
  //
  // TODO(jmarantz): consider refactoring how this naming works so this naming
  // architecture is centralized, resulting in clearer names.
  Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())};
  thread_ = api_.threadFactory().createThread(
      [this, &guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options);
}

threadRoutine核心代码逻辑:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void WorkerImpl::threadRoutine(GuardDog& guard_dog, const Event::PostCb& cb) {
  ENVOY_LOG(debug, "worker entering dispatch loop");
  // The watch dog must be created after the dispatcher starts running and has post events flushed,
  // as this is when TLS stat scopes start working.
  dispatcher_->post([this, &guard_dog, cb]() {
    cb();
    watch_dog_ = guard_dog.createWatchDog(api_.threadFactory().currentThreadId(),
                                          dispatcher_->name(), *dispatcher_);
  });
  dispatcher_->run(Event::Dispatcher::RunType::Block);
  ENVOY_LOG(debug, "worker exited dispatch loop");
  guard_dog.stopWatching(watch_dog_);
  dispatcher_->shutdown();
 
  // We must close all active connections before we actually exit the thread. This prevents any
  // destructors from running on the main thread which might reference thread locals. Destroying
  // the handler does this which additionally purges the dispatcher delayed deletion list.
  handler_.reset();
  tls_.shutdownThread();
  watch_dog_.reset();
}

4.核心的生产postcallback的代码逻辑,post函数:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void DispatcherImpl::post(std::function<void()> callback) {
  bool do_post;
  {
    Thread::LockGuard lock(post_lock_);
    do_post = post_callbacks_.empty();
    post_callbacks_.push_back(callback);
  }
 
// 构造函数对post_cb_进行了初始化操作:runPostCallbacks()这里做的事情是消费事件的处理逻辑
// post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
// 下面实际上是通过event_active激活去执行run 操作进行消费
  if (do_post) { // 这里表示的是当前线程没有事件执行的时候,去主动唤醒另外一个线程去处理它里面的内容
    post_cb_->scheduleCallbackCurrentIteration();
  }
}
 
// 进行event_active的激活操作,这里执行之后,在event队列里面会执行 上面设置的callback函数 runPostCallbacks()
void SchedulableCallbackImpl::scheduleCallbackCurrentIteration() {
  if (enabled()) { // 这里的实现参考下面的函数,主要是判断当前线程里面的raw_event有没有正在排队的时间,有的话,就直接返回了
    return;
  }
  // event_active directly adds the event to the end of the work queue so it executes in the current
  // iteration of the event loop.
  event_active(&raw_event_, EV_TIMEOUT, 0);
}
 
bool SchedulableCallbackImpl::enabled() { return 0 != evtimer_pending(&raw_event_, nullptr); }

(二)消费者的实现方式

run()核心代码,先执行callback函数,再触发event_base_loop()。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void DispatcherImpl::run(RunType type) {
  run_tid_ = api_.threadFactory().currentThreadId();
  // Flush all post callbacks before we run the event loop. We do this because there are post
  // callbacks that have to get run before the initial event loop starts running. libevent does
  // not guarantee that events are run in any particular order. So even if we post() and call
  // event_base_once() before some other event, the other event might get called first.
  runPostCallbacks(); // 批量执行callback函数
  base_scheduler_.run(type);
}

1.runPostCallbacks:这个函数是核心消费逻辑

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void DispatcherImpl::runPostCallbacks() {
  // Clear the deferred delete list before running post callbacks to reduce non-determinism in
  // callback processing, and more easily detect if a scheduled post callback refers to one of the
  // objects that is being deferred deleted.
  clearDeferredDeleteList(); // 延迟删除上一次event触发之后的未清理的数据结构
 
  std::list<std::function<void()>> callbacks;
  {
    // Take ownership of the callbacks under the post_lock_. The lock must be released before
    // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
    // later in the event loop.
    Thread::LockGuard lock(post_lock_);
    // 这里先操作了copy动作,相当于把post_callbacks_的内容转移到callbacks了,这样是为了post_callbacks_可以用来继续做别的事情
    callbacks = std::move(post_callbacks_);
    // post_callbacks_ should be empty after the move.
    ASSERT(post_callbacks_.empty());
  }
  // It is important that the execution and deletion of the callback happen while post_lock_ is not
  // held. Either the invocation or destructor of the callback can call post() on this dispatcher.
  while (!callbacks.empty()) {
    // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
    // executing a long list of callbacks.
    touchWatchdog();
    // Run the callback.
    callbacks.front()(); // 这里是把这些callback从队列的头开始逐次去调用执行
    // Pop the front so that the destructor of the callback that just executed runs before the next
    // callback executes.
    callbacks.pop_front();// 执行完之后,就从这个callbacks队列里面删除掉
  }
}

2.触发event_base_loop()

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void LibeventScheduler::run(Dispatcher::RunType mode) {
  int flag = 0;
  switch (mode) {
  case Dispatcher::RunType::NonBlock:
    flag = LibeventScheduler::flagsBasedOnEventType();
  case Dispatcher::RunType::Block:
    // The default flags have 'block' behavior. See
    // http://www.wangafu.net/~nickm/libevent-book/Ref3_eventloop.html
    break;
  case Dispatcher::RunType::RunUntilExit:
    flag = EVLOOP_NO_EXIT_ON_EMPTY;
    break;
  }
  event_base_loop(libevent_.get(), flag); // 默认是NonBlock,这里触发事件循环
}

参考文档: 版本对应的是1.11.2: https://github.com/istio/proxy

https://blog.csdn.net/weixin_34198797/article/details/89627369?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167715942416800180647283%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=167715942416800180647283&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-89627369-null-null.142^v73^wechat,201^v4^add_ask,239^v2^insert_chatgpt&utm_term=envoy%20dispatcher&spm=1018.2226.3001.4187

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 灰子学技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【Servicemesh系列】【Envoy源码解析(一)】Envoy启动
Envoy是Servicemesh体系中的佼佼者,也是目前Istio默认集成的数据平面,在网上Envoy源码解析的文章非常少,基本很难搜罗到对应的一些细节资料。以下将从源码级别切入,深度分析Envoy架构
吃橙子的狐狸
2019/02/28
1.6K1
Envoy请求流程源码解析(二)|请求解析
Envoy 是一款面向 Service Mesh 的高性能网络代理服务。它与应用程序并行运行,通过以平台无关的方式提供通用功能来抽象网络。当基础架构中的所有服务流量都通过 Envoy 网格时,通过一致的可观测性,很容易地查看问题区域,调整整体性能。
灵雀云
2022/03/03
1.7K0
Envoy请求流程源码解析(二)|请求解析
Envoy源码分析之Dispatcher
在Envoy的代码中Dispatcher是随处可见的,可以说在Envoy中有着举足轻重的地位,一个Dispatcher就是一个EventLoop,其承担了任务队列、网络事件处理、定时器、信号处理等核心功能。在Envoy threading model这篇文章所提到的EventLoop(Each worker thread runs a “non-blocking” event loop)指的就是这个Dispatcher对象。这个部分的代码相对较独立,和其他模块耦合也比较少,但重要性却不言而喻。下面是与Dispatcher相关的类图,在接下来会对其中的关键概念进行介绍。
黑光技术
2019/03/06
1.7K0
Envoy源码分析之Dispatcher
Envoy:event相关代码阅读(一)
本篇文章会从下面两部分来进行讲解,libevent的基础知识介绍,envoy中event的类的实现和event在envoy中的调度逻辑。
灰子学技术
2023/10/30
2320
Envoy:event相关代码阅读(一)
Envoy源码分析之ThreadLocal
Envoy中严重依赖ThreadLocal,为了避免加锁Envoy会尽可能在单一线程中完成所有的事件,但是多个线程之间难免会有一些数据需要共享,还有可能需要读写,为了避免加锁Envoy将一些需要在线程之间共享的数据放在ThreadLocal中,当ThreadLocal中的数据需要更新的时候则会通过主线程将更新后的数据Post到各个线程中,交由各个线程来更新自己的ThreadLocal。Envoy在C++11的thread_local的基础上结合Dispatcher实现了一个ThreadLocal对象。本文则会重点分析下ThreadLocal的设计与实现。先来看下ThreadLocal的整体结构,下文会逐一进行分析。
黑光技术
2019/03/06
1.3K0
Envoy源码分析之ThreadLocal
Envoy:httpfilter相关代码阅读
本篇文章是envoy httpfilter相关代码阅读的整理和总结,笔者试图通过这篇文章将http filter在envoy内部的管控讲清楚,并且将request和response是如何使用这部分 http filter功能的流程介绍清楚。
灰子学技术
2023/10/30
3490
Envoy:httpfilter相关代码阅读
【Servicemesh系列】【Envoy源码解析(二)】一个Http请求到响应的全链路(一)
当有新连接过来的时候,会调用上一章节所提及的被注册到libevent里面的回调函数。我们回顾一下,上一章节提及了,会有多个worker注册所有的listener,当有一个连接过来的时候,系统内核会调度一个线程出来交付这个连接。这样,就可以并发地进行连接的快速建立。更详细的内容可以参考envoy官方博客关于线程模型的描述,此处不赘述。
吃橙子的狐狸
2019/02/28
1.3K0
[源码解析] 深度学习分布式训练框架 horovod (14) --- 弹性训练发现节点 & State
Horovod 是Uber于2017年发布的一个易于使用的高性能的分布式训练框架,在业界得到了广泛应用。
罗西的思考
2021/07/19
4960
[源码解析] 深度学习分布式训练框架 horovod (14) --- 弹性训练发现节点 & State
Envoy请求流程源码解析(三)|请求解析
Envoy 是一款面向 Service Mesh 的高性能网络代理服务。它与应用程序并行运行,通过以平台无关的方式提供通用功能来抽象网络。当基础架构中的所有服务流量都通过 Envoy 网格时,通过一致的可观测性,很容易地查看问题区域,调整整体性能。
灵雀云
2022/04/14
1.5K0
Envoy请求流程源码解析(三)|请求解析
解析envoy处理http请求(下):事件模型和连接管理
Envoy是istio的核心组件之一,以sidecar的方式与服务运行在一起,对服务的流量进行拦截转发。 具有路由,流量控制等等强大特性。 本文以istio1.1所对应的Envoy版本进行源码流程分析。
灵雀云
2019/07/30
2.5K0
解析envoy处理http请求(下):事件模型和连接管理
聊聊jesque的WorkerImpl与WorkerPool
Resque是一个使用redis来创建后台任务的ruby组件。而jesque是其java版本。通常用来做延时队列。
code4it
2018/09/17
5010
深入理解libevent事件库的原理与实践技巧
libevent是一个事件通知库;封装了reactor。 libevent API 提供了一种机制,用于在文件描述符上发生特定事件或达到超时后执行回调函数。此外,libevent还支持由于信号或常规超时而导致的回调。 libevent 旨在替换在事件驱动的网络服务器中找到的事件循环。应用程序只需要调用event_dispatch(),然后动态添加或删除事件,而无需更改事件循环。
Lion 莱恩呀
2024/10/20
4310
深入理解libevent事件库的原理与实践技巧
JDK7 ThreadPoolExecutor execute(Runnable command) 方法解析
/** * 通过这个方法提交的线程,将在新的线程,或者已有的(线程池)线程中执行 * * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@c
技术蓝海
2018/04/26
1.1K0
Tornado 源码阅读:初步认识
来源:国夫君 segmentfault.com/a/1190000002971992 ioloop `ioloop`是`tornado`的核心模块,也是个调度模块,各种异步事件都是由他调度的,所以必须弄清他的执行逻辑 源码分析 而`ioloop`的核心部分则是 `while True`这个循环内部的逻辑,贴上他的代码下 def start(self): if self._running: raise RuntimeError("IOLoop is already running") self.
小小科
2018/05/03
5700
Tornado 源码阅读:初步认识
libuv源码阅读(16)--signal
总结:信号处理handler是被插入到红黑树中,按照一定规则排序插入的,信号越小,不带oneshot等规则。信号处理函数统一触发信号管道可读,然后loop从信号io管道可读端读取信号结构体,执行这个信号上的handler的回调。大概主体流程就是这样的。跟我们平常自己写某些信号的handler的方法类似:注册信号和信号函数,触发信号管道可读,主循环捕获io可读事件,根据信号值调用对应回调。
wanyicheng
2021/03/13
2.2K1
基于C++实现的EventLoop与事件驱动编程
事件驱动编程(Event-Driven)是一种编码范式,常被应用在图形用户界面,应用程序,服务器开发等场景。
Coder-ZZ
2024/05/09
1.6K0
基于C++实现的EventLoop与事件驱动编程
不要在nodejs中阻塞event loop
我们知道event loop是nodejs中事件处理的基础,event loop中主要运行的初始化和callback事件。除了event loop之外,nodejs中还有Worker Pool用来处理一些耗时的操作,比如I/O操作。
程序那些事
2021/02/02
6570
不要在nodejs中阻塞event loop
Mesh8# Envoy原理提点与常用命令
Istio的核心组件,作为sideCar与应用部署在一个Pod中,作为代理流量的进出均需经过Envoy所在的容器,除了代理外还可根据规则进行流量治理、监控等功能。
瓜农老梁
2021/12/13
8610
[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件
Celery是一个简单、灵活且可靠的,处理大量事件的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
罗西的思考
2021/05/13
8750
[源码解析] 并行分布式任务队列 Celery 之  EventDispatcher & Event 组件
【Servicemesh系列】【Envoy源码解析(三)】一个Http请求到响应的全链路(二)
上一章节讲到了FilterManager通过对Read Filter进行流程拼装,以下接着以最核心的路由功能为例,来串联整个请求发送的流程。
吃橙子的狐狸
2019/02/28
1.7K0
推荐阅读
相关推荐
【Servicemesh系列】【Envoy源码解析(一)】Envoy启动
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验