[toc]
网络上已经有很多介绍pod cgroup的文章,单纯的文字描述总感觉在理解其含义前面加了一层模糊的砂纸。所以想从源码层面加深这方面的认知。

pod的底层运行时在kubelet侧是通过ContaienrManager管理的,设计到容器状态的,比如cgroupdevice等都由其管理。代码位于pkg/kubelet/cm/container_manager_linux.go

cgroup的管理欧cgroupfs v1&v2及sysetmd 方式,需要先确定使用的cgroup版本。

如何判断使用的 cgroup 版本

1
2
3
stat -fc %T /sys/fs/cgroup/
- 对于cgroup v2,输出为 cgroup2fs, 也就是代码中unified统一的api格式
- 对于cgroup v1,输出为 tmpfs.

NewContainerManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func NewContainerManager() (ContainerManager, error) {


cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)

// 每一种pod qos设置一个cgroup
// 默认的cgroup名字是"kubepods",设置为pod cgroup的根目录
if nodeConfig.CgroupsPerQOS {

// kubepods 组
cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName)
}
cm := &containerManagerImpl{}
// 拓扑管理
cm.topologyManager, err = topologymanager.NewManager()

// device管理
cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)

//
cm.cpuManager, err = cpumanager.NewManager()
}

目录结构也有所体现

1
2
3
4
kubelet/cm/cpumanager
devicemanager
topologymanager

对于cpu manager支持两种cpu绑核模式

1
2
3
4
5
6
7
8
9
10
11
func NewManager() (Manager, error) {
switch policyName(cpuPolicyName) {

case PolicyNone:
policy = NewNonePolicy()

case PolicyStatic:
policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)

return &manager{policy: policy}
}

containerManagerImpl

ContaienrManager的底层实现对象是containerManagerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (cm *containerManagerImpl) Start() error{

if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {

// 启动cpu manager
// 如果是static, 则启动reconcile loop
err = cm.cpuManager.Start()

}

if err := cm.setupNode(activePods); err != nil {
return err
}
// Starts device manager.
if err := cm.deviceManager.Start(); err != nil {
return err
}
}

下面分析主要流程

setupNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (cm *containerManagerImpl) setupNode() error {


// 对容器启动基于qos的cgroup等级划分
// 即guarantee在最上层的
// burstable和best effort最低一级各自的cgroup中
if cm.NodeConfig.CgroupsPerQOS {
// 创建/sys/fs/cgroup/kubepods/目录
if err := cm.createNodeAllocatableCgroups(); err != nil {
return err
}

// 启动qos cm
// getNodeAllocatableAbsolute 是allocatable-kubereserved-systemreserved
err = cm.qosContainerManager.Start(cm.getNodeAllocatableAbsolute, activePods)
if err != nil {
return fmt.Errorf("failed to initialize top level QOS containers: %v", err)
}
}

// Enforce Node Allocatable (if required)
if err := cm.enforceNodeAllocatableCgroups(); err != nil {
return err
}

}

qosContainerManager.Start

