一 簡介
二 pvController 初始化
-
初始化 eventRecorder
-
初始化 PersistentVolumeController 物件,
-
呼叫 VolumePluginMgr.InitPlugins() 方法 初始化儲存外掛,程式碼存在於 pkg/volume/plugins.go 檔案中
-
開始建立 informer 監聽叢集內的資源,初始化了如下 informer
-
PersistentVolumeInformer -
PersistentVolumeClaimInformer -
StorageClassInformer -
PodInformer -
NodeInformer
-
將 PV & PVC 的 event 分別放入 volumeQueue & claimQueue
-
為了不每次都迭代 pods ,自定義一個透過 pvc 鍵索引 pod 的索引器
-
初始化 intree 儲存 -> csi 遷移相關功能的 manager
三 開始執行
// 開始執行 pvController
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ctrl.claimQueue.ShutDown()
defer ctrl.volumeQueue.ShutDown()
klog.Infof("Starting persistent volume controller")
defer klog.Infof("Shutting down persistent volume controller")
if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
return
}
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)
<-stopCh
}
func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {
// 這裡不訪問 apiserver,是從本地快取拿出的物件,這些物件不可以被外部函式修改
volumeList, err := volumeLister.List(labels.Everything())
if err != nil {
klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
for _, volume := range volumeList {
// 我們不能改變 volume 物件,所以這裡我們copy一份新物件,對新物件進行操作
volumeClone := volume.DeepCopy()
if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil {
klog.Errorf("error updating volume cache: %v", err)
}
}
claimList, err := claimLister.List(labels.Everything())
if err != nil {
klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
for _, claim := range claimList {
if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil {
klog.Errorf("error updating claim cache: %v", err)
}
}
klog.V(4).Infof("controller initialized")
}
type persistentVolumeOrderedIndex struct {
store cache.Indexer
}
1 resync
func (ctrl *PersistentVolumeController) resync() {
klog.V(4).Infof("resyncing PV controller")
pvcs, err := ctrl.claimLister.List(labels.NewSelector())
if err != nil {
klog.Warningf("cannot list claims: %s", err)
return
}
for _, pvc := range pvcs {
ctrl.enqueueWork(ctrl.claimQueue, pvc)
}
pvs, err := ctrl.volumeLister.List(labels.NewSelector())
if err != nil {
klog.Warningf("cannot list persistent volumes: %s", err)
return
}
for _, pv := range pvs {
ctrl.enqueueWork(ctrl.volumeQueue, pv)
}
}
2 volumeWorker
workFunc := func() bool {
keyObj, quit := ctrl.volumeQueue.Get()
if quit {
return true
}
defer ctrl.volumeQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("volumeWorker[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
return false
}
volume, err := ctrl.volumeLister.Get(name)
if err == nil {
// The volume still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateVolume(volume)
return false
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting volume %q from informer: %v", key, err)
return false
}
// The volume is not in informer cache, the event must have been
// "delete"
volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting volume %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the volume from its cache
klog.V(2).Infof("deletion of volume %q was already processed", key)
return false
}
volume, ok := volumeObj.(*v1.PersistentVolume)
if !ok {
klog.Errorf("expected volume, got %+v", volumeObj)
return false
}
ctrl.deleteVolume(volume)
return false
}
updateVolume
func (ctrl *PersistentVolumeController) syncVolume(ctx context.Context, volume *v1.PersistentVolume) error {
klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))
...
// [Unit test set 4]
if volume.Spec.ClaimRef == nil {
// Volume is unused
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)
if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
return nil
} else /* pv.Spec.ClaimRef != nil */ {
// Volume is bound to a claim.
if volume.Spec.ClaimRef.UID == "" {
// The PV is reserved for a PVC; that PVC has not yet been
// bound to this PV; the PVC sync will handle it.
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
return nil
}
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Get the PVC by _name_
var claim *v1.PersistentVolumeClaim
claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
obj, found, err := ctrl.claims.GetByKey(claimName)
if err != nil {
return err
}
if !found {
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
found = !apierrors.IsNotFound(err)
if !found {
obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
found = !apierrors.IsNotFound(err)
}
}
}
if !found {
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Fall through with claim = nil
} else {
var ok bool
claim, ok = obj.(*v1.PersistentVolumeClaim)
if !ok {
return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
}
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))
}
if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {
klog.V(4).Infof("Maybe cached claim: %s is not the newest one, we should fetch it from apiserver", claimrefToClaimKey(volume.Spec.ClaimRef))
claim, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
} else if claim != nil {
// Treat the volume as bound to a missing claim.
if claim.UID != volume.Spec.ClaimRef.UID {
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a newer UID than pv.ClaimRef, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
claim = nil
} else {
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a same UID with pv.ClaimRef", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
}
}
}
if claim == nil {
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
// Also, log this only once:
klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)
if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
// Nothing was saved; we will fall back into the same condition
// in the next call to this method
return err
}
}
if err = ctrl.reclaimVolume(volume); err != nil {
// Release failed, we will fall back into the same condition
// in the next call to this method
return err
}
if volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain {
// volume is being retained, it references a claim that does not exist now.
klog.V(4).Infof("PersistentVolume[%s] references a claim %q (%s) that is not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), volume.Spec.ClaimRef.UID)
}
return nil
} else if claim.Spec.VolumeName == "" {
if pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {
volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)
ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)
claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)
// Skipping syncClaim
return nil
}
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
// The binding is not completed; let PVC sync handle it
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)
} else {
// Dangling PV; try to re-establish the link in the PVC sync
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)
}
ctrl.claimQueue.Add(claimToClaimKey(claim))
return nil
} else if claim.Spec.VolumeName == volume.Name {
// Volume is bound to a claim properly, update status if necessary
klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)
if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
return nil
} else {
// Volume is bound to a claim, but the claim is bound elsewhere
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
// Also, log this only once:
klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)
if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
// Nothing was saved; we will fall back into the same condition
// in the next call to this method
return err
}
}
if err = ctrl.reclaimVolume(volume); err != nil {
return err
}
return nil
} else {
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)
if err = ctrl.unbindVolume(volume); err != nil {
return err
}
return nil
} else {
// The PV must have been created with this ptr; leave it alone.
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)
if err = ctrl.unbindVolume(volume); err != nil {
return err
}
return nil
}
}
}
}
}
-
當Spec.ClaimRef.UID 為空的時候,說明 pvc 還未繫結 pv, 呼叫ctrl.updateVolumePhase 使得 pv 進入 Available 狀態, 方法返回,等待 pvc syncClaim 方法處理
-
使用 Spec.ClaimRef 相關的 pvc 資訊獲取 pv_controller快取的pvc
-
如果 pvc 沒有找到
-
有可能是叢集壓力過大快取沒有更新,則進一步從 informercache 中找,如果 informercache裡面還是沒有的話則進一步從apiserver中去找
-
這裡如果發現 非 Released & 非 Failed 的pv 經過上述步驟仍然找不到 pvc 的話,說明 pvc 被刪除。在最新的kubernetes 版本中會檢查reclaimPoilcy,對 pv的狀態進行處理
-
找到 pvc 之後
-
檢查 pvc的 volumeMode 和 pv 的 volumeMode是否一致,不一致報 event 出來
-
如果發現有這個 pv 有 AnnBoundByController = "pv.kubernetes.io/bound-by-controller" 這個annotation 說明 pvc/pv 流程正在繫結中
-
將 pvc 放到 claimQueue 裡面, 讓 claimWorker 進行處理
-
如果是 pv 是動態建立的情況下,並且 pv 的 ReclaimPolicy 是 delete 的情況下, 說明 pvc 已經綁定了其他pv, 將 pv 置為 released 的狀態, 等待deleters 刪除
-
如果 pv 不是動態建立的情況下,將 pv 的 ClaimRef 欄位置為空,將其 unbound 掉
3 claimWorker
workFunc := func() bool {
keyObj, quit := ctrl.claimQueue.Get()
if quit {
return true
}
defer ctrl.claimQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("claimWorker[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
return false
}
claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
if err == nil {
// The claim still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateClaim(claim)
return false
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
return false
}
// The claim is not in informer cache, the event must have been "delete"
claimObj, found, err := ctrl.claims.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting claim %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the claim from its cache
klog.V(2).Infof("deletion of claim %q was already processed", key)
return false
}
claim, ok := claimObj.(*v1.PersistentVolumeClaim)
if !ok {
klog.Errorf("expected claim, got %+v", claimObj)
return false
}
ctrl.deleteClaim(claim)
return false
}
syncUnboundClaim
func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {
if claim.Spec.VolumeName == "" {
// User did not care which PV they get.
delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)
if err != nil {
return err
}
// [Unit test set 1]
volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
if err != nil {
klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)
return fmt.Errorf("error finding PV for claim %q: %w", claimToClaimKey(claim), err)
}
if volume == nil {
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))
switch {
case delayBinding && !pvutil.IsDelayBindingProvisioning(claim):
if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil {
return err
}
case storagehelpers.GetPersistentVolumeClaimClass(claim) != "":
if err = ctrl.provisionClaim(ctx, claim); err != nil {
return err
}
return nil
default:
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")
}
// Mark the claim as Pending and try to find a match in the next
// periodic syncClaim
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
} else /* pv != nil */ {
// Found a PV for this claim
// OBSERVATION: pvc is "Pending", pv is "Available"
claimKey := claimToClaimKey(claim)
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))
if err = ctrl.bind(volume, claim); err != nil {
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
return err
}
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)
return nil
}
} else /* pvc.Spec.VolumeName != nil */ {
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
if err != nil {
return err
}
if !found {
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
} else {
volume, ok := obj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)
}
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
if volume.Spec.ClaimRef == nil {
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))
if err = checkVolumeSatisfyClaim(volume, claim); err != nil {
klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)
// send an event
msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)
// volume does not satisfy the requirements of the claim
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
} else if err = ctrl.bind(volume, claim); err != nil {
// On any error saving the volume or the claim, subsequent
// syncClaim will finish the binding.
return err
}
// OBSERVATION: pvc is "Bound", pv is "Bound"
return nil
} else if pvutil.IsVolumeBoundToClaim(volume, claim) {
// User asked for a PV that is claimed by this PVC
// OBSERVATION: pvc is "Pending", pv is "Bound"
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))
// Finish the volume binding by adding claim UID.
if err = ctrl.bind(volume, claim); err != nil {
return err
}
// OBSERVATION: pvc is "Bound", pv is "Bound"
return nil
} else {
// User asked for a PV that is claimed by someone else
// OBSERVATION: pvc is "Pending", pv is "Bound"
if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) {
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))
claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)
// User asked for a specific PV, retry later
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
} else {
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))
claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)
return fmt.Errorf("invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))
}
}
}
}
}
-
如果當前 pvc 的 volumeName 為空
-
判斷當前pvc 是否是延遲繫結的
-
呼叫 volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) 找出對應的 pv
-
如果找到 volume 的話
-
呼叫 ctrl.bind(volume, claim) 方法進行繫結
-
如果沒有找到 volume 的話
-
如果是延遲繫結, 並且還未觸發(pod 未引用)則 emit event 到 pvc 上
-
如果 pvc 綁定了 sc, 呼叫 ctrl.provisionClaim(ctx, claim) 方法
-
分析 pvc yaml, 找到 provisioner driver
-
啟動一個 goroutine
-
呼叫 ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass) 進行建立工作
provisionClaimOperation
func (ctrl *PersistentVolumeController) provisionClaimOperation(
ctx context.Context,
claim *v1.PersistentVolumeClaim,
plugin vol.ProvisionableVolumePlugin,
storageClass *storage.StorageClass) (string, error) {
claimClass := storagehelpers.GetPersistentVolumeClaimClass(claim)
klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)
pluginName := plugin.GetPluginName()
if pluginName != "kubernetes.io/csi" && claim.Spec.DataSource != nil {
strerr := fmt.Sprintf("plugin %q is not a CSI plugin. Only CSI plugin can provision a claim with a datasource", pluginName)
return pluginName, fmt.Errorf(strerr)
}
provisionerName := storageClass.Provisioner
// Add provisioner annotation to be consistent with external provisioner workflow
newClaim, err := ctrl.setClaimProvisioner(ctx, claim, provisionerName)
if err != nil {
// Save failed, the controller will retry in the next sync
klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)
return pluginName, err
}
claim = newClaim
pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
klog.V(3).Infof("error reading persistent volume %q: %v", pvName, err)
return pluginName, err
}
if err == nil && volume != nil {
// Volume has been already provisioned, nothing to do.
klog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))
return pluginName, err
}
// Prepare a claimRef to the claim early (to fail before a volume is
// provisioned)
claimRef, err := ref.GetReference(scheme.Scheme, claim)
if err != nil {
klog.V(3).Infof("unexpected error getting claim reference: %v", err)
return pluginName, err
}
// Gather provisioning options
tags := make(map[string]string)
tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace
tags[CloudVolumeCreatedForClaimNameTag] = claim.Name
tags[CloudVolumeCreatedForVolumeNameTag] = pvName
options := vol.VolumeOptions{
PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
MountOptions: storageClass.MountOptions,
CloudTags: &tags,
ClusterName: ctrl.clusterName,
PVName: pvName,
PVC: claim,
Parameters: storageClass.Parameters,
}
// Refuse to provision if the plugin doesn't support mount options, creation
// of PV would be rejected by validation anyway
if !plugin.SupportsMountOption() && len(options.MountOptions) > 0 {
strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)
klog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())
}
// Provision the volume
provisioner, err := plugin.NewProvisioner(options)
if err != nil {
strerr := fmt.Sprintf("Failed to create provisioner: %v", err)
klog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, err
}
var selectedNode *v1.Node = nil
if nodeName, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok {
selectedNode, err = ctrl.NodeLister.Get(nodeName)
if err != nil {
strerr := fmt.Sprintf("Failed to get target node: %v", err)
klog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, err
}
}
allowedTopologies := storageClass.AllowedTopologies
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
volume, err = provisioner.Provision(selectedNode, allowedTopologies)
opComplete(volumetypes.CompleteFuncParam{Err: &err})
if err != nil {
ctrl.rescheduleProvisioning(claim)
strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
klog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, err
}
klog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))
// Create Kubernetes PV object for the volume.
if volume.Name == "" {
volume.Name = pvName
}
// Bind it to the claim
volume.Spec.ClaimRef = claimRef
volume.Status.Phase = v1.VolumeBound
volume.Spec.StorageClassName = claimClass
// Add AnnBoundByController (used in deleting the volume)
metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnBoundByController, "yes")
metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned, plugin.GetPluginName())
// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
klog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)
var newVol *v1.PersistentVolume
if newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), volume, metav1.CreateOptions{}); err == nil || apierrors.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
klog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))
err = nil
} else {
klog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))
_, updateErr := ctrl.storeVolumeUpdate(newVol)
if updateErr != nil {
// We will get an "volume added" event soon, this is not a big error
klog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)
}
}
break
}
// Save failed, try again after a while.
klog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)
time.Sleep(ctrl.createProvisionedPVInterval)
}
if err != nil {
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
klog.V(3).Info(strerr)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
var deleteErr error
var deleted bool
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
_, deleted, deleteErr = ctrl.doDeleteVolume(volume)
if deleteErr == nil && deleted {
// Delete succeeded
klog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)
break
}
if !deleted {
klog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())
break
}
// Delete failed, try again after a while.
klog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)
time.Sleep(ctrl.createProvisionedPVInterval)
}
if deleteErr != nil {
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)
klog.V(2).Info(strerr)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)
}
} else {
klog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))
msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)
}
return pluginName, nil
}
-
檢查driver,只有 csi 型別的 driver 才允許使用 dataSource 欄位
-
為 pvc 加 claim.Annotations["volume.kubernetes.io/storage-provisioner"] = class.Provisioner annotation
-
根據規則拼出 pv Name = "pvc-" + pvc.UID
-
如果找到了 pv, 則說明 pv已經存在,跳過 provision
-
收集pvc/pv 基本資訊封裝到 options 中
-
對 plugin 進行校驗, 如果plugin不支援mount操作,則直接拒絕provision 請求
-
呼叫plugin.NewProvisioner(options) 建立 provisioner, 介面實現了Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) 方法,注意,該方法為同步方法
-
Provision 方法返回了 PersistentVolume例項
-
為創建出來的 pv 關聯 pvc 物件(ClaimRef),嘗試建立 pv 物件 (重複多次)
-
如果建立 pv 失敗,則嘗試呼叫 Delete 方法刪除建立的volume資源
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {
if claim.Spec.VolumeName == "" {
// Claim was bound before but not any more.
if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {
return err
}
return nil
}
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
if err != nil {
return err
}
if !found {
// Claim is bound to a non-existing volume.
if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {
return err
}
return nil
} else {
volume, ok := obj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
}
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
if volume.Spec.ClaimRef == nil {
// Claim is bound but volume has come unbound.
// Or, a claim was bound and the controller has not received updated
// volume yet. We can't distinguish these cases.
// Bind the volume again and set all states to Bound.
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))
if err = ctrl.bind(volume, claim); err != nil {
// Objects not saved, next syncPV or syncClaim will try again
return err
}
return nil
} else if volume.Spec.ClaimRef.UID == claim.UID {
// All is well
// NOTE: syncPV can handle this so it can be left out.
// NOTE: bind() call here will do nothing in most cases as
// everything should be already set.
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))
if err = ctrl.bind(volume, claim); err != nil {
// Objects not saved, next syncPV or syncClaim will try again
return err
}
return nil
} else {
// Claim is bound but volume has a different claimant.
// Set the claim phase to 'Lost', which is a terminal
// phase.
if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {
return err
}
return nil
}
}
}
-
如果沒找到, 說明 pvc 綁定了一個不存在的pv,報 event 並返回
-
如果找到了pv
-
檢查 pv.Spec.ClaimRef 欄位, 如果 為空,說明 pv 還沒有繫結 pvc, 呼叫 ctrl.bind(volume, claim); 方法進行繫結 -
pv.ClaimRef.UID == pvc.UID, 呼叫 bind 方法,但是大多數情況會直接返回(因為所有的操作都已經做完了) -
其他情況說明 volume 綁定了其他的 pvc, 更新pvc 的狀態 為 lost 並報出 event
四 總結

《三步贏大禮,跟著冰河學Java》
由乘風者專家博主——冰河傾力打造的《Java8從入門到精通》電子書已在全網受到數十萬追捧,熱度空前。這本最全Java8新特性知識全解可謂是程式設計師快速進階的標配手冊。即日起,下載電子書,發文並分享,三步完成即可領取活動獎勵,更有機會獲得冰河親筆簽名的新書《深入理解分散式事務:原理與實戰》。福利多多,趕快拉動你的小夥伴一起參與進來給自己一個年終獎吧!
活動詳情如下:步驟一:點選下載電子書;步驟二:在開發者社群發表一篇Java相關的文章並將連結填在問卷中;步驟三:邀請三位好友下載電子書,即可完成打卡領取小米無線滑鼠一個(限量100個)活動結束由冰河挑選十篇優質文章,作者獲冰河親筆簽名圖書一本以及阿里雲定製藍牙音箱一個。(活動時間:12月23日-1月14日,優質文章將於1月17日公佈,獎品將在活動結束後7個工作日內發放)
點選閱讀原文檢視詳情頁!
關鍵詞
方法
物件
狀態
快取
步驟