kubelet 会通过syncNodeStatus周期上报node状态, 最终调用的是defaultNodeStatusFuncs

defaultNodeStatusFuncs

节点信息是由许多setter func完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {

var setters []func(n *v1.Node) error
setters = append(setters,
nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods,
kl.podsPerCore,
kl.GetCachedMachineInfo,
kl.containerManager.GetCapacity,
kl.containerManager.GetDevicePluginResourceCapacity,
kl.containerManager.GetNodeAllocatableReservation,
kl.recordEvent),

}

这里主要关注MachineInfo这一个setter

获取信息有两个途径

  • cadvisor: 获取的是cpu,memory,hugepage-* 对应的方法是GetCachedMachineInfo
  • containerManager: 获取的是systemreserved, kubereserved和扩展资源比如nvidia.com/gpu

kl.GetCachedMachineInfo

1
2
3
4
5
6
7
8
9
10
11
12
// GetCachedMachineInfo assumes that the machine info can't change without a reboot
func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorapiv1.MachineInfo, error) {
kl.machineInfoLock.RLock()
defer kl.machineInfoLock.RUnlock()
return kl.machineInfo, nil
}

func (kl *Kubelet) setCachedMachineInfo(info *cadvisorapiv1.MachineInfo) {
kl.machineInfoLock.Lock()
defer kl.machineInfoLock.Unlock()
kl.machineInfo = info
}
1
2
3
4
5
6
7
func NewMainKubelet(){
...
machineInfo, err := klet.cadvisor.MachineInfo()
machineInfo.Timestamp = time.Time{}
klet.setCachedMachineInfo(machineInfo)
...
}

MachineInfo的实现

1
2
3
4
5
6
7
8
9
10
func MachineInfo() Setter {

return func(node *v1.Node) error {

// 将cpu、memory、hugepages-* 更新到status.capacity
for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) {
node.Status.Capacity[rName] = rCap
}
}
}

cachedMachineInfo从kubelet的缓存machineInfo里获取,该值是在kubelet创建对象和启动时,通过setCachedMachineInfo做一次缓存

cadvisor获取机器相关信息并设置到kubelet的缓存对象machineInfo中,后面上传信息时只会从kubeletcached中获取。

NOTE: NewMainKubelet 只会在kubelet创建时启动,也就是只会设置一次。

所以cpumemoryhugepages-*等机器信息添加到了node.Status部分

其余资源通过containerManager去获取信息, cm的实现是containerManagerImpl对象,位于pkg/kubelet/cm/container_manager_linux.go,下面分别介绍

kl.containerManager.GetCapacity

cm的容量是指ephemeral-storage,我想是因为cm负责容器生命周期管理,容器信息会存放在/var/lib/kubelet中, 需要一定的存储空间能力,之所以叫临时存储,是相对于持久存储来说的,当pod被删除,该部分所占用的存储空间也会被回收。

1
2
3
func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
return cm.capacity
}

pkg/kubelet/cm/container_manager_linux.go
cm 在启动时会统计它的capacity, 这部分capacity就是/var/lib/kubelet挂在点所在分区的容量,通常就是节点的根分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (cm *containerManagerImpl) Start(){

if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LocalStorageCapacityIsolation) {
rootfs, err := cm.cadvisorInterface.RootFsInfo()
if err != nil {
return fmt.Errorf("failed to get rootfs info: %v", err)
}
for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) {
cm.capacity[rName] = rCap
}
}
}

func EphemeralStorageCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceList {
c := v1.ResourceList{
v1.ResourceEphemeralStorage: *resource.NewQuantity(
int64(info.Capacity),
resource.BinarySI),
}
return c
}

获取rootPath的路径信息

1
2
3
func (cc *cadvisorClient) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
return cc.GetDirFsInfo(cc.rootPath)
}

这里的rootpath是?

它是CAdvisorInterface对象在初始化时设置的 s.RootDirectory, 默认值是/var/lib/kubelet

代码cmd/kubelet/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
func run() (err error) {

if kubeDeps.CAdvisorInterface == nil {
// 镜像信息
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)

kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
if err != nil {
return err
}
}

}

可以看到fs相关的,比如/var/lib/kubelet的容量或镜像信息都是通过cadvisor提供的