pkg/kubelet/cm/qos_container_manager_linux.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (m *qosContainerManagerImpl) Start(getNodeAllocatable func() v1.ResourceList, activePods ActivePodsFunc) error {

// 定义burstable和besteffort的cgroup name
// kubepods/burstable
// kubepods/besteffort
qosClasses := map[v1.PodQOSClass]CgroupName{
v1.PodQOSBurstable: NewCgroupName(rootContainer, strings.ToLower(string(v1.PodQOSBurstable))),
v1.PodQOSBestEffort: NewCgroupName(rootContainer, strings.ToLower(string(v1.PodQOSBestEffort))),
}

for qosClass, containerName := range qosClasses {
// best effort的pod的CpuShares是2
if qosClass == v1.PodQOSBestEffort {
minShares := uint64(MinShares)
resourceParameters.CpuShares = &minShares
}

containerConfig := &CgroupConfig{
Name: containerName, // 容器所在的cgroup名字
ResourceParameters: resourceParameters,
}

// 主动做一次cgroup配置的更新,没有则创建;有的话就更新
if !cm.Exists(containerName) {
if err := cm.Create(containerConfig); err != nil {
return fmt.Errorf("failed to create top level %v QOS cgroup : %v", qosClass, err)
}
} else {
// to ensure we actually have the right state, we update the config on startup
if err := cm.Update(containerConfig); err != nil {
return fmt.Errorf("failed to update top level %v QOS cgroup : %v", qosClass, err)
}
}

}
// 启动一个周期任务去做更新, 但代码中怎么只有burstable和besteffort两种类型的
go wait.Until(func() {
err := m.UpdateCgroups()
if err != nil {
klog.Warningf("[ContainerManager] Failed to reserve QoS requests: %v", err)
}
}, periodicQOSCgroupUpdateInterval, wait.NeverStop)

}

enforceNodeAllocatableCgroups 创建顶层cgroup分组

node上顶层cgroup的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {

// allocatable - SystemReserved - KubeReserved
if cm.CgroupsPerQOS && nc.EnforceNodeAllocatable.Has(kubetypes.NodeAllocatableEnforcementKey) {
nodeAllocatable = cm.getNodeAllocatableInternalAbsolute()
}

// 转换一下, cgroupRoot 就是 kubepods 控制组
cgroupConfig := &CgroupConfig{
Name: cm.cgroupRoot,
ResourceParameters: getCgroupConfig(nodeAllocatable),
}

// guaranteed pod 控制组, 也就是 kubepods 控制组
if len(cm.cgroupRoot) > 0 {
go func() {
// 不停更新kubepods cgroup
for {
err := cm.cgroupManager.Update(cgroupConfig)
}()
}
// system-reserved
// 调用cgroupManager.Update(cgroupConfig) 实现
enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved);

// kube-resreved
enforceExistingCgroup(cm.cgroupManager
// 调用cgroupManager.Update(cgroupConfig) 实现
cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved)
}

kubelet的启动参数
--enforce-node-allocatable='pods','kube-reserved','system-reserved' 这三个参数控制三个cgroup的创建

  • kubepods cgroup会有控制循loop update的,node allocatable参数,其中cpu是减去两个reserved的资源
  • --kube-reserved--system-reservedcgroup, 这两个cgroup只更新一次。group的名字可以自定义,
    --system-reserved-cgroup参数, 默认是”/system.slice”,
    --kube-reserved-cgroup 参数, 默认是”/kubelet.service”,

对于cgroup中配置的参数,是通过下面方法获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func getCgroupConfig(rl v1.ResourceList) *ResourceConfig {
// TODO(vishh): Set CPU Quota if necessary.
if rl == nil {
return nil
}
var rc ResourceConfig
if q, exists := rl[v1.ResourceMemory]; exists {
// Memory is defined in bytes.
val := q.Value()
rc.Memory = &val
}
if q, exists := rl[v1.ResourceCPU]; exists {
// CPU is defined in milli-cores.
val := MilliCPUToShares(q.MilliValue())
rc.CpuShares = &val
}
if q, exists := rl[pidlimit.PIDs]; exists {
val := q.Value()
rc.PidsLimit = &val
}
rc.HugePageLimit = HugePageLimits(rl)

return &rc
}

cgroup中更新的参数是memory, cpushares, pidlimits, hugepagelimits

syncPod 更新每个pod的cgroup

在pod 创建同步流程中,也会去更新节点cgroup信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (kl *Kubelet) syncPod(o syncPodOptions) error {

pcm := kl.containerManager.NewPodContainerManager()

if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if !pcm.Exists(pod) {
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
}
if err := pcm.EnsureExists(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
}
}
}

