前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Servicemesh系列】【Envoy源码解析(三)】一个Http请求到响应的全链路(二)

【Servicemesh系列】【Envoy源码解析(三)】一个Http请求到响应的全链路(二)

作者头像
吃橙子的狐狸
发布2019-02-28 16:18:45
1.7K0
发布2019-02-28 16:18:45
举报
文章被收录于专栏:橙子架构杂谈橙子架构杂谈

上一章节讲到了FilterManager通过对Read Filter进行流程拼装,以下接着以最核心的路由功能为例,来串联整个请求发送的流程。

4. 请求解析

对于一个Sidecar来说,最核心的能力必然就是路由。没有路由,其他的功能都是枉谈。所以这边也就会引出Envoy里面最核心的一个Filter —— 连接管理器ConnectionManagerImpl。当新连接的请求初次抵达时,不需要做任何处理:

代码语言:javascript
复制
 Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; }

当处理包体数据时,代码如下,我们忽略掉一些重发和失败处理、状态更新的逻辑:

代码语言:javascript
复制
Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool end_stream) {
  ......
  if (!codec_) {
    // 初次访问时,codec会被初始化出来。codec是ServerConnection类型。会承担编解码的任务。
    codec_ = config_.createCodec(read_callbacks_->connection(), data, *this);
    ......
  }

  bool redispatch;
  do {
    redispatch = false;
    try {
      // 这边即会进行真实的解码过程。
      codec_->dispatch(data);
    } catch (const CodecProtocolException& e) {
      ......
    }
  } while (redispatch);

  return Network::FilterStatus::StopIteration;
}

void ConnectionImpl::dispatch(Buffer::Instance& data) {
  //......
  if (data.length() > 0) {
    uint64_t num_slices = data.getRawSlices(nullptr, 0);
    Buffer::RawSlice slices[num_slices];
    data.getRawSlices(slices, num_slices);

    // 对buffer数据进行分片处理
    for (Buffer::RawSlice& slice : slices) {
      total_parsed += dispatchSlice(static_cast<const char*>(slice.mem_), slice.len_);
    }
  } else {
    dispatchSlice(nullptr, 0);
  }
  // 将buffer下标前移,代表已经消费完了指定byte长度的数据。
  data.drain(total_parsed);
  //......
}

真实解码的过程即在dispatchSlice中,如下:

代码语言:javascript
复制
size_t ConnectionImpl::dispatchSlice(const char* slice, size_t len) {
  ssize_t rc = http_parser_execute(&parser_, &settings_, slice, len);
  //......
  return rc;
}

此处用到了http_parser库来进行处理。我们看下是如何处理的:

代码语言:javascript
复制
http_parser_settings ConnectionImpl::settings_{
    [](http_parser* parser) -> int {
      static_cast<ConnectionImpl*>(parser->data)->onMessageBeginBase();
      return 0;
    },
    [](http_parser* parser, const char* at, size_t length) -> int {
      static_cast<ConnectionImpl*>(parser->data)->onUrl(at, length);
      return 0;
    },
    nullptr, // on_status
    [](http_parser* parser, const char* at, size_t length) -> int {
      static_cast<ConnectionImpl*>(parser->data)->onHeaderField(at, length);
      return 0;
    },
    [](http_parser* parser, const char* at, size_t length) -> int {
      static_cast<ConnectionImpl*>(parser->data)->onHeaderValue(at, length);
      return 0;
    },
    [](http_parser* parser) -> int {
      return static_cast<ConnectionImpl*>(parser->data)->onHeadersCompleteBase();
    },
    [](http_parser* parser, const char* at, size_t length) -> int {
      static_cast<ConnectionImpl*>(parser->data)->onBody(at, length);
      return 0;
    },
    [](http_parser* parser) -> int {
      static_cast<ConnectionImpl*>(parser->data)->onMessageCompleteBase();
      return 0;
    },
    nullptr, // on_chunk_header
    nullptr  // on_chunk_complete
};

如上所示,很清晰地把http1的数据切分了如上几个解析的过程。以下流程比较冗长,我们只挑选其中三个环节:

5. 请求的编解码器初始化阶段

