目录

Client-go Deltafifo 源码解析

简介

上一篇介绍了 Reflector 的原理,发现 Reflector 从 Kube-apiserver 获取到的数据最终都是流向 Deltafifo。

Deltafifo: delta first in first out,从字面意思上看是一个先进先出的队列,Delta 表示变化的资源,包含操作资源对象的类型和数据。

下面就详细解释下 Deltafifo 是如何存储数据的,以及实现原理。从 Client-go 架构可以看出 Reflector 是 Deltafifo 的生产者。

Deltafifo 源码解析

本篇基于 k8s.io/client-go v0.23.4 源码讲解

结构体定义

先通过 Deltafifo 结构体定义来大概看看作用和其字段的作用,在看结构体定义之前,通过下图先看看 Deltafifo 通过什么数据结构来存储数据的

/client-go-deltafifo/deltafifo.png
deltafifo 存储结构

从上图可以看出,queue 是一个切片,存放资源对象的 key;Items 是一个 map,key 为 queue 的元素,value 为一个事件的切片,且切片内的对象是同一个,只是事件类型不同。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// k8s.io/client-go/tools/cache/delta_fifo.go:
type DeltaFIFO struct {
	// lock/cond 保护 items 的并发访问
	lock sync.RWMutex
	cond sync.Cond

	// items 存储数据, map 的 value 是 Deltas,Deltas 是一个 Delta 切片
	// 说明一个资源会存在多个事件,Add/Update/Delete/Sync
	// 对应上图的 items
	items map[string]Deltas

	// queue 存放的是 items 的key,且该 key 在 queue 中有顺序且唯一
	// 对应上图的 queue
	queue []string

	// Delete/Add/Update/AddIfNotPresent 被调用,或者 Reflector 第一次全量 List 时调用 Replace() 时设置为 true
	// populated 用于判断 DletaFIFO 是否为初始化状态(即没有处理过任何对象)
	populated bool
	// Reflector 第一次全量 List 时调用 Replace() 时插入的 items 数
	initialPopulationCount int

	// 用于创建 queue 中的 key 函数
	keyFunc KeyFunc

	// knownObjects 从缓存里获取数据的接口, 其实就是对应上图的 indexer
	knownObjects KeyListerGetter

	// 用于表示队列是否被关闭了,当队列为空时,可以退出
	closed bool

	// emitDeltaTypeReplaced是发出Replaced还是Sync
	emitDeltaTypeReplaced bool
}

// Deltas 是一个 Delta 切片
// 说明一个资源会存在多个事件,Add/Update/Delete/Sync
type Delta struct {
	// 事件类型
	Type   DeltaType
	// 具体资源
	Object interface{}
}

type Deltas []Delta

再来看看 Deltafifo 的初始化是如何实现的。

主要逻辑:

  • informer 启动函数会调用 NewReflector() 初始化 Reflector
  • Reflector 初始化调用 NewDeltaFIFOWithOptions() 初始化 Deltafifo,并赋值于 StoreStore 其实就是 Deltafifo

Deltafifo 初始化只需要传入 KeyFuncKeyListerGetter 两个入参,分别是获取 queue 的 key 的函数以及获取具体数据的接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// k8s.io/client-go/tools/cache/delta_fifo.go:209

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KeyFunction:  keyFunc,
		KnownObjects: knownObjects,
	})
}

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
	if opts.KeyFunction == nil {
		opts.KeyFunction = MetaNamespaceKeyFunc
	}

	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      opts.KeyFunction,
		knownObjects: opts.KnownObjects,

		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
	}
	f.cond.L = &f.lock
	return f
}

上一篇 Relfector 中讲到,Reflector 结构体中有个字段是 *Store ,*该 Store Client-go 架构中就是 Deltafifo,所以 Deltafifo 的初始化就是 Reflector 调用的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// k8s.io/client-go/tools/cache/controller.go:127

func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	// 初始化 Reflector
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group

	wg.StartWithChannel(stopCh, r.Run)

	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

上面 Run 方法是 informer 的启动函数,里面有个 NewReflector 会初始化 Reflector,其函数入参 c.config.Queue 就是 Deltafifo。

