前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apisix-Ingress服务发现详解

Apisix-Ingress服务发现详解

作者头像
tunsuy
发布2023-10-23 17:47:10
6070
发布2023-10-23 17:47:10
举报
文章被收录于专栏:有文化的技术人

apisix

Apache APISIX 是一个基于微服务 API 网关,其不仅可以处理南北向的流量,也可以处理东西向的流量即服务之间的流量。Apache APISIX 集成了控制面板和数据面,与其他 API 网关相比,Apache APISIX 的上游、路由、插件全是动态的,修改这些东西时都不用重启。并且 Apache APISIX 的插件也是热加载,可以随时插拔、修改插件。

Apache APISIX 其设计理念是基于API 网关的数据平面和控制平面分离。控制平面不仅仅能够控制 Apache APISIX ,同时其还能够控制其他组件;数据平面不仅仅能够被自身的控制平面控制,还能被其他组件所控制。由于其基于ETCD 来存储和分发路由数据,默认具备高可用,无单点故障风险。除此之外,其能够友好地支持 Prometheus、SkyWalking 动态追踪、流量复制、故障注入等相关功能。

apisix ingress

在 K8s 生态中,Ingress 作为表示 K8s 流量入口的一种资源,想要让其生效,就需要有一个 Ingress Controller 去监听 K8s 中的 Ingress 资源,并对这些资源进行相应规则的解析和实际承载流量。在当下趋势中,像 Kubernetes Ingress Nginx 就是使用最广泛的 Ingress Controller 实现。

而 APISIX Ingress 则是另一种 Ingress Controller 的实现。跟 Kubernetes Ingress Nginx 的区别主要在于 APISIX Ingress 是以 Apache APISIX 作为实际承载业务流量的数据面。

Apache APISIX Ingress Controller 除了覆盖 NGINX Ingress Controller 已有的能力外,还解决了一些 Nginx Ingress Controller 的痛点。具体如下:

  • 1、配置的动态化加载 通常情况下,作为接入层的 Ingress Controller ,其承载着服务的入口流量引入,在生产环境中,我们的业务对系统的可靠性有着更高的要求,然而,基于 Apache APISIX Ingress Controller 其能够支持动态配置,即时生效,降低生产事故的意外及风险,有助于提高运维可维护性。
  • 2、较强的灰度能力 在实际的业务场景中,有的时候,往往会依据某些特定的需求进行权重调整,结合业务需求按比例进行流量控制,Apache APISIX Ingress Controller 可以支持 Service和 Pod 级别的权重调整,配置清晰而且可读性更强。 除此,相对于NGINX Ingress Controller 中通过 Annotation 的方式提供 Canary 灰度方案,Apache APISIX Ingress Controller 能够解决其缺陷,从而能够更好的提供灰度策略。
  • 3、较好的扩展能力 基于 Apache APISIX 强大的插件能力,Apache APISIX Ingress Controller 通过动态绑定插件来增强功能。Apache APISIX 通过插件封装逻辑,易于管理;完善的文档,易于使用和理解。Apache APISIX Ingress Controller 通过配置即可绑定和解绑插件,无需操作脚本。

APISIX Ingress 目前已经支持的自定义资源主要是以下 5 类,涉及到路由、上游、消费者、证书相关和集群公共配置的相关类别。

内置服务发现

APISIX 内置了下面这些服务发现机制:

  • 基于 Eureka 的服务发现
  • 基于 Nacos 的服务发现
  • 基于 Consul 的服务发现
  • 基于 Consul KV 的服务发现
  • 基于 DNS 的服务发现
  • 基于 APISIX-Seed 架构的控制面服务发现
  • 基于 Kubernetes 的服务发现

上面介绍的这些都是基于数据面apisix配置信息手动变更操作的方案集成。其实,在基于k8s的云原生场景下,apisix还提供了一个控制面组件来对apisix的服务发现进行自动管理,那就是apisix ingress

下面就从源码的角度来看看apisix ingress是怎么做到自动的服务发现的

apisix ingress启动

main.go启动入口一路跟踪,进入providers/controller.gorun方法:

代码语言:javascript
复制
func (c *Controller) run(ctx context.Context) {
   log.Infow("controller tries to leading ...",
      zap.String("namespace", c.namespace),
      zap.String("pod", c.name),
   )

   var cancelFunc context.CancelFunc
   ctx, cancelFunc = context.WithCancel(ctx)
   defer cancelFunc()

   // give up leader
   defer c.leaderContextCancelFunc()

   clusterOpts := &apisix.ClusterOptions{
      AdminAPIVersion: c.cfg.APISIX.AdminAPIVersion,
      Name: c.cfg.APISIX.DefaultClusterName,
      AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
      BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
      MetricsCollector: c.MetricsCollector,
   }
   err := c.apisix.AddCluster(ctx, clusterOpts)
   if err != nil && err != apisix.ErrDuplicatedCluster {
      // TODO give up the leader role
      log.Errorf("failed to add default cluster: %s", err)
      return
   }

   if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
      // TODO give up the leader role
      log.Errorf("failed to wait the default cluster to be ready: %s", err)

      // re-create apisix cluster, used in next c.run
      if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil {
         log.Errorf("failed to update default cluster: %s", err)
         return
      }
      return
   }

   // Creation Phase

   c.informers = c.initSharedInformers()
   common := &providertypes.Common{
      ControllerNamespace: c.namespace,
      ListerInformer: c.informers,
      Config: c.cfg,
      APISIX: c.apisix,
      KubeClient: c.kubeClient,
      MetricsCollector: c.MetricsCollector,
      Recorder: c.recorder,
   }

   c.namespaceProvider, err = namespace.NewWatchingNamespaceProvider(ctx, c.kubeClient, c.cfg)
   if err != nil {
      ctx.Done()
      return
   }

   c.podProvider, err = pod.NewProvider(common, c.namespaceProvider)
   if err != nil {
      ctx.Done()
      return
   }

   c.translator = translation.NewTranslator(&translation.TranslatorOptions{
      APIVersion: c.cfg.Kubernetes.APIVersion,
      EndpointLister: c.informers.EpLister,
      ServiceLister: c.informers.SvcLister,
      SecretLister: c.informers.SecretLister,
      PodLister: c.informers.PodLister,
      ApisixUpstreamLister: c.informers.ApisixUpstreamLister,
      PodProvider: c.podProvider,
   })

   c.apisixProvider, c.apisixTranslator, err = apisixprovider.NewProvider(common, c.namespaceProvider, c.translator)
   if err != nil {
      ctx.Done()
      return
   }

   c.ingressProvider, err = ingressprovider.NewProvider(common, c.namespaceProvider, c.translator, c.apisixTranslator)
   if err != nil {
      ctx.Done()
      return
   }

   c.kubeProvider, err = k8s.NewProvider(common, c.translator, c.namespaceProvider, c.apisixProvider, c.ingressProvider)
   if err != nil {
      ctx.Done()
      return
   }

   if c.cfg.Kubernetes.EnableGatewayAPI {
      c.gatewayProvider, err = gateway.NewGatewayProvider(&gateway.ProviderOptions{
         Cfg: c.cfg,
         APISIX: c.apisix,
         APISIXClusterName: c.cfg.APISIX.DefaultClusterName,
         KubeTranslator: c.translator,
         RestConfig: nil,
         KubeClient: c.kubeClient.Client,
         MetricsCollector: c.MetricsCollector,
         NamespaceProvider: c.namespaceProvider,
      })
      if err != nil {
         ctx.Done()
         return
      }
   }

   // Init Phase

   if err = c.namespaceProvider.Init(ctx); err != nil {
      ctx.Done()
      return
   }
   if err = c.apisixProvider.Init(ctx); err != nil {
      ctx.Done()
      return
   }

   // Run Phase

   e := utils.ParallelExecutor{}

   e.Add(func() {
      c.checkClusterHealth(ctx, cancelFunc)
   })

   e.Add(func() {
      c.informers.Run(ctx)
   })

   e.Add(func() {
      c.namespaceProvider.Run(ctx)
   })

   e.Add(func() {
      c.kubeProvider.Run(ctx)
   })

   e.Add(func() {
      c.apisixProvider.Run(ctx)
   })

   e.Add(func() {
      c.ingressProvider.Run(ctx)
   })

   if c.cfg.Kubernetes.EnableGatewayAPI {
      e.Add(func() {
         c.gatewayProvider.Run(ctx)
      })
   }

   e.Add(func() {
      c.resourceSyncLoop(ctx, c.cfg.ApisixResourceSyncInterval.Duration)
   })
   c.MetricsCollector.ResetLeader(true)

   log.Infow("controller now is running as leader",
      zap.String("namespace", c.namespace),
      zap.String("pod", c.name),
   )

   <-ctx.Done()
   e.Wait()

   for _, execErr := range e.Errors() {
      log.Error(execErr.Error())
   }
   if len(e.Errors()) > 0 {
      log.Error("Start failed, abort...")
      cancelFunc()
   }
}

