前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Servicemesh系列】【Envoy源码解析(二)】一个Http请求到响应的全链路(一)

【Servicemesh系列】【Envoy源码解析(二)】一个Http请求到响应的全链路(一)

作者头像
吃橙子的狐狸
发布2019-02-28 16:18:13
1.3K0
发布2019-02-28 16:18:13
举报
文章被收录于专栏:橙子架构杂谈橙子架构杂谈

1. http连接建立

当有新连接过来的时候,会调用上一章节所提及的被注册到libevent里面的回调函数。我们回顾一下,上一章节提及了,会有多个worker注册所有的listener,当有一个连接过来的时候,系统内核会调度一个线程出来交付这个连接。这样,就可以并发地进行连接的快速建立。更详细的内容可以参考envoy官方博客关于线程模型的描述,此处不赘述。

代码语言:javascript
复制
 listener_.reset(
        evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));

回调函数将调用Listener的OnAccept方法,并最终进行网络级别ConnectionImpl的创建,Connection的底层此处利用了libevent对连接的读写事件进行监听,并注册了读写事件的Filter,用来对监听到的事件和数据进行处理。

代码语言:javascript
复制
void ConnectionHandlerImpl::ActiveListener::newConnection(Network::ConnectionSocketPtr&& socket) {
  ......
  auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket();
  // 创建ServerConnection
  Network::ConnectionPtr new_connection =
      parent_.dispatcher_.createServerConnection(std::move(socket), std::move(transport_socket));
  new_connection->setBufferLimits(config_.perConnectionBufferLimitBytes());
  // 创建真正的Read/Write Filter
 const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain(
      *new_connection, filter_chain->networkFilterFactories());
  ......
}

ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
                               TransportSocketPtr&& transport_socket, bool connected)
    : transport_socket_(std::move(transport_socket)), filter_manager_(*this, *this),
      socket_(std::move(socket)), write_buffer_(dispatcher.getWatermarkFactory().create(
                                      [this]() -> void { this->onLowWatermark(); },
                                      [this]() -> void { this->onHighWatermark(); })),
      dispatcher_(dispatcher), id_(next_global_id_++) {

  // 底层基于event_assign和event_add
  file_event_ = dispatcher_.createFileEvent(
      fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge,
      Event::FileReadyType::Read | Event::FileReadyType::Write);
}

至次,http连接建立完成。下面,就等着请求数据过来了。

2. 请求数据获取

我们都知道,一个connfd会带有read/write buffer区,当一个请求过来时,常规的交互方式即让调用方依次进行sendrecv操作,来发送并获取数据。当发送数据时,通过send将数据最终传递给目标fd的read buffer区。此时采用ET触发的epoll,感知到数据增多/从不可读变为可读的状态,从而触发EV_READ事件,从而调用onFileEvent方法,该方法中,我们目前暂时只关注对read事件的处理:

代码语言:javascript
复制
void ConnectionImpl::onFileEvent(uint32_t events) {

  if (immediate_error_event_ != ConnectionEvent::Connected) {
    ......
  }

  if (events & Event::FileReadyType::Closed) {
    ......    
  }

  if (events & Event::FileReadyType::Write) {
    ......
  }
  // 此处即为对read事件的处理,onReadReady最终会调用到onRead方法
  if (fd() != -1 && (events & Event::FileReadyType::Read)) {
    onReadReady();
  }
}

void ConnectionImpl::onReadReady() {
  ......
  // 核心读取数据的地方。
  IoResult result = transport_socket_->doRead(read_buffer_);
  uint64_t new_buffer_size = read_buffer_.length();

 // 更新连接监控的一些状态,无实际意义。
 updateReadBufferStats(result.bytes_pActiveListener::newConnectionrocessed_, new_buffer_size);

  read_end_stream_ |= result.end_stream_read_;
  if (result.bytes_processed_ != 0 || result.end_stream_read_) { 
    // 当远端连接关闭或者有读取到数据的时候启动onRead,进行读取到的数据的处理
    onRead(new_buffer_size);
  }
  ...... 
}

上面可以看到IoResult result = transport_socket_->doRead(read_buffer_);是获取数据的入口,我们看下数据是如何获取的。

代码语言:javascript
复制
IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) {
  PostIoAction action = PostIoAction::KeepOpen;
  uint64_t bytes_read = 0;
  bool end_stream = false;
  do {
    Api::SysCallResult result = buffer.read(callbacks_->fd(), 16384);
    ENVOY_CONN_LOG(trace, "read returns: {}", callbacks_->connection(), result.rc_);

    if (result.rc_ == 0) {
      end_stream = true;
      break;
    } else if (result.rc_ == -1) {
      ENVOY_CONN_LOG(trace, "read error: {}", callbacks_->connection(), result.errno_);
      if (result.errno_ != EAGAIN) {
        action = PostIoAction::Close;
      }
      break;
    } else {
      bytes_read += result.rc_;
      if (callbacks_->shouldDrainReadBuffer()) {
        callbacks_->setReadBufferReady();
        break;
      }
    }
  } while (true);

  return {action, bytes_read, end_stream};
}