那么 c.config.Queue 是怎么初始化的?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// k8s.io/client-go/tools/cache/controller.go:127:423

func newInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	clientState Store,
	transformer TransformFunc,
) Controller {
	// 初始化 Deltafifo,并设置 *KeyListerGetter,可以发现并没有传 keyFunc*
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          clientState,
		EmitDeltaTypeReplaced: true,
	})
	// 用上面的 fifo 初始化 Queue
	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,

		...
	return New(cfg)
}

到这基本就知道 Deltafifo 是如何初始化的了,下面看看 NewDeltaFIFO 函数两个入参 ** KeyFuncKeyListerGetter 是如何实现的

KeyFunc

keyFunc 用于计算资源的 key,存放到 Deltafifo 的 queue 中,以及作为 items 的 key

主要逻辑:

  • 如果没有指定自定义 keyFunc,则调用 MetaNamespaceKeyFunc 生成默认 keyFunc
  • 如果资源有 namespace,则拼接 namespace + "/" + name 作为 key
  • 如果资源没有 namespace,则直接返回 name 作为 key
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// k8s.io/client-go/tools/cache/delta_fifo.go:218

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
	// 如果没有自定义 KeyFunc,则使用MetaNamespaceKeyFunc 
	if opts.KeyFunction == nil {
		opts.KeyFunction = MetaNamespaceKeyFunc
	}

	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      opts.KeyFunction,
		knownObjects: opts.KnownObjects,

		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
	}
	f.cond.L = &f.lock
	return f
}

// k8s.io/client-go/tools/cache/store.go:104

func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if key, ok := obj.(ExplicitKey); ok {
		return string(key), nil
	}
	meta, err := meta.Accessor(obj)
	if err != nil {
		return "", fmt.Errorf("object has no meta: %v", err)
	}
	// 如果资源有 namespace,则拼接 namespace + "/" + name
	if len(meta.GetNamespace()) > 0 {
		return meta.GetNamespace() + "/" + meta.GetName(), nil
	}
	// 如果资源没有 namespace,则直接返回 name
	return meta.GetName(), nil
}

KeyListerGetter

KeyListerGetter 是在 newInformer() 传入的,对应着函数参数 clientState , 其实就是 indexer,用于从缓存获取具体数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// k8s.io/client-go/tools/cache/controller.go:316

func NewInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
) (Store, Controller) {
	// 初始化 ClientState
	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
}

// k8s.io/client-go/tools/cache/store.go:258

func NewStore(keyFunc KeyFunc) Store {
	return &cache{
		// 初始化 Indexer
		cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
		keyFunc:      keyFunc,
	}
}

可以看出来 KeyListerGetter 就是 Indexer

到这基本就知道 Deltafifo 初始化的过程了,以及重要参数的作用。

Deltafifo 操作

上面讲 Dletafifo 初始化的时候,知道 c.config.Queue 就是 Deltafifo,这个 Queue 是一个接口

上一篇讲解 Reflector 原理时,第一次 List 全量数据时调用 SyncWith() 来同步数据到 Deltafifo,最终调用就是 Deltafifo 的 Resync() 方法;

后面 Watch 监听的数据都是根据数据类型来调用 Deltafifo 的 Add()Update()Delete() 实现同步。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// k8s.io/client-go/tools/cache/fifo.go:52

type Queue interface {
	// Store 是一个 fifo 的接口,包含 fifo 的基本操作
	Store

	// Pop 一直阻塞,直到至少有一个key要处理或队列被关闭,队列被关闭会返回一个错误。
  // 在前面的情况下 Pop 原子性地选择一个 key 进行处理,从 Store 中删除关联(key、accumulator)的数据,
  // 并处理 accumulator。Pop 会返回被处理的 accumulator 和处理的结果。
	
	// PopProcessFunc 函数可以返回一个 ErrRequeue{inner},在这种情况下,Pop 将
  //(a)把那个(key,accumulator)关联作为原子处理的一部分返回到 Queue 中
  // (b) 从 Pop 返回内部错误。
	Pop(PopProcessFunc) (interface{}, error)

	// 如果元素不存在,则添加到队列中
	AddIfNotPresent(interface{}) error

	// 如果第一批元素都被 pop 了,设置为 true
	HasSynced() bool

	// 关闭队列
	Close()
}