onMessageBegin环节,设置一个Codec(ServerConnection)的Decoder和Encoder。这边的Encoder即为ServerConnection自己(注意,ServerConnection持有了网络层的ConnectionImpl实例,可以用以进行响应回写,后面会进一步提及),Decoder即为ActiveStream。ActiveStream会持有ServerConnection(有点绕)。

代码语言:javascript
复制
void ServerConnectionImpl::onMessageBegin() {
  if (!resetStreamCalled()) {
    ASSERT(!active_request_);
    active_request_.reset(new ActiveRequest(*this));
    active_request_->request_decoder_ = &callbacks_.newStream(active_request_->response_encoder_);
  }
}

总而言之,我们记住一点,在这个阶段,会完成Downstream请求的Decoder和Encoder的初始化,并且提前塞好各种回调信息。便于在各个环节被使用。

6. 请求头处理阶段

在请求头阶段,一言以蔽之,做的事情即进行各类的初始化。

6.1 解析Http_Method

onHeaderCompleteBase阶段。当url和头信息都读取完了后,会调用到这个阶段。

代码语言:javascript
复制
int ServerConnectionImpl::onHeadersComplete(HeaderMapImplPtr&& headers) {

  if (active_request_) {
    // 解析出http方法,并插入header中
    const char* method_string = http_method_str(static_cast<http_method>(parser_.method));
    handlePath(*headers, parser_.method);
    ASSERT(active_request_->request_url_.empty());
    headers->insertMethod().value(method_string, strlen(method_string));

    if (parser_.flags & F_CHUNKED ||
        (parser_.content_length > 0 && parser_.content_length != ULLONG_MAX) || handling_upgrade_) {

      // 此处,将在上一阶段初始化出来的请求解码器——ActiveStream,拿出,进行头解析。
      active_request_->request_decoder_->decodeHeaders(std::move(headers), false);
      if (connection_.state() != Network::Connection::State::Open) {
        http_parser_pause(&parser_, 1);
      }

    } else {
      deferred_end_stream_headers_ = std::move(headers);
    }
  }

  return 0;
}

6.2 生成&遍历FilterChain、Router匹配

进一步来看,代码很长,我们截取重要的说:

代码语言:javascript
复制
void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) {

  // 产生StreamDecoderFilter Chain。用来进行实际上的各种decode操作
  const bool upgrade_rejected = createFilterChain() == false;

  //......

  // 这边会进行route的匹配。route->match
  refreshCachedRoute();

  // 设定route级别的timeout
  if (cached_route_.value()) {
    const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry();
    if (route_entry != nullptr && route_entry->idleTimeout()) {
      idle_timeout_ms_ = route_entry->idleTimeout().value();
      if (idle_timeout_ms_.count()) {
        if (idle_timer_ == nullptr) {
          idle_timer_ = connection_manager_.read_callbacks_->connection().dispatcher().createTimer(
              [this]() -> void { onIdleTimeout(); });
        }
      } else if (idle_timer_ != nullptr) {
        idle_timer_->disableTimer();
        idle_timer_ = nullptr;
      }
    }
  }

  // 进行trace埋点
  if (connection_manager_.config_.tracingConfig()) {
    traceRequest();
  }

  // 进行头解析
  decodeHeaders(nullptr, *request_headers_, end_stream);

  // 重置一下Idle定时任务,将请求超时的Stream进行回收。
  resetIdleTimer();
}

void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilter* filter,
                                                        HeaderMap& headers, bool end_stream) {

  //......

  for (; entry != decoder_filters_.end(); entry++) {
    ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders));
    state_.filter_call_state_ |= FilterCallState::DecodeHeaders;

    // 可以看到,会遍历decoder_filters进行解析
    FilterHeadersStatus status = (*entry)->decodeHeaders(
        headers, end_stream && continue_data_entry == decoder_filters_.end());
    state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
    if (!(*entry)->commonHandleAfterHeadersCallback(status) &&
        std::next(entry) != decoder_filters_.end()) {
      return;
    }
    //......
  }

 //......
}

// ActiveStreamDecodeFilter
FilterHeadersStatus decodeHeaders(HeaderMap& headers, bool end_stream) {
      is_grpc_request_ = Grpc::Common::hasGrpcContentType(headers);
      // 最终调用的是StreamDecoderFilterSharedPtr类型的handler进行实际处理。这些filter通过前面ActiveStream进行DecodeHeaders的最开始的时候注入到了stream里面。
      return handle_->decodeHeaders(headers, end_stream);
}

