前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Operator示例:通过Operator+CRD实现部署自动化

Operator示例:通过Operator+CRD实现部署自动化

作者头像
Yuyy
发布2024-01-22 10:06:53
5060
发布2024-01-22 10:06:53
举报
文章被收录于专栏:yuyy.info技术专栏

前言

在上一篇通过Operator自动暴露集群内部服务中,遗留了一个问题:开发人员or业务上游是需要关注k8s内建资源,例如deployment如何定义,这和K8S自动化的目标背道而驰。

本篇文章将采用CRD(CustomResourceDefinition)来屏蔽底层K8S资源,让开发人员只需要按照我们制定的规则来定义CR即可。至于创建deployment,service,ingress等操作就可以交给Operator来完成,从而实现部署自动化。

而自动化就可以对接业务系统,使其实现业务价值。例如根据授权信息,创建租户购买的产品服务,当授权到期时,自动删除对应资源。

主要步骤

  1. 基于k8s官方的sample-controller进行修改
  2. 定义CRD的struct
  3. 使用 code-generator 生成CR的 client, informers, listers, deep-copy
  4. 向k8s注册CR
  5. 向informer注册CR的事件
  6. 在CR的事件处理函数中,创建产品服务的deployment, service, ingress
  7. 使用controller-gen生成CRD的资源yaml文件,并部署
  8. 运行Operator
  9. 创建CR

具体实现

完整代码:https://github.com/EchoGroot/operator-demo

基于k8s官方的sample-controller进行修改

git clone https://github.com/kubernetes/sample-controller.git

修改go.mod里的项目名

修改hack/update-codegen.sh里的项目名,output-base路径

执行go mod vendor下载依赖

目录结构

代码语言:javascript
复制
.
├── README.md
├── artifacts
│   └── crd
├── controller.go
├── go.mod
├── go.sum
├── hack
│   ├── boilerplate.go.txt
│   ├── custom-boilerplate.go.txt
│   ├── tools.go
│   ├── update-codegen.sh # 生成代码的脚本
│   └── verify-codegen.sh
├── main.go
├── pkg
│   ├── apis # CR struct定义
│   ├── generated # 基于CR生成的 client, informers, listers
│   └── signals
└── vendor

定义CRD的struct

文件路径:pkg/apis/appcontroller/v1alpha1/types.go

代码语言:javascript
复制
package v1alpha1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// App is a specification for a App resource
type App struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   AppSpec   `json:"spec"`
    Status AppStatus `json:"status"`
}

// AppSpec is the spec for a App resource
type AppSpec struct {
    Deployment DeploymentSpec `json:"deployment"`
    Service    ServiceSpec    `json:"service"`
    Ingress    IngressSpec    `json:"ingress"`
}

type DeploymentSpec struct {
    Name     string `json:"name"`
    Image    string `json:"image"`
    Replicas int32  `json:"replicas"`
}

type ServiceSpec struct {
    Name string `json:"name"`
}

type IngressSpec struct {
    Name string `json:"name"`
}

// AppStatus is the status for a App resource
type AppStatus struct {
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// AppList is a list of App resources
type AppList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata"`

    Items []App `json:"items"`
}

使用 code-generator 生成CR的 client, informers, listers, deep-copy

执行./hack/update-codegen.shpkg/generated下的代码会被重新生成

向k8s注册CR

生成的代码里通过init函数会自动注册CR。

向informer注册CR的事件

main.go里初始化操作k8s内建资源的clientset,以及查询资源、监听CR事件的informer。

代码语言:javascript
复制
package main

import (
    "flag"
    "time"

    "app-controller/pkg/signals"
    kubeinformers "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/klog/v2"
    // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
    // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

    clientset "app-controller/pkg/generated/clientset/versioned"
    informers "app-controller/pkg/generated/informers/externalversions"
)

var (
    masterURL  string
    kubeconfig string
)