// k8s.io/client-go/tools/cache/store.go:39

type Store interface {

	// 将给定元素添加到队列中
	Add(obj interface{}) error

	// 更新元素
	Update(obj interface{}) error

	// 删除元素
	Delete(obj interface{}) error

	// 获取元素列表
	List() []interface{}

	// 获取元素 key 的列表
	ListKeys() []string

	// 查看给定元素
	Get(obj interface{}) (item interface{}, exists bool, err error)

	// 通过 key 查看给定元素
	GetByKey(key string) (item interface{}, exists bool, err error)

	// 替换元素
	Replace([]interface{}, string) error

	// 重新同步
	Resync() error
}

下面具体看看这些方法,Deltafifo 是如何实现的。先把简单的几个方法说完,比较复杂的放在后面

List

List() 用于获取每个资源最新的数据,并通过列表来返回

主要逻辑:

  • 加锁
  • 调用 listLocked() 获取数据,遍历 Deltafifo 的 items,将每个 key 的最新 delta 追加到切片返回
  • 解锁
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// k8s.io/client-go/tools/cache/delta_fifo.go:442

func (f *DeltaFIFO) List() []interface{} {
	// 读锁
	f.lock.RLock()
	defer f.lock.RUnlock()
	return f.listLocked()
}

func (f *DeltaFIFO) listLocked() []interface{} {
	list := make([]interface{}, 0, len(f.items))
	// 遍历 items,将每个资源的最新数据添加到切片中返回
	for _, item := range f.items {
		list = append(list, item.Newest().Object)
	}
	return list
}

// 获取切片最后一个元素
// 因为切片 append 从尾部添加,所以倒数第一个就是最新的数据
func (d Deltas) Newest() *Delta {
	if n := len(d); n > 0 {
		return &d[n-1]
	}
	return nil
}

ListKeys

ListKeys() 用于获取 Deltafifo 中所有元素的 key

主要逻辑:

  • 加锁
  • 遍历 Deltafifo 的 queue ,queue 中的元素放到一个切片中返回
  • 释放锁
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// k8s.io/client-go/tools/cache/delta_fifo.go:458

func (f *DeltaFIFO) ListKeys() []string {
	f.lock.RLock()
	defer f.lock.RUnlock()
	list := make([]string, 0, len(f.queue))
	// 将 queue 中所有元素添加到切片中返回
	for _, key := range f.queue {
		list = append(list, key)
	}
	return list
}

Get、GetByKey

Get() 通过一个对象返回该对象所有事件,GetByKey() 通过对象的 Key 返回该对象所有事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// k8s.io/client-go/tools/cache/delta_fifo.go:471

func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
	// 获得当前资源的 key, namespace/name
	key, err := f.KeyOf(obj)
	if err != nil {
		return nil, false, KeyError{obj, err}
	}
	return f.GetByKey(key)
}

func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
	f.lock.RLock()
	defer f.lock.RUnlock()
	// 从 items 获取 Deltas
	d, exists := f.items[key]
	// 如果存在,拷贝 Deltas 返回,防止被更新影响其他调用者
	if exists {
		d = copyDeltas(d)
	}
	return d, exists, nil
}

可以发现 Get() 最终还是调用 GetByKey() 来实现的。

AddIfNotPresent

addIfNotPresent() 将不存在 Deltafifo 的事件添加进来

主要逻辑:

  • 如果当前资源已存在 Deltafifo,则忽略
  • 将当前资源计算出的 key 添加到 queue
  • 将当前资源 key 和 具体对象保存到 items
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// k8s.io/client-go/tools/cache/delta_fifo.go:357

func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
	f.populated = true
	// 如果该元素已存在于 Deltafifo 中,则直接退出
	if _, exists := f.items[id]; exists {
		return
	}
	// 添加元素 key 到 queue
	f.queue = append(f.queue, id)
	// 添加数据到 items
	f.items[id] = deltas
	f.cond.Broadcast()
}

