Indexer 原理
简介
上一篇详细讲解了 Deltafifo 的原理,Pop()
获取 Deltafifo 的事件进行处理时,最终都是在操作 Indexer。
Indexer 是一个索引缓存,用于缓存 Reflector 从 Kube-apiserver List/Watch
到的资源对象,可以理解是一个带索引查询的内存型存储。
下面通过走读源码的形式来了解 Indexer 的原理,本篇基于 k8s.io/client-go v0.23.4 源码讲解
Indexer 理解
Indexer 是一个内存存储组件,但是除了存储功能,还包含索引查询的功能,所以涉及到几个比较难理解的关键词,在介绍 Indexer 之前先讲解这几个关键词的功能和区别。
1
2
3
4
5
6
7
8
9
10
11
// Indexers包含了所有索引器(索引分类)及其索引器函数IndexFunc,IndexFunc为计算某个索引键下的所有对象键列表的方法
type Indexers map [ string ] IndexFunc
// 计算索引键
type IndexFunc func ( obj interface {}) ([] string , error )
// Indices包含了所有索引器(索引分类)及其所有的索引数据Index;而Index则包含了索引键以及索引键下的所有对象键的列表;
type Indices map [ string ] Index
// Index 存储一个索引键的所有对象
type Index map [ string ] sets . String
上面有几个比较难以理解的名词:Indexers 、IndexFunc 、Indices 、Index ,这四个名词都是关于索引功能的,下面通过例子来阐述。
Indexers、IndexFunc
Indexers 包含了所有索引器 (索引分类)及其索引器函数 IndexFunc,IndexFunc 为计算某个索引键 下的所有对象键 列表的方法;
Indexers:索引器
IndexFunc:索引器函数
1
2
3
4
Indexers: {
"索引器1" : 索引函数 1 ,
"索引器2" : 索引函数 2 ,
}
示例:
1
2
3
4
Indexers: {
"namespace" : MetaNamespaceIndexFunc ,
"nodeName" : NodeNameIndexFunc ,
}
MetaNamespaceIndexFunc
获取对象的 namespace,该 namespace 作为索引键
NodeNameIndexFunc
获取对象的 nodeName,该 nodeName 作为索引键
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func MetaNamespaceIndexFunc ( obj interface {}) ([] string , error ) {
meta , err := meta . Accessor ( obj )
if err != nil {
return [] string { "" }, fmt . Errorf ( "object has no meta: %v" , err )
}
return [] string { meta . GetNamespace ()}, nil
}
func NodeNameIndexFunc ( obj interface {}) ([] string , error ) {
pod , ok := obj .( * v1 . Pod )
if ! ok {
return [] string { "" }, fmt . Errorf ( " object is not a pod )
}
return [] string { pod . Spec . NodeName }, nil
}
Indices、Index
Indices 包含了所有索引器 (索引分类)及其所有的索引数据 Index;而 Index 则包含了索引键 以及索引键下的所有对象键 的列表;
1
2
3
4
5
6
7
8
9
10
11
Indices: {
# index
"索引器1" : {
"索引键1" : [ "对象键1" , "对象键2" ],
"索引键2" : [ "对象键3" ],
},
"索引器2" : {
"索引键3" : [ "对象键1" ],
"索引键4" : [ "对象键2" , "对象键3" ],
}
}
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
pod1 := & v1 . Pod {
ObjectMeta : metav1 . ObjectMeta {
Name : "pod-1" ,
Namespace : "default" ,
},
Spec : v1 . PodSpec {
NodeName : "node1" ,
}
}
pod2 := & v1 . Pod {
ObjectMeta : metav1 . ObjectMeta {
Name : "pod-2" ,
Namespace : "default" ,
},
Spec : v1 . PodSpec {
NodeName : "node2" ,
}
}
pod3 := & v1 . Pod {
ObjectMeta : metav1 . ObjectMeta {
Name : "pod-3" ,
Namespace : "kube-system" ,
},
Spec : v1 . PodSpec {
NodeName : "node2" ,
}
}
1
2
3
4
5
6
7
8
9
10
Indices: {
"namespace" : {
"default" : [ "pod-1" , "pod-2" ],
"kube-system" : [ "pod-3" ],
},
"nodeName" : {
"node1" : [ "pod-1" ],
"node2" : [ "pod-2" , "pod-3" ],
}
}
总结
Indexer、IndexFunc、Indices、Index 弄清这四个组件逻辑非常重要,下图简单做个总结。
这个图想了很久,始终没想到很丝滑的方式呈现出来,大家讲究看吧~
indexer 原理图
Indexer 定义
下面具体看看 Indexer 具体结构体定义和实现的 interface。
Indexer Interface
我们通常所说的 Indexer 理解是一个存储组件,在 Client-go 定义中,Indexer
是一个 Interface。
Indexer
Interface 继承了 Store
Interface,上一篇 Reflcetor 也继承了 Store
Interface,所以说明 Indexer 也是一个存储;
除了 Store
之外,Indexer
Interface 还有几个关于索引功能的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// k8s.io/client-go/tools/cache/index.go:35
type Indexer interface {
// Store Interface
Store
// Index 通过给定索引名称返回其索引值集
Index ( indexName string , obj interface {}) ([] interface {}, error )
// 该方法通过索引名称和索引值获取所有值的 key
IndexKeys ( indexName , indexedValue string ) ([] string , error )
// 通过索引名称获取所有值
ListIndexFuncValues ( indexName string ) [] string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex ( indexName , indexedValue string ) ([] interface {}, error )
// 获取 Indexers
GetIndexers () Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers ( newIndexers Indexers ) error
}
Cache Interface
Indexer
Interface 继承了 cache
Interface
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Store interface {
// 添加数据
Add ( obj interface {}) error
// 更新数据
Update ( obj interface {}) error
// 删除数据
Delete ( obj interface {}) error
// 获取所有数据
List () [] interface {}
// 获取所有数据的 key, 因为数据都是通过 items 存储的,是一个 map
ListKeys () [] string
// 获取与 obj 相同 key 的数据
Get ( obj interface {}) ( item interface {}, exists bool , err error )
// 通过指定 key 获取对应数据
GetByKey ( key string ) ( item interface {}, exists bool , err error )
// 用 []interface 替换 store 所有数据
Replace ([] interface {}, string ) error
// 重新同步,每个 store 实现的逻辑不一样,需要具体分析
Resync () error
Cache struct
cache
struct 实现了上面的 Indexer
Interface,自然也实现了 Store
Interface,看看 cache
struct 的定义
1
2
3
4
5
6
7
8
// k8s.io/client-go/tools/cache/store.go:139
type cache struct {
// 是一个 ThreadSafeStore 接口的实现,threadSafeMap 结构体实现这个接口
cacheStorage ThreadSafeStore
// 计算对象的 key
keyFunc KeyFunc
}
cache,有两个字段,一个是 ThreadSafeStore
接口的实现,还有一个计算对象 key 的函数。
ThreadSafeStore
接口的实现是 threadSafeMap
结构体,这个结构体包含了真正缓存数据的 items,以及一些索引功能相关的字段
ThreadSafeStore Interface
cache
struct 内的 cacheStorage
字段是 ThreadSafeStore
Interface 的一个实现,下面看看该 Interface 的定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type ThreadSafeStore interface {
Add ( key string , obj interface {})
Update ( key string , obj interface {})
Delete ( key string )
Get ( key string ) ( item interface {}, exists bool )
List () [] interface {}
ListKeys () [] string
Replace ( map [ string ] interface {}, string )
Index ( indexName string , obj interface {}) ([] interface {}, error )
IndexKeys ( indexName , indexKey string ) ([] string , error )
ListIndexFuncValues ( name string ) [] string
ByIndex ( indexName , indexKey string ) ([] interface {}, error )
GetIndexers () Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers ( newIndexers Indexers ) error
// Resync is a no-op and is deprecated
Resync () error
}
发现 ThreadSafeStore interface 与上面的 Indexer interface 方法基本一致,不同的是 Indexer 方法的入参都是 obj 对象,而 ThreadSafeStore 方法的入参多了一个 key,这个 key 就是索引键。
threadSafeMap struct
threadSafeMap
struct 实现了上面的 ThreadSafeStore
Interface,这个结构体包含了真正缓存数据的 items,以及一些索引功能相关的字段。
1
2
3
4
5
6
7
8
9
10
11
12
// k8s.io/client-go/tools/cache/thread_safe_store.go:63
type threadSafeMap struct {
lock sync . RWMutex
// 真正存储数据的地方
items map [ string ] interface {}
// indexers 索引器映射 IndexFunc,IndexFunc 用于获取对象的索引键
indexers Indexers
// indices 索引器映射 Index,Index 则包含了索引键以及索引键下的所有对象键的列表
indices Indices
}
Indexer 实例化
下面所说的 Indexer 都表示 cache
struct
介绍了 Indexer 的接口以及结构体定义,接下来看看 Indexer 是如何实例化的。上一篇 Deltafifo 原理讲 KeyListerGetter
章节说到,Indexer 的实例化是作为 newInformer()
的入参传进来的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// k8s.io/client-go/tools/cache/controller.go:316
func NewInformer (
lw ListerWatcher ,
objType runtime . Object ,
resyncPeriod time . Duration ,
h ResourceEventHandler ,
) ( Store , Controller ) {
// 实例化 store,也就是 indexer
// DeletionHandlingMetaNamespaceKeyFunc 是 keyFunc 的实现
clientState := NewStore ( DeletionHandlingMetaNamespaceKeyFunc )
return clientState , newInformer ( lw , objType , resyncPeriod , h , clientState , nil )
}
// k8s.io/client-go/tools/cache/store.go:258
func NewStore ( keyFunc KeyFunc ) Store {
return & cache {
// threadSafeMap 实例化
cacheStorage : NewThreadSafeStore ( Indexers {}, Indices {}),
keyFunc : keyFunc ,
}
}
同上 Indexer 的实例化,也就知道了 Indexer 两个字段:KeyFunc,cacheStorage 的实例化。
KeyFunc
KeyFunc
是 Indxer 的一个字段,用于计算指定对象的 Key(对象键)
通过上面 Indexer 初始化知道 KeyFunc
是 DeletionHandlingMetaNamespaceKeyFunc
1
2
3
4
5
6
7
8
9
10
// k8s.io/client-go/tools/cache/controller.go:294
func DeletionHandlingMetaNamespaceKeyFunc ( obj interface {}) ( string , error ) {
// 校验对象删除状态是否 Unknown
if d , ok := obj .( DeletedFinalStateUnknown ); ok {
return d . Key , nil
}
// 计算 key
return MetaNamespaceKeyFunc ( obj )
}
上一篇讲解 Deltafifo 的 KeyFunc
章节知道 Deltafifo 存储的数据的 Key 是通过 MetaNamespaceKeyFunc
计算的,为什么 Indexer 的 Key 需要先校验 DeletedFinalStateUnknown
呢?
cacheStorage
cacheStorage
是 Indxer 第二个字段,实现了 ThreadSafeStore
Interface
1
2
3
4
5
6
7
8
9
// k8s.io/client-go/tools/cache/thread_safe_store.go:333
func NewThreadSafeStore ( indexers Indexers , indices Indices ) ThreadSafeStore {
return & threadSafeMap {
items : map [ string ] interface {}{},
indexers : indexers ,
indices : indices ,
}
}
NewThreadSafeStore
方法就是初始化上面的 threadSafeMap
struct
Indexer 操作数据
Deltafifo 的 Pop()
方法从 Deltafifo 消费事件然后调用 Indexer 的一些方法来操作数据,下面具体看看 Indexer 操作数据的方法。
cache
cache 就是对 ThreadSafeStore 的一个再次封装,很多操作都是直接调用的 ThreadSafeStore 的操作实现的,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// k8s.io/client-go/tools/cache/store.go
// Add 插入一个元素到 cache 中
func ( c * cache ) Add ( obj interface {}) error {
key , err := c . keyFunc ( obj ) // 生成对象键
if err != nil {
return KeyError { obj , err }
}
// 将对象添加到底层的 ThreadSafeStore 中
c . cacheStorage . Add ( key , obj )
return nil
}
// 更新cache中的对象
func ( c * cache ) Update ( obj interface {}) error {
key , err := c . keyFunc ( obj )
if err != nil {
return KeyError { obj , err }
}
c . cacheStorage . Update ( key , obj )
return nil
}
// 删除cache中的对象
func ( c * cache ) Delete ( obj interface {}) error {
key , err := c . keyFunc ( obj )
if err != nil {
return KeyError { obj , err }
}
c . cacheStorage . Delete ( key )
return nil
}
// 得到cache中所有的对象
func ( c * cache ) List () [] interface {} {
return c . cacheStorage . List ()
}
// 得到cache中所有的对象键
func ( c * cache ) ListKeys () [] string {
return c . cacheStorage . ListKeys ()
}
// 得到cache中的Indexers
func ( c * cache ) GetIndexers () Indexers {
return c . cacheStorage . GetIndexers ()
}
// 得到对象obj与indexName索引器关联的所有对象
func ( c * cache ) Index ( indexName string , obj interface {}) ([] interface {}, error ) {
return c . cacheStorage . Index ( indexName , obj )
}
func ( c * cache ) IndexKeys ( indexName , indexKey string ) ([] string , error ) {
return c . cacheStorage . IndexKeys ( indexName , indexKey )
}
func ( c * cache ) ListIndexFuncValues ( indexName string ) [] string {
return c . cacheStorage . ListIndexFuncValues ( indexName )
}
func ( c * cache ) ByIndex ( indexName , indexKey string ) ([] interface {}, error ) {
return c . cacheStorage . ByIndex ( indexName , indexKey )
}
func ( c * cache ) AddIndexers ( newIndexers Indexers ) error {
return c . cacheStorage . AddIndexers ( newIndexers )
}
func ( c * cache ) Get ( obj interface {}) ( item interface {}, exists bool , err error ) {
key , err := c . keyFunc ( obj )
if err != nil {
return nil , false , KeyError { obj , err }
}
return c . GetByKey ( key )
}
func ( c * cache ) GetByKey ( key string ) ( item interface {}, exists bool , err error ) {
item , exists = c . cacheStorage . Get ( key )
return item , exists , nil
}
// 替换cache中所有的对象
func ( c * cache ) Replace ( list [] interface {}, resourceVersion string ) error {
items := make ( map [ string ] interface {}, len ( list ))
for _ , item := range list {
key , err := c . keyFunc ( item )
if err != nil {
return KeyError { item , err }
}
items [ key ] = item
}
c . cacheStorage . Replace ( items , resourceVersion )
return nil
}
func ( c * cache ) Resync () error {
return nil
}
可以看到 cache 没有自己独特的实现方式,都是调用的包含的 ThreadSafeStore
操作接口。
Add、Update、Delete、Replace
Indexer 的这四个方法最终调用的是 ThreadSafeStore
的 Add、Update、Delete、Replace*。*
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// k8s.io/client-go/tools/cache/thread_safe_store.go:73
func ( c * threadSafeMap ) Add ( key string , obj interface {}) {
// 加锁,保证并发安全
c . lock . Lock ()
defer c . lock . Unlock ()
// 获取旧的数据,不存在为零值
oldObject := c . items [ key ]
// 更新当前 key 的值,即存储数据
c . items [ key ] = obj
// 更新索引,因为 Indexer 具有索引功能
c . updateIndices ( oldObject , obj , key )
}
func ( c * threadSafeMap ) Update ( key string , obj interface {}) {
c . lock . Lock ()
defer c . lock . Unlock ()
oldObject := c . items [ key ]
c . items [ key ] = obj
c . updateIndices ( oldObject , obj , key )
}
func ( c * threadSafeMap ) Delete ( key string ) {
c . lock . Lock ()
defer c . lock . Unlock ()
if obj , exists := c . items [ key ]; exists {
c . updateIndices ( obj , nil , key )
delete ( c . items , key )
}
}
// 替换所有对象,相当于重新构建索引
// Replace 方法用于 Reflector 第一次 List 的数据进行同步
func ( c * threadSafeMap ) Replace ( items map [ string ] interface {}, resourceVersion string ) {
c . lock . Lock ()
defer c . lock . Unlock ()
// 直接覆盖之前的对象
c . items = items
// 重新构建索引
c . indices = Indices {}
for key , item := range c . items {
// 更新元素的索引
c . updateIndices ( nil , item , key )
}
}
发现 threadSafeMap 的 Add
、Update
、Delete
、Replace
最终都是调用 threadSafeMap 的 updateIndices
来实现数据操作,下面看看 updateIndices
的具体实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// k8s.io/client-go/tools/cache/thread_safe_store.go:259
func ( c * threadSafeMap ) updateIndices ( oldObj interface {}, newObj interface {}, key string ) {
var oldIndexValues , indexValues [] string
var err error
for name , indexFunc := range c . indexers {
// 如果旧对象存在,获取该对象下的
if oldObj != nil {
oldIndexValues , err = indexFunc ( oldObj )
} else {
oldIndexValues = oldIndexValues [: 0 ]
}
if err != nil {
panic ( fmt . Errorf ( "unable to calculate an index entry for key %q on index %q: %v" , key , name , err ))
}
// 如果新对象存在,计算其索引器
if newObj != nil {
indexValues , err = indexFunc ( newObj )
} else {
indexValues = indexValues [: 0 ]
}
if err != nil {
panic ( fmt . Errorf ( "unable to calculate an index entry for key %q on index %q: %v" , key , name , err ))
}
// 获取该索引下的所有对象列表
index := c . indices [ name ]
// 如果对象为空,则初始化一个
if index == nil {
index = Index {}
c . indices [ name ] = index
}
for _ , value := range oldIndexValues {
// We optimize for the most common case where index returns a single value.
if len ( indexValues ) == 1 && value == indexValues [ 0 ] {
continue
}
// 删除索引键
c . deleteKeyFromIndex ( key , value , index )
}
for _ , value := range indexValues {
// We optimize for the most common case where index returns a single value.
if len ( oldIndexValues ) == 1 && value == oldIndexValues [ 0 ] {
continue
}
// 将当前 key 加入到索引器中
c . addKeyToIndex ( key , value , index )
}
}
}
添加索引和删除索引的实现都挺简单的,其实主要还是要对 indices、indexs 这些数据结构非常了解,这样就非常容易了,我们可以将 indexFunc
当成当前对象的命名空间来看待,这样对于上面的索引更新和删除的理解就肯定没问题了。
Get、List、ListKeys
然后接下来就是几个查询相关的接口实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// k8s.io/client-go/tools/cache/thread_safe_store.go:98
// 获取对象
func ( c * threadSafeMap ) Get ( key string ) ( item interface {}, exists bool ) {
// 只需要读锁
c . lock . RLock ()
defer c . lock . RUnlock ()
// 直接从 map 中读取值
item , exists = c . items [ key ]
return item , exists
}
// 对象列举
func ( c * threadSafeMap ) List () [] interface {} {
// 只需要读锁
c . lock . RLock ()
defer c . lock . RUnlock ()
list := make ([] interface {}, 0 , len ( c . items ))
// 将对象添加到 list 中
for _ , item := range c . items {
list = append ( list , item )
}
return list
}
// 返回 threadSafeMap 中所有的对象键列表
func ( c * threadSafeMap ) ListKeys () [] string {
c . lock . RLock ()
defer c . lock . RUnlock ()
list := make ([] string , 0 , len ( c . items ))
for key := range c . items {
list = append ( list , key )
}
return list
}
Index
Index 方法用于获取指定索引的所有对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// k8s.io/client-go/tools/cache/thread_safe_store.go
// 通过指定的索引器和对象获取符合这个对象特征的所有对象
func ( c * threadSafeMap ) Index ( indexName string , obj interface {}) ([] interface {}, error ) {
c . lock . RLock ()
defer c . lock . RUnlock ()
// 获得索引器 indexName 的索引键计算函数
indexFunc := c . indexers [ indexName ]
if indexFunc == nil {
return nil , fmt . Errorf ( "Index with name %s does not exist" , indexName )
}
// 获取指定 obj 对象的索引键
indexedValues , err := indexFunc ( obj )
if err != nil {
return nil , err
}
// 获得索引器 indexName 的所有索引
index := c . indices [ indexName ]
// 用来存储对象键的集合
var storeKeySet sets . String
if len ( indexedValues ) == 1 {
// 大多数情况下只有一个值匹配(默认获取的索引键就是对象的 namespace)
// 直接拿到这个索引键的对象键集合
storeKeySet = index [ indexedValues [ 0 ]]
} else {
// 由于有多个索引键,则可能有重复的对象键出现,索引需要去重
storeKeySet = sets . String {}
// 循环索引键
for _ , indexedValue := range indexedValues {
// 循环索引键下面的对象键,因为要去重
for key := range index [ indexedValue ] {
storeKeySet . Insert ( key )
}
}
}
// 拿到了所有的对象键集合过后,循环拿到所有的对象集合
list := make ([] interface {}, 0 , storeKeySet . Len ())
for storeKey := range storeKeySet {
list = append ( list , c . items [ storeKey ])
}
return list , nil
}
这个 Index 函数就是获取一个指定对象的索引键,然后把这个索引键下面的所有的对象全部获取到,比如我们要获取一个 Pod 所在命名空间下面的所有 Pod,如果更抽象一点,就是符合对象某些特征 的所有对象,而这个特征就是我们指定的索引键函数计算出来的。
ByIndex
然后接下来就是一个比较重要的 ByIndex
函数的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// k8s.io/client-go/tools/cache/thread_safe_store.go
// 和上面的 Index 函数类似,只是是直接指定的索引键
func ( c * threadSafeMap ) ByIndex ( indexName , indexedValue string ) ([] interface {}, error ) {
c . lock . RLock ()
defer c . lock . RUnlock ()
// 获得索引器 indexName 的索引键计算函数
indexFunc := c . indexers [ indexName ]
if indexFunc == nil {
return nil , fmt . Errorf ( "Index with name %s does not exist" , indexName )
}
// 获得索引器 indexName 的所有索引
index := c . indices [ indexName ]
// 获取指定索引键的所有所有对象键
set := index [ indexedValue ]
// 然后根据对象键遍历获取对象
list := make ([] interface {}, 0 , set . Len ())
for key := range set {
list = append ( list , c . items [ key ])
}
return list , nil
}
可以很清楚地看到 ByIndex
函数和 Index 函数比较类似,但是更简单了,因为不需要通过索引函数计算索引键了,直接获取一个指定的索引键的全部资源对象。
IndexKeys、ListIndexFuncValues、GetIndexers、AddIndexers
然后是其他几个索引相关的函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// k8s.io/client-go/tools/cache/thread_safe_store.go
// IndexKeys 和上面的 ByIndex 几乎是一样的,只是这里是直接返回对象键列表
func ( c * threadSafeMap ) IndexKeys ( indexName , indexedValue string ) ([] string , error ) {
c . lock . RLock ()
defer c . lock . RUnlock ()
// 获取索引器 indexName 的索引键计算函数
indexFunc := c . indexers [ indexName ]
if indexFunc == nil {
return nil , fmt . Errorf ( "Index with name %s does not exist" , indexName )
}
// 获取索引器 indexName 的所有索引
index := c . indices [ indexName ]
// 直接获取指定索引键的对象键集合
set := index [ indexedValue ]
return set . List (), nil
}
// 获取索引器下面的所有索引键
func ( c * threadSafeMap ) ListIndexFuncValues ( indexName string ) [] string {
c . lock . RLock ()
defer c . lock . RUnlock ()
// 获取索引器 indexName 的所有索引
index := c . indices [ indexName ]
names := make ([] string , 0 , len ( index ))
// 遍历索引得到索引键
for key := range index {
names = append ( names , key )
}
return names
}
// 直接返回 indexers
func ( c * threadSafeMap ) GetIndexers () Indexers {
return c . indexers
}
// 添加一个新的 Indexers
func ( c * threadSafeMap ) AddIndexers ( newIndexers Indexers ) error {
c . lock . Lock ()
defer c . lock . Unlock ()
if len ( c . items ) > 0 {
return fmt . Errorf ( "cannot add indexers to running index" )
}
// 获取旧的索引器和新的索引器keys
oldKeys := sets . StringKeySet ( c . indexers )
newKeys := sets . StringKeySet ( newIndexers )
// 如果包含新的索引器,则提示冲突
if oldKeys . HasAny ( newKeys . List () ... ) {
return fmt . Errorf ( "indexer conflict: %v" , oldKeys . Intersection ( newKeys ))
}
// 将新的索引器添加到 Indexers 中
for k , v := range newIndexers {
c . indexers [ k ] = v
}
return nil
}
// 没有真正实现 Resync 操作
func ( c * threadSafeMap ) Resync () error {
return nil
}
这里我们就将 ThreadSafeMap 的实现进行了分析说明。整体来说比较方便,一个就是将对象数据存入到一个 map 中,然后就是维护索引,方便根据索引来查找到对应的对象。
总结
经过上面的分析,Indexer 主要两个功能:缓存数据、数据具有索引功能,Client-go 原理图再添加 Indexer 的部分。
client-go 架构图
前面我们已经知道了 Reflector 通过 ListAndWatch
把数据传入 DeltaFIFO 后,经过 DeltaFIFO 的 Pop
函数将资源对象存入到了本地的一个存储 Indexer 中,而这个底层真正的存储其实就是上面的 ThreadSafeStore。
要理解 Indexer 组件,最主要就是要把索引、索引器(索引分类)、索引键、对象键这几个概念弄清楚,有时候确实容易混乱,我们将上面的示例理解了应该就很好理解了,我们可以简单的理解为 Indexer 就是简单的把相同命名空间的对象放在一个集合中,然后基于命名空间来查找对象。
介绍完 Indexer 的内容后,Client-go 的核心几个组件就介绍完了,但是还缺少一个组件把 Reflector、Deltafifo、Indexer 这三个组件串联起来工作,这个组件就是 Informer,下一章介绍 Informer 的内容。
Indexer 在实现多索引的本地缓存的思路,特别在代码可复用方面,值得我们学习借鉴。
关注微信公众号,可了解更多云原生详情~