前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Client Go四种交互模式之 DynamicClient实战案例详解

Client Go四种交互模式之 DynamicClient实战案例详解

作者头像
用户1413827
发布2024-03-09 09:12:51
2950
发布2024-03-09 09:12:51
举报
文章被收录于专栏:站长运维站长运维

Client Go四种交互模式之 DynamicClient实战案例详解

引子

Kubernetes赢得了云原生平台之争,同时在绝大多数云原生场景中都凭借其高扩展性担任了重要角色。通过kube-apiserver提供的开放的模块,在不需要切分一个内外部的接口情况下,让我们具备了在同一应用前提下(Controller)与集群及其他系统交互的能力,甚至是自定义的资源描述我们特殊的操作,被称之为Operator Pattern

尽管我们可以使用HTTP ClientAPI Server之间进行交互,但这其实并不简单。如果我们只考虑kubernetes的核心resources,那么这种交互方式恐怕不是最完美的,因为许多resources请求后响应回的数据结构和可能操作都是不一样的。作为加强,Kubernetes本身提供了一系列更加方便整合进kubernetes系统的方式,被称之为client-go

这个项目中被最广泛使用客户端是ClientSet,它是交互客户端的类型之一。这意味着这个接口给kubernetes的每种资源都提供了可用的扩展方法和操作,这也就是为什么但凡要实现扩展,我们首先就应该考虑这种标准操作kubernetes内置接口的方式。

但是这也肯定不能满足全部场景,很多情况下我们要自己声明CRD(Custom Resources Definition),ClientSet无法满足我们我们的需求了。这个时候,我们就需要DynamicClient,也就是k8s.io/client-go/dynamic.Interface接口,来让我们有能力入侵系统,并实现自定义的功能和操作。

这种方式的好处在于,首先,它避免了对kubernetes系统的强依赖,让我们可以操作自定义resources。如果你想通过其他操作作为构建块来建构自动化或者工作流,比如ExternalDNS,CertManager,Prometheus Operator等,通常就是把这些项目,使用Go语言数据类型,把它们注册到你的Client实例当中,作为依赖。操作的自由度越大,管控的难度显然就会越大,你需要管理这些扩展功能的迭代升级版本,同时又要保持你已经在集群上安装的版本与go.mod文件中声明的版本相一致。

第二,你可以协调多种或未知的resources。当你的operator实现了一个通用逻辑,它就可以与任何通用的Kubernetes resources(RBACPods)交互,custom resources也不在话下,这个时候,dynamicClient就是你唯一的解决的方案。DynamicClient中有些例子对garbage collection controller是有重度依赖的,假如你打算对项目里任意的custom resources提供支持DynamicClient类型扩展也必不可少,比如KubeWatch

因此,深入client-go项目的组成模块和弄清楚如何使用好它就是要具备的技能。

# 通过DynamicClient做基础操作

以下的代码都预设你在`kubernetes cluster`中进行使用。

许多DynamicClient相关的操作都与TypedClient类似,比如创建新实例,都可以通过给构造方法提供配置文件来完成。

代码语言:javascript
复制
func newClient() (dynamic.Interface, error) {
	config, err := rest.InClusterConfig()
	if err != nil {
		return nil, err
	}

	dynClient, err := dynamic.NewForConfig(config)
	if err != nil {
		return nil, err
	}

	return dynClient, nil
}

尽管DynamicClient对于你通过resources声明的意图一无所知,它也没提供类似于CoreV1().Pod类似的方法。所以得告诉它怎么来解析和处理你提供的resources,这时候首先要做的,你需要提供一个schema.GroupVersionResource,它是一个提供了必要信息的golang数据结构,用它可以来构造针对于API ServerHTTP请求。

举例,如果你想实现一个从MongoDB Community Operator拉取到所有Mongodb resources列表的功能,你可以这么做:

代码语言:javascript
复制
var monboDBResource = schema.GroupVersionResource{Group: "mongodbcommunity.mongodb.com", Version: "v1", Resource: "mongodbcommunity"}

func ListMongoDB(ctx context.Context, client dynamic.Interface, namespace string) ([]unstructured.Unstructured, error)  {
	list, err := client.Resource(monboDBResource).Namespace(namespace).List(ctx, metav1.ListOptions{})
	if err != nil {
		return nil, err
	}

	return list.Items, nil
}

这里要注意,如果你处理带有namespaceresources,那.Namespace(namespace)就是强制的,而使用空字符就是列出所有命名空间下的。

这段代码片段,我们可以看到DynamicClient的主要组成unstructured.Unstructured。这是一个封装任意JSON结构的特别类型,同时也符合标准的Kubernetes interface,比如runtime.Object,但是最为重要的是它提供了在unstructure package下一系列helpers用于操作这些数据。

扩充下我们上面的例子,如果我们想按一定比例来扩容MongoDB,我们可以像如下方式使用:

代码语言:javascript
复制
// ScaleMongoDB changes the number of members by the given proportion,
// which should be 0 =< proportion < 1.
func ScaleMongoDB(ctx context.Context, client dynamic.Interface, name string, namespace string, proportion uint) error {
	if proportion > 1 {
		return fmt.Errorf("proportion should be between 0 =< proportion < 1")
	}

	mongoDBClient := client.Resource(monboDBResource).Namespace(namespace)
	mdb, err := mongoDBClient.Get(ctx, name, metav1.GetOptions{})
	if err != nil {
		return err
	}

	members, found, err := unstructured.NestedInt64(mdb.UnstructuredContent(), "spec", "members")
	if err != nil {
		return err
	}

	if !found {
		return fmt.Errorf("members field not found on MongoDB spec")
	}

	scaled := int(members) * (1 + int(proportion))

	patch := []interface{}{
		map[string]interface{}{
			"op":    "replace",
			"path":  "/spec/members",
			"value": scaled,
		},
	}

	payload, err := json.Marshal(patch)
	if err != nil {
		return err
	}

	_, err = mongoDBClient.Patch(ctx, name, types.JSONPatchType, payload, metav1.PatchOptions{})
	if err != nil {
		return err
	}

	return nil
}

这里我们利用unstructured.NestedInt64只访问我们要操作的字段,保证我们与MongoDB CRD间的耦合最小,同时又可以在安全类型之下来操作resource data

unstructured package有很多像这种的helpers,不只用于读取,也可以在resource上的任意字段上进行写入。

Kubernetes上执行所有常规操作(get,list,watch,create,patch,delete等)也遵循同样规则:即提供scheme.GroupVersionResource,同时处理unstructured.Unstructured结果。

# 带有DynamicClientController

更高级和对Kubernetes client的使用是创建一个controller,它可对实际集群状态的变化进行响应,把它调整到期望状态。

通常,我们使用Informer,一个由k8s.io/client-go提供的模块,状态发生改变、创建就会在指定类型的client运行一个handler。幸运的是Dynamic package提供了一个我们可以使用的Informer模块。

举例,如果MongoDB被检测到清除了相关PersistentVolumeClaims,我们进行捕获,如下:

代码语言:javascript
复制
package main

import (
	"fmt"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/dynamic/dynamicinformer"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
	"time"
)

const maxRetries = 3

var monboDBResource = schema.GroupVersionResource{Group: "mongodbcommunity.mongodb.com", Version: "v1", Resource: "mongodbcommunity"}

type MongoDBController struct {
	informer cache.SharedIndexInformer
	stopper  chan struct{}
	queue    workqueue.RateLimitingInterface
}

func NewMongoDBController(client dynamic.Interface) (*MongoDBController, error) {
	dynInformer := dynamicinformer.NewDynamicSharedInformerFactory(client, 0)
	informer := dynInformer.ForResource(monboDBResource).Informer()
	stopper := make(chan struct{})

	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		DeleteFunc: func(obj interface{}) {
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	})

	return &MongoDBController{
		informer: informer,
		queue: queue,
		stopper: stopper,
	}, nil
}

func (m *MongoDBController) Stop() {
	close(m.stopper)
}

func (m *MongoDBController) Run() {
	defer utilruntime.HandleCrash()

	defer m.queue.ShutDown()

	go m.informer.Run(m.stopper)

	if !cache.WaitForCacheSync(m.stopper, m.informer.HasSynced) {
		utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
		return
	}

	wait.Until(m.runWorker, time.Second, m.stopper)
}

func (m *MongoDBController) runWorker() {
	for {
		key, quit := m.queue.Get()
		if quit {
			return
		}

		err := m.processItem(key.(string))

		if err == nil {
			m.queue.Forget(key)
		} else if m.queue.NumRequeues(key) < maxRetries {
			m.queue.AddRateLimited(key)
		} else {
			m.queue.Forget(key)
			utilruntime.HandleError(err)
		}

		m.queue.Done(key)
	}
}

func (m *MongoDBController) processItem(mongodb string) error {
	return nil
}

这段绝大多数代码都非常标准,例如使用特定化类型client controllerwork queue,informer event handler,item processing

此外,使用由DynamicClient所提供的解耦方式,在terms complexity上确实让人少了好多头痛事。

# 用DynamicClient进行测试

如果我们想用DynamicClient来进行伸缩,还要让其具备像TypedClient一样易于测试,这样才能保证整体扩展的灵活性和健壮性。

在上面给出的Controller例子中,dynamic package提供了一个等效的伪客户端,允许使用它做存根对象验证及断言行为执行。这里多说两句,如果对于单元测试了解不深入,可能对于我说的概念有点糊涂,模拟通过具有验证调用和交互的能力,而存根通常没有,存根通常用于提供固定数据或控制测试环境,而模拟则用于代码和外部环境间的交互。

代码语言:javascript
复制
package main

import (
	"context"
	"k8s.io/apimachinery/pkg/runtime"
	dynamicfake "k8s.io/client-go/dynamic/fake"
)