HasSynced

hasSynced() 判断 Deltafifo 是否已经同步完成

主要逻辑:

  • 判断 Deltafifo 是否已经存在事件且 Reflector 第一次 List 全量数据是否都已经被 pop 完了
  • initialPopulationCount 字段在 Replace() 会加,在 Pop() 会减,所以根据这个字段来判断第一次 List 的数据是否全部消费完,消费完则说明已经 cache 完成
1
2
3
4
5
6
7
8
9
// k8s.io/client-go/tools/cache/delta_fifo.go:271

func (f *DeltaFIFO) HasSynced() bool {
	f.lock.Lock()
	defer f.lock.Unlock()
	// populated 为 true 以及 initialPopulationCount = 0 表示第一全量 List 数据都被 pop 完了
	
	return f.populated && f.initialPopulationCount == 0
}

Close

Close() 作用是关闭 Deltafifo,将 Deltafifo 的 closed 字段设置为 true

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// k8s.io/client-go/tools/cache/delta_fifo.go:247

func (f *DeltaFIFO) Close() {
	f.lock.Lock()
	defer f.lock.Unlock()
	// 设置 Closed 字段为 true
	f.closed = true
	// 唤醒所有协程
	f.cond.Broadcast()
}

以上几个方法主要是一些辅助性的方法,比较容易理解,下面这几个就是 Deltafifo 的核心功能了。

Add、Update、Delete

Add() 的作用就是将元素添加到队列到中,Update() 的作用就是更新元素 ,Delete() 的作用删除元素

主要逻辑:

  • 加锁
  • 将 polulated 设置为 true,表明 Deltafifo 已经不是初始化状态了,已经存在事件了
  • 调用 queueActionLocked() 完成事件添加
  • 释放锁
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// k8s.io/client-go/tools/cache/delta_fifo.go:279

func (f *DeltaFIFO) Add(obj interface{}) error {
	// 添加元素,需要上锁
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Added, obj)
}

// 更新元素
func (f *DeltaFIFO) Update(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Updated, obj)
}

// 删除事件
func (f *DeltaFIFO) Delete(obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	if f.knownObjects == nil {
		// 如果 knownObjects(indexer) 为空且事件已被删除,则直接返回
		if _, exists := f.items[id]; !exists {
			return nil
		}
	} else {
		// 如果存在 indexer 且数据不存在,则直接返回
		_, exists, err := f.knownObjects.GetByKey(id)
		_, itemsExist := f.items[id]
		if err == nil && !exists && !itemsExist {
			return nil
		}
	}

	// 调用 queueActionLocked 将删除事件添加到 Deltafifo 中
	return f.queueActionLocked(Deleted, obj)
}

发现 Add()Update()Delete() 逻辑都一样,最终都是根据不同事件类型调用 queueActionLocked()

queueActionLocked 主要逻辑:

  • 获取当前事件的 key,调用前面讲的 keyFunc
  • 将 Deleted 事件进行去重
  • 将事件的 key 追加到 queue
  • 将事件保存到 items
  • 唤醒消费者
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// k8s.io/client-go/tools/cache/delta_fifo.go:409

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	// 先获取元素的 key,这里就会调用前面的 keyFunc 来生成 key
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	// 如果是 Add 类型,oldDeltas 为 nil,如果是 Update 类型,oldDeltas 不为 nil
	oldDeltas := f.items[id]
	// 将当前 Delta 添加到 oldDeltas 
	newDeltas := append(oldDeltas, Delta{actionType, obj})
	// 去重
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		// 如果元素不存在,则将 key 添加到队列中
		// 对于更新事件,则不需要添加到队列中了,因为队列中已存在该 key,这样就保证了队列中的 key 是唯一的
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		// Add 事件直接添加到 items 中
		// Update 事件则更新当前 key 的 value
		f.items[id] = newDeltas
		// 唤醒正在的 wait 的 goroutine
		f.cond.Broadcast()
	} else {
		// This never happens, because dedupDeltas never returns an empty list
		// when given a non-empty list (as it is here).
		// If somehow it happens anyway, deal with it but complain.
		if oldDeltas == nil {
			klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
			return nil
		}
		klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
		f.items[id] = newDeltas
		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
	}
	return nil
}