pkg/kubelet/cm/pod_container_manager_linux.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (m *podContainerManagerImpl) EnsureExists(pod *v1.Pod) error {
// pod的cgroup名字
// 对于guarantee pod, 直接在kubepods 这个cgroup下
// 其他则是在kubepods/burstable, kubepods/besteffort
podContainerName, _ := m.GetPodContainerName(pod)
// check if container already exist
alreadyExists := m.Exists(pod)
if !alreadyExists {
// 生成cgroup的配置
// m.enforceCPULimits bool类型,一般为true
// m.cpuCFSQuotaPeriod 默认100ms
containerConfig := &CgroupConfig{
Name: podContainerName,
ResourceParameters: ResourceConfigForPod(pod, m.enforceCPULimits, m.cpuCFSQuotaPeriod),
}
if m.podPidsLimit > 0 {
containerConfig.ResourceParameters.PidsLimit = &m.podPidsLimit
}
if err := m.cgroupManager.Create(containerConfig); err != nil {
return fmt.Errorf("failed to create container for %v : %v", podContainerName, err)
}
}

if err := m.applyLimits(pod); err != nil {
return fmt.Errorf("failed to apply resource limits on container for %v : %v", podContainerName, err)
}
return nil
}

pod中定义的资源转成cgroup控制组里的配置资源

通过pcm.EnsureExists确保pod的cgroup组存在并更新pod所在的cgroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func ResourceConfigForPod(pod *v1.Pod, enforceCPULimits bool, cpuPeriod uint64) *ResourceConfig {
// sum requests and limits.
reqs, limits := resource.PodRequestsAndLimits(pod)


cpuShares := MilliCPUToShares(cpuRequests)
cpuQuota := MilliCPUToQuota(cpuLimits, int64(cpuPeriod))

// build the result
result := &ResourceConfig{}
if qosClass == v1.PodQOSGuaranteed {
result.CpuShares = &cpuShares
result.CpuQuota = &cpuQuota
result.CpuPeriod = &cpuPeriod
result.Memory = &memoryLimits
} else if qosClass == v1.PodQOSBurstable {
result.CpuShares = &cpuShares
if cpuLimitsDeclared {
result.CpuQuota = &cpuQuota
result.CpuPeriod = &cpuPeriod
}
if memoryLimitsDeclared {
result.Memory = &memoryLimits
}
} else {
shares := uint64(MinShares)
result.CpuShares = &shares
}
}

  • cpuPeriod 是默认值100ms, 对应的启动参数是--cpu-cfs-quota-period=100ms
  • cpuShares 表示有几个核可用,是pod request计算来的,是共享核时的分cpu时间片策略,我理解容器启动的进程,当物理机上节点资源紧张不得不共享核时,可以有多大的时间片分。share概念体现在共享一个核时。
  • cpuQuota 是从cpu limits计算得来, 体现在cpu.cfs_quota_us/cpu.cfs_period_us
  • memoryLimits 从memoryLimits 计算得来

对不不同QOS的pod来说

  • guarantee pod都会设置如上的参数值
  • burstable pod, CpuShares会设置,但是它的Resources.Limits设置了,才会去更新cpu quota,cpu period及memroy limit
  • besteffort pod. 只会配置cpu share且是最小值2.

3中类型的parent cgroup,以及子pod 的cgroup,还有kubereserved,systemteserved等等cgroup参数的设置
调用的都是cgroupmanager.Update()

cgroupManager.Update 最后干活

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (m *cgroupManagerImpl) Update(cgroupConfig *CgroupConfig) error {
// 将cm中的资源对象转换成runc libcontainer对象的格式
resourceConfig := cgroupConfig.ResourceParameters
resources := m.toResources(resourceConfig)

libcontainerCgroupConfig := &libcontainerconfigs.Cgroup{
Resources: resources,
}

// 每个cgroup名字,构建其子系统的挂载点,比如cpu、memory、cpuset
libcontainerCgroupConfig.Paths = m.buildCgroupPaths(cgroupConfig.Name)

setSupportedSubsystemsV1(libcontainerCgroupConfig)
}

