k8s multi-attach error for volume

测试

分别创建3个使用同一个pvc的pod, 1和2调度到master1节点,3调度到master2节点

1
2
3
4
5
6
7
8
9
10
11
12
13
# kubectl get pod -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
demo-csi-1-6f5975b88f-khdw6 0/1 ContainerCreating 0 2m20s <none> master1 <none> <none>
demo-csi-2-6f5975b88f-n9p8z 0/1 ContainerCreating 0 2m4s <none> master1 <none> <none>
demo-csi-3-7cdfc7786-gb7bt 0/1 ContainerCreating 0 69s <none> master2 <none> <none>

# kubectl describe pod demo-csi-3-7cdfc7786-gb7bt
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 42s ake-scheduler Successfully assigned default/demo-csi-3-7cdfc7786-gb7bt to master2
Warning FailedAttachVolume 42s attachdetach-controller Multi-Attach error for volume "pvc-f3a38946-74ee-4eda-835c-fab5ce76c6fd" Volume is already used by pod(s) demo-csi-2-6f5975b88f-n9p8z, demo-csi-1-6f5975b88f-khdw6

可以看到只有3号pod报multi-attach告警事件。
attach是node级别的,multi attach表示该卷被attach到不同的节点。如果是attach到相同节点不会报错误。

疑问:globalmount被bind mount到多个pod挂载点,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# kubectl get node master1 -o yaml
...
volumesAttached:
- devicePath: ""
name: kubernetes.io/csi/arstor.csi.ake.com^arstor--0bf5059875921e668a5bdf2c7fc48445--pvcf3a3894674ee4eda835cfab5ce76c6fd--iqn.2010-01.com.example:k8s--kataDirect:false
volumesInUse:
- kubernetes.io/csi/arstor.csi.ake.com^arstor--0bf5059875921e668a5bdf2c7fc48445--pvcf3a3894674ee4eda835cfab5ce76c6fd--iqn.2010-01.com.example:k8s--kataDirect:false

# kubectl get node master2 -o yaml
...
volumesInUse:
- kubernetes.io/csi/arstor.csi.ake.com^arstor--0bf5059875921e668a5bdf2c7fc48445--pvcf3a3894674ee4eda835cfab5ce76c6fd--iqn.2010-01.com.example:k8s--kataDirect:false

# kubectl get node master3 -o yaml
no volumesInUse && volumesAttached信息

volumesAttached表示volume已经attach到node节点了,因为multi attach error所以在master2上没有该attach信息。

volumesInUse
应该是调度的结果,也就是dsw的期望的,而asw是维护实际的状态,当前pod使用的卷因为网络原因,未attach到node上来,导致容器处于pending状态

代码分析

kube-controller-manager中的adc(attach detach controller)会有reconcile 循环

位置 pkg/controller/volume/attachdetach/attach_detach_controller.go

1
2
3
4
5
6
7
8
9
10
11
12
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {

err := adc.populateActualStateOfWorld()
...
err = adc.populateDesiredStateOfWorld()
...
// 周期性调谐,保证volume attach/detach是期望的状态
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)
go wait.Until(adc.pvcWorker, time.Second, stopCh)

}

主要运行4个方法

(1)adc.populateActualStateOfWorld()
(2)adc.populateDesiredStateOfWorld()
(3)adc.reconciler.Run()
(4)adc.desiredStateOfWorldPopulator.Run()

adc.populateActualStateOfWorld()

asw 对象分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type actualStateOfWorld struct {
// 记录attach到node的volume, attached修饰volumes
attachedVolumes map[v1.UniqueVolumeName]attachedVolume

// 要更新的node map, 实际是更新node VolumesAttached Status field. map记录 node和node上attached的volume
// 要去更新status的nodes
nodesToUpdateStatusFor map[types.NodeName]nodeToUpdateStatusFor

volumePluginMgr *volume.VolumePluginMgr

sync.RWMutex
}

type attachedVolume struct {
// volume信息
volumeName v1.UniqueVolumeName

// volume spec
spec *volume.Spec

// volume attach到的节点集合, attachedto是修饰nodes
nodesAttachedTo map[types.NodeName]nodeAttachedTo

// volume被attach到node上的设备路径
devicePath string
}