func main() {
    klog.InitFlags(nil)
    flag.Parse()

    // set up signals so we handle the shutdown signal gracefully
    ctx := signals.SetupSignalHandler()
    logger := klog.FromContext(ctx)

    cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
    if err != nil {
        logger.Error(err, "Error building kubeconfig")
        klog.FlushAndExit(klog.ExitFlushTimeout, 1)
    }

    // 操作k8s内建资源
    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        logger.Error(err, "Error building kubernetes clientset")
        klog.FlushAndExit(klog.ExitFlushTimeout, 1)
    }

    // 操作自定义资源
    appClient, err := clientset.NewForConfig(cfg)
    if err != nil {
        logger.Error(err, "Error building kubernetes clientset")
        klog.FlushAndExit(klog.ExitFlushTimeout, 1)
    }

    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
    appInformerFactory := informers.NewSharedInformerFactory(appClient, time.Second*30)

    controller := NewController(ctx, kubeClient, appClient,
        kubeInformerFactory.Apps().V1().Deployments(),
        kubeInformerFactory.Core().V1().Services(),
        kubeInformerFactory.Networking().V1().Ingresses(),
        appInformerFactory.Appcontroller().V1alpha1().Apps())

    // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(ctx.done())
    // Start method is non-blocking and runs all registered informers in a dedicated goroutine.
    kubeInformerFactory.Start(ctx.Done())
    appInformerFactory.Start(ctx.Done())

    if err = controller.Run(ctx, 2); err != nil {
        logger.Error(err, "Error running controller")
        klog.FlushAndExit(klog.ExitFlushTimeout, 1)
    }
}

func init() {
    flag.StringVar(&kubeconfig, "kubeconfig", "/Users/xxx/.kube/config", "Path to a kubeconfig. Only required if out-of-cluster.")
    flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
  • 注册CR创建、更新的事件
代码语言:javascript
复制
appInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueApp,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueApp(new)
        },
    })
  • 删除CR不用管,通过指定产品服务deployment、service、ingress的OwnerReferences为CR,实现当CR删除时,对应的deployment、service、ingress也会被删除。

在CR的事件处理函数中,创建产品服务的deployment, service, ingress

代码语言:javascript
复制
  deployment, err := c.deploymentsLister.Deployments(namespace).Get(app.Spec.Deployment.Name)
    if errors.IsNotFound(err) {
        deployment, err = c.kubeclientset.AppsV1().Deployments(app.Namespace).Create(ctx, newDeployment(app), metav1.CreateOptions{})
    }
    if err != nil {
        return err
    }   
代码语言:javascript
复制
func newDeployment(app *appv1alpha1.App) *appsv1.Deployment {
    labels := map[string]string{
        "app":        "app-deployment",
        "controller": app.Name,
    }
    return &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      app.Spec.Deployment.Name,
            Namespace: app.Namespace,
            OwnerReferences: []metav1.OwnerReference{
                *metav1.NewControllerRef(app, appv1alpha1.SchemeGroupVersion.WithKind("App")),
            },
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: &app.Spec.Deployment.Replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: labels,
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: labels,
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  app.Spec.Deployment.Name,
                            Image: app.Spec.Deployment.Image,
                        },
                    },
                },
            },
        },
    }
}

使用controller-gen生成CRD的资源yaml文件,并部署

  1. 运行controller-gen crd paths=./... output:crd:dir=artifacts/crd,即可生成artifacts/crd/appcontroller.yuyy.com_apps.yaml
代码语言:javascript
复制
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: v0.14.0
  name: apps.appcontroller.yuyy.com
spec:
  group: appcontroller.yuyy.com
  names:
    kind: App
    listKind: AppList
    plural: apps
    singular: app
  scope: Namespaced
  versions:
  - name: v1alpha1
    schema:
      openAPIV3Schema:
        description: App is a specification for a App resource
        properties:
          apiVersion:
            description: |-
              APIVersion defines the versioned schema of this representation of an object.
              Servers should convert recognized schemas to the latest internal value, and
              may reject unrecognized values.
              More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
            type: string
          kind:
            description: |-
              Kind is a string value representing the REST resource this object represents.
              Servers may infer this from the endpoint the client submits requests to.
              Cannot be updated.
              In CamelCase.
              More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
            type: string
          metadata:
            type: object
          spec:
            description: AppSpec is the spec for a App resource
            properties:
              deployment:
                properties:
                  image:
                    type: string
                  name:
                    type: string
                  replicas:
                    format: int32
                    type: integer
                required:
                - image
                - name
                - replicas
                type: object
              ingress:
                properties:
                  name:
                    type: string
                required:
                - name
                type: object
              service:
                properties:
                  name:
                    type: string
                required:
                - name
                type: object
            required:
            - deployment
            - ingress
            - service
            type: object
          status:
            description: AppStatus is the status for a App resource
            type: object
        required:
        - spec
        - status
        type: object
    served: true
    storage: true
  1. kubectl apply

运行Operator

创建CR

artifacts/crd/example.app.yaml

代码语言:javascript
复制
apiVersion: appcontroller.yuyy.com/v1alpha1
kind: App
metadata:
 name: app-demo
spec:
 deployment:
   image: "nginx"
   name: app-deployment-demo
   replicas: 2
 service:
   name: app-service-demo
 ingress:
   name: app-ingress-demo

status: { }

