[toc] 网络上已经有很多介绍pod cgroup的文章,单纯的文字描述总感觉在理解其含义前面加了一层模糊的砂纸。所以想从源码层面加深这方面的认知。
pod的底层运行时在kubelet侧是通过ContaienrManager管理的,设计到容器状态的,比如cgroup,device等都由其管理。代码位于pkg/kubelet/cm/container_manager_linux.go
cgroup的管理欧cgroupfs v1&v2及sysetmd 方式,需要先确定使用的cgroup版本。
如何判断使用的 cgroup 版本
 
1 2 3 stat -fc %T /sys/fs/cgroup/ - 对于cgroup v2,输出为 cgroup2fs, 也就是代码中unified统一的api格式 - 对于cgroup v1,输出为 tmpfs. 
 
NewContainerManager 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func NewContainerManager() (ContainerManager, error) { 	cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver) 	 	// 每一种pod qos设置一个cgroup 	// 默认的cgroup名字是"kubepods",设置为pod cgroup的根目录 	if nodeConfig.CgroupsPerQOS { 	 	    // kubepods 组 		cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName) 	} 	cm := &containerManagerImpl{} 	// 拓扑管理 	cm.topologyManager, err = topologymanager.NewManager() 	 	// device管理 	cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager) 	     //  	cm.cpuManager, err = cpumanager.NewManager() } 
 
目录结构也有所体现
1 2 3 4 kubelet/cm/cpumanager            devicemanager            topologymanager 
 
对于cpu manager支持两种cpu绑核模式
1 2 3 4 5 6 7 8 9 10 11 func NewManager() (Manager, error) { 	switch policyName(cpuPolicyName) { 	case PolicyNone: 		policy = NewNonePolicy() 	 	case PolicyStatic:	 		policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)     return  &manager{policy: policy} } 
 