asw关心volume和node对象的关系;volume 管理 attached部分; node 管理要更新status的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (adc *attachDetachController) populateActualStateOfWorld() error {
// 拿到所有的nodes对象
nodes, err := adc.nodeLister.List(labels.Everything())
// 遍历每一个node
for _, node := range nodes {
nodeName := types.NodeName(node.Name)
// 遍历当前node对象中的VolumesAttached属性,谁给它赋值?
for _, attachedVolume := range node.Status.VolumesAttached {
err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath)
if err != nil {
klog.Errorf("Failed to mark the volume as attached: %v", err)
continue
}
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.addNodeToDswp(node, types.NodeName(node.Name))
}
}

}

可以看到,对于每个node对象,已经附着在其上的卷都做下面三个流程的处理:

  • MarkVolumeAsAttached()
    • 此处volumeSpec是nil, 该流程是将持久化到etcd的node.status上的attached的volume和在控制器内存中的asw.attachedVolumes进行对齐,如果该步骤出现问题,后面步骤不会进行。避免控制器面与集群真实不一致。从而避免出现管理混乱的局面
  • processVolumesInUse
  • addNodeToDswp

MarkVolumeAsAttached

node.Status.VolumesAttached是由asw.MarkVolumeAsAttached赋值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error {
显示设置isAttached为true. 即触发
_, err := asw.AddVolumeNode(uniqueName, volumeSpec, nodeName, devicePath, true)
return err
}

func (asw *actualStateOfWorld) AddVolumeNode(uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, ...) (v1.UniqueVolumeName, error) {
volumeName := uniqueName

// 如果卷名为空,则生成一个
if volumeName == "" {
// 必须要有pv信息,因为后面流程获取volume name要用到,所以强制检查
if volumeSpec == nil {
return volumeName, fmt.Errorf("volumeSpec cannot be nil if volumeName is empty")
}

// 命名规则是plugin name + volume name.
// 对于csi插件就是"kubernetes.io/csi..."
// fetches a persistent volume plugin by spec.
attachableVolumePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
volumeName, err = util.GetUniqueVolumeNameFromSpec(attachableVolumePlugin, volumeSpec)


}

// 从asw的attachedVolumes获取node已经attached的卷,如果没有就创建一个volumeObj, 如果之前有就更新devicePath和spec字段。
volumeObj, volumeExists := asw.attachedVolumes[volumeName]


// 获取volumeObj卷被attached到的节点对象,没有就创建,之前有的话更新node.attachedConfirmed
node, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
// Create object if it doesn't exist.
node = nodeAttachedTo{
nodeName: nodeName,
mountedByNode: true, // Assume mounted, until proven otherwise
attachedConfirmed: isAttached,
detachRequestedTime: time.Time{},
}
} else {
// node确认,volume确认已经被attach上了
node.attachedConfirmed = isAttached
}

// 将上面生成的node和volumeObj对象,更新到asw的attachedVolumes
volumeObj.nodesAttachedTo[nodeName] = node
asw.attachedVolumes[volumeName] = volumeObj

// 卷和node确定绑定关系,则确认volume已经attached了
if isAttached {
// 这里report的意思是调用client-go持久化到node对象上
asw.addVolumeToReportAsAttached(volumeName, nodeName)
}
}

asw中卷分成已经attachedVolume和其他,肯定是个map => asw.attachedVolumes
对于attached的卷,需要记录被attach to哪个节点, 也是个map => volumeObj.nodesAttachedTo

isAttached若为true, 表卷已经被attach到节点;若为false, volume绑定到该node还不确定,即绑定可能发生更改

volumeSpec里包含很多重要信息,volumeName卷名,volume plugin的信息,plugin name,比如csi, nfs, plugin是否是attachable

1
2
3
4
5
6
7
8
func (asw *actualStateOfWorld) addVolumeToReportAsAttached(
volumeName v1.UniqueVolumeName, nodeName types.NodeName) {

// 确保asw的nodesToUpdateStatusFor中要更新的node及volume信息是期望的。
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]

_, nodeToUpdateVolumeExists := nodeToUpdate.volumesToReportAsAttached[volumeName]
}

该方法最重要的是更新asw.nodesToUpdateStatusFor属性。
命名规则xToxxx值得学习, 即x要去做xxx, 但是还没有去做

  • addVolumeToReportAsAttached: 将volume上报为attached
  • nodesToUpdateStatusFor: node要去更新它的status字段
  • nodeToUpdate: node对象要去更新的部分

