[toc] 网络上已经有很多介绍pod cgroup的文章,单纯的文字描述总感觉在理解其含义前面加了一层模糊的砂纸。所以想从源码层面加深这方面的认知。
pod的底层运行时在kubelet侧是通过ContaienrManager
管理的,设计到容器状态的,比如cgroup
,device
等都由其管理。代码位于pkg/kubelet/cm/container_manager_linux.go
cgroup的管理欧cgroupfs v1&v2及sysetmd 方式,需要先确定使用的cgroup版本。
如何判断使用的 cgroup 版本
1 2 3 stat -fc %T /sys/fs/cgroup/ - 对于cgroup v2,输出为 cgroup2fs, 也就是代码中unified统一的api格式 - 对于cgroup v1,输出为 tmpfs.
NewContainerManager 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func NewContainerManager() (ContainerManager, error) { cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver) // 每一种pod qos设置一个cgroup // 默认的cgroup名字是"kubepods",设置为pod cgroup的根目录 if nodeConfig.CgroupsPerQOS { // kubepods 组 cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName) } cm := &containerManagerImpl{} // 拓扑管理 cm.topologyManager, err = topologymanager.NewManager() // device管理 cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager) // cm.cpuManager, err = cpumanager.NewManager() }
目录结构也有所体现
1 2 3 4 kubelet/cm/cpumanager devicemanager topologymanager
对于cpu manager
支持两种cpu绑核模式
1 2 3 4 5 6 7 8 9 10 11 func NewManager() (Manager, error) { switch policyName(cpuPolicyName) { case PolicyNone: policy = NewNonePolicy() case PolicyStatic: policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity) return &manager{policy: policy} }
containerManagerImpl ContaienrManager
的底层实现对象是containerManagerImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (cm *containerManagerImpl) Start() error{ if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { // 启动cpu manager // 如果是static, 则启动reconcile loop err = cm.cpuManager.Start() } if err := cm.setupNode(activePods); err != nil { return err } // Starts device manager. if err := cm.deviceManager.Start(); err != nil { return err } }
下面分析主要流程
setupNode 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (cm *containerManagerImpl) setupNode() error { // 对容器启动基于qos的cgroup等级划分 // 即guarantee在最上层的 // burstable和best effort最低一级各自的cgroup中 if cm.NodeConfig.CgroupsPerQOS { // 创建/sys/fs/cgroup/kubepods/目录 if err := cm.createNodeAllocatableCgroups(); err != nil { return err } // 启动qos cm // getNodeAllocatableAbsolute 是allocatable-kubereserved-systemreserved err = cm.qosContainerManager.Start(cm.getNodeAllocatableAbsolute, activePods) if err != nil { return fmt.Errorf("failed to initialize top level QOS containers: %v", err) } } // Enforce Node Allocatable (if required) if err := cm.enforceNodeAllocatableCgroups(); err != nil { return err } }
qosContainerManager.Start pkg/kubelet/cm/qos_container_manager_linux.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func (m *qosContainerManagerImpl) Start(getNodeAllocatable func() v1.ResourceList, activePods ActivePodsFunc) error { // 定义burstable和besteffort的cgroup name // kubepods/burstable // kubepods/besteffort qosClasses := map[v1.PodQOSClass]CgroupName{ v1.PodQOSBurstable: NewCgroupName(rootContainer, strings.ToLower(string(v1.PodQOSBurstable))), v1.PodQOSBestEffort: NewCgroupName(rootContainer, strings.ToLower(string(v1.PodQOSBestEffort))), } for qosClass, containerName := range qosClasses { // best effort的pod的CpuShares是2 if qosClass == v1.PodQOSBestEffort { minShares := uint64(MinShares) resourceParameters.CpuShares = &minShares } containerConfig := &CgroupConfig{ Name: containerName, // 容器所在的cgroup名字 ResourceParameters: resourceParameters, } // 主动做一次cgroup配置的更新,没有则创建;有的话就更新 if !cm.Exists(containerName) { if err := cm.Create(containerConfig); err != nil { return fmt.Errorf("failed to create top level %v QOS cgroup : %v", qosClass, err) } } else { // to ensure we actually have the right state, we update the config on startup if err := cm.Update(containerConfig); err != nil { return fmt.Errorf("failed to update top level %v QOS cgroup : %v", qosClass, err) } } } // 启动一个周期任务去做更新, 但代码中怎么只有burstable和besteffort两种类型的 go wait.Until(func() { err := m.UpdateCgroups() if err != nil { klog.Warningf("[ContainerManager] Failed to reserve QoS requests: %v", err) } }, periodicQOSCgroupUpdateInterval, wait.NeverStop) }
enforceNodeAllocatableCgroups 创建顶层cgroup分组 node上顶层cgroup的创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { // allocatable - SystemReserved - KubeReserved if cm.CgroupsPerQOS && nc.EnforceNodeAllocatable.Has(kubetypes.NodeAllocatableEnforcementKey) { nodeAllocatable = cm.getNodeAllocatableInternalAbsolute() } // 转换一下, cgroupRoot 就是 kubepods 控制组 cgroupConfig := &CgroupConfig{ Name: cm.cgroupRoot, ResourceParameters: getCgroupConfig(nodeAllocatable), } // guaranteed pod 控制组, 也就是 kubepods 控制组 if len(cm.cgroupRoot) > 0 { go func() { // 不停更新kubepods cgroup for { err := cm.cgroupManager.Update(cgroupConfig) }() } // system-reserved // 调用cgroupManager.Update(cgroupConfig) 实现 enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved); // kube-resreved enforceExistingCgroup(cm.cgroupManager // 调用cgroupManager.Update(cgroupConfig) 实现 cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved) }
kubelet的启动参数--enforce-node-allocatable='pods','kube-reserved','system-reserved'
这三个参数控制三个cgroup的创建
kubepods
cgroup会有控制循loop update的,node allocatable
参数,其中cpu是减去两个reserved的资源
--kube-reserved
和 --system-reserved
cgroup, 这两个cgroup只更新一次。group的名字可以自定义,--system-reserved-cgroup
参数, 默认是”/system.slice”,--kube-reserved-cgroup
参数, 默认是”/kubelet.service”,
对于cgroup中配置的参数,是通过下面方法获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func getCgroupConfig(rl v1.ResourceList) *ResourceConfig { // TODO(vishh): Set CPU Quota if necessary. if rl == nil { return nil } var rc ResourceConfig if q, exists := rl[v1.ResourceMemory]; exists { // Memory is defined in bytes. val := q.Value() rc.Memory = &val } if q, exists := rl[v1.ResourceCPU]; exists { // CPU is defined in milli-cores. val := MilliCPUToShares(q.MilliValue()) rc.CpuShares = &val } if q, exists := rl[pidlimit.PIDs]; exists { val := q.Value() rc.PidsLimit = &val } rc.HugePageLimit = HugePageLimits(rl) return &rc }
cgroup中更新的参数是memory, cpushares, pidlimits, hugepagelimits
syncPod 更新每个pod的cgroup 在pod 创建同步流程中,也会去更新节点cgroup信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (kl *Kubelet) syncPod(o syncPodOptions) error { pcm := kl.containerManager.NewPodContainerManager() if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) { if !pcm.Exists(pod) { if err := kl.containerManager.UpdateQOSCgroups(); err != nil { klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err) } if err := pcm.EnsureExists(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err) return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) } } } }
pkg/kubelet/cm/pod_container_manager_linux.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (m *podContainerManagerImpl) EnsureExists(pod *v1.Pod) error { // pod的cgroup名字 // 对于guarantee pod, 直接在kubepods 这个cgroup下 // 其他则是在kubepods/burstable, kubepods/besteffort podContainerName, _ := m.GetPodContainerName(pod) // check if container already exist alreadyExists := m.Exists(pod) if !alreadyExists { // 生成cgroup的配置 // m.enforceCPULimits bool类型,一般为true // m.cpuCFSQuotaPeriod 默认100ms containerConfig := &CgroupConfig{ Name: podContainerName, ResourceParameters: ResourceConfigForPod(pod, m.enforceCPULimits, m.cpuCFSQuotaPeriod), } if m.podPidsLimit > 0 { containerConfig.ResourceParameters.PidsLimit = &m.podPidsLimit } if err := m.cgroupManager.Create(containerConfig); err != nil { return fmt.Errorf("failed to create container for %v : %v", podContainerName, err) } } if err := m.applyLimits(pod); err != nil { return fmt.Errorf("failed to apply resource limits on container for %v : %v", podContainerName, err) } return nil }
pod中定义的资源转成cgroup控制组里的配置资源
通过pcm.EnsureExists确保pod的cgroup组存在并更新pod所在的cgroup
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func ResourceConfigForPod(pod *v1.Pod, enforceCPULimits bool, cpuPeriod uint64) *ResourceConfig { // sum requests and limits. reqs, limits := resource.PodRequestsAndLimits(pod) cpuShares := MilliCPUToShares(cpuRequests) cpuQuota := MilliCPUToQuota(cpuLimits, int64(cpuPeriod)) // build the result result := &ResourceConfig{} if qosClass == v1.PodQOSGuaranteed { result.CpuShares = &cpuShares result.CpuQuota = &cpuQuota result.CpuPeriod = &cpuPeriod result.Memory = &memoryLimits } else if qosClass == v1.PodQOSBurstable { result.CpuShares = &cpuShares if cpuLimitsDeclared { result.CpuQuota = &cpuQuota result.CpuPeriod = &cpuPeriod } if memoryLimitsDeclared { result.Memory = &memoryLimits } } else { shares := uint64(MinShares) result.CpuShares = &shares } }
cpuPeriod 是默认值100ms, 对应的启动参数是--cpu-cfs-quota-period=100ms
cpuShares 表示有几个核可用,是pod request计
算来的,是共享核时的分cpu时间片策略,我理解容器启动的进程,当物理机上节点资源紧张不得不共享核时,可以有多大的时间片分。share概念体现在共享一个核时。
cpuQuota 是从cpu limits计算得来, 体现在cpu.cfs_quota_us/cpu.cfs_period_us
memoryLimits 从memoryLimits 计算得来
对不不同QOS的pod来说
guarantee pod都会设置如上的参数值
burstable pod, CpuShares会设置,但是它的Resources.Limits设置了,才会去更新cpu quota,cpu period及memroy limit
besteffort pod. 只会配置cpu share且是最小值2.
3中类型的parent cgroup,以及子pod 的cgroup,还有kubereserved,systemteserved等等cgroup参数的设置 调用的都是cgroupmanager.Update()
cgroupManager.Update 最后干活 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (m *cgroupManagerImpl) Update(cgroupConfig *CgroupConfig) error { // 将cm中的资源对象转换成runc libcontainer对象的格式 resourceConfig := cgroupConfig.ResourceParameters resources := m.toResources(resourceConfig) libcontainerCgroupConfig := &libcontainerconfigs.Cgroup{ Resources: resources, } // 每个cgroup名字,构建其子系统的挂载点,比如cpu、memory、cpuset libcontainerCgroupConfig.Paths = m.buildCgroupPaths(cgroupConfig.Name) setSupportedSubsystemsV1(libcontainerCgroupConfig) } 调用每一种资源的Set()方法去设置。 func setSupportedSubsystemsV1(cgroupConfig *libcontainerconfigs.Cgroup) error { for sys, required := range getSupportedSubsystems() { if _, ok := cgroupConfig.Paths[sys.Name()]; !ok { if required { return fmt.Errorf("failed to find subsystem mount for required subsystem: %v", sys.Name()) } // the cgroup is not mounted, but its not required so continue... klog.V(6).Infof("Unable to find subsystem mount for optional subsystem: %v", sys.Name()) continue } if err := sys.Set(cgroupConfig.Paths[sys.Name()], cgroupConfig); err != nil { return fmt.Errorf("failed to set config for supported subsystems : %v", err) } } return nil }
以cpuset
为例子, 在我集群环境使用的是cgroup v1
,运行时是runc
.
代码位于vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs/cpuset.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (s *CpusetGroup) Set(path string, cgroup *configs.Cgroup) error { if cgroup.Resources.CpusetCpus != "" { if err := fscommon.WriteFile(path, "cpuset.cpus", cgroup.Resources.CpusetCpus); err != nil { return err } } if cgroup.Resources.CpusetMems != "" { if err := fscommon.WriteFile(path, "cpuset.mems", cgroup.Resources.CpusetMems); err != nil { return err } } return nil } func WriteFile(dir, file, data string) error { if dir == "" { return errors.Errorf("no directory specified for %s", file) } path, err := securejoin.SecureJoin(dir, file) if err != nil { return err } if err := retryingWriteFile(path, []byte(data), 0700); err != nil { return errors.Wrapf(err, "failed to write %q to %q", data, path) } return nil } func retryingWriteFile(filename string, data []byte, perm os.FileMode) error { for { err := ioutil.WriteFile(filename, data, perm) if errors.Is(err, unix.EINTR) { logrus.Infof("interrupted while writing %s to %s", string(data), filename) continue } return err } }
调用运行时库,比如runc的libcontainer
,库中已经封装好了更新的方法。 方法也很简单,调用go内置的ioutil.WriteFile
方法去更新cgroup文件。
device manager cm 除了维护cgroup外,还负责container device的管理,这是由 device manager
完成。
代码位于pkg/kubelet/cm/devicemanager/manager.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (m *ManagerImpl) Start() error { err := m.readCheckpoint() s, err := net.Listen("unix", socketPath) // 注册服务, 服务提供者是ManagerImpl // device plugin 会发送注册请求过来 m.wg.Add(1) m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterRegistrationServer(m.server, m) go func() { defer m.wg.Done() m.server.Serve(s) }() }
这里cm
创建了一个grpc server
,并将 m
注册后service backend
。device manager
它会接收device plugin
侧的RegisterRequest
,将device 注册到kubelet中。
以及ListAndWatch
和Allocation
方法,在其他文章进行详细分析
总结
cpu policy
是none时,绑定所有核,相当于没有绑。guarantee pod的cpu也是没有绑定的。
cpu policy
是static时, 开启绑核。cpu共享池是节点上所有cpu - guarantee pod后剩余的, 注意reserve资源是仍在pool里的。guarantee pod绑定的核,不会再绑给其他pod。
describe node可以看到节点资源划分。统计的是被pod占用的资源。比如terminating pod
仍然是资源的。
burstable 占用节点资源是0,所以不保证持久运行,burstable占用节点资源是按reqeust值算的,guarantee占用节点资源按request/limit计算,因为后面两者节点的资源在计算节点资源划分时是剪去了,也相当于在该node上预留好了位置,所以服务质量有保证的。对于burstable pod完全是按照0资源去计算的,压根就没留资源位置。资源不足时,很容易被驱逐。