// 去重 Delta
func dedupDeltas(deltas Deltas) Deltas {
	n := len(deltas)
	// 如果 Delta 切小于两个元素,则不需要去重
	if n < 2 {
		return deltas
	}
	// 因为 append 从切片尾部添加的,所以比较最后两个元素是否重复即可
	a := &deltas[n-1]
	b := &deltas[n-2]
	if out := isDup(a, b); out != nil {
		deltas[n-2] = *out
		return deltas[:n-1]
	}
	return deltas
}

// 目前只支持 Delete 类型的去重
func isDup(a, b *Delta) *Delta {
	if out := isDeletionDup(a, b); out != nil {
		return out
	}
	// TODO: Detect other duplicate situations? Are there any?
	return nil
}

// 判断两个是否都是 delete 类型, 都是 delete 类型肯定是有重复,只需返回一个
func isDeletionDup(a, b *Delta) *Delta {
	if b.Type != Deleted || a.Type != Deleted {
		return nil
	}
	// Do more sophisticated checks, or is this sufficient?
	if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
		return a
	}
	return b
}

Replace

Replace() 作用是全量更新 Deltafifo 数据,这个方法在 Reflector 第一次 List 时调用,以及后续 Watch 出现于 kube-apiserver 连接中断了,导致 Reflector 重新 List 也会调用 Deltafifo 的 Replace()

主要逻辑:

  • 将 list 的数据的 key 保存到一个 set 中,并同步这些 list 事件到 Deltafifo
  • 如果没有 indexer 缓存,只需将 Deltafifo 中存在,但 List 不存在的事件标记为删除事件,同步到 Deltafifo,将 initialPopulationCount 加一
  • 如果有 indexer 缓存,需要判断 indexer 存在,list 不存在的事件标记为删除事件,同步到 Deltafifo,将 initialPopulationCount 加一
  • Replace 事件类型为 Replaced
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// k8s.io/client-go/tools/cache/delta_fifo.go:575

func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	keys := make(sets.String, len(list))

	// 第一次全量同步类型是 Replaced
	// 以前版本是 Sync,这里做了兼容
	action := Sync
	if f.emitDeltaTypeReplaced {
		action = Replaced
	}

	// 之前的版本是 sync
	// 将传进来的数据赋予类型(sync/replaced)保存到 Deltafifo 中
	for _, item := range list {
		key, err := f.KeyOf(item)
		if err != nil {
			return KeyError{item, err}
		}
		// 将 key 插入一个 set 中
		keys.Insert(key)
		// 同步事件
		if err := f.queueActionLocked(action, item); err != nil {
			return fmt.Errorf("couldn't enqueue object: %v", err)
		}
	}
	// 如果没有 indexer 缓存,就对自身队列的元素进行检查,检查自身队列中是否有不在全量 List 的事件,如果有就标记为删除事件
	if f.knownObjects == nil {
		queuedDeletions := 0
		for k, oldItem := range f.items {
			// 如果自身队列已存在输入的 key,则忽略
			if keys.Has(k) {
				continue
			}
			// 如果自身队列不存在,则删除
			var deletedObj interface{}
			if n := oldItem.Newest(); n != nil {
				deletedObj = n.Object
			}
			// 将待删除元素数量加一
			queuedDeletions++
			// 将该资源封装成 DeletedFinalStateUnknown 删除事件同步到 Deltafifo
			// 
			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
				return err
			}
		}

		if !f.populated {
			// 如果 populated 为 false,说明还没有设置
			f.populated = true
			// 设置 initialPopulationCount 等于 List 数据量总量加上队列已存在的删除事件
			f.initialPopulationCount = keys.Len() + queuedDeletions
		}

		return nil
	}

	// 检测 indexer 中存在,但是全量 list 不存在的 key,说明这个 key 需要被删除
	// 从 indexer 获取资源 key
	knownKeys := f.knownObjects.ListKeys()
	queuedDeletions := 0
	for _, k := range knownKeys {
		// 如果 indexer 中的 key 存在于输入的 list,则忽略
		if keys.Has(k) {
			continue
		}
		// 如果不存在于 List,从 indexer 获取当前 key 的具体资源
		deletedObj, exists, err := f.knownObjects.GetByKey(k)
		if err != nil {
			deletedObj = nil
			klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
		} else if !exists {
			deletedObj = nil
			klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
		}
		queuedDeletions++
		// 同步当前 key 的删除事件
		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
			return err
		}
	}

	if !f.populated {
		// 如果 populated 为 false,说明还没有设置
		f.populated = true
		// 设置 initialPopulationCount 等于 List 数据量总量加上队列已存在的删除事件
		f.initialPopulationCount = keys.Len() + queuedDeletions
	}

	return nil
}

