Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >client-go 源码分析(5) - informer机制中的本地存储indexer

client-go 源码分析(5) - informer机制中的本地存储indexer

作者头像
后端云
发布于 2023-02-10 06:17:07
发布于 2023-02-10 06:17:07
61900
代码可运行
举报
文章被收录于专栏:后端云后端云
运行总次数:0
代码可运行

informer机制中的本地存储(local cache),对应的结构体是下面的cache struct。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type cache struct {
	// cacheStorage bears the burden of thread safety for the cache
	cacheStorage ThreadSafeStore
	// keyFunc is used to make the key for objects stored in and retrieved from items, and
	// should be deterministic.
	keyFunc KeyFunc
}

该结构体包含有一个KeyFunc函数属性(一个cache对象,或者说一个indexer,或者说一个本地存储,只有一个KeyFunc,作用是为items的value:obj生成对应的key,这里的KeyFunc和下面的IndexFunc不一样,别搞混了)和ThreadSafeStore接口,下面的threadSafeMap struct实现了ThreadSafeStore接口的所有方法。

KeyFunc函数的作用是算出一个obj对象的不重复的key,将算出的key作为items的key,obj作为items的value。而items map就是实际的存储本地存储数据的地方。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// index implements the indexing functionality
	index *storeIndex
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type storeIndex struct {
	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

在进入threadSafeMap结构体中的storeIndex结构体前,需要了解几个概念。

storeIndex是items map的索引,且设计的比较精妙,但是理解起来有点绕,下面的图是索引的简单例子。接下来的索引的代码要结合下面的图理解会清晰很多。

在local store中最主要的是有4个概念需要理解:

  1. Indexers: type Indexers map[string]IndexFunc
  2. IndexFunc: type IndexFunc func(obj interface{}) ([]string, error)
  3. Indices: type Indices map[string]Index
  4. Index: type Index map[string]sets.String

结合上面的图理解:

  1. Indexers:索引函数的集合,它为一个map,其key为索引器的名字IndexName(自定义,但要唯一),value为对应的索引函数IndexFunc
  2. IndexFunc: 索引函数,它接收一个obj,并实现逻辑来取出/算出该obj的索引数组。需要注意是索引数组,具体取什么或算出什么作为索引完全是我们可以自定义的。
  3. Indices: 索引数据集合,它为一个map,其key和Indexers中的key对应,表示索引器的名字。Value为当前到达数据通过该索引函数计算出来的Index。
  4. Index: 索引与数据key集合,它的key为索引器计算出来的索引数组中的每一项,value为对应的资源的key(默认namespace/name)集合。

通读下client-go\tools\cache 包下的 store.go 和 thread_safe_store.go 的代码,其中索引有点绕,多次反复对照上图可以获得更好理解。再结合下面实际的建立和查询索引的例子更加有助理解代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package main

import (
	"fmt"

	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/tools/cache"
)

const (
	NamespaceIndexName = "namespace"
	NodeNameIndexName  = "nodeName"
)

func NamespaceIndexFunc(obj interface{}) ([]string, error) {
	m, err := meta.Accessor(obj)
	if err != nil {
		return []string{""}, fmt.Errorf("object has no meta: %v", err)
	}
	return []string{m.GetNamespace()}, nil
}

func NodeNameIndexFunc(obj interface{}) ([]string, error) {
	pod, ok := obj.(*v1.Pod)
	if !ok {
		return []string{}, nil
	}
	return []string{pod.Spec.NodeName}, nil
}

func main() {
	index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
		NamespaceIndexName: NamespaceIndexFunc,
		NodeNameIndexName:  NodeNameIndexFunc,
	})

	pod1 := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "index-pod-1",
			Namespace: "default",
		},
		Spec: v1.PodSpec{NodeName: "node1"},
	}
	pod2 := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "index-pod-2",
			Namespace: "default",
		},
		Spec: v1.PodSpec{NodeName: "node2"},
	}
	pod3 := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "index-pod-3",
			Namespace: "kube-system",
		},
		Spec: v1.PodSpec{NodeName: "node2"},
	}

	_ = index.Add(pod1)
	_ = index.Add(pod2)
	_ = index.Add(pod3)

	// ByIndex 两个参数:IndexName(索引器名称)和 indexKey(需要检索的key)
	pods, err := index.ByIndex(NamespaceIndexName, "default")
	if err != nil {
		panic(err)
	}
	for _, pod := range pods {
		fmt.Println(pod.(*v1.Pod).Name)
	}

	fmt.Println("==========================")

	pods, err = index.ByIndex(NodeNameIndexName, "node2")
	if err != nil {
		panic(err)
	}
	for _, pod := range pods {
		fmt.Println(pod.(*v1.Pod).Name)
	}

}