6.3 负载均衡

这时候,我们以最核心的一个Decoder Filter —— Route:Filter终于登场了。我们来看下他是如何应对Header解析阶段的:

代码语言:javascript
复制
Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool end_stream) {  

  //......

  // 此处是另外一个关键点,即进行负载均衡挑选到一个目标机器的连接池。
  Http::ConnectionPool::Instance* conn_pool = getConnPool();
  if (!conn_pool) {
    sendNoHealthyUpstreamResponse();
    return Http::FilterHeadersStatus::StopIteration;
  }

  // 初始化超时定时器,但还没启动(在请求完成后启动)
  timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_.suppress_envoy_headers_,
                                         grpc_request_);

  // 初始化重试器。
  retry_state_ =
      createRetryState(route_entry_->retryPolicy(), headers, *cluster_, config_.runtime_,
                       config_.random_, callbacks_->dispatcher(), route_entry_->priority());
  do_shadowing_ = FilterUtility::shouldShadow(route_entry_->shadowPolicy(), config_.runtime_,
                                              callbacks_->streamId());

  // UpstreamRequest即进行路由的核心类
  upstream_request_.reset(new UpstreamRequest(*this, *conn_pool));
  upstream_request_->encodeHeaders(end_stream);

  //......

  return Http::FilterHeadersStatus::StopIteration;
}

我们先来看其核心的功能——如何挑选目标机器。

代码语言:javascript
复制
// ClusterManagerImpl
Http::ConnectionPool::Instance*
ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster, ResourcePriority priority,
                                           Http::Protocol protocol, LoadBalancerContext* context) {
  ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped<ThreadLocalClusterManagerImpl>();

  // 确认这个cluster是否存在
  auto entry = cluster_manager.thread_local_clusters_.find(cluster);
  if (entry == cluster_manager.thread_local_clusters_.end()) {
    return nullptr;
  }

  // 挑选一个host以及对应的连接池,如果不存在就创建一个
  return entry->second->connPool(priority, protocol, context);
}

// ClusterEntry
Http::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
    ResourcePriority priority, Http::Protocol protocol, LoadBalancerContext* context) {

  // 通过负载均衡器进行选择
  HostConstSharedPtr host = lb_->chooseHost(context);
  if (!host) {
    ENVOY_LOG(debug, "no healthy host for HTTP connection pool");
    cluster_info_->stats().upstream_cx_none_healthy_.inc();
    return nullptr;
  }

  // protocol和priority组成唯一键,即同一协议(如http1.1),同一优先级共享同一个连接池。
  std::vector<uint8_t> hash_key = {uint8_t(protocol), uint8_t(priority)};

  //......

  // 从全局cache中找到host对应的连接池容器,然后找到对应key的连接池,如果不存在则创建一个。这个逻辑和常规连接池的做法相类似。
  ConnPoolsContainer& container = parent_.host_http_conn_pool_map_[host];
  if (!container.pools_[hash_key]) {
    container.pools_[hash_key] = parent_.parent_.factory_.allocateConnPool(
        parent_.thread_local_dispatcher_, host, priority, protocol,
        have_options ? context->downstreamConnection()->socketOptions() : nullptr);
  }

  return container.pools_[hash_key].get();
}

其中loadbalancer,我们以随机策略为例来进行分析,随机策略继承自ZoneAwareLoadBalancer我们需要关注,zone的信息并不会直接反应到代码层面上,这一切都已经在XDS返回时屏蔽掉了。