Replace(client-go/tools/cache/delta_fifo.go)函数来对 DeltaFIFO 进行全量更新:

  • Sync 所有 DeltaFIFO 中的对象,将输入对象全部加入到 DeltaFIFO
  • 如果 knowObjects 为空,则删除 DeltaFIFO 中不存在于 list 的对象,使 DeltaFIFO 中的有效对象(非 DeletedFinalStateUnknown)等同于输入对象
  • 如果 knownObjects 非空,获取 knowObjects 中不存在于 list 的对象,并在 DeltaFIFO 中删除这些对象

第2步好理解,knownObjects 为空,只需要更新 DeltaFIFO 即可。第3步中,当 knownObjects 非空时,需要以 knowObjects 为基准进行对象的删除,否则会造成 indexer 中的数据与 apiserver 的数据不一致,举个例子,比如 knownObjects 中的对象为 {obj1, obj2, obj3},而 DeltaFIFO中待处理的对象为 {obj2, obj3,obj4},如果仅按照2步骤进行处理,会导致 knownObjects 中残留 obj1,因此需要在 DeltaFIFO 中添加删除 obj1 的变更消息。从下面 ShareInformer 的图中可以看出,knownObjects(即Indexer)的数据只能通过 DeltaFIFO 变更。

Resync

resync() 从 knownObject(indexer) 中同步全量元素到 Deltafifo,Resync() 会被定期调用,用于保证一些处理失败的事件能够被重新执行

下面代码说的“缓存”都指的 indexer,下一篇会详细讲解

主要逻辑:

  • 如果没有 indexer,就不存在 resync,直接退出
  • 如果有 indexer,就从 indexer 获取所有数据的 key,以 Sync 类型同步到 Deltafifo
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// k8s.io/client-go/tools/cache/delta_fifo.go:661
func (f *DeltaFIFO) Resync() error {
	f.lock.Lock()
	defer f.lock.Unlock()
	// 如果没有后端缓存(indexer),直接退出
	if f.knownObjects == nil {
		return nil
	}
	// 从缓存中获取所有数据的 key
	keys := f.knownObjects.ListKeys()
	for _, k := range keys {
		// 同步每个元素
		if err := f.syncKeyLocked(k); err != nil {
			return err
		}
	}
	return nil
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
	// 先查看该元素是否在缓存中
	obj, exists, err := f.knownObjects.GetByKey(key)
	if err != nil {
		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
		return nil
	} else if !exists {
		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
		return nil
	}

	// 获取元素的 key
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	// 当前对象已存在 Deltafifo 中,无需同步
	if len(f.items[id]) > 0 {
		return nil
	}
	//  
	if err := f.queueActionLocked(Sync, obj); err != nil {
		return fmt.Errorf("couldn't queue object: %v", err)
	}
	return nil
}

Resync() 主要有以下几个作用:

  • 保证事件处理失败了能够被重新处理
  • 假如一直没有事件进来,则 Watch 会超时,所以 resync 保证 relector Watch 的正常运行

Pop

Pop() 用于获取 Deltafifo 中第一个元素,即最先入队的元素,然后调用 process 函数处理该元素

