k8s multi-attach error for volume 测试 分别创建3个使用同一个pvc的pod, 1和2调度到master1节点,3调度到master2节点
1 2 3 4 5 6 7 8 9 10 11 12 13 # kubectl get pod -o wide NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES demo-csi-1-6f5975b88f-khdw6 0/1 ContainerCreating 0 2m20s <none> master1 <none> <none> demo-csi-2-6f5975b88f-n9p8z 0/1 ContainerCreating 0 2m4s <none> master1 <none> <none> demo-csi-3-7cdfc7786-gb7bt 0/1 ContainerCreating 0 69s <none> master2 <none> <none> # kubectl describe pod demo-csi-3-7cdfc7786-gb7bt ... Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal Scheduled 42s ake-scheduler Successfully assigned default/demo-csi-3-7cdfc7786-gb7bt to master2 Warning FailedAttachVolume 42s attachdetach-controller Multi-Attach error for volume "pvc-f3a38946-74ee-4eda-835c-fab5ce76c6fd" Volume is already used by pod(s) demo-csi-2-6f5975b88f-n9p8z, demo-csi-1-6f5975b88f-khdw6
可以看到只有3号pod报multi-attach
告警事件。 attach是node级别的,multi attach
表示该卷被attach到不同的节点。如果是attach到相同节点不会报错误。
疑问:globalmount
被bind mount到多个pod挂载点,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # kubectl get node master1 -o yaml ... volumesAttached: - devicePath: "" name: kubernetes.io/csi/arstor.csi.ake.com^arstor--0bf5059875921e668a5bdf2c7fc48445--pvcf3a3894674ee4eda835cfab5ce76c6fd--iqn.2010-01.com.example:k8s--kataDirect:false volumesInUse: - kubernetes.io/csi/arstor.csi.ake.com^arstor--0bf5059875921e668a5bdf2c7fc48445--pvcf3a3894674ee4eda835cfab5ce76c6fd--iqn.2010-01.com.example:k8s--kataDirect:false # kubectl get node master2 -o yaml ... volumesInUse: - kubernetes.io/csi/arstor.csi.ake.com^arstor--0bf5059875921e668a5bdf2c7fc48445--pvcf3a3894674ee4eda835cfab5ce76c6fd--iqn.2010-01.com.example:k8s--kataDirect:false # kubectl get node master3 -o yaml no volumesInUse && volumesAttached信息
volumesAttached
表示volume已经attach到node节点了,因为multi attach error所以在master2上没有该attach信息。
和volumesInUse
应该是调度的结果,也就是dsw
的期望的,而asw
是维护实际的状态,当前pod使用的卷因为网络原因,未attach到node上来,导致容器处于pending
状态
代码分析 在kube-controller-manager
中的adc(attach detach controller)
会有reconcile
循环
位置 pkg/controller/volume/attachdetach/attach_detach_controller.go
1 2 3 4 5 6 7 8 9 10 11 12 func (adc *attachDetachController) Run(stopCh <-chan struct{}) { err := adc.populateActualStateOfWorld() ... err = adc.populateDesiredStateOfWorld() ... // 周期性调谐,保证volume attach/detach是期望的状态 go adc.reconciler.Run(stopCh) go adc.desiredStateOfWorldPopulator.Run(stopCh) go wait.Until(adc.pvcWorker, time.Second, stopCh) }
主要运行4个方法
(1)adc.populateActualStateOfWorld() (2)adc.populateDesiredStateOfWorld() (3)adc.reconciler.Run() (4)adc.desiredStateOfWorldPopulator.Run()
adc.populateActualStateOfWorld() asw 对象分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 type actualStateOfWorld struct { // 记录attach到node的volume, attached修饰volumes attachedVolumes map[v1.UniqueVolumeName]attachedVolume // 要更新的node map, 实际是更新node VolumesAttached Status field. map记录 node和node上attached的volume // 要去更新status的nodes nodesToUpdateStatusFor map[types.NodeName]nodeToUpdateStatusFor volumePluginMgr *volume.VolumePluginMgr sync.RWMutex } type attachedVolume struct { // volume信息 volumeName v1.UniqueVolumeName // volume spec spec *volume.Spec // volume attach到的节点集合, attachedto是修饰nodes nodesAttachedTo map[types.NodeName]nodeAttachedTo // volume被attach到node上的设备路径 devicePath string }
asw关心volume和node对象的关系;volume 管理 attached部分; node 管理要更新status的部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (adc *attachDetachController) populateActualStateOfWorld() error { // 拿到所有的nodes对象 nodes, err := adc.nodeLister.List(labels.Everything()) // 遍历每一个node for _, node := range nodes { nodeName := types.NodeName(node.Name) // 遍历当前node对象中的VolumesAttached属性,谁给它赋值? for _, attachedVolume := range node.Status.VolumesAttached { err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath) if err != nil { klog.Errorf("Failed to mark the volume as attached: %v", err) continue } adc.processVolumesInUse(nodeName, node.Status.VolumesInUse) adc.addNodeToDswp(node, types.NodeName(node.Name)) } } }
可以看到,对于每个node对象,已经附着在其上的卷都做下面三个流程的处理:
MarkVolumeAsAttached()
此处volumeSpec是nil, 该流程是将持久化到etcd的node.status上的attached的volume和在控制器内存中的asw.attachedVolumes进行对齐,如果该步骤出现问题,后面步骤不会进行。避免控制器面与集群真实不一致。从而避免出现管理混乱的局面
processVolumesInUse
addNodeToDswp
MarkVolumeAsAttached node.Status.VolumesAttached是由asw.MarkVolumeAsAttached赋值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 func (asw *actualStateOfWorld) MarkVolumeAsAttached( uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error { 显示设置isAttached为true. 即触发 _, err := asw.AddVolumeNode(uniqueName, volumeSpec, nodeName, devicePath, true) return err } func (asw *actualStateOfWorld) AddVolumeNode(uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, ...) (v1.UniqueVolumeName, error) { volumeName := uniqueName // 如果卷名为空,则生成一个 if volumeName == "" { // 必须要有pv信息,因为后面流程获取volume name要用到,所以强制检查 if volumeSpec == nil { return volumeName, fmt.Errorf("volumeSpec cannot be nil if volumeName is empty") } // 命名规则是plugin name + volume name. // 对于csi插件就是"kubernetes.io/csi..." // fetches a persistent volume plugin by spec. attachableVolumePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) volumeName, err = util.GetUniqueVolumeNameFromSpec(attachableVolumePlugin, volumeSpec) } // 从asw的attachedVolumes获取node已经attached的卷,如果没有就创建一个volumeObj, 如果之前有就更新devicePath和spec字段。 volumeObj, volumeExists := asw.attachedVolumes[volumeName] // 获取volumeObj卷被attached到的节点对象,没有就创建,之前有的话更新node.attachedConfirmed node, nodeExists := volumeObj.nodesAttachedTo[nodeName] if !nodeExists { // Create object if it doesn't exist. node = nodeAttachedTo{ nodeName: nodeName, mountedByNode: true, // Assume mounted, until proven otherwise attachedConfirmed: isAttached, detachRequestedTime: time.Time{}, } } else { // node确认,volume确认已经被attach上了 node.attachedConfirmed = isAttached } // 将上面生成的node和volumeObj对象,更新到asw的attachedVolumes volumeObj.nodesAttachedTo[nodeName] = node asw.attachedVolumes[volumeName] = volumeObj // 卷和node确定绑定关系,则确认volume已经attached了 if isAttached { // 这里report的意思是调用client-go持久化到node对象上 asw.addVolumeToReportAsAttached(volumeName, nodeName) } }
asw中卷分成已经attachedVolume和其他,肯定是个map => asw.attachedVolumes
对于attached的卷,需要记录被attach to
哪个节点, 也是个map => volumeObj.nodesAttachedTo
isAttached若为true, 表卷已经被attach到节点;若为false, volume绑定到该node还不确定,即绑定可能发生更改
volumeSpec里包含很多重要信息,volumeName卷名,volume plugin的信息,plugin name,比如csi, nfs, plugin是否是attachable
1 2 3 4 5 6 7 8 func (asw *actualStateOfWorld) addVolumeToReportAsAttached( volumeName v1.UniqueVolumeName, nodeName types.NodeName) { // 确保asw的nodesToUpdateStatusFor中要更新的node及volume信息是期望的。 nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName] _, nodeToUpdateVolumeExists := nodeToUpdate.volumesToReportAsAttached[volumeName] }
该方法最重要的是更新asw.nodesToUpdateStatusFor
属性。 命名规则xToxxx
值得学习, 即x要去做xxx, 但是还没有去做
addVolumeToReportAsAttached: 将volume上报为attached
nodesToUpdateStatusFor: node要去更新它的status字段
nodeToUpdate: node对象要去更新的部分
将node
写入到asw
的nodesToUpdateStatusFor
后如何处理呢? 答案是在asw对象的GetVolumesToReportAttached()
方法完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error { // 获取asw.nodesToUpdateStatusFor字段 nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached() for nodeName, attachedVolumes := range nodesToUpdate { if err := asw.updateNodeStatusUpdateNeeded(nodeName, false); err != nil { klog.Errorf("Failed to update statusUpdateNeeded field when getting volumes: %v", err) } } } func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error { // 遍历nodesToUpdateStatusFor域,map结构, key是node, value是attachedVolumes nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached() for nodeName, attachedVolumes := range nodesToUpdate { ... if err := nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes); err != nil { ... } } } func (nsu *nodeStatusUpdater) updateNodeStatus(nodeName types.NodeName, nodeObj *v1.Node, attachedVolumes []v1.AttachedVolume) error { node := nodeObj.DeepCopy() // 将attached到该节点上的volume更新在node.status.VolumesAttached上 node.Status.VolumesAttached = attachedVolumes // 调用client-go node的patch方法完成持久化 _, patchBytes, err := nodeutil.PatchNodeStatus(nsu.kubeClient.CoreV1(), nodeName, nodeObj, node) if err != nil { return err } klog.V(4).Infof("Updating status %q for node %q succeeded. VolumesAttached: %v", patchBytes, nodeName, attachedVolumes) return nil }
当我们将要更新的卷及node信息写入asw
后就返回了 在reconciler.reconcile
方法中,会调用GetVolumesToReportAttached
获取node要更新的部分,然后调用patch node
方法,更新node
对象。
asw
和dsw
分别将关心的数据记录到各自对象缓存种,最后reconciler
将调谐aws和dsw.
小结:
该流程是将持久化到etcd
的node.status
上的attached
的volume
和在控制器内存中的asw.attachedVolumes
进行对齐,如果该步骤出现问题,后面步骤不会进行。避免控制器面与集群真实不一致。从而避免出现管理混乱的局面
在整个流程处理完后,会将特定node上attachedVolume
通过updateNodeStatus()
重新更新到etcd
,表明是经过本次reconcile
调谐过的了。
processVolumesInUse 将Node's Status.VolumesInUse
中的卷更新,表示已经mounted
状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (adc *attachDetachController) processVolumesInUse( nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) { // 对于每一个attached到本node的卷 for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) { 检查是否mount了,即如果在volumeInUse中有,表示mounted了 mounted := false for _, volumeInUse := range volumesInUse { if attachedVolume.VolumeName == volumeInUse { mounted = true break } } // 设置卷的MountedByNode属性为true, 表明该volume被attached到该节点其已经被mounted的,做detach是不安全的 err := adc.actualStateOfWorld.SetVolumeMountedByNode(attachedVolume.VolumeName, nodeName, mounted) } }
小结:
管理asw中volume被attached到某个节点,其是否被使用, 即是否mounted
疑问
node.Status.VolumesInUse
是在哪里被设置的?
1 2 3 4 kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse) pkg/kubelet/kubelet_node_status.go nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse)
kubelet中volume manager
完成mount操作。通过setter
对象及defaultNodeStatusFuncs()
方法更新node对象
volumesInUse
中存在但是在attachedVolume中不存在的怎么处理,当前逻辑是不会管理到的
addNodeToDswp 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) { // 如果是controller-manager, attach/detach管理,该值为false。不运行的pod,volume也没必要存在 if _, exists := node.Annotations[volumeutil.ControllerManagedAttachAnnotation]; exists { keepTerminatedPodVolumes := false // 如果显示设置,则按实际设置的来 if t, ok := node.Annotations[volumeutil.KeepTerminatedPodVolumesAnnotation]; ok { keepTerminatedPodVolumes = (t == "true") } // Node specifies annotation indicating it should be managed by attach // detach controller. Add it to desired state of world. adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes) } } func (dsw *desiredStateOfWorld) AddNode(nodeName k8stypes.NodeName, keepTerminatedPodVolumes bool) { dsw.Lock() defer dsw.Unlock() if _, nodeExists := dsw.nodesManaged[nodeName]; !nodeExists { dsw.nodesManaged[nodeName] = nodeManaged{ nodeName: nodeName, volumesToAttach: make(map[v1.UniqueVolumeName]volumeToAttach), keepTerminatedPodVolumes: keepTerminatedPodVolumes, } } }
该方法比较简单,仅保证asw中的node添加到dsw的nodesManaged map中,asw有的要对齐到dsw中。
keepTerminatedPodVolumes字段,为true 表示对于terminatedPod中的volume不进行umount和detach。
如何判断pod是terminated pod?
1 2 3 4 // IsPodTerminated checks if pod is terminated func IsPodTerminated(pod *v1.Pod, podStatus v1.PodStatus) bool { return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses)) }
podStatus.Phase
v1.PodSucceeded, pod中的容器都运行完了,退出码是0
v1.PodFailed, 有一个容器退出码是非0.
pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses) pod要被删除并且pod没有在运行态
pod running?
1 2 3 4 5 6 7 8 func notRunning(statuses []v1.ContainerStatus) bool { for _, status := range statuses { if status.State.Terminated == nil && status.State.Waiting == nil { return false } } return true }
pod默认状态是not running, 但 只要pod中有一个容器没给出结果(没有更新Terminated且也没有更新Waiting),我们就认为pod是running的。
小结:
populateasw是根据实际状态,也就是node对象上的volumesAttached和volumesInUse实际状态,并结合etcd中的最新状态,来维护asw对象中volume、node绑定及使用情况
adc.populateDesiredStateOfWorld() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 // pkg/controller/volume/attachdetach/attach_detach_controller.go func (adc *attachDetachController) populateDesiredStateOfWorld() error { // pods, err := adc.podLister.List(labels.Everything()) if err != nil { return err } for _, pod := range pods { podToAdd := pod // 将pod的volume加到dsw对象中,这里没有使用返回值 adc.podAdd(podToAdd) for _, podVolume := range podToAdd.Spec.Volumes { ... volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd, nodeName, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator) plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec) // 调用asw方法,判断volume和node绑定状态 attachState := adc.actualStateOfWorld.GetAttachState(volumeName, nodeName) if attachState == cache.AttachStateAttached { klog.V(10).Infof("Volume %q is attached to node %q. Marking as attached in ActualStateOfWorld", volumeName, nodeName, ) devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName) if err != nil { klog.Errorf("Failed to find device path: %v", err) continue } // 在asw也会主动做一次MarkVolumeAsAttached,只是volume是nil,那么attachedVolume里的volumeObj.spec就是nil // 本次MarkVolumeAsAttached传入的是volume才是真实的 err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath) if err != nil { klog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err) } } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 func (adc *attachDetachController) podAdd(obj interface{}) { pod, ok := obj.(*v1.Pod) if pod == nil || !ok { return } // pod还没有调度,不进行后面流程的处理 if pod.Spec.NodeName == "" { // Ignore pods without NodeName, indicating they are not scheduled. return } // 确定volume是加入dswp还是从dswp中删除; // 默认是加入,但是对于Terminated pod则是根据keepTerminatedPodVolume标志位。 // 即pod因为异常终止了,它依赖的卷是否要做detach动作;默认是false,即pod中止,它依赖卷做detach。这是合理的,因为pod会重建,volume也会重建 // 如果pod总是调度到同一个节点,为了加速,volume可以不做detach. 方法是在node annotataion设置volumes.kubernetes.io/keep-terminated-pod-volumes:true. volumeActionFlag := util.DetermineVolumeAction(pod, adc.desiredStateOfWorld, true /* default volume action */) // 对于给定pod,是否要将volume加入dsw,还是从dsw中删除 util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator) } func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld, ...) { 1. pod没有volume,忽略 2. pod没有node, 忽略 3. pod调度到的node, 没有在dsw.nodesManaged中,忽略 for _, podVolume := range pod.Spec.Volumes { // volumeSpec, err := CreateVolumeSpec(...) // 从pv找到使用的plugin, 验证plugin要支持attach操作。 // csi, iscsi, rbd支持attach, 而nfs, hostpath不需要支持attach,直接mount即可。 attachableVolumePlugin, err := volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) if addVolumes { // 将该pod加入到引用volumeSpec的pods列表里 _, err := desiredStateOfWorld.AddPod(uniquePodName, pod, volumeSpec, nodeName) } else { // Remove volume from desired state of world uniqueVolumeName, err := util.GetUniqueVolumeNameFromSpec(attachableVolumePlugin, volumeSpec) // 对于terminated pod且对于volume的策略是不keep了,则将该pod从引用volume的pods列表里移 desiredStateOfWorld.DeletePod(uniquePodName, uniqueVolumeName, nodeName) } } func (dsw *desiredStateOfWorld) AddPod() (v1.UniqueVolumeName, error) { // 从dsw获取node对象 nodeObj, nodeExists := dsw.nodesManaged[nodeName] // 从node里获取需要attach到本节点的卷,这里是还没有attach,也符合dsw的语义。 volumeObj, volumeExists := nodeObj.volumesToAttach[volumeName] // node上没有volume,说明是新attach过来的,则添加到volumesToAttach // 保证volumesToAttach有新的卷 if !volumeExists { volumeObj = volumeToAttach{ multiAttachErrorReported: false, volumeName: volumeName, spec: volumeSpec, scheduledPods: make(map[types.UniquePodName]pod), } dsw.nodesManaged[nodeName].volumesToAttach[volumeName] = volumeObj } // 引用该卷的pod列表中,没有本pod,则添加 if _, podExists := volumeObj.scheduledPods[podName]; !podExists { dsw.nodesManaged[nodeName].volumesToAttach[volumeName].scheduledPods[podName] = pod{ podName: podName, podObj: podToAdd, } } }
dsw
是将集群中所有pod按照期望的状态维护到dsw对象上,即 pod被调度到node, pod引用的volume期望attach到特定node上,以及该volume会被哪个pod引用到。
dsw, asw是描述的volume
dsw对象解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 type desiredStateOfWorld struct { // 描述adc管理的node组成的map。 nodesManaged map[k8stypes.NodeName]nodeManaged // volumePluginMgr is the volume plugin manager used to create volume // plugin objects. volumePluginMgr *volume.VolumePluginMgr sync.RWMutex } type nodeManaged struct { // 节点的名字 nodeName k8stypes.NodeName // map, 应该attach到该节点的volume对象 volumesToAttach map[v1.UniqueVolumeName]volumeToAttach // keepTerminatedPodVolumes determines if for terminated pods(on this node) - volumes // should be kept mounted and attached. keepTerminatedPodVolumes bool } type volumeToAttach struct { // 这个卷的multiAttachErrorReported是否上报过,这种错误,一个volume只上报一次即可 multiAttachErrorReported bool // 卷信息 volumeName v1.UniqueVolumeName // volume spec spec *volume.Spec // reference 该卷的pods对象的集合 scheduledPods map[types.UniquePodName]pod }
至此 asw和dsw的populate已经完成
adc.reconciler.Run 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (rc *reconciler) reconcile() { // 保证detach卷都被detach了 // 从asw里获取已经attach的卷,对每一个卷做detach对齐,及对于不应该attach卷,去做detach for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes(){ // 卷在asw有,但是dsw里已经没有了 if !rc.desiredStateOfWorld.VolumeExists( attachedVolume.VolumeName, attachedVolume.NodeName){ ... } ... } // 一定是从dsw里获取 rc.attachDesiredVolumes() // asw和dsw对象各域更新好后,最后更新 node的status rc.nodeStatusUpdater.UpdateNodeStatuses() }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 func (rc *reconciler) attachDesiredVolumes() { // 对于dsw中的每一个卷,进行检查 for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() { ... // 该卷不允许多挂 if !util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) { // 挂载该卷的node nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName) if len(nodes) > 0 { // 该卷挂载到某节点了,可能是本次相同的节点,也可能是other node, 而且没有上报过multiattach error, 则reportMultiAttachError if !volumeToAttach.MultiAttachErrorReported { rc.reportMultiAttachError(volumeToAttach, nodes) rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName) } continue } // 如果该卷没有attach到node过,不会上报multi attach,这是最常见的情况了 } // 触发volume attach动作 err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld) ... } } func (og *operationGenerator) GenerateAttachVolumeFunc( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { attachVolumeFunc := func() (error, error) { attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() // Execute attach 调用具体plugin进行attach devicePath, attachErr := volumeAttacher.Attach(volumeToAttach.VolumeSpec, volumeToAttach.NodeName) // Update actual state of world // 更新asw.attachedVolumes addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 入参是volumeToAttach.VolumeName func (asw *actualStateOfWorld) GetNodesForAttachedVolume(volumeName v1.UniqueVolumeName) []types.NodeName { asw.RLock() defer asw.RUnlock() // 在asw中的`attachedVolumes`中找到`volumeObj`, volumeObj, volumeExists := asw.attachedVolumes[volumeName] // 如果该卷没有attached过,或者attach过但是记录未空,直接返回空节点列表 if !volumeExists || len(volumeObj.nodesAttachedTo) == 0 { return []types.NodeName{} } // 如果之前attach过,则返回attached过的节点列表 nodes := []types.NodeName{} for nodeName, nodesAttached := range volumeObj.nodesAttachedTo { if nodesAttached.attachedConfirmed { nodes = append(nodes, nodeName) } } return nodes }
主要是查看asw.attachedVolumes
对象的状态; nodes会有3种情况
0, 表第一次attach
非0,attach过且和本次节点相同
非0,attach过且和本次节点不相同
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func (rc *reconciler) reportMultiAttachError(volumeToAttach cache.VolumeToAttach, nodes []types.NodeName) { // 对于dsw中的这个未attach到nodde的volume otherNodes := []types.NodeName{} otherNodesStr := []string{} // 根据volume对象当前绑定的节点,筛选出other nodes for _, node := range nodes { if node != volumeToAttach.NodeName { otherNodes = append(otherNodes, node) otherNodesStr = append(otherNodesStr, string(node)) } } // 在other nodes上统计引用该volume的pod pods := rc.desiredStateOfWorld.GetVolumePodsOnNodes(otherNodes, volumeToAttach.VolumeName) // 在其他节点找不到使用该volume的pod,说明pod已经被删掉了 // asw.attachedVolumes中残留这volume之前绑定的信息,而此时volume绑到其他node上,触发multi attach告警,volume只能唯一绑定某个node // 可能是清理不全导致,所以每一步状态记录都要慎重,因为你要维护好它的状态啊 if len(pods) == 0 { simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another") return } // 到这里,说明在其他节点上有引用该volume的pod, // 通常是使用者的问题,比如replicaset使用同一个pvc,但是有多个副本。因为dp通常是// 无状态的,不该开启多副本还使用pvc. 需要改成sts来使用多副本, // 需要上报错误信息提醒用户 // 统计出使用重复使用volume的pod信息,即 // 遍历本次volume被分配到的pods列表 for _, scheduledPod := range volumeToAttach.ScheduledPods { // 与其他接地点上,使用该volume的pod比对 for _, pod := range pods { if pod.Namespace == scheduledPod.Namespace { localPodNames = append(localPodNames, pod.Name) } else { otherPods++ } } } // Log all pods for system admin podNames := []string{} for _, pod := range pods { podNames = append(podNames, pod.Namespace+"/"+pod.Name) } 将已经引用该卷的pod及所在node的信息以事件形式上报到本次pod中,提醒用户 detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already used by pods %s on node %s", strings.Join(podNames, ", "), strings.Join(otherNodesStr, ", "))) }
volume在node1上attach,又被attach到node2上,但是node1上status volumeattached还没有清理完,就会报multi-attach,等node1清理完成就不会报该错了
csi plugin attach pkg/volume/csi/csi_attacher.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) { ... // 创建attachment对象 attachment := &storage.VolumeAttachment{ ObjectMeta: meta.ObjectMeta{ Name: attachID, }, Spec: storage.VolumeAttachmentSpec{ NodeName: node, Attacher: pvSrc.Driver, Source: vaSrc, }, } _, err = c.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) // 一致等待volume attachment对象status状态 // 2mins 超时时间 // csi-attacher sidecar 会watch该对象, 完成具体的附着动作 if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, csiTimeout); err != nil { return "", err } }
总结
asw是从当前node节点的volumeattached信息,该信息在reconcile后,本次调谐完成,由UpdateNodeStatuses()更新到node中
dsw是遍历所有pod中的每个volume,来更新asw.volumeattached这个map,key是卷的名字,value是卷的信息,包括attach到哪个节点。
adc的Attach/Detach
操作,只是创建/删除VolumeAttachment
对象;csi-attacher sidecar
会watch该对象,并向csi driver发起controllerpublishvolume/controllerUnpublishvolume request
完成后更新VolumeAttachment status
.
controller-manager只是完成了attach的操作,mount的调谐是在kubelet的volume manager
完成,其也有一套asw
、dsw
、reconcile
的过程。