func TestDynamicClient(t *testing.T) {
	mdb := &unstructured.Unstructured{}
	mdb.SetUnstructuredContent(map[string]interface{}{
		"apiVersion": "mongodbcommunity.mongodb.com/v1",
		"kind": "MongoDBCommunity",
		"metadata": map[string]interface{} {
			"name":      "mongodb-test",
			"namespace": "default",
		},
		"spec": map[string]interface{}{
			"members": 3,
		},
	})

	dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), mdb)
  NotifyMongoDBs(context.Background(), dynamicClient)

	AssertActions(t,  dynamicClient.Actions(), []ExpectedAction{
		{
			Verb: "list",
			Namespace: "default",
			Resource: "mongodbcommunity",
		},
	})

}

使用unstructured.Unstructured类型,我们可以像使用YAML文件中相同标签一样来创建Kubernetes对象,但要带有maps

在执行测试逻辑之后,我们可以使用dynamicClient.Actions()来查看我们代码里执行的所有操作。然而,在每个测试上面手动断言这些actions,通常会导致代码可读性下降和不够完善的断言。

与此同时,我经常使用特定的断言功能AssertActions来校验是不是每个预期的action都可以在可执行actions中被查找到。关键点在于,此功能无法提供一个准确的列表匹配,举例,如果正在使用的client执行了一个删除操作,测试无法进行中断,这里仅有的用于失败的AssertAction条件是列表提供的operation在预期的列表中查询不到。可以在,仅当预期的actions被执行到时,来改变断言功能或者创建一个sibling function来做验证。

尽管当前的实现比较麻烦,但是此功能与DynamicClientTypedClient均可协调使用。

代码语言:javascript
复制
type ExpectedAction struct {
	Verb string
	Name string
	Namespace string
	Resource string

	// Patch action
	PatchType types.PatchType
	PatchPayload []map[string]interface{}
}

func AssertActions(t *testing.T, got []kubetesting.Action, expected []ExpectedAction) {
	if len(expected) > len(got) {
		t.Fatalf("executed actions too short, expected %d, got %d", len(expected), len(got))
		return
	}

	for i, expectedAction := range expected {
		if !AssertExpectedAction(got, expectedAction) {
			t.Fatalf("action %d does not match any of the got actions", i)
		}
	}
}

func AssertExpectedAction(got []kubetesting.Action, expectedAction ExpectedAction) bool {
	for _, gotAction := range got {
		switch expectedAction.Verb {
		case "get":
			getAction, ok := gotAction.(kubetesting.GetAction)
			if !ok {
				continue
			}

			if getAction.GetName() != expectedAction.Name {
				continue
			}

			if !validateNamespaceAndResource(getAction, expectedAction) {
				continue
			}

			return true
		case "list":
			listAction, ok := gotAction.(kubetesting.ListAction)
			if !ok {
				continue
			}

			if !validateNamespaceAndResource(listAction, expectedAction) {
				continue
			}

			return true
		case "watch":
			watchAction, ok := gotAction.(kubetesting.WatchAction)
			if !ok {
				continue
			}

			if !validateNamespaceAndResource(watchAction, expectedAction) {
				continue
			}

			return true
		case "create":
			createAction, ok := gotAction.(kubetesting.CreateAction)
			if !ok {
				continue
			}

			if !validateNamespaceAndResource(createAction, expectedAction) {
				continue
			}

			return true
		case "update":
			updateAction, ok := gotAction.(kubetesting.UpdateAction)
			if !ok {
				continue
			}

			if !validateNamespaceAndResource(updateAction, expectedAction) {
				continue
			}

			return true
		case "delete":
			deleteAction, ok := gotAction.(kubetesting.DeleteAction)
			if !ok {
				continue
			}

			if deleteAction.GetName() != expectedAction.Name {
				continue
			}

			if !validateNamespaceAndResource(deleteAction, expectedAction) {
				continue
			}

			return true
		case "patch":
			patchAction, ok := gotAction.(kubetesting.PatchAction)
			if !ok {
				continue
			}

			if patchAction.GetName() != expectedAction.Name {
				continue
			}

			if !validateNamespaceAndResource(patchAction, expectedAction) {
				continue
			}

			if patchAction.GetPatchType() != expectedAction.PatchType {
				continue
			}

			patchBytes, err := json.Marshal(expectedAction.PatchPayload)
			if err != nil {
				continue
			}

			if !bytes.Equal(patchAction.GetPatch(), patchBytes) {
				continue
			}

			return true
		}
	}

	return false
}

func validateNamespaceAndResource(action kubetesting.Action, expectedAction ExpectedAction) bool {
	return action.GetNamespace() == expectedAction.Namespace && action.GetResource().Resource == expectedAction.Resource
}

这个断言功能允许你随便往里加判断条件,比如验证list/watch限制以及create/update主体。

总结

Kubernetes的生态系统非常丰富,我们时不时会发现这样的宝藏。我强烈大家阅读k8s.io/client-go源码,当然仅仅读它也不够,sigs.k8s.io/controller-runtime项目和Kubernetes Reference API都是极好的。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-03-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Client Go四种交互模式之 DynamicClient实战案例详解
  • 引子
  • 总结
相关产品与服务
腾讯云服务器利旧
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档