node写入到aswnodesToUpdateStatusFor后如何处理呢?
答案是在asw对象的GetVolumesToReportAttached()方法完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
// 获取asw.nodesToUpdateStatusFor字段
nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached()
for nodeName, attachedVolumes := range nodesToUpdate {

if err := asw.updateNodeStatusUpdateNeeded(nodeName, false); err != nil {
klog.Errorf("Failed to update statusUpdateNeeded field when getting volumes: %v", err)
}
}
}

func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
// 遍历nodesToUpdateStatusFor域,map结构, key是node, value是attachedVolumes

nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached()
for nodeName, attachedVolumes := range nodesToUpdate {
...
if err := nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes); err != nil {
...
}
}

}

func (nsu *nodeStatusUpdater) updateNodeStatus(nodeName types.NodeName, nodeObj *v1.Node, attachedVolumes []v1.AttachedVolume) error {
node := nodeObj.DeepCopy()
// 将attached到该节点上的volume更新在node.status.VolumesAttached上
node.Status.VolumesAttached = attachedVolumes
// 调用client-go node的patch方法完成持久化
_, patchBytes, err := nodeutil.PatchNodeStatus(nsu.kubeClient.CoreV1(), nodeName, nodeObj, node)
if err != nil {
return err
}

klog.V(4).Infof("Updating status %q for node %q succeeded. VolumesAttached: %v", patchBytes, nodeName, attachedVolumes)
return nil
}

当我们将要更新的卷及node信息写入asw后就返回了
reconciler.reconcile方法中,会调用GetVolumesToReportAttached获取node要更新的部分,然后调用patch node方法,更新node对象。

aswdsw分别将关心的数据记录到各自对象缓存种,最后reconciler将调谐aws和dsw.

小结:

  1. 该流程是将持久化到etcdnode.status上的attachedvolume和在控制器内存中的asw.attachedVolumes进行对齐,如果该步骤出现问题,后面步骤不会进行。避免控制器面与集群真实不一致。从而避免出现管理混乱的局面
  2. 在整个流程处理完后,会将特定node上attachedVolume通过updateNodeStatus()重新更新到etcd,表明是经过本次reconcile 调谐过的了。

processVolumesInUse

Node's Status.VolumesInUse中的卷更新,表示已经mounted状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (adc *attachDetachController) processVolumesInUse(
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {

// 对于每一个attached到本node的卷
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
检查是否mount了,即如果在volumeInUse中有,表示mounted了
mounted := false
for _, volumeInUse := range volumesInUse {
if attachedVolume.VolumeName == volumeInUse {
mounted = true
break
}
}
// 设置卷的MountedByNode属性为true, 表明该volume被attached到该节点其已经被mounted的,做detach是不安全的
err := adc.actualStateOfWorld.SetVolumeMountedByNode(attachedVolume.VolumeName, nodeName, mounted)
}
}

小结:

  1. 管理asw中volume被attached到某个节点,其是否被使用, 即是否mounted

疑问

  1. node.Status.VolumesInUse是在哪里被设置的?

    1
    2
    3
    4
    kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)

    pkg/kubelet/kubelet_node_status.go
    nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse)

    kubelet中volume manager完成mount操作。通过setter对象及defaultNodeStatusFuncs()方法更新node对象

  2. volumesInUse中存在但是在attachedVolume中不存在的怎么处理,当前逻辑是不会管理到的

addNodeToDswp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) {

// 如果是controller-manager, attach/detach管理,该值为false。不运行的pod,volume也没必要存在
if _, exists := node.Annotations[volumeutil.ControllerManagedAttachAnnotation]; exists {
keepTerminatedPodVolumes := false
// 如果显示设置,则按实际设置的来
if t, ok := node.Annotations[volumeutil.KeepTerminatedPodVolumesAnnotation]; ok {
keepTerminatedPodVolumes = (t == "true")
}

// Node specifies annotation indicating it should be managed by attach
// detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes)
}
}
func (dsw *desiredStateOfWorld) AddNode(nodeName k8stypes.NodeName, keepTerminatedPodVolumes bool) {
dsw.Lock()
defer dsw.Unlock()

if _, nodeExists := dsw.nodesManaged[nodeName]; !nodeExists {
dsw.nodesManaged[nodeName] = nodeManaged{
nodeName: nodeName,
volumesToAttach: make(map[v1.UniqueVolumeName]volumeToAttach),
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
}
}
}