kl.containerManager.GetDevicePluginResourceCapacity

这里的容量是指device所提供的资源容量

cm调用deviceManager获得

pkg/kubelet/cm/devicemanager/manager.go

1
2
3
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return cm.deviceManager.GetCapacity()
}

kubelet 更新 node status 时该方法会被调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {

// 遍历所有的健康device, key是资源名,v是device信息,map类型
for resourceName, devices := range m.healthyDevices {

// 将devices的长度作为资源量,通过数量体现的扩展资源。
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)

}

for resourceName, devices := range m.unhealthyDevices {
// 资源是否还在endpoints列表中
eI, ok := m.endpoints[resourceName]
if (ok && eI.e.stopGracePeriodExpired()) || !ok {
if !ok {
klog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync")
}
// 不在endpoints列表中,或者存在但是在stop中
// 将该资源清理,同时更新checkpoint文件
delete(m.endpoints, resourceName)
delete(m.unhealthyDevices, resourceName)
deletedResources.Insert(resourceName)
needsUpdateCheckpoint = true
} else {
// 将之前不健康的resource重新加回到capacity中
capacityCount := capacity[v1.ResourceName(resourceName)]
unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
capacityCount.Add(unhealthyCount)
capacity[v1.ResourceName(resourceName)] = capacityCount
}

}

return capacity, allocatable, deletedResources.UnsortedList()
}

返回值

  • capacity: contains the registered device plugin resource capacity.
  • allocatable:contains the registered device plugin resource allocatable. 其实两者是相同的
  • deletedResources.UnsortedList(): contains previously registered resources that are no longer active. inactive状态的资源。

前两个是map类型,可以包含多种类型的资源,最后一个返回值是list类型

kl.containerManager.GetNodeAllocatableReservation

位于 pkg/kubelet/cm/node_container_manager_linux.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList {
evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity)
result := make(v1.ResourceList)
for k := range cm.capacity {
value := resource.NewQuantity(0, resource.DecimalSI)
if cm.NodeConfig.SystemReserved != nil {
value.Add(cm.NodeConfig.SystemReserved[k])
}
if cm.NodeConfig.KubeReserved != nil {
value.Add(cm.NodeConfig.KubeReserved[k])
}
if evictionReservation != nil {
value.Add(evictionReservation[k])
}
if !value.IsZero() {
result[k] = *value
}
}
return result
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func hardEvictionReservation() v1.ResourceList {

for _, threshold := range thresholds {
switch threshold.Signal {
case evictionapi.SignalMemoryAvailable:
memoryCapacity := capacity[v1.ResourceMemory]
// 乘以percentage
value := evictionapi.GetThresholdQuantity(threshold.Value, &memoryCapacity)
ret[v1.ResourceMemory] = *value
case evictionapi.SignalNodeFsAvailable:
storageCapacity := capacity[v1.ResourceEphemeralStorage]
// 乘以percentage
value := evictionapi.GetThresholdQuantity(threshold.Value, &storageCapacity)
ret[v1.ResourceEphemeralStorage] = *value
}
}
}

驱逐预留出来的资源包括两部分

  • memory.available 对应capacity的meory部分
  • nodefs.available 对应capacity的ephemeral-storage部分

乘以百分比后(通常5%),就是驱逐预留出来的资源

MachineInfo的执行逻辑

将资源更新到ndoe.statuscapacityallocatable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func MachineInfo(){

// Capacity中cpu, memory and hugepage-* 部分
info, err := machineInfoFunc()
for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) {
node.Status.Capacity[rName] = rCap
}

// capacity设置ephemeral-storage 部分
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
initialCapacity := capacityFunc()
if initialCapacity != nil {
if v, exists := initialCapacity[v1.ResourceEphemeralStorage]; exists {
node.Status.Capacity[v1.ResourceEphemeralStorage] = v
}
}
}

// capaicty 设置扩展资源
devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = devicePluginResourceCapacityFunc()

// 设置node status的capacity域
for k, v := range devicePluginCapacity {
if old, ok := node.Status.Capacity[k]; !ok || old.Value() != v.Value() {
}
node.Status.Capacity[k] = v
}

for _, removedResource := range removedDevicePlugins {

// inactive的device,设置node status的capacity为0
node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI)

}