containerManagerImpl ContaienrManager的底层实现对象是containerManagerImpl  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (cm *containerManagerImpl) Start() error{      	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { 	 	// 启动cpu manager 	// 如果是static, 则启动reconcile loop 	err = cm.cpuManager.Start() 	 	} 	 	if err := cm.setupNode(activePods); err != nil { 	return err } // Starts device manager. if err := cm.deviceManager.Start(); err != nil { 	return err } } 
下面分析主要流程
setupNode  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (cm *containerManagerImpl) setupNode() error {     // 对容器启动基于qos的cgroup等级划分     // 即guarantee在最上层的     // burstable和best effort最低一级各自的cgroup中 	if cm.NodeConfig.CgroupsPerQOS { 	    // 创建/sys/fs/cgroup/kubepods/目录 	if err := cm.createNodeAllocatableCgroups(); err != nil { 		return err 	} 	 	// 启动qos cm 	// getNodeAllocatableAbsolute 是allocatable-kubereserved-systemreserved 	err = cm.qosContainerManager.Start(cm.getNodeAllocatableAbsolute, activePods) 	if err != nil { 		return fmt.Errorf("failed to initialize top level QOS containers: %v", err) 	} } 	// Enforce Node Allocatable (if required) if err := cm.enforceNodeAllocatableCgroups(); err != nil { 	return err } } 
 
qosContainerManager.Start pkg/kubelet/cm/qos_container_manager_linux.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func (m *qosContainerManagerImpl) Start(getNodeAllocatable func() v1.ResourceList, activePods ActivePodsFunc) error {     // 定义burstable和besteffort的cgroup name     // kubepods/burstable      // kubepods/besteffort 	qosClasses := map[v1.PodQOSClass]CgroupName{ 		v1.PodQOSBurstable:  NewCgroupName(rootContainer, strings.ToLower(string(v1.PodQOSBurstable))), 		v1.PodQOSBestEffort: NewCgroupName(rootContainer, strings.ToLower(string(v1.PodQOSBestEffort))), 	} 	 	for qosClass, containerName := range qosClasses { 	    // best effort的pod的CpuShares是2 		if qosClass == v1.PodQOSBestEffort { 			minShares := uint64(MinShares) 			resourceParameters.CpuShares = &minShares 		} 		 		containerConfig := &CgroupConfig{ 			Name:               containerName,  // 容器所在的cgroup名字 			ResourceParameters: resourceParameters, 		} 		 		// 主动做一次cgroup配置的更新,没有则创建;有的话就更新 		if !cm.Exists(containerName) { 			if err := cm.Create(containerConfig); err != nil { 				return fmt.Errorf("failed to create top level %v QOS cgroup : %v", qosClass, err) 			} 		} else { 			// to ensure we actually have the right state, we update the config on startup 			if err := cm.Update(containerConfig); err != nil { 				return fmt.Errorf("failed to update top level %v QOS cgroup : %v", qosClass, err) 			} 		} 	 	} 	// 启动一个周期任务去做更新, 但代码中怎么只有burstable和besteffort两种类型的 	go wait.Until(func() { 		err := m.UpdateCgroups() 		if err != nil { 			klog.Warningf("[ContainerManager] Failed to reserve QoS requests: %v", err) 		} 	}, periodicQOSCgroupUpdateInterval, wait.NeverStop) 	 } 
 
enforceNodeAllocatableCgroups 创建顶层cgroup分组 node上顶层cgroup的创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30  func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {     // allocatable - SystemReserved - KubeReserved 	if cm.CgroupsPerQOS && nc.EnforceNodeAllocatable.Has(kubetypes.NodeAllocatableEnforcementKey) { 		nodeAllocatable = cm.getNodeAllocatableInternalAbsolute() 	} 	 	// 转换一下, cgroupRoot 就是 kubepods 控制组 	cgroupConfig := &CgroupConfig{ 		Name:               cm.cgroupRoot, 		ResourceParameters: getCgroupConfig(nodeAllocatable), 	} 	 	// guaranteed pod 控制组, 也就是 kubepods 控制组 	if len(cm.cgroupRoot) > 0 { 		go func() { 		    // 不停更新kubepods cgroup 			for { 				err := cm.cgroupManager.Update(cgroupConfig) 			}()         }     // system-reserved     // 调用cgroupManager.Update(cgroupConfig) 实现     enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved);          // kube-resreved     enforceExistingCgroup(cm.cgroupManager     // 调用cgroupManager.Update(cgroupConfig) 实现     cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved) } 
 
kubelet的启动参数--enforce-node-allocatable='pods','kube-reserved','system-reserved' 这三个参数控制三个cgroup的创建
kubepods cgroup会有控制循loop update的,node allocatable参数,其中cpu是减去两个reserved的资源 
--kube-reserved 和 --system-reservedcgroup, 这两个cgroup只更新一次。group的名字可以自定义,--system-reserved-cgroup参数, 默认是”/system.slice”,--kube-reserved-cgroup 参数, 默认是”/kubelet.service”, 
 
对于cgroup中配置的参数,是通过下面方法获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func getCgroupConfig(rl v1.ResourceList) *ResourceConfig { 	// TODO(vishh): Set CPU Quota if necessary. 	if rl == nil { 		return nil 	} 	var rc ResourceConfig 	if q, exists := rl[v1.ResourceMemory]; exists { 		// Memory is defined in bytes. 		val := q.Value() 		rc.Memory = &val 	} 	if q, exists := rl[v1.ResourceCPU]; exists { 		// CPU is defined in milli-cores. 		val := MilliCPUToShares(q.MilliValue()) 		rc.CpuShares = &val 	} 	if q, exists := rl[pidlimit.PIDs]; exists { 		val := q.Value() 		rc.PidsLimit = &val 	} 	rc.HugePageLimit = HugePageLimits(rl) 	return &rc } 
 
cgroup中更新的参数是memory, cpushares, pidlimits, hugepagelimits
syncPod 更新每个pod的cgroup 在pod 创建同步流程中,也会去更新节点cgroup信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (kl *Kubelet) syncPod(o syncPodOptions) error { 	pcm := kl.containerManager.NewPodContainerManager() 	     if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {     	if !pcm.Exists(pod) {     		if err := kl.containerManager.UpdateQOSCgroups(); err != nil {     			klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)     		}     		if err := pcm.EnsureExists(pod); err != nil {     			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)     			return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)     		}     	}     } } 
 
pkg/kubelet/cm/pod_container_manager_linux.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (m *podContainerManagerImpl) EnsureExists(pod *v1.Pod) error {     // pod的cgroup名字     // 对于guarantee pod, 直接在kubepods 这个cgroup下     // 其他则是在kubepods/burstable, kubepods/besteffort 	podContainerName, _ := m.GetPodContainerName(pod) 	// check if container already exist 	alreadyExists := m.Exists(pod) 	if !alreadyExists { 		// 生成cgroup的配置 		// m.enforceCPULimits bool类型,一般为true 		// m.cpuCFSQuotaPeriod 默认100ms 		containerConfig := &CgroupConfig{ 			Name:               podContainerName, 			ResourceParameters: ResourceConfigForPod(pod, m.enforceCPULimits, m.cpuCFSQuotaPeriod), 		} 		if m.podPidsLimit > 0 { 			containerConfig.ResourceParameters.PidsLimit = &m.podPidsLimit 		} 		if err := m.cgroupManager.Create(containerConfig); err != nil { 			return fmt.Errorf("failed to create container for %v : %v", podContainerName, err) 		} 	} 	if err := m.applyLimits(pod); err != nil { 		return fmt.Errorf("failed to apply resource limits on container for %v : %v", podContainerName, err) 	} 	return nil } 
 
pod中定义的资源转成cgroup控制组里的配置资源
通过pcm.EnsureExists确保pod的cgroup组存在并更新pod所在的cgroup
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func ResourceConfigForPod(pod *v1.Pod, enforceCPULimits bool, cpuPeriod uint64) *ResourceConfig { 	// sum requests and limits. 	reqs, limits := resource.PodRequestsAndLimits(pod) 	 	 	cpuShares := MilliCPUToShares(cpuRequests) 	cpuQuota := MilliCPUToQuota(cpuLimits, int64(cpuPeriod)) 	 	// build the result 	result := &ResourceConfig{} 	if qosClass == v1.PodQOSGuaranteed { 		result.CpuShares = &cpuShares 		result.CpuQuota = &cpuQuota 		result.CpuPeriod = &cpuPeriod 		result.Memory = &memoryLimits 	} else if qosClass == v1.PodQOSBurstable { 		result.CpuShares = &cpuShares 		if cpuLimitsDeclared { 			result.CpuQuota = &cpuQuota 			result.CpuPeriod = &cpuPeriod 		} 		if memoryLimitsDeclared { 			result.Memory = &memoryLimits 		} 	} else { 		shares := uint64(MinShares) 		result.CpuShares = &shares 	} } 
 
cpuPeriod 是默认值100ms, 对应的启动参数是--cpu-cfs-quota-period=100ms 
cpuShares 表示有几个核可用,是pod request计算来的,是共享核时的分cpu时间片策略,我理解容器启动的进程,当物理机上节点资源紧张不得不共享核时,可以有多大的时间片分。share概念体现在共享一个核时。 
cpuQuota  是从cpu limits计算得来, 体现在cpu.cfs_quota_us/cpu.cfs_period_us 
memoryLimits 从memoryLimits 计算得来 
 
对不不同QOS的pod来说
guarantee pod都会设置如上的参数值 
burstable pod, CpuShares会设置,但是它的Resources.Limits设置了,才会去更新cpu quota,cpu period及memroy limit 
besteffort pod. 只会配置cpu share且是最小值2. 
 
3中类型的parent cgroup,以及子pod 的cgroup,还有kubereserved,systemteserved等等cgroup参数的设置 调用的都是cgroupmanager.Update()
cgroupManager.Update 最后干活 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (m *cgroupManagerImpl) Update(cgroupConfig *CgroupConfig) error {     // 将cm中的资源对象转换成runc libcontainer对象的格式 	resourceConfig := cgroupConfig.ResourceParameters 	resources := m.toResources(resourceConfig) 	libcontainerCgroupConfig := &libcontainerconfigs.Cgroup{ 		Resources: resources, 	} 	 	// 每个cgroup名字,构建其子系统的挂载点,比如cpu、memory、cpuset 	libcontainerCgroupConfig.Paths = m.buildCgroupPaths(cgroupConfig.Name) 	 	setSupportedSubsystemsV1(libcontainerCgroupConfig) } 调用每一种资源的Set()方法去设置。 func setSupportedSubsystemsV1(cgroupConfig *libcontainerconfigs.Cgroup) error { 	for sys, required := range getSupportedSubsystems() { 		if _, ok := cgroupConfig.Paths[sys.Name()]; !ok { 			if required { 				return fmt.Errorf("failed to find subsystem mount for required subsystem: %v", sys.Name()) 			} 			// the cgroup is not mounted, but its not required so continue... 			klog.V(6).Infof("Unable to find subsystem mount for optional subsystem: %v", sys.Name()) 			continue 		} 		if err := sys.Set(cgroupConfig.Paths[sys.Name()], cgroupConfig); err != nil { 			return fmt.Errorf("failed to set config for supported subsystems : %v", err) 		} 	} 	return nil } 
 
以cpuset为例子, 在我集群环境使用的是cgroup v1,运行时是runc.
代码位于vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs/cpuset.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (s *CpusetGroup) Set(path string, cgroup *configs.Cgroup) error { 	if cgroup.Resources.CpusetCpus != "" { 		if err := fscommon.WriteFile(path, "cpuset.cpus", cgroup.Resources.CpusetCpus); err != nil { 			return err 		} 	} 	if cgroup.Resources.CpusetMems != "" { 		if err := fscommon.WriteFile(path, "cpuset.mems", cgroup.Resources.CpusetMems); err != nil { 			return err 		} 	} 	return nil } func WriteFile(dir, file, data string) error { 	if dir == "" { 		return errors.Errorf("no directory specified for %s", file) 	} 	path, err := securejoin.SecureJoin(dir, file) 	if err != nil { 		return err 	} 	if err := retryingWriteFile(path, []byte(data), 0700); err != nil { 		return errors.Wrapf(err, "failed to write %q to %q", data, path) 	} 	return nil } func retryingWriteFile(filename string, data []byte, perm os.FileMode) error { 	for { 		err := ioutil.WriteFile(filename, data, perm) 		if errors.Is(err, unix.EINTR) { 			logrus.Infof("interrupted while writing %s to %s", string(data), filename) 			continue 		} 		return err 	} } 
 
调用运行时库,比如runc的libcontainer,库中已经封装好了更新的方法。 方法也很简单,调用go内置的ioutil.WriteFile方法去更新cgroup文件。
device manager cm 除了维护cgroup外,还负责container device的管理,这是由 device manager 完成。
代码位于pkg/kubelet/cm/devicemanager/manager.go  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (m *ManagerImpl) Start() error { 	err := m.readCheckpoint() 	 	s, err := net.Listen("unix", socketPath) 	 	 	// 注册服务, 服务提供者是ManagerImpl 	// device plugin 会发送注册请求过来 	m.wg.Add(1) m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterRegistrationServer(m.server, m) go func() { 	defer m.wg.Done() 	m.server.Serve(s) }() } 
 这里cm创建了一个grpc server,并将 m 注册后service backend。device manager它会接收device plugin侧的RegisterRequest,将device 注册到kubelet中。
以及ListAndWatch和Allocation方法,在其他文章进行详细分析
总结 
cpu policy是none时,绑定所有核,相当于没有绑。guarantee pod的cpu也是没有绑定的。 
cpu policy是static时, 开启绑核。cpu共享池是节点上所有cpu - guarantee pod后剩余的, 注意reserve资源是仍在pool里的。guarantee pod绑定的核,不会再绑给其他pod。 
describe node可以看到节点资源划分。统计的是被pod占用的资源。比如terminating pod仍然是资源的。 
burstable 占用节点资源是0,所以不保证持久运行,burstable占用节点资源是按reqeust值算的,guarantee占用节点资源按request/limit计算,因为后面两者节点的资源在计算节点资源划分时是剪去了,也相当于在该node上预留好了位置,所以服务质量有保证的。对于burstable pod完全是按照0资源去计算的,压根就没留资源位置。资源不足时,很容易被驱逐。