img.png

kubelet call cni

kubelet.run

启动kubelet的各个Manager,

注意syncLoop()这个pod状态同步是个for{}形的死循环。

因为需要将依赖的各个控制器先启动起来,才能进行pod的syncLoop()控制。所以syncLoop()是在最后启动

相关代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
kl.initializeModules()
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

下面这三个的启动条件是kubeClient!=nil, 因为涉及到与kube-apiserver的通信,上报节点状态,node not ready是这里设置的
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
go kl.nodeLeaseController.Run(wait.NeverStop)

// 调用运行时状态handler,containerRuntime.Status(), 运行时的状态包括两部分, runtime is up(可以创建容器) + runtime network is up(可以为容器配置网络), 写入到runtimeState属性中
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
=> initializeRuntimeDependentModules 当运行时正常,则初始化运行时所需要的模块(单例模式,值得学习),
这里包括cadvisor, StatsProvider,containerManager,evictionManager,pluginManager(csi and device plugin), 运行时好了,就可以创建pod、containers及volume, device, stats等信息。
kl.initNetworkUtil()
// watch channel, 写入端时在syncloop收到删除pod事件
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()
kl.runtimeClassManager.Start(wait.NeverStop)

// Start the pod lifecycle event generator.
kl.pleg.Start()

最后才是启动pod的synchronization状态同步
kl.syncLoop(updates, kl)

syncLoop 调度核心

主体逻辑是在for循环中的syncLoopIteration,它有5个参数

  • updates: 改动即updates,所以也叫config channel,watches for changes from three channels (file, apiserver, and http)。
    • file对static pod的改动
    • apiserver是apply新的改动或者kubectl edit xxx
    • http更新,不依赖etcd或者apiserver来创建pod
      • 在Kubernetes集群之外管理Pod: 如果需要从非Kubernetes系统(如自定义工具或其他云平台)管理Pod,则可以使用HTTP方式来提交Pod定义。这样做不需要直接与Kubernetes API服务器交互,也不需要在节点上部署额外的组件。
      • 动态生成Pod定义, 不用生成yaml或者json的file再apply这样了,直接内存数据发请求就可以。
      • 应对临时性存储后端:某些存储后端(如etcd)可以将Pod定义作为HTTP响应提供。在这种情况下,使用HTTP可以使kubelet无需连接到etcd实例,而是可以直接从HTTP端点获取Pod定义。这种方法还允许更容易地切换到另一个存储后端,因为只需要简单地更改HTTP端点的URL地址即可
  • handler:就是kubelet本身,调用kubelet的方法来处理事件。
  • syncTicker
    • 周期任务,1s,
    • 保证pod的asw始终是dsw
  • housekeepingTicker,
    • 周期任务,2s
    • 为pod打扫卫生,pod依赖的资源需要清理,比如volume
  • plegCh
    • pod生命周期(创建、启动、停止、删除等)产生事件,事件驱动流程,这非常适合用channel去实现。
    • 主动触发pod的同步,asw和dsw对齐, 不依赖周期性的sync
    • 周期性向container runtime查询当前所有容器的状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.Info("Starting kubelet main sync loop.")

// 同步定时器,1s一次,为了保证pod的asw始终是是dsw状态
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()

// 全局清理定时器,2s一次,Period for performing global cleanup tasks.
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()

// 返回channle,让订阅者可以接收到PodLifecycleEvent事件
// 主角pleg, 定时relist容器变化
// 是主动上报的,
// pod生命周期包括,Pod的创建、启动、停止、删除等
// 当发生这些事件,主动触发pod的同步,asw和dsw对齐。
plegCh := kl.pleg.Watch()

for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
// 为什么要先检查底层运行时状态?
// 因为要检查依赖条件是否满足,如果异常,需要忽略
// 底层运行时的状态都是通过kl.runtimeState这个对象维护的
klog.Errorf("skipping pod synchronization - %v", err)
// exponential backoff
// 指数型推迟,100ms->200ms->400ms,最大是5s
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
// reset backoff if we have a success
// 重置推迟时间
duration = base

kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}

对syncloop模型的深入理解:

  • k8s的消息,不需要rabbitmq, 而是事件驱动的,有定时任务的事件,也有主动上报的事件。而事件机制非常符合go语言的channel特性
  • 周期任务 + 主动触发,两种相互配合, 相互相成,缺一不可。
  • syncloop的目的是为了维护单个pod的aws始终是dsw. 注意是单个pod,那如果是多个pod呢,即pod组
  • 那就要通过pod控制器了,保证多个pod始终是期望的个数,它们只是控制面真正的实体还是kubelet里的syncLoop来保证,具体就是修改etcd中pod数据(pod多了要删除,少了要增加),有了数据后,通过watch机制,传入到syncLoop的updates channel, 从而触发syncLoop来真正执行的。
  • select case channel保证单个进程事件的通知,那跨进程如何保证,比如kubelet监听apiserver, 那不就是client-go的watch机制么
  • 不管是单进程(组件内部)还是多进程(组件之间)都体现了k8s是通过事件而非消息队列来通信的。

syncLoopIteration

syncLoopIteration名字中Iteration体现在是在syncLoop后面执行的

根据上面4个channel来的事件,调用handler.HandlePodxxx()

updates ch, etcd中配置更新

1
2
3
4
5
6
7
8
handler.HandlePodAdditions //一样,同步类型:SyncPodCreate

update和delet事件:
handler.HandlePodUpdates //一样,同步类型:SyncPodUpdate

handler.HandlePodRemoves() => podManager.DeletePod(pod) + podKiller.KillPod()

handler.HandlePodReconcile: // -> kl.dispatchWork() -> podworker.managePodLoop kl.podWorkers.UpdatePod() -> podWorkers.managePodLoop()(起了一个协程) -> kubelet.syncPod() -> 创建pod流程了。

每个pod都运行一个managePodLoop

pleg通道, 主动上报

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
case e := <-plegCh:
if e.Type == pleg.ContainerStarted {
// 容器启动了,记录最后一个容器的启动
kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
// 对于容器是删除,则不需要做pod sync
handler.HandlePodSyncs([]*v1.Pod{pod})
}
// 如果容器变为了Died,则清理pod中的容器
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}

sync chan 周期任务

1
handler.HandlePodSyncs(podsToSync)

livenessManager

主动杀死的
检测容器是否还活着,liveness

当probe结果是Failure时,HandlePodSyncs

1
2
3
4
5
6
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
...
handler.HandlePodSyncs([]*v1.Pod{pod})
}

housekeepingCh, 管家,清理家

1
2
3
4
5
6
7
8
9
10
11
12
13
case <-housekeepingCh:
// 等待资源ready后,才会做cleanup,
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
if !kl.sourcesReady.AllReady() {
...
} else {
klog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.Errorf("Failed cleaning pods: %v", err)
}
}

对 pod 进行增删改操作, 其实最后都会跳到 dispatchWork 方法上.
img.png
该方法里主要定义了类型 kubetypes.SyncPodType, 然后调用 podWrokers.UpdatePod() 异步操作 pod.

对于每一个Pod,kubeletpodworker都会新起一个goroutine来运行updatePod -> managePodLoop, 最后是运行kubeletsyncPod(), 在这里根据podworker传入的更新类型,区分出来是删除,创建,还是更新。

在这些操作中,会设置pod的状态,没必要设置完成才能继续下一步,而是采用异步方式,写入一个channel, 而status manager 会一直监听这个channel,读取数据,client就可以继续后面流程了,而status manager会触发syncPod及syncBatch(),调用client-go来修改状态。

syncpod