该方法比较简单,仅保证asw中的node添加到dsw的nodesManaged map中,asw有的要对齐到dsw中。

keepTerminatedPodVolumes字段,为true 表示对于terminatedPod中的volume不进行umount和detach。

如何判断pod是terminated pod?

1
2
3
4
// IsPodTerminated checks if pod is terminated
func IsPodTerminated(pod *v1.Pod, podStatus v1.PodStatus) bool {
return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses))
}

podStatus.Phase

  • v1.PodSucceeded, pod中的容器都运行完了,退出码是0
  • v1.PodFailed, 有一个容器退出码是非0.

pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses)
pod要被删除并且pod没有在运行态

pod running?

1
2
3
4
5
6
7
8
func notRunning(statuses []v1.ContainerStatus) bool {
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return false
}
}
return true
}

pod默认状态是not running, 但
只要pod中有一个容器没给出结果(没有更新Terminated且也没有更新Waiting),我们就认为pod是running的。

小结:

populateasw是根据实际状态,也就是node对象上的volumesAttached和volumesInUse实际状态,并结合etcd中的最新状态,来维护asw对象中volume、node绑定及使用情况

adc.populateDesiredStateOfWorld()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// pkg/controller/volume/attachdetach/attach_detach_controller.go

func (adc *attachDetachController) populateDesiredStateOfWorld() error {
//
pods, err := adc.podLister.List(labels.Everything())
if err != nil {
return err
}
for _, pod := range pods {
podToAdd := pod
// 将pod的volume加到dsw对象中,这里没有使用返回值
adc.podAdd(podToAdd)
for _, podVolume := range podToAdd.Spec.Volumes {
...
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd, nodeName, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)

plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)

volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)

// 调用asw方法,判断volume和node绑定状态
attachState := adc.actualStateOfWorld.GetAttachState(volumeName, nodeName)
if attachState == cache.AttachStateAttached {
klog.V(10).Infof("Volume %q is attached to node %q. Marking as attached in ActualStateOfWorld",
volumeName,
nodeName,
)
devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
if err != nil {
klog.Errorf("Failed to find device path: %v", err)
continue
}
// 在asw也会主动做一次MarkVolumeAsAttached,只是volume是nil,那么attachedVolume里的volumeObj.spec就是nil
// 本次MarkVolumeAsAttached传入的是volume才是真实的
err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
if err != nil {
klog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err)
}
}

}

}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (adc *attachDetachController) podAdd(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if pod == nil || !ok {
return
}
// pod还没有调度,不进行后面流程的处理
if pod.Spec.NodeName == "" {
// Ignore pods without NodeName, indicating they are not scheduled.
return
}
// 确定volume是加入dswp还是从dswp中删除;
// 默认是加入,但是对于Terminated pod则是根据keepTerminatedPodVolume标志位。
// 即pod因为异常终止了,它依赖的卷是否要做detach动作;默认是false,即pod中止,它依赖卷做detach。这是合理的,因为pod会重建,volume也会重建
// 如果pod总是调度到同一个节点,为了加速,volume可以不做detach. 方法是在node annotataion设置volumes.kubernetes.io/keep-terminated-pod-volumes:true.
volumeActionFlag := util.DetermineVolumeAction(pod, adc.desiredStateOfWorld, true /* default volume action */)

// 对于给定pod,是否要将volume加入dsw,还是从dsw中删除
util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
}

func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld, ...) {
1. pod没有volume,忽略
2. pod没有node, 忽略
3. pod调度到的node, 没有在dsw.nodesManaged中,忽略
for _, podVolume := range pod.Spec.Volumes {
//
volumeSpec, err := CreateVolumeSpec(...)

// 从pv找到使用的plugin, 验证plugin要支持attach操作。
// csi, iscsi, rbd支持attach, 而nfs, hostpath不需要支持attach,直接mount即可。
attachableVolumePlugin, err := volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)

if addVolumes {
// 将该pod加入到引用volumeSpec的pods列表里
_, err := desiredStateOfWorld.AddPod(uniquePodName, pod, volumeSpec, nodeName)
} else {
// Remove volume from desired state of world
uniqueVolumeName, err := util.GetUniqueVolumeNameFromSpec(attachableVolumePlugin, volumeSpec)
// 对于terminated pod且对于volume的策略是不keep了,则将该pod从引用volume的pods列表里移
desiredStateOfWorld.DeletePod(uniquePodName, uniqueVolumeName, nodeName)
}
}