kubectl apply

Operator开始工作

成功创建资源

完整的controller.go

代码语言:javascript
复制
package main

import (
    "context"
    "fmt"
    "k8s.io/api/networking/v1"
    "k8s.io/apimachinery/pkg/util/intstr"
    coreinformers "k8s.io/client-go/informers/core/v1"
    networkinformers "k8s.io/client-go/informers/networking/v1"
    corelisters "k8s.io/client-go/listers/core/v1"
    networklisters "k8s.io/client-go/listers/networking/v1"
    "time"

    "golang.org/x/time/rate"

    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    appsinformers "k8s.io/client-go/informers/apps/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/kubernetes/scheme"
    typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
    appslisters "k8s.io/client-go/listers/apps/v1"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/record"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog/v2"

    appv1alpha1 "app-controller/pkg/apis/appcontroller/v1alpha1"
    clientset "app-controller/pkg/generated/clientset/versioned"
    appscheme "app-controller/pkg/generated/clientset/versioned/scheme"
    informers "app-controller/pkg/generated/informers/externalversions/appcontroller/v1alpha1"
    listers "app-controller/pkg/generated/listers/appcontroller/v1alpha1"
)

const controllerAgentName = "app-controller"

const (
    // SuccessSynced is used as part of the Event 'reason' when a App is synced
    SuccessSynced = "Synced"
    // ErrResourceExists is used as part of the Event 'reason' when a App fails
    // to sync due to a Deployment of the same name already existing.
    ErrResourceExists = "ErrResourceExists"

    // MessageResourceExists is the message used for Events when a resource
    // fails to sync due to a Deployment already existing
    MessageResourceExists = "Resource %q already exists and is not managed by App"
    // MessageResourceSynced is the message used for an Event fired when a App
    // is synced successfully
    MessageResourceSynced = "App synced successfully"
)

// Controller is the controller implementation for App resources
type Controller struct {
    // kubeclientset is a standard kubernetes clientset
    kubeclientset kubernetes.Interface
    // appclientset is a clientset for our own API group
    appclientset clientset.Interface

    deploymentsLister appslisters.DeploymentLister
    deploymentsSynced cache.InformerSynced
    servicesLister    corelisters.ServiceLister
    servicesSynced    cache.InformerSynced
    ingressesLister   networklisters.IngressLister
    ingressesSynced   cache.InformerSynced
    appsLister        listers.AppLister
    appsSynced        cache.InformerSynced

    // workqueue is a rate limited work queue. This is used to queue work to be
    // processed instead of performing it as soon as a change happens. This
    // means we can ensure we only process a fixed amount of resources at a
    // time, and makes it easy to ensure we are never processing the same item
    // simultaneously in two different workers.
    workqueue workqueue.RateLimitingInterface
    // recorder is an event recorder for recording Event resources to the
    // Kubernetes API.
    recorder record.EventRecorder
}

// NewController returns a new app controller
func NewController(ctx context.Context, kubeclientset kubernetes.Interface, appclientset clientset.Interface, deploymentInformer appsinformers.DeploymentInformer, serviceInformer coreinformers.ServiceInformer, ingressInformer networkinformers.IngressInformer, appInformer informers.AppInformer) *Controller {
    logger := klog.FromContext(ctx)

    // Create event broadcaster
    // Add app-controller types to the default Kubernetes Scheme so Events can be
    // logged for app-controller types.
    utilruntime.Must(appscheme.AddToScheme(scheme.Scheme))
    logger.V(4).Info("Creating event broadcaster")

    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
    ratelimiter := workqueue.NewMaxOfRateLimiter(
        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
        &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
    )

    controller := &Controller{
        kubeclientset:     kubeclientset,
        appclientset:      appclientset,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        servicesLister:    serviceInformer.Lister(),
        servicesSynced:    serviceInformer.Informer().HasSynced,
        ingressesLister:   ingressInformer.Lister(),
        ingressesSynced:   ingressInformer.Informer().HasSynced,
        appsLister:        appInformer.Lister(),
        appsSynced:        appInformer.Informer().HasSynced,
        workqueue:         workqueue.NewRateLimitingQueue(ratelimiter),
        recorder:          recorder,
    }

    logger.Info("Setting up event handlers")
    // Set up an event handler for when App resources change
    appInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueApp,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueApp(new)
        },
    })

    return controller
}

// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(ctx context.Context, workers int) error {
    defer utilruntime.HandleCrash()
    defer c.workqueue.ShutDown()
    logger := klog.FromContext(ctx)

    // Start the informer factories to begin populating the informer caches
    logger.Info("Starting App controller")

    // Wait for the caches to be synced before starting workers
    logger.Info("Waiting for informer caches to sync")

    if ok := cache.WaitForCacheSync(ctx.Done(), c.deploymentsSynced, c.servicesSynced, c.ingressesSynced, c.appsSynced); !ok {
        return fmt.Errorf("failed to wait for caches to sync")
    }

    logger.Info("Starting workers", "count", workers)
    // Launch two workers to process App resources
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, c.runWorker, time.Second)
    }

    logger.Info("Started workers")
    <-ctx.Done()
    logger.Info("Shutting down workers")

    return nil
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker(ctx context.Context) {
    for c.processNextWorkItem(ctx) {
    }
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    obj, shutdown := c.workqueue.Get()
    logger := klog.FromContext(ctx)

    if shutdown {
        return false
    }

    // We wrap this block in a func so we can defer c.workqueue.Done.
    err := func(obj interface{}) error {
        // We call Done here so the workqueue knows we have finished
        // processing this item. We also must remember to call Forget if we
        // do not want this work item being re-queued. For example, we do
        // not call Forget if a transient error occurs, instead the item is
        // put back on the workqueue and attempted again after a back-off
        // period.
        defer c.workqueue.Done(obj)
        var key string
        var ok bool
        // We expect strings to come off the workqueue. These are of the
        // form namespace/name. We do this as the delayed nature of the
        // workqueue means the items in the informer cache may actually be
        // more up to date that when the item was initially put onto the
        // workqueue.
        if key, ok = obj.(string); !ok {
            // As the item in the workqueue is actually invalid, we call
            // Forget here else we'd go into a loop of attempting to
            // process a work item that is invalid.
            c.workqueue.Forget(obj)
            utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
            return nil
        }
        // Run the syncHandler, passing it the namespace/name string of the
        // App resource to be synced.
        if err := c.syncHandler(ctx, key); err != nil {
            // Put the item back on the workqueue to handle any transient errors.
            c.workqueue.AddRateLimited(key)
            return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
        }
        // Finally, if no error occurs we Forget this item so it does not
        // get queued again until another change happens.
        c.workqueue.Forget(obj)
        logger.Info("Successfully synced", "resourceName", key)
        return nil
    }(obj)

    if err != nil {
        utilruntime.HandleError(err)
        return true
    }

    return true
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the App resource
// with the current status of the resource.
func (c *Controller) syncHandler(ctx context.Context, key string) error {
    // Convert the namespace/name string into a distinct namespace and name

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
        return nil
    }

    // Get the App resource with this namespace/name
    app, err := c.appsLister.Apps(namespace).Get(name)
    if err != nil {
        // The App resource may no longer exist, in which case we stop
        // processing.
        if errors.IsNotFound(err) {
            utilruntime.HandleError(fmt.Errorf("app '%s' in work queue no longer exists", key))
            return nil
        }

        return err
    }

    deployment, err := c.deploymentsLister.Deployments(namespace).Get(app.Spec.Deployment.Name)
    if errors.IsNotFound(err) {
        deployment, err = c.kubeclientset.AppsV1().Deployments(app.Namespace).Create(ctx, newDeployment(app), metav1.CreateOptions{})
    }
    if err != nil {
        return err
    }

    // service
    service, err := c.servicesLister.Services(namespace).Get(app.Spec.Service.Name)
    if errors.IsNotFound(err) {
        service, err = c.kubeclientset.CoreV1().Services(namespace).Create(ctx, newService(app), metav1.CreateOptions{})
    }
    if err != nil {
        return err
    }

    // ingress
    ingress, err := c.ingressesLister.Ingresses(namespace).Get(app.Spec.Ingress.Name)
    if errors.IsNotFound(err) {
        ingress, err = c.kubeclientset.NetworkingV1().Ingresses(namespace).Create(ctx, newIngress(app), metav1.CreateOptions{})
    }

    // If an error occurs during Get/Create, we'll requeue the item so we can
    // attempt processing again later. This could have been caused by a
    // temporary network failure, or any other transient reason.
    if err != nil {
        return err
    }

    // If the Deployment is not controlled by this App resource, we should log
    // a warning to the event recorder and return error msg.
    if !metav1.IsControlledBy(deployment, app) {
        msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
        c.recorder.Event(app, corev1.EventTypeWarning, ErrResourceExists, msg)
        return fmt.Errorf("%s", msg)
    }
    if !metav1.IsControlledBy(service, app) {
        msg := fmt.Sprintf(MessageResourceExists, service.Name)
        c.recorder.Event(app, corev1.EventTypeWarning, ErrResourceExists, msg)
        return fmt.Errorf("%s", msg)
    }
    if !metav1.IsControlledBy(ingress, app) {
        msg := fmt.Sprintf(MessageResourceExists, ingress.Name)
        c.recorder.Event(app, corev1.EventTypeWarning, ErrResourceExists, msg)
        return fmt.Errorf("%s", msg)
    }

    c.recorder.Event(app, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
    return nil
}

// enqueueApp takes a App resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than App.
func (c *Controller) enqueueApp(obj interface{}) {
    var key string
    var err error
    if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
        utilruntime.HandleError(err)
        return
    }
    c.workqueue.Add(key)
}