代码语言:javascript
复制
HostConstSharedPtr RandomLoadBalancer::chooseHost(LoadBalancerContext*) {
  // 此处进行主机列表的选择
  const HostVector& hosts_to_use = hostSourceToHosts(hostSourceToUse());
  if (hosts_to_use.empty()) {
    return nullptr;
  }

  return hosts_to_use[random_.random() % hosts_to_use.size()];
}
// 此处即进行host的筛选
ZoneAwareLoadBalancerBase::HostsSource ZoneAwareLoadBalancerBase::hostSourceToUse() {

  // 基于优先级先挑出被选中的主机集合,此处不展开说明
  HostSet& host_set = chooseHostSet();
  HostsSource hosts_source;
  hosts_source.priority_ = host_set.priority();

  // 此处有一个全局恐慌阈值的设定。即如果有半数以上的机器不健康的话,为了避免雪崩,则捞取全部主机,这个扛雪崩挺有用。比eureka的自我保护模式更进一步。
  if (isGlobalPanic(host_set)) {
    stats_.lb_healthy_panic_.inc();
    hosts_source.source_type_ = HostsSource::SourceType::AllHosts;
    return hosts_source;
  }

  // 基于zone的本地权重直接进行选择
  const absl::optional<uint32_t> locality = host_set.chooseLocality();
  if (locality.has_value()) {
    hosts_source.source_type_ = HostsSource::SourceType::LocalityHealthyHosts;
    hosts_source.locality_index_ = locality.value();
    return hosts_source;
  }

  // 如果不需要或者无法进行区域相关的路由,如只有一个区域,如发现所有host都不是所在地加权路由机器,等等的,就无需进行额外处理,直接用所有的健康节点。
  if (per_priority_state_[host_set.priority()]->locality_routing_state_ ==
      LocalityRoutingState::NoLocalityRouting) {
    hosts_source.source_type_ = HostsSource::SourceType::HealthyHosts;
    return hosts_source;
  }

  // 如果没有开启区域感知路由,则返回健康节点
  if (!runtime_.snapshot().featureEnabled(RuntimeZoneEnabled, routing_enabled_)) {
    hosts_source.source_type_ = HostsSource::SourceType::HealthyHosts;
    return hosts_source;
  }

  // 如果本地集群处于恐慌状态,即一半以上不健康,则不要采用基于zone的lb,这个是个啥原因?
  if (isGlobalPanic(localHostSet())) {
    stats_.lb_local_cluster_not_ok_.inc();
    hosts_source.source_type_ = HostsSource::SourceType::HealthyHosts;
    return hosts_source;
  }

  // 进行区域感知路由,在在此会有比较复杂的判断,如发现源机器数量少于目标机器数量,这时候就会对溢出部分进行一定概率的跨zone处理,等等。具体见tryChooseLocalLocalityHosts方法
  hosts_source.source_type_ = HostsSource::SourceType::LocalityHealthyHosts;
  hosts_source.locality_index_ = tryChooseLocalLocalityHosts(host_set);
  return hosts_source;
}

以上即已完成了目标主机的第一层筛选,即,已经确定了如下三个元素:

  1. 优先级
  2. 区域
  3. 是否需要健康节点

具体代码见:

代码语言:javascript
复制
const HostVector& ZoneAwareLoadBalancerBase::hostSourceToHosts(HostsSource hosts_source) {
  const HostSet& host_set = *priority_set_.hostSetsPerPriority()[hosts_source.priority_];
  switch (hosts_source.source_type_) {
  case HostsSource::SourceType::AllHosts:
    return host_set.hosts();
  case HostsSource::SourceType::HealthyHosts:
    return host_set.healthyHosts();
  case HostsSource::SourceType::LocalityHealthyHosts:
    return host_set.healthyHostsPerLocality().get()[hosts_source.locality_index_];
  default:
    NOT_REACHED_GCOVR_EXCL_LINE;
  }
}

OK,经过以上几道工序,目标主机列表即已被挑选出来,然后经过随机策略的hosts_to_use[random_.random() % hosts_to_use.size()];即完成了目标主机的选择。然后如上文开始所述,通过主机找到连接池,并返回给getConnPool调用方。至此,负载均衡已经全部完成。

6.4 获取/创建Upstream连接并绑定响应回调

我们回顾下,完成负载均衡之后,即会进行另外一个非常重要的类UpstreamRequest的构造:

代码语言:javascript
复制
  upstream_request_.reset(new UpstreamRequest(*this, *conn_pool));
  upstream_request_->encodeHeaders(end_stream);

为什么这边叫encodeHeaders呢?即downstream的解码,其实就应对了upstream的编码。下来,即传统套路——获取或者创建连接,发送请求。由于Envoy是全异步化的,所以如果是新连接,还需要在连接上注册好回调处理函数。