上面代码逻辑大致如下:

  • 初始化apisix集群配置信息:用于跟数据面服务apisix通信进行相关的配置操作
  • 初始化k8s资源inforrmers信息:用户对k8s各个资源进行监听及获取资源信息
  • 监听k8s集群namespace资源并处理
  • 监听k8s集群pod资源并处理
  • 监听k8s集群ingress资源并处理
  • 监听k8s集群中apisix自定义资源并处理,比如:apisixRoute等
  • 监听k8s集群endpoint资源并处理
  • 监听k8s集群secret资源并处理
  • 监听k8s集群configmap资源并处理
  • 监听k8s集群gateway资源并处理

下面以处理endpoint资源为例进行说明,其他资源的监听处理类似,就不一一讲解了。

服务发现

进入到k8s/endpoint/provider.go中,我们先来看看实例初始化方法:

代码语言:javascript
复制
func NewProvider(common *providertypes.Common, translator translation.Translator, namespaceProvider namespace.WatchingNamespaceProvider) (Provider, error) {
   p := &endpointProvider{
      cfg: common.Config,
   }

   base := &baseEndpointController{
      Common: common,
      translator: translator,

      svcLister: common.SvcLister,
      apisixUpstreamLister: common.ApisixUpstreamLister,
   }

   if common.Kubernetes.WatchEndpointSlices {
      p.endpointSliceController = newEndpointSliceController(base, namespaceProvider)
   } else {
      p.endpointsController = newEndpointsController(base, namespaceProvider)
   }

   return p, nil
}
func newEndpointsController(base *baseEndpointController, namespaceProvider namespace.WatchingNamespaceProvider) *endpointsController {
   ctl := &endpointsController{
      baseEndpointController: base,

      workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "endpoints"),
      workers: 1,

      namespaceProvider: namespaceProvider,

      epLister: base.EpLister,
      epInformer: base.EpInformer,
   }

   ctl.epInformer.AddEventHandler(
      cache.ResourceEventHandlerFuncs{
         AddFunc: ctl.onAdd,
         UpdateFunc: ctl.onUpdate,
         DeleteFunc: ctl.onDelete,
      },
   )

   return ctl
}

注意到最后代码的AddEventHandler,这里就是我们经常见到的informer的处理回调方法设置的地方。

我们看到针对endpoint资源的增删改,设置了对应的回调方法。这里来看看onAdd方法:

代码语言:javascript
复制
func (c *endpointsController) onAdd(obj interface{}) {
   key, err := cache.MetaNamespaceKeyFunc(obj)
   if err != nil {
      log.Errorf("found endpoints object with bad namespace/name: %s, ignore it", err)
      return
   }
   if !c.namespaceProvider.IsWatchingNamespace(key) {
      return
   }
   log.Debugw("endpoints add event arrived",
      zap.String("object-key", key))

   c.workqueue.Add(&types.Event{
      Type: types.EventAdd,
      // TODO pass key.
      Object: kube.NewEndpoint(obj.(*corev1.Endpoints)),
   })

   c.MetricsCollector.IncrEvents("endpoints", "add")
}

该方法的参数表示增加的endpoint资源对象信息。该方法主要是向endpoint的队列workqueue中增加一个事件对象:包含事件类型、增加的endpoint对象

在最开始main我们介绍provider的启动方法中提到:执行了每个provider的run方法,下面我们来看下endpoint的provider的run方法:

代码语言:javascript
复制
func (c *endpointsController) run(ctx context.Context) {
   log.Info("endpoints controller started")
   defer log.Info("endpoints controller exited")
   defer c.workqueue.ShutDown()

   if ok := cache.WaitForCacheSync(ctx.Done(), c.epInformer.HasSynced); !ok {
      log.Error("informers sync failed")
      return
   }

   handler := func() {
      for {
         obj, shutdown := c.workqueue.Get()
         if shutdown {
            return
         }

         err := c.sync(ctx, obj.(*types.Event))
         c.workqueue.Done(obj)
         c.handleSyncErr(obj, err)
      }
   }

   for i := 0; i < c.workers; i++ {
      go handler()
   }

   <-ctx.Done()
}
func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
   ep := ev.Object.(kube.Endpoint)
   ns, err := ep.Namespace()
   if err != nil {
      return err
   }
   newestEp, err := c.epLister.GetEndpoint(ns, ep.ServiceName())
   if err != nil {
      if errors.IsNotFound(err) {
         return c.syncEmptyEndpoint(ctx, ep)
      }
      return err
   }
   return c.syncEndpoint(ctx, newestEp)
}
func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
   log.Debugw("endpoint controller syncing endpoint",
      zap.Any("endpoint", ep),
   )

   namespace, err := ep.Namespace()
   if err != nil {
      return err
   }
   svcName := ep.ServiceName()
   svc, err := c.svcLister.Services(namespace).Get(svcName)
   if err != nil {
      if k8serrors.IsNotFound(err) {
         return c.syncEmptyEndpoint(ctx, ep)
      }
      log.Errorf("failed to get service %s/%s: %s", namespace, svcName, err)
      return err
   }

   switch c.Kubernetes.APIVersion {
   case config.ApisixV2beta3:
      var subsets []configv2beta3.ApisixUpstreamSubset
      subsets = append(subsets, configv2beta3.ApisixUpstreamSubset{})
      auKube, err := c.apisixUpstreamLister.V2beta3(namespace, svcName)
      if err != nil {
         if !k8serrors.IsNotFound(err) {
            log.Errorf("failed to get ApisixUpstream %s/%s: %s", namespace, svcName, err)
            return err
         }
      } else if auKube.V2beta3().Spec != nil && len(auKube.V2beta3().Spec.Subsets) > 0 {
         subsets = append(subsets, auKube.V2beta3().Spec.Subsets...)
      }
      clusters := c.APISIX.ListClusters()
      for _, port := range svc.Spec.Ports {
         for _, subset := range subsets {
            nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels)
            if err != nil {
               log.Errorw("failed to translate upstream nodes",
                  zap.Error(err),
                  zap.Any("endpoints", ep),
                  zap.Int32("port", port.Port),
               )
            }
            name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint)
            for _, cluster := range clusters {
               if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
                  return err
               }
            }
         }
      }
   case config.ApisixV2:
      var subsets []configv2.ApisixUpstreamSubset
      subsets = append(subsets, configv2.ApisixUpstreamSubset{})
      auKube, err := c.apisixUpstreamLister.V2(namespace, svcName)
      if err != nil {
         if !k8serrors.IsNotFound(err) {
            log.Errorf("failed to get ApisixUpstream %s/%s: %s", namespace, svcName, err)
            return err
         }
      } else if auKube.V2().Spec != nil && len(auKube.V2().Spec.Subsets) > 0 {
         subsets = append(subsets, auKube.V2().Spec.Subsets...)
      }
      clusters := c.APISIX.ListClusters()
      for _, port := range svc.Spec.Ports {
         for _, subset := range subsets {
            nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels)
            if err != nil {
               log.Errorw("failed to translate upstream nodes",
                  zap.Error(err),
                  zap.Any("endpoints", ep),
                  zap.Int32("port", port.Port),
               )
            }
            name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint)
            for _, cluster := range clusters {
               if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
                  return err
               }
            }
         }
      }
   default:
      panic(fmt.Errorf("unsupported ApisixUpstream version %v", c.Kubernetes.APIVersion))
   }
   return nil
}

上面代码主要逻辑就是:

  • 从endpoint的队列workqueue中获取事件对象
  • 根据endpoint信息从k8s集群中获取最新的namespace和service等信息
  • 根据namespace和servicename从k8s集群中获取apisix upstream资源信息
  • 对每一个service端口,向数据面服务apisix发送配置更新请求

https://xiaorui.cc/archives/7369

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-10-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 有文化的技术人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • apisix
  • apisix ingress
  • 内置服务发现
  • apisix ingress启动
  • 服务发现
相关产品与服务
API 网关
腾讯云 API 网关(API Gateway)是腾讯云推出的一种 API 托管服务,能提供 API 的完整生命周期管理,包括创建、维护、发布、运行、下线等。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档