kubelet 会通过syncNodeStatus周期上报node状态, 最终调用的是defaultNodeStatusFuncs
defaultNodeStatusFuncs
节点信息是由许多setter func完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13
   | func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
  	var setters []func(n *v1.Node) error 	setters = append(setters, 	nodestatus.MachineInfo(string(kl.nodeName),            kl.maxPods,  	    kl.podsPerCore,  	    kl.GetCachedMachineInfo,  	    kl.containerManager.GetCapacity,  	    kl.containerManager.GetDevicePluginResourceCapacity,  	    kl.containerManager.GetNodeAllocatableReservation,  	    kl.recordEvent),
  }
  | 
 
这里主要关注MachineInfo这一个setter
获取信息有两个途径
- cadvisor: 获取的是
cpu,memory,hugepage-* 对应的方法是GetCachedMachineInfo 
- containerManager: 获取的是
systemreserved, kubereserved和扩展资源比如nvidia.com/gpu 
kl.GetCachedMachineInfo
1 2 3 4 5 6 7 8 9 10 11 12
   | // GetCachedMachineInfo assumes that the machine info can't change without a reboot func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorapiv1.MachineInfo, error) { 	kl.machineInfoLock.RLock() 	defer kl.machineInfoLock.RUnlock() 	return kl.machineInfo, nil }
  func (kl *Kubelet) setCachedMachineInfo(info *cadvisorapiv1.MachineInfo) { 	kl.machineInfoLock.Lock() 	defer kl.machineInfoLock.Unlock() 	kl.machineInfo = info }
   | 
 
1 2 3 4 5 6 7
   | func NewMainKubelet(){     ... 	machineInfo, err := klet.cadvisor.MachineInfo() 	machineInfo.Timestamp = time.Time{} 	klet.setCachedMachineInfo(machineInfo)     ... }
  | 
 
MachineInfo的实现
1 2 3 4 5 6 7 8 9 10
   | func MachineInfo() Setter {
  	return func(node *v1.Node) error { 	 	    // 将cpu、memory、hugepages-* 更新到status.capacity     	for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) {     		node.Status.Capacity[rName] = rCap     	}     } }
  | 
 
cachedMachineInfo从kubelet的缓存machineInfo里获取,该值是在kubelet创建对象和启动时,通过setCachedMachineInfo做一次缓存
从cadvisor获取机器相关信息并设置到kubelet的缓存对象machineInfo中,后面上传信息时只会从kubeletcached中获取。
NOTE: NewMainKubelet 只会在kubelet创建时启动,也就是只会设置一次。
所以cpu、memory、hugepages-*等机器信息添加到了node.Status部分
其余资源通过containerManager去获取信息, cm的实现是containerManagerImpl对象,位于pkg/kubelet/cm/container_manager_linux.go,下面分别介绍
kl.containerManager.GetCapacity
cm的容量是指ephemeral-storage,我想是因为cm负责容器生命周期管理,容器信息会存放在/var/lib/kubelet中, 需要一定的存储空间能力,之所以叫临时存储,是相对于持久存储来说的,当pod被删除,该部分所占用的存储空间也会被回收。
1 2 3
   | func (cm *containerManagerImpl) GetCapacity() v1.ResourceList { 	return cm.capacity }
  | 
 
pkg/kubelet/cm/container_manager_linux.go
cm 在启动时会统计它的capacity, 这部分capacity就是/var/lib/kubelet挂在点所在分区的容量,通常就是节点的根分区。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
   | func (cm *containerManagerImpl) Start(){
  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LocalStorageCapacityIsolation) { 		rootfs, err := cm.cadvisorInterface.RootFsInfo() 		if err != nil { 			return fmt.Errorf("failed to get rootfs info: %v", err) 		} 		for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) { 			cm.capacity[rName] = rCap 		} 	}     }
  func EphemeralStorageCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceList { 	c := v1.ResourceList{ 		v1.ResourceEphemeralStorage: *resource.NewQuantity( 			int64(info.Capacity), 			resource.BinarySI), 	} 	return c }
  | 
 