// allocatable 部分的cpu、memory
// allocatable = capacity - reserve。
// 预留资源包括三部分,cpu相关的(kubereserved,systemreserved),memrory和nodefs相关的eviction
allocatableReservation := nodeAllocatableReservationFunc()
for k, v := range node.Status.Capacity {
value := v.DeepCopy()
if res, exists := allocatableReservation[k]; exists {
// 减去预留的资源就是allocatable
value.Sub(res)
}
node.Status.Allocatable[k] = value
}

// allocatable部分的扩展资源, 其实capacity总是等于allocatable
for k, v := range devicePluginAllocatable {
if old, ok := node.Status.Allocatable[k]; !ok || old.Value() != v.Value() {
klog.V(2).Infof("Update allocatable for %s to %d", k, v.Value())
}
node.Status.Allocatable[k] = v
}


// 如果开启的大页内存,allocatable部分的memory还需要减去大页的内存。
for k, v := range node.Status.Capacity {
if v1helper.IsHugePageResourceName(k) {
allocatableMemory := node.Status.Allocatable[v1.ResourceMemory]
value := v.DeepCopy()
allocatableMemory.Sub(value)
if allocatableMemory.Sign() < 0 {
// Negative Allocatable resources don't make sense.
allocatableMemory.Set(0)
}
node.Status.Allocatable[v1.ResourceMemory] = allocatableMemory
}
}
}

device plugin Register

每启动一个device plugin 则会向containerManager中的DeviceManager 发起注册请求,也就是调用该Register方法

1
2
3
4
5
6
7
8
9
10
11
12
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {

// 版本检查
// 资源名检查


go m.addEndpoint(r)

// 不会返回错误信息
return &pluginapi.Empty{}, nil

}

创建一个协程去干活,多个plugin的注册不会互相影响,提高注册的并发。

device plugin 发过来的请求是
vendor/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/api.pb.go

1
2
3
4
5
6
7
8
9
10
11
12
13
type RegisterRequest struct {

Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"`
// device plugin 监听的socket地址, string类型
// PATH = path.Join(DevicePluginPath, endpoint)
Endpoint string `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
// 资源名字
ResourceName string `protobuf:"bytes,3,opt,name=resource_name,json=resourceName,proto3" json:"resource_name,omitempty"`
// Options to be communicated with Device Manager
Options *DevicePluginOptions `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}

addEndpoint

1
2
3
4
5
6
7
8
9
10
11
12
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
if err != nil {
klog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}
// device plugin建立resourceName和对象的关联
m.registerEndpoint(r.ResourceName, r.Options, new)
go func() {
m.runEndpoint(r.ResourceName, new)
}()
}
  1. new一个endpoint, 将device plugin manager的socketdir和device plugin发过来的endpoint 组成一个新的路径, 同时还有device plugin manager的回调函数。

runEndpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
e.run() // 里面是个死循环,run退出说明listandWatch异常,也就是断开链接了,可认为device unhealth.
e.stop()

m.mutex.Lock()
defer m.mutex.Unlock()

// 标记资源 unhealthy
if old, ok := m.endpoints[resourceName]; ok && old.e == e {
m.markResourceUnhealthy(resourceName)
}

klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
}

markResourceUnhealthy

将unhealthy device添加到device plugin manager的unhealthyDevices列表中

1
2
3
4
5
6
7
8
9
10
11
12
func (m *ManagerImpl) markResourceUnhealthy(resourceName string) {
klog.V(2).Infof("Mark all resources Unhealthy for resource %s", resourceName)
healthyDevices := sets.NewString()
if _, ok := m.healthyDevices[resourceName]; ok {
healthyDevices = m.healthyDevices[resourceName]
m.healthyDevices[resourceName] = sets.NewString()
}
if _, ok := m.unhealthyDevices[resourceName]; !ok {
m.unhealthyDevices[resourceName] = sets.NewString()
}
m.unhealthyDevices[resourceName] = m.unhealthyDevices[resourceName].Union(healthyDevices)
}

run

callback 完成资源的注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (e *endpointImpl) run() {
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)

return
}

for {
response, err := stream.Recv()
devs := response.Devices
klog.V(2).Infof("State pushed for device plugin %s", e.resourceName)

for _, d := range devs {
newDevs = append(newDevs, *d)
}

e.callback(e.resourceName, newDevs)

}