// handleObject will take any resource implementing metav1.Object and attempt
// to find the App resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that App resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
func (c *Controller) handleObject(obj interface{}) {
    var object metav1.Object
    var ok bool
    logger := klog.FromContext(context.Background())
    if object, ok = obj.(metav1.Object); !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
            return
        }
        object, ok = tombstone.Obj.(metav1.Object)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
            return
        }
        logger.V(4).Info("Recovered deleted object", "resourceName", object.GetName())
    }
    logger.V(4).Info("Processing object", "object", klog.KObj(object))
    if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
        // If this object is not owned by a App, we should not do anything more
        // with it.
        if ownerRef.Kind != "App" {
            return
        }

        app, err := c.appsLister.Apps(object.GetNamespace()).Get(ownerRef.Name)
        if err != nil {
            logger.V(4).Info("Ignore orphaned object", "object", klog.KObj(object), "app", ownerRef.Name)
            return
        }

        c.enqueueApp(app)
        return
    }
}

// newDeployment creates a new Deployment for a App resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the App resource that 'owns' it.
func newDeployment(app *appv1alpha1.App) *appsv1.Deployment {
    labels := map[string]string{
        "app":        "app-deployment",
        "controller": app.Name,
    }
    return &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      app.Spec.Deployment.Name,
            Namespace: app.Namespace,
            OwnerReferences: []metav1.OwnerReference{
                *metav1.NewControllerRef(app, appv1alpha1.SchemeGroupVersion.WithKind("App")),
            },
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: &app.Spec.Deployment.Replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: labels,
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: labels,
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  app.Spec.Deployment.Name,
                            Image: app.Spec.Deployment.Image,
                        },
                    },
                },
            },
        },
    }
}

func newIngress(app *appv1alpha1.App) *v1.Ingress {
    prefix := v1.PathTypePrefix
    return &v1.Ingress{
        ObjectMeta: metav1.ObjectMeta{
            Name:      app.Spec.Ingress.Name,
            Namespace: app.Namespace,
            OwnerReferences: []metav1.OwnerReference{
                *metav1.NewControllerRef(app, appv1alpha1.SchemeGroupVersion.WithKind("App")),
            },
        },
        Spec: v1.IngressSpec{
            Rules: []v1.IngressRule{
                {
                    IngressRuleValue: v1.IngressRuleValue{
                        HTTP: &v1.HTTPIngressRuleValue{
                            Paths: []v1.HTTPIngressPath{
                                {
                                    PathType: &prefix,
                                    Path:     "/",
                                    Backend: v1.IngressBackend{
                                        Service: &v1.IngressServiceBackend{
                                            Name: app.Spec.Service.Name,
                                            Port: v1.ServiceBackendPort{Number: 80},
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
    }

}

func newService(app *appv1alpha1.App) *corev1.Service {
    labels := map[string]string{
        "app":        "app-deployment",
        "controller": app.Name,
    }
    return &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name:      app.Spec.Service.Name,
            Namespace: app.Namespace,
            OwnerReferences: []metav1.OwnerReference{
                *metav1.NewControllerRef(app, appv1alpha1.SchemeGroupVersion.WithKind("App")),
            },
        },
        Spec: corev1.ServiceSpec{
            Selector: labels,
            Ports: []corev1.ServicePort{
                {
                    Protocol: corev1.ProtocolTCP,
                    Port:     80,
                    TargetPort: intstr.IntOrString{
                        IntVal: 80,
                    },
                },
            },
        },
    }
}

Post Views: 3

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-5-08 2,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 主要步骤
  • 具体实现
    • 基于k8s官方的sample-controller进行修改
      • 定义CRD的struct
        • 使用 code-generator 生成CR的 client, informers, listers, deep-copy
          • 向k8s注册CR
            • 向informer注册CR的事件
              • 在CR的事件处理函数中,创建产品服务的deployment, service, ingress
                • 使用controller-gen生成CRD的资源yaml文件,并部署
                  • 运行Operator
                    • 创建CR
                      • 完整的controller.go
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档