kubelet启动的syncLoop监听事件, 根据不同的事件类型podworker提供syncpod()能力,实现单个pod的同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Kubelet.syncPod()
-> kl.containerRuntime.SyncPod() //调用容器的运行时 SyncPod 完成容器重建,即kubeGenericRuntimeManager
-> m.createPodSandbox() // m表示上面的manager
-> m.runtimeService.RunPodSandbox() // 是NewManager时的入参remoteRuntimeService
-> r.runtimeClient.RunPodSandbox() //r表示remoteRuntimeService, 在New它时,也初始化了client

=> GRPC 通过remoteRuntimeEndpoint(unix:///var/run/dockershim.sock")确定,CRI另一侧

dockerServer启动后,提供rpc service需求,此时请求是RunPodSandbox
-> dockerService.RunPodSandbox() // dockerService如何创建的呢?见下面 kubelet/dockershim/docker_sandbox.go是作为内置在kubelet里的CRI server实现。
-> ds.network.SetUpPod() // network见dockerService的构建, 这说明cni是传给了CRI。替换了docker原先的libnetwork库??
-> pm.plugin.SetUpPod() // plugin就是初始化Manager时传入的那个,也就是选中的那个
-> plugin.addToNetwork() // 在dockershim里
-> cniNet.AddNetworkList() // cniNet是cni/libcni里的对象,入参有多个plugin的配置,该对象定义在cni/libcni,还有sandbox id, namesapce等运行时返回的参数(resp),这样cni才可以进入到这个pod的空间去配置网络参数
-> ExecPlugin 调用cni plugin bin, 完成pod内的网络设置。
-> 下一步就是要找到plugin的binary的实现的逻辑了,比如bridge, portmap, calico等。

代码位置: github.com/containernetworking/cni/libcni/api.go

libcni api定义的接口规范
K8s-CNI-RPC.png)

infra容器创建后,但在其他容器创建前
pkg/kubelet/dockershim/network/cni/cni.go

// Step 5: Setup networking for the sandbox.
All pod networking is setup by a CNI plugin discovered at startup time.
This plugin assigns the pod ip, sets up routes inside the sandbox,
creates interfaces etc.

lib cni 库实现

vendor/github.com/containernetworking/cni/libcni/api.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (c *CNIConfig) AddNetworkList(ctx context.Context, list *NetworkConfigList, rt *RuntimeConf) (types.Result, error) {

for _, net := range list.Plugins {
result, err = c.addNetwork(ctx, list.Name, list.CNIVersion, net, result, rt)
if err != nil {
return nil, err
}
}
}

func (c *CNIConfig) addNetwork(ctx context.Context, name, cniVersion string, net *NetworkConfig, prevResult types.Result, rt *RuntimeConf) (types.Result, error) {
c.ensureExec()
//
pluginPath, err := c.exec.FindInPath(net.Network.Type, c.Path)
if err != nil {
return nil, err
}

newConf, err := buildOneConfig(name, cniVersion, net, prevResult, rt)
if err != nil {
return nil, err
}
// 通过二进制方式调用插件的ADD
return invoke.ExecPluginWithResult(ctx, pluginPath, newConf.Bytes, c.args("ADD", rt), c.exec)
}

入参list表示配置文件中,使用的cni列表。

遍历每一个cni插件,调用其ADD接口实现。

host-local 插件分析

删除 pod 流程 (HandlePodRemoves)

  1. podManager删除pod
  2. 调用containerRuntime.KillPod()
  3. probeManager将该pod的三种类型的探针worker都stop
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
    start := kl.clock.Now()
    for _, pod := range pods {
    kl.podManager.DeletePod(pod)
    if kubetypes.IsMirrorPod(pod) {
    kl.handleMirrorPod(pod, start)
    continue
    }
    // Deletion is allowed to fail because the periodic cleanup routine
    // will trigger deletion again.
    if err := kl.deletePod(pod); err != nil {
    klog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
    }
    kl.probeManager.RemovePod(pod)
    }
    }