func (dsw *desiredStateOfWorld) AddPod() (v1.UniqueVolumeName, error) {
// 从dsw获取node对象
nodeObj, nodeExists := dsw.nodesManaged[nodeName]

// 从node里获取需要attach到本节点的卷,这里是还没有attach,也符合dsw的语义。
volumeObj, volumeExists := nodeObj.volumesToAttach[volumeName]

// node上没有volume,说明是新attach过来的,则添加到volumesToAttach
// 保证volumesToAttach有新的卷
if !volumeExists {
volumeObj = volumeToAttach{
multiAttachErrorReported: false,
volumeName: volumeName,
spec: volumeSpec,
scheduledPods: make(map[types.UniquePodName]pod),
}
dsw.nodesManaged[nodeName].volumesToAttach[volumeName] = volumeObj
}

// 引用该卷的pod列表中,没有本pod,则添加
if _, podExists := volumeObj.scheduledPods[podName]; !podExists {
dsw.nodesManaged[nodeName].volumesToAttach[volumeName].scheduledPods[podName] =
pod{
podName: podName,
podObj: podToAdd,
}
}
}

dsw是将集群中所有pod按照期望的状态维护到dsw对象上,即
pod被调度到node, pod引用的volume期望attach到特定node上,以及该volume会被哪个pod引用到。

dsw, asw是描述的volume

dsw对象解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type desiredStateOfWorld struct {
// 描述adc管理的node组成的map。
nodesManaged map[k8stypes.NodeName]nodeManaged
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
sync.RWMutex
}

type nodeManaged struct {
// 节点的名字
nodeName k8stypes.NodeName

// map, 应该attach到该节点的volume对象
volumesToAttach map[v1.UniqueVolumeName]volumeToAttach

// keepTerminatedPodVolumes determines if for terminated pods(on this node) - volumes
// should be kept mounted and attached.
keepTerminatedPodVolumes bool
}

type volumeToAttach struct {
// 这个卷的multiAttachErrorReported是否上报过,这种错误,一个volume只上报一次即可
multiAttachErrorReported bool

// 卷信息
volumeName v1.UniqueVolumeName

// volume spec
spec *volume.Spec

// reference 该卷的pods对象的集合
scheduledPods map[types.UniquePodName]pod
}

至此 asw和dsw的populate已经完成

adc.reconciler.Run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (rc *reconciler) reconcile() {
// 保证detach卷都被detach了
// 从asw里获取已经attach的卷,对每一个卷做detach对齐,及对于不应该attach卷,去做detach
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes(){
// 卷在asw有,但是dsw里已经没有了
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName){
...
}
...
}
// 一定是从dsw里获取
rc.attachDesiredVolumes()

// asw和dsw对象各域更新好后,最后更新 node的status
rc.nodeStatusUpdater.UpdateNodeStatuses()

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (rc *reconciler) attachDesiredVolumes() {
// 对于dsw中的每一个卷,进行检查
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
...
// 该卷不允许多挂
if !util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
// 挂载该卷的node
nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName)
if len(nodes) > 0 {
// 该卷挂载到某节点了,可能是本次相同的节点,也可能是other node, 而且没有上报过multiattach error, 则reportMultiAttachError
if !volumeToAttach.MultiAttachErrorReported {
rc.reportMultiAttachError(volumeToAttach, nodes)
rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
continue
}
// 如果该卷没有attach到node过,不会上报multi attach,这是最常见的情况了
}

// 触发volume attach动作
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
...

}
}