可以看到,强制以16KB的分片大小去循环取read buffer区,这个读数据的动作只有在四个情况下会退出:

  1. 远端关闭连接,即连接被动关闭。
  2. 发生异常
  3. 读到buffer区空了
  4. 读够limit大小的数据时

此处的的Buffer采用了OwnImpl,底层会进行了readv的系统调用,此处不展开。针对以上代码,我们着重关注下整个读逻辑。我们来捋一下:

读数据的整体逻辑

  1. 监听conn fd的读buffer区,设定为epoll边缘触发,并且被设定为持久化监听(libevent设置为EV_PERSIST)。
  2. 当读buffer区第一次出现数据时候,回调onFileEvent。开始进行处理
  3. 首先,从buffer区中读出数据,当被动关闭连接、异常、读够1M数据(默认值)、读到无数据可读的时候退出这次读处理。
    1. 如果是异常,则envoy也同步进行各种重置和关闭操作。然后退出。
    2. 如果是被动关闭,不考虑半关闭情况下,envoy会进行重置和关闭操作,但于此同时,会将被动关闭前读出来的所有数据发送到后续流程去处理。
    3. 如果是读够1M数据的场景,则发送给后面流程去处理。但需要格外关注的是,由于envoy采用了边缘触发,所以如果没有新数据进来,则无法将监听到read事件,这样可能导致数据无法被消费完。为了解决这个问题,所以会通过callbacks_->setReadBufferReady();重新触发Read事件。
    4. 如果是读buff区读完的场景,则将读取到的数据发送到后面流程去处理。
  4. 后面流程执行完后(过一系列的filter、包括限流、路由等),一次读事件处理完成。由于设置了持久化监听,所以无需手动再进行EV_READ事件注册。继续等待下一次读事件的到来。

(注:会有一些地方会显示的触发或者关闭事件监听,此处不展开讨论)

3. 请求数据处理流程拼装

当从fd中拿到数据后,则会进行正式的处理。处理主要包括限流、熔断、链路追踪、数据采集、路由转发、负载均衡等。FilterManager管理所有的Read/Write Filter,并拼装成pipeline进行处理。

代码语言:javascript
复制
void ConnectionImpl::onRead(uint64_t read_buffer_size) {
  ......
  filter_manager_.onRead();
}

void FilterManagerImpl::onRead() {
  ASSERT(!upstream_filters_.empty());
  onContinueReading(nullptr);
}

// 拼装逻辑
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) {
  std::list<ActiveReadFilterPtr>::iterator entry;
  if (!filter) {
    entry = upstream_filters_.begin();
  } else {
    entry = std::next(filter->entry());
  }

  for (; entry != upstream_filters_.end(); entry++) {
    if (!(*entry)->initialized_) {
      (*entry)->initialized_ = true;
      // 第一次访问则调用onNewConnection
      FilterStatus status = (*entry)->filter_->onNewConnection();
      if (status == FilterStatus::StopIteration) {
        return;
      }
    }

    BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();
    if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
      // 后续访问则调用onData
      FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
      if (status == FilterStatus::StopIteration) {
        return;
      }
    }
  }
}

这里的filter是怎么发现的呢?通过RegsitryFactory来实现的。首先,对应每个factory,比如RatelimiterFilter的factory,需要有个这样的声明来实现注册

代码语言:javascript
复制
static Registry::RegisterFactory<RateLimitFilterConfig,
                                 Server::Configuration::NamedHttpFilterConfigFactory>
    register_;

声明之后,则会默认调用RegisterFactory的构造函数进行注册

代码语言:javascript
复制
template <class T, class Base> class RegisterFactory {
public:
  RegisterFactory() { FactoryRegistry<Base>::registerFactory(instance_); }

private:
  T instance_{};
};

之后即在初始化Lisnter的阶段,会进行对应Filter工厂的实例化,在初始化连接阶段,会获取所需要的工厂实例,进行Filter实例的初始化。

下一章节,我们取出两个Filter做说明来看整个处理流程。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018年09月04日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. http连接建立
  • 2. 请求数据获取
    • 读数据的整体逻辑
    • 3. 请求数据处理流程拼装
    相关产品与服务
    负载均衡
    负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档