主要逻辑:

  • 获取 queue 中第一个元素,并将剩下的元素作为新的 queue
  • 将 initialPopulationCount 减一
  • 从 queue 中删除对应的 key,从 items 删除对应具体资源
  • 调用 process 处理该事件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// k8s.io/client-go/tools/cache/delta_fifo.go:515

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// 如果 Deltafifo 已关闭,则退出
			if f.closed {
				return nil, ErrFIFOClosed
			}
			// 队列为空,则一直等待生产者生产元素
			f.cond.Wait()
		}
		// 获取第一个元素的 key
		id := f.queue[0]
		// 将剩下元素的作为新的队列
		f.queue = f.queue[1:]
		depth := len(f.queue)
		// 每次 pod 完一个元素,都要将 initialPopulationCount 减一,该字段表示 Reflector 表示第一次全量 List 的数据个数 
		// 该字段在 Replace 时设置
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		// 从 queue 中获取的 key,理论上 items 都存在
		item, ok := f.items[id]
		if !ok {
			// This should never happen
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
			continue
		}
		// 从 items 删除数据
		delete(f.items, id)
		// 只有当 queue 的长度大于 10, 才会开启日志追踪
		if depth > 10 {
			trace := utiltrace.New("DeltaFIFO Pop Process",
				utiltrace.Field{Key: "ID", Value: id},
				utiltrace.Field{Key: "Depth", Value: depth},
				utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
			defer trace.LogIfLong(100 * time.Millisecond)
		}
		// 调用 proccess 处理该元素
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			// 如果处理失败,则重新添加到 Deltafifo 
			f.addIfNotPresent(id, item)
			err = e.Err
		}

		return item, err
	}
}

上面 Pop() 在获取一个元素时,就直接调用 process() 来处理这个元素了,这个 process() 是 Pop() 的入参,说明是调用者实现的,下面这个 process() 具体做了什么。

sharedIndexInformer 的 Run 函数中就初始化了 DeltaFIFO,也配置了用于 Pop 回调处理的函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// k8s.io/client-go/tools/cache/shared_informer.go:368

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
  // 初始化 DeltaFIFO,这里就可以看出来 KnownObjects 就是一个 Indexer
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,  // 指定 Pop 函数的回调处理函数
	}
	......
}

// 真正的 Pop 回调处理函数
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				......
			} else {
        // 将对象添加到 Indexer 中
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				......
			}
		case Deleted:
      // 删除 Indexer 中的对象
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			......
		}
	}
	return nil
}

从上面可以看出 DeltaFIFO 中的元素被 Pop() 后被同步到了 Indexer 存储中,而在 DeltaFIFO 中的 KnownObjects 也就是这个指定的 Indexer。

思考

1、为什么 queueActionLocked() 中只有 Delete 类型需要去重,Add/Update 类型不需要去重?

2、为什么 Replace() 需要添加删除事件逻辑?

3、ReplaceResync 的区别?

总结

思考题会等到 Client-go 全部组件讲解完再解答比较合适,因为会涉及到组件之间的关系,以及特殊情况的举例,可能现在不大好理解。

至此我们可以在上一篇 Reflector 原理的图中增加一些 Deltafifo 的细节了

/client-go-deltafifo/client-go-deltafifo.png
deltafifo 原理

主要流程:

  • Reflector 用 List() 从 Kube-apiserver 获取全量对应资源对象
  • Reflector 调用 Deltafifo 的 Replace()list 对象同步到 Detafifo,这里所有对象事件类型都是 Replaced
  • Reflector 调用 Watch() 监听 Kube-apiserver 的资源对象事件
  • Reflector 根据事件类型(Add/Update/Delete) 调用 Deltafifo 的 Add()/Update()/Delete() 来同步当前事件
  • Informer 调用 Reflector 的 Pop() 处理事件,当事件类型为 Add/Update/Delete/Sync/Replaced 时先添加到 Indexer,当事件类型为 Delete 时,将当前资源对象从 indexer 删除
  • 这里的 Sync 类型事件到 Indexer 讲解

经过上面的分析,可以直到从 Reflector List() 和 Watch() 的数据都会流入 Deltafifo,Deltafifo 保存了数据的类型和具体资源,通过 Pop() 弹出每个元素最终同步到了 Indexer 中。下面就详解讲解 Indexer 的原理。


WeChat Pay
关注微信公众号,可了解更多云原生详情~

相关文章