获取rootPath的路径信息
1 2 3
   | func (cc *cadvisorClient) RootFsInfo() (cadvisorapiv2.FsInfo, error) { 	return cc.GetDirFsInfo(cc.rootPath) }
  | 
 
这里的rootpath是?
它是CAdvisorInterface对象在初始化时设置的 s.RootDirectory, 默认值是/var/lib/kubelet
代码cmd/kubelet/app/server.go
1 2 3 4 5 6 7 8 9 10 11 12 13
   | func run() (err error) {
  	if kubeDeps.CAdvisorInterface == nil { 	    // 镜像信息 		imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint) 		 		kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint)) 		if err != nil { 			return err 		} 	} 	 }
  | 
 
可以看到fs相关的,比如/var/lib/kubelet的容量或镜像信息都是通过cadvisor提供的
kl.containerManager.GetDevicePluginResourceCapacity
这里的容量是指device所提供的资源容量
cm调用deviceManager获得
pkg/kubelet/cm/devicemanager/manager.go
1 2 3
   | func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { 	return cm.deviceManager.GetCapacity() }
  | 
 
当 kubelet 更新 node status 时该方法会被调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
   | func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) { 	 	// 遍历所有的健康device, key是资源名,v是device信息,map类型 	for resourceName, devices := range m.healthyDevices { 	 	    // 将devices的长度作为资源量,通过数量体现的扩展资源。 		capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) 		allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) 	 	} 	 	for resourceName, devices := range m.unhealthyDevices { 	    // 资源是否还在endpoints列表中 	    eI, ok := m.endpoints[resourceName] 		if (ok && eI.e.stopGracePeriodExpired()) || !ok { 			if !ok { 				klog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync") 			} 			// 不在endpoints列表中,或者存在但是在stop中 			// 将该资源清理,同时更新checkpoint文件 			delete(m.endpoints, resourceName) 			delete(m.unhealthyDevices, resourceName) 			deletedResources.Insert(resourceName) 			needsUpdateCheckpoint = true 		} else { 		    // 将之前不健康的resource重新加回到capacity中 			capacityCount := capacity[v1.ResourceName(resourceName)] 			unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) 			capacityCount.Add(unhealthyCount) 			capacity[v1.ResourceName(resourceName)] = capacityCount 		} 	 	} 	 	return capacity, allocatable, deletedResources.UnsortedList() }
  | 
 
返回值
- capacity: contains the registered device plugin resource capacity.
 
- allocatable:contains the registered device plugin resource allocatable. 其实两者是相同的
 
- deletedResources.UnsortedList(): contains previously registered resources that are no longer active. inactive状态的资源。
 
前两个是map类型,可以包含多种类型的资源,最后一个返回值是list类型
kl.containerManager.GetNodeAllocatableReservation
位于 pkg/kubelet/cm/node_container_manager_linux.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
   | func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList { 	evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity) 	result := make(v1.ResourceList) 	for k := range cm.capacity { 		value := resource.NewQuantity(0, resource.DecimalSI) 		if cm.NodeConfig.SystemReserved != nil { 			value.Add(cm.NodeConfig.SystemReserved[k]) 		} 		if cm.NodeConfig.KubeReserved != nil { 			value.Add(cm.NodeConfig.KubeReserved[k]) 		} 		if evictionReservation != nil { 			value.Add(evictionReservation[k]) 		} 		if !value.IsZero() { 			result[k] = *value 		} 	} 	return result }
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   | func hardEvictionReservation() v1.ResourceList {
  	for _, threshold := range thresholds { 		switch threshold.Signal { 		case evictionapi.SignalMemoryAvailable: 			memoryCapacity := capacity[v1.ResourceMemory] 			// 乘以percentage 			value := evictionapi.GetThresholdQuantity(threshold.Value, &memoryCapacity) 			ret[v1.ResourceMemory] = *value 		case evictionapi.SignalNodeFsAvailable: 			storageCapacity := capacity[v1.ResourceEphemeralStorage] 			// 乘以percentage 			value := evictionapi.GetThresholdQuantity(threshold.Value, &storageCapacity) 			ret[v1.ResourceEphemeralStorage] = *value 		} 	} }
  | 
 
驱逐预留出来的资源包括两部分
- memory.available 对应capacity的meory部分
 
- nodefs.available 对应capacity的ephemeral-storage部分
 
乘以百分比后(通常5%),就是驱逐预留出来的资源
MachineInfo的执行逻辑
将资源更新到ndoe.status的capacity和allocatable中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
   | func MachineInfo(){
      // Capacity中cpu, memory and hugepage-*  部分 	info, err := machineInfoFunc() 	for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) { 		node.Status.Capacity[rName] = rCap 	}	 	 	// capacity设置ephemeral-storage 部分 	if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { 		initialCapacity := capacityFunc() 		if initialCapacity != nil { 			if v, exists := initialCapacity[v1.ResourceEphemeralStorage]; exists { 				node.Status.Capacity[v1.ResourceEphemeralStorage] = v 			} 		} 	} 	 	// capaicty 设置扩展资源	 	devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = devicePluginResourceCapacityFunc() 	 	// 设置node status的capacity域 	for k, v := range devicePluginCapacity { 		if old, ok := node.Status.Capacity[k]; !ok || old.Value() != v.Value() { 		} 		node.Status.Capacity[k] = v 	} 	 	for _, removedResource := range removedDevicePlugins { 	 	    // inactive的device,设置node status的capacity为0 		node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI) 	 	} 	 	 	// allocatable 部分的cpu、memory 	// allocatable = capacity - reserve。 	// 预留资源包括三部分,cpu相关的(kubereserved,systemreserved),memrory和nodefs相关的eviction 	allocatableReservation := nodeAllocatableReservationFunc() 	for k, v := range node.Status.Capacity { 		value := v.DeepCopy() 		if res, exists := allocatableReservation[k]; exists { 		    // 减去预留的资源就是allocatable 			value.Sub(res) 		} 		node.Status.Allocatable[k] = value 	} 	 	// allocatable部分的扩展资源, 其实capacity总是等于allocatable 	for k, v := range devicePluginAllocatable { 		if old, ok := node.Status.Allocatable[k]; !ok || old.Value() != v.Value() { 			klog.V(2).Infof("Update allocatable for %s to %d", k, v.Value()) 		} 		node.Status.Allocatable[k] = v 	} 	 	 	// 如果开启的大页内存,allocatable部分的memory还需要减去大页的内存。 	for k, v := range node.Status.Capacity { 		if v1helper.IsHugePageResourceName(k) { 			allocatableMemory := node.Status.Allocatable[v1.ResourceMemory] 			value := v.DeepCopy() 			allocatableMemory.Sub(value) 			if allocatableMemory.Sign() < 0 { 				// Negative Allocatable resources don't make sense. 				allocatableMemory.Set(0) 			} 			node.Status.Allocatable[v1.ResourceMemory] = allocatableMemory 		} 	} }
  | 
 
device plugin Register
每启动一个device plugin 则会向containerManager中的DeviceManager 发起注册请求,也就是调用该Register方法
1 2 3 4 5 6 7 8 9 10 11 12
   | func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
      // 版本检查     // 资源名检查
 
  	go m.addEndpoint(r) 	 	// 不会返回错误信息 	return &pluginapi.Empty{}, nil
  }
  | 
 
创建一个协程去干活,多个plugin的注册不会互相影响,提高注册的并发。
device plugin 发过来的请求是
vendor/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/api.pb.go
1 2 3 4 5 6 7 8 9 10 11 12 13
   | type RegisterRequest struct {
  	Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` 	// device plugin 监听的socket地址, string类型 	// PATH = path.Join(DevicePluginPath, endpoint) 	Endpoint string `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"`     // 资源名字 	ResourceName string `protobuf:"bytes,3,opt,name=resource_name,json=resourceName,proto3" json:"resource_name,omitempty"` 	// Options to be communicated with Device Manager 	Options              *DevicePluginOptions `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` 	XXX_NoUnkeyedLiteral struct{}             `json:"-"` 	XXX_sizecache        int32                `json:"-"` }
  | 
 
addEndpoint
1 2 3 4 5 6 7 8 9 10 11 12
   | func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { 	new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback) 	if err != nil { 		klog.Errorf("Failed to dial device plugin with request %v: %v", r, err) 		return 	} 	// device plugin建立resourceName和对象的关联 	m.registerEndpoint(r.ResourceName, r.Options, new) 	go func() { 		m.runEndpoint(r.ResourceName, new) 	}() }
  | 
 
- new一个endpoint, 将device plugin manager的socketdir和device plugin发过来的endpoint 组成一个新的路径, 同时还有device plugin manager的回调函数。
 
runEndpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14
   | func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) { 	e.run()  // 里面是个死循环,run退出说明listandWatch异常,也就是断开链接了,可认为device unhealth. 	e.stop()
  	m.mutex.Lock() 	defer m.mutex.Unlock()
      // 标记资源 unhealthy 	if old, ok := m.endpoints[resourceName]; ok && old.e == e { 		m.markResourceUnhealthy(resourceName) 	}
  	klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e) }
  | 
 
markResourceUnhealthy
将unhealthy device添加到device plugin manager的unhealthyDevices列表中
1 2 3 4 5 6 7 8 9 10 11 12
   | func (m *ManagerImpl) markResourceUnhealthy(resourceName string) { 	klog.V(2).Infof("Mark all resources Unhealthy for resource %s", resourceName) 	healthyDevices := sets.NewString() 	if _, ok := m.healthyDevices[resourceName]; ok { 		healthyDevices = m.healthyDevices[resourceName] 		m.healthyDevices[resourceName] = sets.NewString() 	} 	if _, ok := m.unhealthyDevices[resourceName]; !ok { 		m.unhealthyDevices[resourceName] = sets.NewString() 	} 	m.unhealthyDevices[resourceName] = m.unhealthyDevices[resourceName].Union(healthyDevices) }
  | 
 
run
callback 完成资源的注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
   | func (e *endpointImpl) run() { 	stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{}) 	if err != nil { 		klog.Errorf(errListAndWatch, e.resourceName, err)
  		return 	}
  	for { 		response, err := stream.Recv() 		devs := response.Devices 		klog.V(2).Infof("State pushed for device plugin %s", e.resourceName) 		 		for _, d := range devs { 			newDevs = append(newDevs, *d) 		}
  		e.callback(e.resourceName, newDevs) 	 	}
  | 
 
ListAndWatch会返回device信息,
vendor/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/api.pb.go
1 2 3 4 5 6 7
   | // E.g: // struct Device { //    ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", //    Health: "Healthy", //    Topology: //      Node: //        
   | 
 
callback 完成资源的注册
1 2 3 4 5 6
   | func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) { 	klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath) 	 	manager.callback = manager.genericDeviceUpdateCallback 	 }
  | 
 
callback: genericDeviceUpdateCallback
pkg/kubelet/cm/devicemanager/manager.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
   | func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) { 	m.mutex.Lock() 	m.healthyDevices[resourceName] = sets.NewString() 	m.unhealthyDevices[resourceName] = sets.NewString() 	m.allDevices[resourceName] = make(map[string]pluginapi.Device) 	for _, dev := range devices { 	    // device的信息记录到resourcename和device.id组成的二维矩阵中,并赋值到device plugin manager的allDevices中 		m.allDevices[resourceName][dev.ID] = dev 		if dev.Health == pluginapi.Healthy { 		    // 只记device id 			m.healthyDevices[resourceName].Insert(dev.ID) 		} else { 			m.unhealthyDevices[resourceName].Insert(dev.ID) 		} 	} 	m.mutex.Unlock() 	if err := m.writeCheckpoint(); err != nil { 		klog.Errorf("writing checkpoint encountered %v", err) 	} }
  | 
 
根据device的健康状态,分别将resourceName和device信息(map,只记录了key是devic.ID, value时Empty{}类型)添加到device plugin manager的healthyDevices和unhealthyDevices这两个map中
在syncNode周期执行时,会调用device plugin manager的GetCapacity方法, 从上面两个map获取扩展资源的容量. 资源量就是map的长度。
kubelet_internal_checkpoint
对于healthy的device,也就是完成注册的device,ListAndWatch流程最后还会封装data对象并记录到/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint文件中
1 2 3 4 5 6 7 8 9 10 11 12 13
   | func (m *ManagerImpl) writeCheckpoint() error {
  	registeredDevs := make(map[string][]string) 	for resource, devices := range m.healthyDevices { 		registeredDevs[resource] = devices.UnsortedList() 	} 	 	// pod信息和device信息关联 	data := checkpoint.New(m.podDevices.toCheckpointData(), registeredDevs) 	// 数据落盘 	err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data) }
 
  | 
 
总结
capacity部分
- node.status.capacity中
cpu、memory、hugepages-*等机器信息,是从kubele缓存里读取,只在kubelet启动时更新一次,这个确实是固定不变的,周期syncNodeStatus时,都是从cached里获取. 
- capaticy.ephemeral-storage 是从contaienrManager的缓存里读取。也是固定不变的,因为kubelet工作的挂载点存储容量不变。
 
- capacity.扩展是从devicePlugin Manager里获取的,它在变化的,因为device plugin是动态添加删除或者发生故障的,在每次syncNodeStatus时同步更新,后面附上相关代码。
 
allocatable部分
- allocatable是从capacity部分计算来的,即capacity-reserved的资源,预留资源包括三部分,cpu相关的(kubereserved,systemreserved),memrory和nodefs相关的eviction. 所以出了配置reserve资源外,allocatable部分不会变化
 
- 对于剩余的扩展资源来说,我们在代码中发现capacity总是等于allocatable部分,所以是不变的。
 
device 部分
- devicde plugin manager 是cm的子manager
 
- dpm最关键对象是
healthyDevices和unhealthyDevices这两个map, key是资源名字,v是device信息(map,关键的是devic.ID), 还有callback函数。 
device 信息
1 2 3 4 5 6 7
   | // E.g: // struct Device { //    ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", //    Health: "Healthy", //    Topology: //      Node: //        
   | 
 
在gpu测试环境上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
   | # cat  /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint | jq . {   "Data": {     "PodDeviceEntries": [       {         "PodUID": "e956d5f9-abb2-49a5-b9bf-78c5e1064a0f",         "ContainerName": "xiamu-gpu-instance-1693382814-8ogoiqca",         "ResourceName": "nvidia.com/gpu",         "DeviceIDs": {           "0": [             "GPU-6f89ddc0-0967-bfb2-d004-ea8538bb312a"           ]         },         "AllocResp": "CkIKFk5WSURJQV9WSVNJQkxFX0RFVklDRVMSKEdQVS02Zjg5ZGRjMC0wOTY3LWJmYjItZDAwNC1lYTg1MzhiYjMxMmE="       }     ],     "RegisteredDevices": {       "nvidia.com/gpu": [       // 资源名: device.ID         "GPU-6f89ddc0-0967-bfb2-d004-ea8538bb312a"       ]     }   },   "Checksum": 3431416169 }
   |