func (og *operationGenerator) GenerateAttachVolumeFunc(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {

attachVolumeFunc := func() (error, error) {

attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)

volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()

// Execute attach 调用具体plugin进行attach
devicePath, attachErr := volumeAttacher.Attach(volumeToAttach.VolumeSpec, volumeToAttach.NodeName)

// Update actual state of world
// 更新asw.attachedVolumes
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
入参是volumeToAttach.VolumeName
func (asw *actualStateOfWorld) GetNodesForAttachedVolume(volumeName v1.UniqueVolumeName) []types.NodeName {
asw.RLock()
defer asw.RUnlock()

// 在asw中的`attachedVolumes`中找到`volumeObj`,
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
// 如果该卷没有attached过,或者attach过但是记录未空,直接返回空节点列表
if !volumeExists || len(volumeObj.nodesAttachedTo) == 0 {
return []types.NodeName{}
}
// 如果之前attach过,则返回attached过的节点列表
nodes := []types.NodeName{}
for nodeName, nodesAttached := range volumeObj.nodesAttachedTo {
if nodesAttached.attachedConfirmed {
nodes = append(nodes, nodeName)
}
}
return nodes
}

主要是查看asw.attachedVolumes对象的状态;
nodes会有3种情况

  • 0, 表第一次attach
  • 非0,attach过且和本次节点相同
  • 非0,attach过且和本次节点不相同
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (rc *reconciler) reportMultiAttachError(volumeToAttach cache.VolumeToAttach, nodes []types.NodeName) {
// 对于dsw中的这个未attach到nodde的volume
otherNodes := []types.NodeName{}
otherNodesStr := []string{}

// 根据volume对象当前绑定的节点,筛选出other nodes
for _, node := range nodes {
if node != volumeToAttach.NodeName {
otherNodes = append(otherNodes, node)
otherNodesStr = append(otherNodesStr, string(node))
}
}


// 在other nodes上统计引用该volume的pod
pods := rc.desiredStateOfWorld.GetVolumePodsOnNodes(otherNodes, volumeToAttach.VolumeName)

// 在其他节点找不到使用该volume的pod,说明pod已经被删掉了
// asw.attachedVolumes中残留这volume之前绑定的信息,而此时volume绑到其他node上,触发multi attach告警,volume只能唯一绑定某个node
// 可能是清理不全导致,所以每一步状态记录都要慎重,因为你要维护好它的状态啊
if len(pods) == 0 {
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
return
}

// 到这里,说明在其他节点上有引用该volume的pod, // 通常是使用者的问题,比如replicaset使用同一个pvc,但是有多个副本。因为dp通常是// 无状态的,不该开启多副本还使用pvc. 需要改成sts来使用多副本,
// 需要上报错误信息提醒用户

// 统计出使用重复使用volume的pod信息,即
// 遍历本次volume被分配到的pods列表
for _, scheduledPod := range volumeToAttach.ScheduledPods {
// 与其他接地点上,使用该volume的pod比对
for _, pod := range pods {
if pod.Namespace == scheduledPod.Namespace {
localPodNames = append(localPodNames, pod.Name)
} else {
otherPods++
}
}

}
// Log all pods for system admin
podNames := []string{}
for _, pod := range pods {
podNames = append(podNames, pod.Namespace+"/"+pod.Name)
}
将已经引用该卷的pod及所在node的信息以事件形式上报到本次pod中,提醒用户
detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already used by pods %s on node %s", strings.Join(podNames, ", "), strings.Join(otherNodesStr, ", ")))
}

volume在node1上attach,又被attach到node2上,但是node1上status volumeattached还没有清理完,就会报multi-attach,等node1清理完成就不会报该错了

csi plugin attach

pkg/volume/csi/csi_attacher.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
...

// 创建attachment对象
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: node,
Attacher: pvSrc.Driver,
Source: vaSrc,
},
}

_, err = c.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})

// 一致等待volume attachment对象status状态
// 2mins 超时时间
// csi-attacher sidecar 会watch该对象, 完成具体的附着动作
if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, csiTimeout); err != nil {
return "", err
}
}

总结

  • asw是从当前node节点的volumeattached信息,该信息在reconcile后,本次调谐完成,由UpdateNodeStatuses()更新到node中
  • dsw是遍历所有pod中的每个volume,来更新asw.volumeattached这个map,key是卷的名字,value是卷的信息,包括attach到哪个节点。
  • adc的Attach/Detach操作,只是创建/删除VolumeAttachment对象;csi-attacher sidecar 会watch该对象,并向csi driver发起controllerpublishvolume/controllerUnpublishvolume request 完成后更新VolumeAttachment status.
  • controller-manager只是完成了attach的操作,mount的调谐是在kubelet的volume manager完成,其也有一套aswdswreconcile的过程。