ListAndWatch会返回device信息,
vendor/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/api.pb.go

1
2
3
4
5
6
7
// E.g:
// struct Device {
// ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
// Health: "Healthy",
// Topology:
// Node:
//

callback 完成资源的注册

1
2
3
4
5
6
func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)

manager.callback = manager.genericDeviceUpdateCallback

}

callback: genericDeviceUpdateCallback

pkg/kubelet/cm/devicemanager/manager.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
m.allDevices[resourceName] = make(map[string]pluginapi.Device)
for _, dev := range devices {
// device的信息记录到resourcename和device.id组成的二维矩阵中,并赋值到device plugin manager的allDevices中
m.allDevices[resourceName][dev.ID] = dev
if dev.Health == pluginapi.Healthy {
// 只记device id
m.healthyDevices[resourceName].Insert(dev.ID)
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
}
m.mutex.Unlock()
if err := m.writeCheckpoint(); err != nil {
klog.Errorf("writing checkpoint encountered %v", err)
}
}

根据device的健康状态,分别将resourceName和device信息(map,只记录了key是devic.ID, value时Empty{}类型)添加到device plugin managerhealthyDevicesunhealthyDevices这两个map中

syncNode周期执行时,会调用device plugin managerGetCapacity方法, 从上面两个map获取扩展资源的容量. 资源量就是map的长度。

kubelet_internal_checkpoint

对于healthy的device,也就是完成注册的device,ListAndWatch流程最后还会封装data对象并记录到/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
func (m *ManagerImpl) writeCheckpoint() error {

registeredDevs := make(map[string][]string)
for resource, devices := range m.healthyDevices {
registeredDevs[resource] = devices.UnsortedList()
}

// pod信息和device信息关联
data := checkpoint.New(m.podDevices.toCheckpointData(), registeredDevs)
// 数据落盘
err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data)
}

总结

capacity部分

  1. node.status.capacity中cpumemoryhugepages-*等机器信息,是从kubele缓存里读取,只在kubelet启动时更新一次,这个确实是固定不变的,周期syncNodeStatus时,都是从cached里获取.
  2. capaticy.ephemeral-storage 是从contaienrManager的缓存里读取。也是固定不变的,因为kubelet工作的挂载点存储容量不变。
  3. capacity.扩展是从devicePlugin Manager里获取的,它在变化的,因为device plugin是动态添加删除或者发生故障的,在每次syncNodeStatus时同步更新,后面附上相关代码。

allocatable部分

  1. allocatable是从capacity部分计算来的,即capacity-reserved的资源,预留资源包括三部分,cpu相关的(kubereserved,systemreserved),memrory和nodefs相关的eviction. 所以出了配置reserve资源外,allocatable部分不会变化
  2. 对于剩余的扩展资源来说,我们在代码中发现capacity总是等于allocatable部分,所以是不变的。

device 部分

  1. devicde plugin manager 是cm的子manager
  2. dpm最关键对象是healthyDevicesunhealthyDevices这两个map, key是资源名字,v是device信息(map,关键的是devic.ID), 还有callback函数。

device 信息

1
2
3
4
5
6
7
// E.g:
// struct Device {
// ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
// Health: "Healthy",
// Topology:
// Node:
//

在gpu测试环境上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# cat  /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint | jq .
{
"Data": {
"PodDeviceEntries": [
{
"PodUID": "e956d5f9-abb2-49a5-b9bf-78c5e1064a0f",
"ContainerName": "xiamu-gpu-instance-1693382814-8ogoiqca",
"ResourceName": "nvidia.com/gpu",
"DeviceIDs": {
"0": [
"GPU-6f89ddc0-0967-bfb2-d004-ea8538bb312a"
]
},
"AllocResp": "CkIKFk5WSURJQV9WSVNJQkxFX0RFVklDRVMSKEdQVS02Zjg5ZGRjMC0wOTY3LWJmYjItZDAwNC1lYTg1MzhiYjMxMmE="
}
],
"RegisteredDevices": {
"nvidia.com/gpu": [ // 资源名: device.ID
"GPU-6f89ddc0-0967-bfb2-d004-ea8538bb312a"
]
}
},
"Checksum": 3431416169
}