代码语言:javascript
复制
void Filter::UpstreamRequest::encodeHeaders(bool end_stream) {
  ASSERT(!encode_complete_);
  encode_complete_ = end_stream;
  // 用上面负载均衡出来的连接池去获取或者创建连接
  Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(*this, *this);
  if (handle) {
    conn_pool_stream_handle_ = handle;
  }
}

我们来看下连接池获取连接的主体逻辑

代码语言:javascript
复制
ConnectionPool::Cancellable* ConnPoolImpl::newStream(StreamDecoder& response_decoder,
                                                     ConnectionPool::Callbacks& callbacks) {
  // 如果有可用client,直接用。同时将各种回调重新绑定。
  if (!ready_clients_.empty()) {
    ready_clients_.front()->moveBetweenLists(ready_clients_, busy_clients_);
    attachRequestToClient(*busy_clients_.front(), response_decoder, callbacks);
    return nullptr;
  }

  // 如果请求挂起队列还没堆满
  if (host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
    // 如果链接数已经达到上限,已经不能接着创建连接,则状态记录下来,然后添加到pending队列里等待空闲连接出来供使用。
    bool can_create_connection =
        host_->cluster().resourceManager(priority_).connections().canCreate();
    if (!can_create_connection) {
      host_->cluster().stats().upstream_cx_overflow_.inc();
    }

    // 如果一个连接都没有,或者还没达到连接数上限,那么就开始创建连接
    if ((ready_clients_.size() == 0 && busy_clients_.size() == 0) || can_create_connection) {
      createNewConnection();
    }

    // 将请求挂到pending队列里,等待处理。
    PendingRequestPtr pending_request(new PendingRequest(*this, response_decoder, callbacks));
    pending_request->moveIntoList(std::move(pending_request), pending_requests_);
    return pending_requests_.front().get();
  } else {
    // 如果连队列都满了,那就没救了
    ENVOY_LOG(debug, "max pending requests overflow");
    callbacks.onPoolFailure(ConnectionPool::PoolFailureReason::Overflow, nullptr);
    host_->cluster().stats().upstream_rq_pending_overflow_.inc();
    return nullptr;
  }
}

创建新连接的逻辑如下:

代码语言:javascript
复制
// 创建一个ActiveClient,并将client添加到busy_client中,代表其已经被占用。ActiveClient是什么呢?往下看
void ConnPoolImpl::createNewConnection() {
  ActiveClientPtr client(new ActiveClient(*this));
  client->moveIntoList(std::move(client), busy_clients_);
}

// 可以看到,初始化时候,会创建连接超时控制器并启动,当后续收到连接成功事件后,会进行定时器的取消操作。
ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent)
    : parent_(parent),
      connect_timer_(parent_.dispatcher_.createTimer([this]() -> void { onConnectTimeout(); })),
      remaining_requests_(parent_.host_->cluster().maxRequestsPerConnection()) {

  // 此处会进行Upstream网络层连接ConnectionImpl的创建
  Upstream::Host::CreateConnectionData data =
      parent_.host_->createConnection(parent_.dispatcher_, parent_.socket_options_);
  // 此处会进行Upstream ClientConnection的创建,并监听read buffer区,对响应数据进行绑定CodecReadFilter,Filter会进行响应数据的处理。我们这章暂不展开,到后面分析响应处理的时候再来过一遍
  codec_client_ = parent_.createCodecClient(data);
  // 此处进行连接事件回调的监听。监听网络事件如连接成功事件、关闭事件等,并可在收到高低水位事件时候进行处理。
  codec_client_->addConnectionCallbacks(*this);
  //......
  connect_timer_->enableTimer(parent_.host_->cluster().connectTimeout());
  //......
}

我们这时候会看到,当连接池有可用连接的时候,将会进行线程与连接的绑定——attachRequestToClient,这也是我们进行各种超时、重试处理,以及响应处理的前提。

但是,当没有可用连接要创建新连接的时候,并没有进行attach操作,那我们难道不需要绑定么?答案是否定的,我们看下envoy是如何处理的:

我们从上面的介绍可以看到,我们在产生upstream连接(ClientConnection)的时候,有一个小操作——codec_client_->addConnectionCallbacks(*this);他会将ActiveClient注册监听各类连接事件。当有事件到达,则会唤起onEvent方法,进一步触发以下方法:

代码语言:javascript
复制
void ConnPoolImpl::onConnectionEvent(ActiveClient& client, Network::ConnectionEvent event) {
  //......
  if (event == Network::ConnectionEvent::Connected) {
    conn_connect_ms_->complete();
    processIdleClient(client, false);
  }
}

void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) {
  client.stream_wrapper_.reset();
  if (pending_requests_.empty() || delay) {
    client.moveBetweenLists(busy_clients_, ready_clients_);
  } else {
    // 此处即进行了绑定
    attachRequestToClient(client, pending_requests_.back()->decoder_,
                          pending_requests_.back()->callbacks_);
    // 从阻塞请求队列里面摘除,代表已经受到处理
    pending_requests_.pop_back();
  }

  if (delay && !pending_requests_.empty() && !upstream_ready_enabled_) {
    upstream_ready_enabled_ = true;
    upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0));
  }

  checkForDrained();
}

大家读源码的时候,可能会感到很奇怪,为什么连接获取的逻辑中,充斥着各种直接操作连接池成员变量如ready_clients,busy_clients等等,而不用担心线程安全问题?为了回答这个问题,我们需要再往回追溯,我们是从哪里拿到的连接池?ThreadlocalClusterManager里面!即每个线程会在cluster初始化的时候也一起绑定一份镜像到自己线程上,使用的时候则可以以线程安全的方式来使用。那我们如何进行各类资源限制呢?Envoy提供了一个Resource类来进行资源限制,这个Resource类会进行各类原子级别(atomic)的增减,来维持全局的资源计数视图的一致性。

好了,至此请求头阶段完成了处理。我们来看发出请求的最后一个环节。

7. 请求体处理

onBody阶段。会解析出请求体数据,并且一路调用到Router:Filter:decodeData,并最终调用在header阶段初始化出来的UpstreamRequest的如下方法:

代码语言:javascript
复制
void Filter::UpstreamRequest::encodeData(Buffer::Instance& data, bool end_stream) {
  ASSERT(!encode_complete_);
  encode_complete_ = end_stream;

  if (!request_encoder_) {
    ENVOY_STREAM_LOG(trace, "buffering {} bytes", *parent_.callbacks_, data.length());
    if (!buffered_request_body_) {
      buffered_request_body_.reset(
          new Buffer::WatermarkBuffer([this]() -> void { this->enableDataFromDownstream(); },
                                      [this]() -> void { this->disableDataFromDownstream(); }));
      buffered_request_body_->setWatermarks(parent_.buffer_limit_);
    }

    buffered_request_body_->move(data);
  } else {
    ENVOY_STREAM_LOG(trace, "proxying {} bytes", *parent_.callbacks_, data.length());
    request_info_.addBytesSent(data.length());
    // 发送数据
    request_encoder_->encodeData(data, end_stream);
    if (end_stream) {
      request_info_.onLastUpstreamTxByteSent();
      parent_.callbacks_->requestInfo().onLastUpstreamTxByteSent();
    }
  }
}

void StreamEncoderImpl::encodeData(Buffer::Instance& data, bool end_stream) {
  if (data.length() > 0) {
    if (chunk_encoding_) {
      connection_.buffer().add(fmt::format("{:x}\r\n", data.length()));
    }
    // 将数据放入发送缓冲区
    connection_.buffer().move(data);

    if (chunk_encoding_) {
      connection_.buffer().add(CRLF);
    }
  }

  // 最后无论是否endstream,都会进行底层网络连接的flush操作
  if (end_stream) {
    endEncode();
  } else {
    connection_.flushOutput();
  }
}

至此,请求发送完成。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018年09月05日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 4. 请求解析
  • 5. 请求的编解码器初始化阶段
  • 6. 请求头处理阶段
    • 6.1 解析Http_Method
      • 6.2 生成&遍历FilterChain、Router匹配
        • 6.3 负载均衡
          • 6.4 获取/创建Upstream连接并绑定响应回调
          • 7. 请求体处理
          相关产品与服务
          负载均衡
          负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档