Envoy 是一款由 Lyft 开源的,使用 C++ 编写的 L7 代理和通信总线,目前是 CNCF 旗下的开源项目且已经毕业,代码托管在 GitHub 上,它也是 Istio 服务网格中默认的数据平面。关于 Envoy 的详情请阅读 Envoy 中文文档。Envoy 本身无法构成一个完整的 Service Mesh,但是它可以作为 service mesh 中的应用间流量的代理,负责 service mesh 中的数据层。
Envoy架构中的一些重要概念:
Envoy正常的工作流程为Host A(下游主机)发送请求至上游主机(Host B、Host C、Host D等),Envoy通过Listener监听到有下游主机的请求,收到请求后的Envoy将所有请求流量劫持至Envoy内部,并将请求内容抽象为Filter Chains路由至某个上游主机中从而实现路由转发及负载均衡能力。
Envoy为了实现流量代理能力通常需要一个统一的配置文件来记录信息以便启动时加载,在Envoy中启动配置文件有静态配置和动态配置两种方式。静态配置是将配置信息写入文件中,启动时直接加载,动态配置通过xDS实现一个Envoy的服务端(可以理解为以API接口对外实现服务发现能力)。
xDS模块的功能是通过Envoy API V1(基于HTTP)或V2(基于gRPC)实现一个服务端将配置信息暴露给上游主机,等待上游主机的拉取。
xDS API 在envoy中被称为 Data plane API
。其代码保存在 https://github.com/envoyproxy/envoy/tree/master/api/envoy/api/v2,用户可以根据proto文件自行生成相对应语言的GRPC代码文件。
Envoy 官方提供了两份 xDS Server 的实现,分别是:
另外,官方还把 api 的定义代码从 Envoy 的源码库中提取出来,放在了 https://github.com/envoyproxy/data-plane-api
要实现一个简单的控制面,一般就是基于xDS api来管理动态配置的能力,如下几步
type callbacks struct {
test.Callbacks
simpleCache cache.SnapshotCache
hash cache.NodeHash
datasource service.Datasource
config *Config
}
func (c *callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryRequest) error {
nodeId := c.hash.ID(request.Node)
logrus.Debugf("node: %s on stream request version_info: %s resource_names: %s type_url: %s response_nonce: %s error: %+v", nodeId, request.VersionInfo, request.ResourceNames, request.TypeUrl, request.ResponseNonce, request.ErrorDetail)
if !containsString(c.simpleCache.GetStatusKeys(), nodeId) {
c.SetSnapshot(nodeId)
}
return c.Callbacks.OnStreamRequest(id, request)
}
根据业务需要,实现相应的回调方法。
server := server.NewServer(ctx, simpleCache, cb)
var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions,
grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: grpcKeepaliveTime,
Timeout: grpcKeepaliveTimeout,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: grpcKeepaliveMinTime,
PermitWithoutStream: true,
}),
)
grpcServer := grpc.NewServer(grpcOptions...)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
logrus.Fatal(err)
}
discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, server)
routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, server)
listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, server)
secretservice.RegisterSecretDiscoveryServiceServer(grpcServer, server)
runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, server)
logrus.Infof("management server listening on %d\n", port)
if err = grpcServer.Serve(lis); err != nil {
logrus.Println(err)
}
上面的流程大致就是:
snap, _ := cache.NewSnapshot(time.Now().Format(time.RFC3339),
map[resource.Type][]types.Resource{
resource.ClusterType: clusters,
resource.RouteType: {makeRoute(RouteName, routes)},
resource.ListenerType: {makeHTTPListener(ListenerName, RouteName, listenerPort)},
},
)
根据xDS 数据格式,构造所需的业务数据,也就是最后存储在envoy中的动态配置
if err := snapshot.Consistent(); err != nil {
logrus.Errorf("snapshot inconsistency: %+v\n%+v", snapshot, err)
return
}
if err := c.simpleCache.SetSnapshot(context.Background(), nodeId, snapshot); err != nil {
logrus.Errorf("snapshot error %q for %+v", err, snapshot)
return
}
将业务数据动态注入envoy配置中