上面的代码可以说是最常见的最简单的索引example。创建了两个索引函数,分别是NamespaceIndexFunc和NodeNameIndexFunc,函数的作用分别是获取obj的namespace信息和获取obj的所在节点信息。创建了3个pod,其中pod1,pod2在default namespace中,pod3在另一个namespace中;pod2,pod3在node2上,pod1在另一个node上。

index.ByIndex是通过索引查找,调用了两次,分别查找default namespace的pod对象 和 在node2节点的pod对象。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
GOROOT=C:\go\go1.19 #gosetup
GOPATH=C:\Users\hanwei\go #gosetup
C:\go\go1.19\bin\go.exe build -o C:\Users\hanwei\AppData\Local\Temp\GoLand\___11go_build_lab.exe lab #gosetup
C:\Users\hanwei\AppData\Local\Temp\GoLand\___11go_build_lab.exe
index-pod-1
index-pod-2
==========================
index-pod-2
index-pod-3

Process finished with the exit code 0
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ByIndex(indexName, indexedValue string) ([]interface{}, error)

ByIndex 方法主要工作是:根据索引器名称,比如上面main方法例子中的nodeName索引器名称,获取索引函数NodeNameIndexFunc,所根据索引器名称获得的索引函数为nil,则往上层报错索引器不存在。并通过map indices(map indices的key是索引器名称,value是index)根据索引器名称nodeName获取真正的索引index,index对应上图的右下角的表格,ByIndex 方法的第二个参数对应上图右下角表格的第一列,set := index[indexedValue]中的set对应第二列,set对应items的key值,items map是实际存储obj的map。通过set对应items的key值可以获取实际的obj,即main方法中的pod list。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()

	indexFunc := c.indexers[indexName]
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}

	index := c.indices[indexName]

	set := index[indexedValue]
	list := make([]interface{}, 0, set.Len())
	for key := range set {
		list = append(list, c.items[key])
	}

	return list, nil
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
_ = index.Add(pod1)
_ = index.Add(pod2)
_ = index.Add(pod3)

index.Add的方法是对obj对象建立索引。上面的main方法对三个pod对象创建索引。注意,在对obj创建索引之前需要先创建索引器,否则会报错。也可换一种说法,在创建索引器的时候会检查items是否为空,若不为空会报错。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	c.cacheStorage.Add(key, obj)
	return nil
}

index.Add 先通过KeyFunc算出obj的key,然后将key/value对存入items中。并调用c.cacheStorage.Add方法,该方法会调用 updateIndices 方法。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *threadSafeMap) Add(key string, obj interface{}) {
	c.Update(key, obj)
}

func (c *threadSafeMap) Update(key string, obj interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
	oldObject := c.items[key]
	c.items[key] = obj
	c.updateIndices(oldObject, obj, key)
}

