From 98112e86270d9b6c0904529145f928da7e14756f Mon Sep 17 00:00:00 2001 From: Reijer Copier Date: Mon, 21 Jun 2021 16:56:35 +0200 Subject: [PATCH] fix(controller): dehydrate workflow before deleting offloaded node status (#6112) Signed-off-by: uturunku1 --- .../explosive_offload_node_status_repo.go | 2 +- persist/sqldb/mocks/OffloadNodeStatusRepo.go | 8 +-- persist/sqldb/offload_node_status_repo.go | 10 ++- workflow/controller/controller.go | 71 ++++++++++++------- workflow/controller/indexes/indexes.go | 1 + workflow/controller/indexes/uid_index.go | 14 ++++ 6 files changed, 74 insertions(+), 32 deletions(-) create mode 100644 workflow/controller/indexes/uid_index.go diff --git a/persist/sqldb/explosive_offload_node_status_repo.go b/persist/sqldb/explosive_offload_node_status_repo.go index 85716a972a48..d9e1816af2cb 100644 --- a/persist/sqldb/explosive_offload_node_status_repo.go +++ b/persist/sqldb/explosive_offload_node_status_repo.go @@ -33,6 +33,6 @@ func (n *explosiveOffloadNodeStatusRepo) Delete(string, string) error { return OffloadNotSupportedError } -func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) ([]UUIDVersion, error) { +func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]string, error) { return nil, OffloadNotSupportedError } diff --git a/persist/sqldb/mocks/OffloadNodeStatusRepo.go b/persist/sqldb/mocks/OffloadNodeStatusRepo.go index 06934e7e333a..f8981efc2bd1 100644 --- a/persist/sqldb/mocks/OffloadNodeStatusRepo.go +++ b/persist/sqldb/mocks/OffloadNodeStatusRepo.go @@ -89,15 +89,15 @@ func (_m *OffloadNodeStatusRepo) List(namespace string) (map[sqldb.UUIDVersion]v } // ListOldOffloads provides a mock function with given fields: namespace -func (_m *OffloadNodeStatusRepo) ListOldOffloads(namespace string) ([]sqldb.UUIDVersion, error) { +func (_m *OffloadNodeStatusRepo) ListOldOffloads(namespace string) (map[string][]string, error) { ret := _m.Called(namespace) - var r0 []sqldb.UUIDVersion - if rf, ok := ret.Get(0).(func(string) []sqldb.UUIDVersion); ok { + var r0 map[string][]string + if rf, ok := ret.Get(0).(func(string) map[string][]string); ok { r0 = rf(namespace) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]sqldb.UUIDVersion) + r0 = ret.Get(0).(map[string][]string) } } diff --git a/persist/sqldb/offload_node_status_repo.go b/persist/sqldb/offload_node_status_repo.go index 2daa4de7ddda..667da1aa082d 100644 --- a/persist/sqldb/offload_node_status_repo.go +++ b/persist/sqldb/offload_node_status_repo.go @@ -26,7 +26,7 @@ type OffloadNodeStatusRepo interface { Save(uid, namespace string, nodes wfv1.Nodes) (string, error) Get(uid, version string) (wfv1.Nodes, error) List(namespace string) (map[UUIDVersion]wfv1.Nodes, error) - ListOldOffloads(namespace string) ([]UUIDVersion, error) + ListOldOffloads(namespace string) (map[string][]string, error) Delete(uid, version string) error IsEnabled() bool } @@ -178,7 +178,7 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes, return res, nil } -func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]UUIDVersion, error) { +func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]string, error) { log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing old offloaded nodes") var records []UUIDVersion err := wdc.session. @@ -191,7 +191,11 @@ func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]UUIDVersion, er if err != nil { return nil, err } - return records, nil + x := make(map[string][]string) + for _, r := range records { + x[r.UID] = append(x[r.UID], r.Version) + } + return x, nil } func (wdc *nodeOffloadRepo) Delete(uid, version string) error { diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 1e93b88f250e..5e5a10d2eb6d 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -187,6 +187,7 @@ var indexers = cache.Indexers{ indexes.SemaphoreConfigIndexName: indexes.WorkflowSemaphoreKeysIndexFunc(), indexes.WorkflowPhaseIndex: indexes.MetaWorkflowPhaseIndexFunc(), indexes.ConditionsIndex: indexes.ConditionsIndexFunc, + indexes.UIDIndex: indexes.MetaUIDFunc, } // Run starts an Workflow resource controller @@ -553,37 +554,59 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{}) log.WithField("err", err).Error("Failed to list old offloaded nodes") continue } - if len(oldRecords) == 0 { - log.Info("Zero old offloads, nothing to do") - continue - } - // get every lives workflow (1000s) into a map - liveOffloadNodeStatusVersions := make(map[types.UID]string) - workflows, err := util.NewWorkflowLister(wfc.wfInformer).List() - if err != nil { - log.WithField("err", err).Error("Failed to list incomplete workflows") - continue - } - for _, wf := range workflows { - // this could be the empty string - as it is no longer offloaded - liveOffloadNodeStatusVersions[wf.UID] = wf.Status.OffloadNodeStatusVersion - } - log.WithFields(log.Fields{"len_wfs": len(liveOffloadNodeStatusVersions), "len_old_offloads": len(oldRecords)}).Info("Deleting old offloads that are not live") - for _, record := range oldRecords { - // this could be empty string - nodeStatusVersion, ok := liveOffloadNodeStatusVersions[types.UID(record.UID)] - if !ok || nodeStatusVersion != record.Version { - err := wfc.offloadNodeStatusRepo.Delete(record.UID, record.Version) - if err != nil { - log.WithField("err", err).Error("Failed to delete offloaded nodes") - } + log.WithField("len_wfs", len(oldRecords)).Info("Deleting old offloads that are not live") + for uid, versions := range oldRecords { + if err := wfc.deleteOffloadedNodesForWorkflow(uid, versions); err != nil { + log.WithError(err).WithField("uid", uid).Error("Failed to delete old offloaded nodes") } } + log.Info("Workflow GC finished") } } } } +func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versions []string) error { + workflows, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.UIDIndex, uid) + if err != nil { + return err + } + var wf *wfv1.Workflow + switch l := len(workflows); l { + case 0: + log.WithField("uid", uid).Info("Workflow missing, probably deleted") + case 1: + un := workflows[0].(*unstructured.Unstructured) + wf, err = util.FromUnstructured(un) + if err != nil { + return err + } + key := wf.ObjectMeta.Namespace + "/" + wf.ObjectMeta.Name + wfc.workflowKeyLock.Lock(key) + defer wfc.workflowKeyLock.Unlock(key) + // workflow might still be hydrated + if wfc.hydrator.IsHydrated(wf) { + log.WithField("uid", wf.UID).Info("Hydrated workflow encountered") + err = wfc.hydrator.Dehydrate(wf) + if err != nil { + return err + } + } + default: + return fmt.Errorf("expected no more than 1 workflow, got %d", l) + } + for _, version := range versions { + // skip delete if offload is live + if wf != nil && wf.Status.OffloadNodeStatusVersion == version { + continue + } + if err := wfc.offloadNodeStatusRepo.Delete(uid, version); err != nil { + return err + } + } + return nil +} + func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan struct{}) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) diff --git a/workflow/controller/indexes/indexes.go b/workflow/controller/indexes/indexes.go index 98758e6661b8..ea8d7250deed 100644 --- a/workflow/controller/indexes/indexes.go +++ b/workflow/controller/indexes/indexes.go @@ -15,4 +15,5 @@ const ( PodPhaseIndex = "pod.phase" ConditionsIndex = "status.conditions" SemaphoreConfigIndexName = "bySemaphoreConfigMap" + UIDIndex = "uid" ) diff --git a/workflow/controller/indexes/uid_index.go b/workflow/controller/indexes/uid_index.go new file mode 100644 index 000000000000..555571025af9 --- /dev/null +++ b/workflow/controller/indexes/uid_index.go @@ -0,0 +1,14 @@ +package indexes + +import ( + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/tools/cache" +) + +var MetaUIDFunc cache.IndexFunc = func(obj interface{}) ([]string, error) { + v, err := meta.Accessor(obj) + if err != nil { + return nil, nil + } + return []string{string(v.GetUID())}, nil +}