调用每一种资源的Set()方法去设置。
func setSupportedSubsystemsV1(cgroupConfig *libcontainerconfigs.Cgroup) error {
for sys, required := range getSupportedSubsystems() {
if _, ok := cgroupConfig.Paths[sys.Name()]; !ok {
if required {
return fmt.Errorf("failed to find subsystem mount for required subsystem: %v", sys.Name())
}
// the cgroup is not mounted, but its not required so continue...
klog.V(6).Infof("Unable to find subsystem mount for optional subsystem: %v", sys.Name())
continue
}
if err := sys.Set(cgroupConfig.Paths[sys.Name()], cgroupConfig); err != nil {
return fmt.Errorf("failed to set config for supported subsystems : %v", err)
}
}
return nil
}

cpuset为例子,
在我集群环境使用的是cgroup v1,运行时是runc.

代码位于vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs/cpuset.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (s *CpusetGroup) Set(path string, cgroup *configs.Cgroup) error {
if cgroup.Resources.CpusetCpus != "" {
if err := fscommon.WriteFile(path, "cpuset.cpus", cgroup.Resources.CpusetCpus); err != nil {
return err
}
}
if cgroup.Resources.CpusetMems != "" {
if err := fscommon.WriteFile(path, "cpuset.mems", cgroup.Resources.CpusetMems); err != nil {
return err
}
}
return nil
}

func WriteFile(dir, file, data string) error {
if dir == "" {
return errors.Errorf("no directory specified for %s", file)
}
path, err := securejoin.SecureJoin(dir, file)
if err != nil {
return err
}
if err := retryingWriteFile(path, []byte(data), 0700); err != nil {
return errors.Wrapf(err, "failed to write %q to %q", data, path)
}
return nil
}

func retryingWriteFile(filename string, data []byte, perm os.FileMode) error {
for {
err := ioutil.WriteFile(filename, data, perm)
if errors.Is(err, unix.EINTR) {
logrus.Infof("interrupted while writing %s to %s", string(data), filename)
continue
}
return err
}
}

调用运行时库,比如runc的libcontainer,库中已经封装好了更新的方法。
方法也很简单,调用go内置的ioutil.WriteFile方法去更新cgroup文件。

device manager

cm 除了维护cgroup外,还负责container device的管理,这是由 device manager 完成。

代码位于pkg/kubelet/cm/devicemanager/manager.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (m *ManagerImpl) Start() error {

err := m.readCheckpoint()

s, err := net.Listen("unix", socketPath)


// 注册服务, 服务提供者是ManagerImpl
// device plugin 会发送注册请求过来
m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...)

pluginapi.RegisterRegistrationServer(m.server, m)
go func() {
defer m.wg.Done()
m.server.Serve(s)
}()

}

这里cm创建了一个grpc server,并将 m 注册后service backend
device manager它会接收device plugin侧的RegisterRequest,将device 注册到kubelet中。

以及ListAndWatchAllocation方法,在其他文章进行详细分析

总结

  1. cpu policy是none时,绑定所有核,相当于没有绑。guarantee pod的cpu也是没有绑定的。
  2. cpu policy是static时, 开启绑核。cpu共享池是节点上所有cpu - guarantee pod后剩余的, 注意reserve资源是仍在pool里的。guarantee pod绑定的核,不会再绑给其他pod。
  3. describe node可以看到节点资源划分。统计的是被pod占用的资源。比如terminating pod仍然是资源的。
  4. burstable 占用节点资源是0,所以不保证持久运行,burstable占用节点资源是按reqeust值算的,guarantee占用节点资源按request/limit计算,因为后面两者节点的资源在计算节点资源划分时是剪去了,也相当于在该node上预留好了位置,所以服务质量有保证的。对于burstable pod完全是按照0资源去计算的,压根就没留资源位置。资源不足时,很容易被驱逐。