kubelet 会通过syncNodeStatus
周期上报node状态, 最终调用的是defaultNodeStatusFuncs
defaultNodeStatusFuncs
节点信息是由许多setter func
完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
var setters []func(n *v1.Node) error setters = append(setters, nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity, kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
}
|
这里主要关注MachineInfo
这一个setter
获取信息有两个途径
- cadvisor: 获取的是
cpu,memory,hugepage-*
对应的方法是GetCachedMachineInfo
- containerManager: 获取的是
systemreserved
, kubereserved
和扩展资源比如nvidia.com/gpu
kl.GetCachedMachineInfo
1 2 3 4 5 6 7 8 9 10 11 12
| // GetCachedMachineInfo assumes that the machine info can't change without a reboot func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorapiv1.MachineInfo, error) { kl.machineInfoLock.RLock() defer kl.machineInfoLock.RUnlock() return kl.machineInfo, nil }
func (kl *Kubelet) setCachedMachineInfo(info *cadvisorapiv1.MachineInfo) { kl.machineInfoLock.Lock() defer kl.machineInfoLock.Unlock() kl.machineInfo = info }
|
1 2 3 4 5 6 7
| func NewMainKubelet(){ ... machineInfo, err := klet.cadvisor.MachineInfo() machineInfo.Timestamp = time.Time{} klet.setCachedMachineInfo(machineInfo) ... }
|
MachineInfo
的实现
1 2 3 4 5 6 7 8 9 10
| func MachineInfo() Setter {
return func(node *v1.Node) error { // 将cpu、memory、hugepages-* 更新到status.capacity for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) { node.Status.Capacity[rName] = rCap } } }
|
cachedMachineInfo
从kubelet的缓存machineInfo
里获取,该值是在kubelet
创建对象和启动时,通过setCachedMachineInfo
做一次缓存
从cadvisor
获取机器相关信息并设置到kubelet
的缓存对象machineInfo
中,后面上传信息时只会从kubelet
cached中获取。
NOTE: NewMainKubelet
只会在kubelet
创建时启动,也就是只会设置一次。
所以cpu
、memory
、hugepages-*
等机器信息添加到了node.Status
部分
其余资源通过containerManager
去获取信息, cm的实现是containerManagerImpl
对象,位于pkg/kubelet/cm/container_manager_linux.go
,下面分别介绍
kl.containerManager.GetCapacity
cm的容量是指ephemeral-storage
,我想是因为cm负责容器生命周期管理,容器信息会存放在/var/lib/kubelet
中, 需要一定的存储空间能力,之所以叫临时存储,是相对于持久存储来说的,当pod被删除,该部分所占用的存储空间也会被回收。
1 2 3
| func (cm *containerManagerImpl) GetCapacity() v1.ResourceList { return cm.capacity }
|
pkg/kubelet/cm/container_manager_linux.go
cm 在启动时会统计它的capacity, 这部分capacity就是/var/lib/kubelet
挂在点所在分区的容量,通常就是节点的根分区。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (cm *containerManagerImpl) Start(){
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LocalStorageCapacityIsolation) { rootfs, err := cm.cadvisorInterface.RootFsInfo() if err != nil { return fmt.Errorf("failed to get rootfs info: %v", err) } for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) { cm.capacity[rName] = rCap } } }
func EphemeralStorageCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceList { c := v1.ResourceList{ v1.ResourceEphemeralStorage: *resource.NewQuantity( int64(info.Capacity), resource.BinarySI), } return c }
|
获取rootPath
的路径信息
1 2 3
| func (cc *cadvisorClient) RootFsInfo() (cadvisorapiv2.FsInfo, error) { return cc.GetDirFsInfo(cc.rootPath) }
|
这里的rootpath是?
它是CAdvisorInterface
对象在初始化时设置的 s.RootDirectory
, 默认值是/var/lib/kubelet
代码cmd/kubelet/app/server.go
1 2 3 4 5 6 7 8 9 10 11 12 13
| func run() (err error) {
if kubeDeps.CAdvisorInterface == nil { // 镜像信息 imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint) kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint)) if err != nil { return err } } }
|
可以看到fs
相关的,比如/var/lib/kubelet
的容量或镜像信息都是通过cadvisor
提供的
kl.containerManager.GetDevicePluginResourceCapacity
这里的容量是指device所提供的资源容量
cm调用deviceManager
获得
pkg/kubelet/cm/devicemanager/manager.go
1 2 3
| func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { return cm.deviceManager.GetCapacity() }
|
当 kubelet
更新 node status
时该方法会被调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) { // 遍历所有的健康device, key是资源名,v是device信息,map类型 for resourceName, devices := range m.healthyDevices { // 将devices的长度作为资源量,通过数量体现的扩展资源。 capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) } for resourceName, devices := range m.unhealthyDevices { // 资源是否还在endpoints列表中 eI, ok := m.endpoints[resourceName] if (ok && eI.e.stopGracePeriodExpired()) || !ok { if !ok { klog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync") } // 不在endpoints列表中,或者存在但是在stop中 // 将该资源清理,同时更新checkpoint文件 delete(m.endpoints, resourceName) delete(m.unhealthyDevices, resourceName) deletedResources.Insert(resourceName) needsUpdateCheckpoint = true } else { // 将之前不健康的resource重新加回到capacity中 capacityCount := capacity[v1.ResourceName(resourceName)] unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) capacityCount.Add(unhealthyCount) capacity[v1.ResourceName(resourceName)] = capacityCount } } return capacity, allocatable, deletedResources.UnsortedList() }
|
返回值
- capacity: contains the registered device plugin resource capacity.
- allocatable:contains the registered device plugin resource allocatable. 其实两者是相同的
- deletedResources.UnsortedList(): contains previously registered resources that are no longer active. inactive状态的资源。
前两个是map类型,可以包含多种类型的资源,最后一个返回值是list类型
kl.containerManager.GetNodeAllocatableReservation
位于 pkg/kubelet/cm/node_container_manager_linux.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList { evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity) result := make(v1.ResourceList) for k := range cm.capacity { value := resource.NewQuantity(0, resource.DecimalSI) if cm.NodeConfig.SystemReserved != nil { value.Add(cm.NodeConfig.SystemReserved[k]) } if cm.NodeConfig.KubeReserved != nil { value.Add(cm.NodeConfig.KubeReserved[k]) } if evictionReservation != nil { value.Add(evictionReservation[k]) } if !value.IsZero() { result[k] = *value } } return result }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func hardEvictionReservation() v1.ResourceList {
for _, threshold := range thresholds { switch threshold.Signal { case evictionapi.SignalMemoryAvailable: memoryCapacity := capacity[v1.ResourceMemory] // 乘以percentage value := evictionapi.GetThresholdQuantity(threshold.Value, &memoryCapacity) ret[v1.ResourceMemory] = *value case evictionapi.SignalNodeFsAvailable: storageCapacity := capacity[v1.ResourceEphemeralStorage] // 乘以percentage value := evictionapi.GetThresholdQuantity(threshold.Value, &storageCapacity) ret[v1.ResourceEphemeralStorage] = *value } } }
|
驱逐预留出来的资源包括两部分
- memory.available 对应capacity的meory部分
- nodefs.available 对应capacity的ephemeral-storage部分
乘以百分比后(通常5%),就是驱逐预留出来的资源
MachineInfo的执行逻辑
将资源更新到ndoe.status
的capacity
和allocatable
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| func MachineInfo(){
// Capacity中cpu, memory and hugepage-* 部分 info, err := machineInfoFunc() for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) { node.Status.Capacity[rName] = rCap } // capacity设置ephemeral-storage 部分 if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { initialCapacity := capacityFunc() if initialCapacity != nil { if v, exists := initialCapacity[v1.ResourceEphemeralStorage]; exists { node.Status.Capacity[v1.ResourceEphemeralStorage] = v } } } // capaicty 设置扩展资源 devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = devicePluginResourceCapacityFunc() // 设置node status的capacity域 for k, v := range devicePluginCapacity { if old, ok := node.Status.Capacity[k]; !ok || old.Value() != v.Value() { } node.Status.Capacity[k] = v } for _, removedResource := range removedDevicePlugins { // inactive的device,设置node status的capacity为0 node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI) } // allocatable 部分的cpu、memory // allocatable = capacity - reserve。 // 预留资源包括三部分,cpu相关的(kubereserved,systemreserved),memrory和nodefs相关的eviction allocatableReservation := nodeAllocatableReservationFunc() for k, v := range node.Status.Capacity { value := v.DeepCopy() if res, exists := allocatableReservation[k]; exists { // 减去预留的资源就是allocatable value.Sub(res) } node.Status.Allocatable[k] = value } // allocatable部分的扩展资源, 其实capacity总是等于allocatable for k, v := range devicePluginAllocatable { if old, ok := node.Status.Allocatable[k]; !ok || old.Value() != v.Value() { klog.V(2).Infof("Update allocatable for %s to %d", k, v.Value()) } node.Status.Allocatable[k] = v } // 如果开启的大页内存,allocatable部分的memory还需要减去大页的内存。 for k, v := range node.Status.Capacity { if v1helper.IsHugePageResourceName(k) { allocatableMemory := node.Status.Allocatable[v1.ResourceMemory] value := v.DeepCopy() allocatableMemory.Sub(value) if allocatableMemory.Sign() < 0 { // Negative Allocatable resources don't make sense. allocatableMemory.Set(0) } node.Status.Allocatable[v1.ResourceMemory] = allocatableMemory } } }
|
device plugin Register
每启动一个device plugin
则会向containerManager
中的DeviceManager
发起注册请求,也就是调用该Register
方法
1 2 3 4 5 6 7 8 9 10 11 12
| func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
// 版本检查 // 资源名检查
go m.addEndpoint(r) // 不会返回错误信息 return &pluginapi.Empty{}, nil
}
|
创建一个协程去干活,多个plugin的注册不会互相影响,提高注册的并发。
device plugin
发过来的请求是
vendor/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/api.pb.go
1 2 3 4 5 6 7 8 9 10 11 12 13
| type RegisterRequest struct {
Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` // device plugin 监听的socket地址, string类型 // PATH = path.Join(DevicePluginPath, endpoint) Endpoint string `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"` // 资源名字 ResourceName string `protobuf:"bytes,3,opt,name=resource_name,json=resourceName,proto3" json:"resource_name,omitempty"` // Options to be communicated with Device Manager Options *DevicePluginOptions `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_sizecache int32 `json:"-"` }
|
addEndpoint
1 2 3 4 5 6 7 8 9 10 11 12
| func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback) if err != nil { klog.Errorf("Failed to dial device plugin with request %v: %v", r, err) return } // device plugin建立resourceName和对象的关联 m.registerEndpoint(r.ResourceName, r.Options, new) go func() { m.runEndpoint(r.ResourceName, new) }() }
|
- new一个endpoint, 将device plugin manager的socketdir和device plugin发过来的endpoint 组成一个新的路径, 同时还有device plugin manager的回调函数。
runEndpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) { e.run() // 里面是个死循环,run退出说明listandWatch异常,也就是断开链接了,可认为device unhealth. e.stop()
m.mutex.Lock() defer m.mutex.Unlock()
// 标记资源 unhealthy if old, ok := m.endpoints[resourceName]; ok && old.e == e { m.markResourceUnhealthy(resourceName) }
klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e) }
|
markResourceUnhealthy
将unhealthy device添加到device plugin manager的unhealthyDevices列表中
1 2 3 4 5 6 7 8 9 10 11 12
| func (m *ManagerImpl) markResourceUnhealthy(resourceName string) { klog.V(2).Infof("Mark all resources Unhealthy for resource %s", resourceName) healthyDevices := sets.NewString() if _, ok := m.healthyDevices[resourceName]; ok { healthyDevices = m.healthyDevices[resourceName] m.healthyDevices[resourceName] = sets.NewString() } if _, ok := m.unhealthyDevices[resourceName]; !ok { m.unhealthyDevices[resourceName] = sets.NewString() } m.unhealthyDevices[resourceName] = m.unhealthyDevices[resourceName].Union(healthyDevices) }
|
run
callback 完成资源的注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (e *endpointImpl) run() { stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{}) if err != nil { klog.Errorf(errListAndWatch, e.resourceName, err)
return }
for { response, err := stream.Recv() devs := response.Devices klog.V(2).Infof("State pushed for device plugin %s", e.resourceName) for _, d := range devs { newDevs = append(newDevs, *d) }
e.callback(e.resourceName, newDevs) }
|
ListAndWatch会返回device信息,
vendor/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/api.pb.go
1 2 3 4 5 6 7
| // E.g: // struct Device { // ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", // Health: "Healthy", // Topology: // Node: //
|
callback 完成资源的注册
1 2 3 4 5 6
| func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) { klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath) manager.callback = manager.genericDeviceUpdateCallback }
|
callback: genericDeviceUpdateCallback
pkg/kubelet/cm/devicemanager/manager.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) { m.mutex.Lock() m.healthyDevices[resourceName] = sets.NewString() m.unhealthyDevices[resourceName] = sets.NewString() m.allDevices[resourceName] = make(map[string]pluginapi.Device) for _, dev := range devices { // device的信息记录到resourcename和device.id组成的二维矩阵中,并赋值到device plugin manager的allDevices中 m.allDevices[resourceName][dev.ID] = dev if dev.Health == pluginapi.Healthy { // 只记device id m.healthyDevices[resourceName].Insert(dev.ID) } else { m.unhealthyDevices[resourceName].Insert(dev.ID) } } m.mutex.Unlock() if err := m.writeCheckpoint(); err != nil { klog.Errorf("writing checkpoint encountered %v", err) } }
|
根据device的健康状态,分别将resourceName
和device信息(map,只记录了key是devic.ID
, value时Empty{}类型)添加到device plugin manager
的healthyDevices
和unhealthyDevices
这两个map中
在syncNode
周期执行时,会调用device plugin manager
的GetCapacity
方法, 从上面两个map获取扩展资源的容量. 资源量就是map的长度。
kubelet_internal_checkpoint
对于healthy的device,也就是完成注册的device,ListAndWatch流程最后还会封装data对象并记录到/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint
文件中
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (m *ManagerImpl) writeCheckpoint() error {
registeredDevs := make(map[string][]string) for resource, devices := range m.healthyDevices { registeredDevs[resource] = devices.UnsortedList() } // pod信息和device信息关联 data := checkpoint.New(m.podDevices.toCheckpointData(), registeredDevs) // 数据落盘 err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data) }
|
总结
capacity部分
- node.status.capacity中
cpu
、memory
、hugepages-*
等机器信息,是从kubele缓存里读取,只在kubelet启动时更新一次,这个确实是固定不变的,周期syncNodeStatus时,都是从cached里获取.
- capaticy.ephemeral-storage 是从contaienrManager的缓存里读取。也是固定不变的,因为kubelet工作的挂载点存储容量不变。
- capacity.扩展是从devicePlugin Manager里获取的,它在变化的,因为device plugin是动态添加删除或者发生故障的,在每次syncNodeStatus时同步更新,后面附上相关代码。
allocatable部分
- allocatable是从capacity部分计算来的,即capacity-reserved的资源,预留资源包括三部分,cpu相关的(kubereserved,systemreserved),memrory和nodefs相关的eviction. 所以出了配置reserve资源外,allocatable部分不会变化
- 对于剩余的扩展资源来说,我们在代码中发现capacity总是等于allocatable部分,所以是不变的。
device 部分
- devicde plugin manager 是cm的子manager
- dpm最关键对象是
healthyDevices
和unhealthyDevices
这两个map, key是资源名字,v是device信息(map,关键的是devic.ID
), 还有callback函数。
device 信息
1 2 3 4 5 6 7
| // E.g: // struct Device { // ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", // Health: "Healthy", // Topology: // Node: //
|
在gpu测试环境上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| # cat /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint | jq . { "Data": { "PodDeviceEntries": [ { "PodUID": "e956d5f9-abb2-49a5-b9bf-78c5e1064a0f", "ContainerName": "xiamu-gpu-instance-1693382814-8ogoiqca", "ResourceName": "nvidia.com/gpu", "DeviceIDs": { "0": [ "GPU-6f89ddc0-0967-bfb2-d004-ea8538bb312a" ] }, "AllocResp": "CkIKFk5WSURJQV9WSVNJQkxFX0RFVklDRVMSKEdQVS02Zjg5ZGRjMC0wOTY3LWJmYjItZDAwNC1lYTg1MzhiYjMxMmE=" } ], "RegisteredDevices": { "nvidia.com/gpu": [ // 资源名: device.ID "GPU-6f89ddc0-0967-bfb2-d004-ea8538bb312a" ] } }, "Checksum": 3431416169 }
|