updateIndices中的oldObj为nil(以main方法为例),下面的代码主要处理的工作是:

  1. 遍历所有indexers,获得key/value对,即索引器名称和索引函数,用索引函数算出obj的indexValues,那上面的main方法中的 nodeName索引器名称,获取索引函数NodeNameIndexFunc 这一对map indexers的key/value对举例,indexValues, err = indexFunc(newObj)这句代码算出三个pod的所在节点。
  2. 执行 c.addKeyToIndex(key, value, index) 这一句代码。三个参数分别为:第一个参数为 pod obj的存储在items map中的key值,第二个参数为pod obj的pod所在节点的信息,第三个参数为index map,即上图右下角的表格。
  3. addKeyToIndex方法的就是更新index map,index map的key是对应的上面的node信息,key对应的value是pod obj的item中的key值。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
	var oldIndexValues, indexValues []string
	var err error
	for name, indexFunc := range c.indexers {
		if oldObj != nil {
			oldIndexValues, err = indexFunc(oldObj)
		} else {
			oldIndexValues = oldIndexValues[:0]
		}
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}

		if newObj != nil {
			indexValues, err = indexFunc(newObj)
		} else {
			indexValues = indexValues[:0]
		}
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}

		index := c.indices[name]
		if index == nil {
			index = Index{}
			c.indices[name] = index
		}

		if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
			// We optimize for the most common case where indexFunc returns a single value which has not been changed
			continue
		}

		for _, value := range oldIndexValues {
			c.deleteKeyFromIndex(key, value, index)
		}
		for _, value := range indexValues {
			c.addKeyToIndex(key, value, index)
		}
	}
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) {
	set := index[indexValue]
	if set == nil {
		set = sets.String{}
		index[indexValue] = set
	}
	set.Insert(key)
}

总结下,上面的main方法生成的索引相关的map如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# Indexers 就是包含的所有索引器(分类)以及对应实现
indexers: {  
  "namespace": NamespaceIndexFunc,
  "nodeName": NodeNameIndexFunc,
}
# Indices 就是包含的所有索引分类中所有的索引数据
indices: {
 "namespace": {  #namespace 这个索引分类下的所有索引数据
  "default": ["pod-1", "pod-2"],  # Index 就是一个索引键下所有的对象键列表
  "kube-system": ["pod-3"]   # Index
 },
 "nodeName": {  # nodeName 这个索引分类下的所有索引数据(对象键列表)
  "node1": ["pod-1"],  # Index
  "node2": ["pod-2", "pod-3"]  # Index
 }
}
  • 这两个map 的 key 数量和 名称完全一致。 key是索引器名称,value分别是索引器函数和index map。
  • map indices的value也是map,是index map。
  • map index 的key/value是 map Indexers的value函数 通过入参obj算出来后插入的。
  • mapindex 的value不是的obj,而是 map items 中的key,通过map items[key]可以获取obj。

增删改查索引的实现都挺简单的,其实主要还是要对 indices、indexs 这些数据结构非常了解,这样就非常容易了。

主要难点就是 indices、indexs 这些数据结构,另外还有几个次要的点,不要概念搞混了,indexFunc,keyFunc,items。可以将 indexFunc 当成当前对象的命名空间来看待,对理解又有一定的帮助。

通过索引的设计,可以看出极大加快了查询obj的速度,并且可以自定义索引函数,实现快速个性化索引查询。数据库查询为了加快查询速度也会有索引的设计,上面也可以算是个数据库索引的本地存储的实现。

理解了上面的主线代码,理解任何informer 的 local cache的代码都容易理解了。比如下面Index 函数已经很好理解了,虽然是所有代码中剩下的最复杂的方法了:

Index 函数就是获取一个指定对象的指定索引下面的所有的对象全部获取到,比如我们要获取一个 Pod 所在命名空间下面的所有 Pod,如果更抽象一点,就是符合对象某些特征的所有对象。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Index returns a list of items that match the given object on the index function.
// Index is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()

	indexFunc := c.indexers[indexName]
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}

	indexedValues, err := indexFunc(obj)
	if err != nil {
		return nil, err
	}
	index := c.indices[indexName]

	var storeKeySet sets.String
	if len(indexedValues) == 1 {
		// In majority of cases, there is exactly one value matching.
		// Optimize the most common path - deduping is not needed here.
		storeKeySet = index[indexedValues[0]]
	} else {
		// Need to de-dupe the return list.
		// Since multiple keys are allowed, this can happen.
		storeKeySet = sets.String{}
		for _, indexedValue := range indexedValues {
			for key := range index[indexedValue] {
				storeKeySet.Insert(key)
			}
		}
	}

	list := make([]interface{}, 0, storeKeySet.Len())
	for storeKey := range storeKeySet {
		list = append(list, c.items[storeKey])
	}
	return list, nil
}

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-12-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端云 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
client-go 之 Indexer 的理解
前面我们讲到 DeltaFIFO 中的元素通过 Pop 函数弹出后,在指定的回调函数中将元素添加到了 Indexer 中。Indexer 是什么?字面意思是索引器,它就是 Informer 中的 LocalStore 部分,我们可以和数据库进行类比,数据库是建立在存储之上的,索引也是构建在存储之上,只是和数据做了一个映射,使得按照某些条件查询速度会非常快,所以说 Indexer 本身也是一个存储,只是它在存储的基础上扩展了索引功能。从 Indexer 接口的定义可以证明这一点:
我是阳明
2020/09/04
2.8K0
client-go 之 Indexer 的理解
k8s源码分析- Informer机制
由于Informer这部分的源码比较复杂,调用链路也很长,后面的源码分析,都会围绕这一张图展开。
kinnylee
2020/10/15
5.6K0
k8s源码分析- Informer机制
client-go的Indexer三部曲之三:源码阅读
程序员欣宸
2023/07/10
3110
client-go的Indexer三部曲之三:源码阅读
Informer LocalStore源码解析
Store为最基本的存储接口,提供增删改查基本功能,要求对象有唯一键,键的计算方式由接口的具体实现决定,很好理解。
李鹤
2023/03/28
3260
Informer LocalStore源码解析
kubernetes client-go解析
Indexer保存了来自apiServer的资源。使用listWatch方式来维护资源的增量变化。通过这种方式可以减小对apiServer的访问,减轻apiServer端的压力
charlieroro
2020/03/24
1.3K1
kubernetes client-go解析
K8s源码分析(24)-ThreadSafeStore组件
上一篇文章里,我们主要介绍了和资源索引相关的一系列对象,其中包括了 indexer 对象,index 对象,以及 indices 对象等等。在本篇文章里我们主要来介绍和对象存储相关的组件 ThreadSafeStore 接口以及其实现。
TA码字
2022/10/30
3260
K8s源码分析(24)-ThreadSafeStore组件
浅谈 K8s Informer
进入 K8s 的世界,会发现有很多的 Controller,它们都是为了完成某类资源(如 pod 是通过 DeploymentController, ReplicaSetController 进行管理)的调谐,目标是保持用户期望的状态。
astraw99
2021/09/14
1.5K2
浅谈 K8s Informer
K8s源码分析(25)-Store组件和Indexer组件
上一篇文章里,我们主要介绍了和对象存储相关的组件 ThreadSafeStore 接口以及它的实现结构体 threadSafeMap,本质上来说该接口是并发安全的资源对象存储数据结构。在本篇文章里我们主要来介绍 Store 和 Indexer ,它们同样也是资源对象存储组件。
TA码字
2022/10/30
4800
K8s源码分析(25)-Store组件和Indexer组件
client-go 源码分析(10) - 使用client-go实现一个简单controller的例子
下面的example也是client-go官方的例子。通过这个简单的例子正好把之前的源码分析的一个个模块都串起来了。
后端云
2023/02/10
8520
client-go 源码分析(10) - 使用client-go实现一个简单controller的例子
Kubernetes Controller Manager 工作原理
在 Kubernetes Master 节点中,有三个重要组件:ApiServer、ControllerManager、Scheduler,它们一起承担了整个集群的管理工作。本文尝试梳理清楚 ControllerManager 的工作流程和原理。
CS实验室
2021/03/22
3.6K0
Kubernetes Controller Manager 工作原理
client-go实战之十:标签选择(labels.Selector),重要
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 本篇概览 本文是《client-go实战》系列的第十篇,kubernetes真是博大精深,尽管前面已有九篇实战,依然有个十分重要的基础知识点没覆盖到,这也是今天的重要内容:标签选择器labels.Selector 本文由以下内容组成 准备工作:部署nginx的deployment和service 按照官方文档,解读LabelSelector 什么是标签选择器(l
程序员欣宸
2023/03/12
2.8K0
client-go实战之十:标签选择(labels.Selector),重要
client-go 之 DeltaFIFO 实现原理
前文我们讲到 Reflector 中通过 ListAndWatch 获取到数据后传入到了本地的存储中,也就是 DeltaFIFO 中。从 DeltaFIFO 的名字可以看出它是一个 FIFO,也就是一个先进先出的队列,而 Delta 表示的是变化的资源对象存储,包含操作资源对象的类型和数据,Reflector 就是这个队列的生产者。
我是阳明
2020/09/04
3K0
K8s源码分析(23)-indexer及index和indices组件
上一篇文章里,我们主要介绍了 kubernetes 世界中的 clientset 对象,它的主要作用是用来获取所有资源操作对象的工厂,所以从本质上来说,clientset 就是资源操作对象工厂的工厂。本篇文章里我们主要来介绍和在 client go 组件中和索引相关的一系列对象,其中包括了 indexer,index,以及 indices。
TA码字
2022/10/30
7160
K8s源码分析(23)-indexer及index和indices组件
原 深入分析Kubernetes Sche
PriorityQueue PriorityQueue Struct 先看看PriorityQueue的结构定义。 type PriorityQueue struct { lock sync.RWM
Walton
2018/06/20
9550
使用 client-go 对 Kubernetes 进行自定义开发及源码分析
注意:这里 Kubernetes 集群搭建使用 Minikube 来完成,Minikube 启动的单节点 k8s Node 实例是需要运行在本机的 VM 虚拟机里面,所以需要提前安装好 VM,这里我选择 Oracle VirtualBox。k8s 运行底层使用 Docker 容器,所以本机需要安装好 Docker 环境,这里忽略 Docker、VirtualBox、Minikube、Kubectl 的安装过程,可以参考之前文章 Minikube & kubectl 升级并配置, 这里着重介绍下 client-go 安装以及如何自定义操作 k8s 各资源类型。
哎_小羊
2019/05/25
6.5K0
Kubernetes Informer基本原理
不论是 k8s 自身组件,还是自己编写 controller,都需要通过 apiserver 监听 etcd 事件来完成自己的控制循环逻辑。
政采云前端团队
2024/01/30
8930
Kubernetes Informer基本原理
16.深入k8s:Informer使用及其源码分析
这次讲解我用了很一些图,尽可能的把这个模块给描述清楚,如果感觉对你有所帮助不妨发一封邮件激励一下我~
luozhiyun
2020/10/28
2.7K0
16.深入k8s:Informer使用及其源码分析
如何高效掌控K8s资源变化?K8s Informer实现机制浅析
作者:腾讯云云巢团队研发工程师 王成 导语:本文通过分析 K8s 中 Reflector(反射器)、DeletaFIFO(增量队列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享资源通知器)、processorListener(事件监听处理器)、workqueue(事件处理工作队列) 等组件,对 Informer 实现机制进行了解析。 PART ONE 概述 进入 K8s 的世界,会发现有很多的 Controller,它们都是为了完成某类资源(如 pod
腾源会
2021/09/15
5670
kubernetes controller 解析
controller内部有个内存cache,cache 一般和lister/ indexer 一起配合使用, 用一个 Indexer interface进行的包装
王磊-字节跳动
2019/10/07
1.9K0
client-go的Indexer三部曲之一:基本功能
这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 《client-go的Indexer三部曲》全部链接
程序员欣宸
2023/07/10
3490
client-go的Indexer三部曲之一:基本功能
相关推荐
client-go 之 Indexer 的理解
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验