From 6a7dbabd49fea3c6b381efd0eaa68c0d4eba84b1 Mon Sep 17 00:00:00 2001 From: joyceliu Date: Fri, 8 Mar 2024 21:22:00 +0800 Subject: [PATCH] fix: Adjust the location structure of the playbook to facilitate sequential execution of the playbook Signed-off-by: joyceliu --- pkg/const/{context.go => common.go} | 9 -- pkg/controllers/task_controller.go | 52 +++---- pkg/converter/converter.go | 14 +- pkg/task/internal.go | 217 +++++++++++++++++++--------- pkg/variable/helper.go | 26 +++- pkg/variable/internal.go | 10 +- pkg/variable/variable.go | 189 +++++++++++------------- 7 files changed, 284 insertions(+), 233 deletions(-) rename pkg/const/{context.go => common.go} (84%) diff --git a/pkg/const/context.go b/pkg/const/common.go similarity index 84% rename from pkg/const/context.go rename to pkg/const/common.go index 0dfaf824..18849e7a 100644 --- a/pkg/const/context.go +++ b/pkg/const/common.go @@ -16,15 +16,6 @@ limitations under the License. package _const -// key in context - -// use in marshal playbook.Block -const ( - CtxBlockHosts = "block-hosts" - CtxBlockRole = "block-role" - CtxBlockWhen = "block-when" -) - // LocalHostName is the default local host name in inventory. const LocalHostName = "localhost" diff --git a/pkg/controllers/task_controller.go b/pkg/controllers/task_controller.go index 905cc3ea..64aa45cc 100644 --- a/pkg/controllers/task_controller.go +++ b/pkg/controllers/task_controller.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" - "k8s.io/utils/strings/slices" ctrl "sigs.k8s.io/controller-runtime" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -148,37 +147,27 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c } func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconcileOptions) (ctrl.Result, error) { - // find dependency tasks - dl, err := options.Variable.Get(variable.DependencyTasks{ + var nsTasks = &kubekeyv1alpha1.TaskList{} + if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(options.Pipeline.Namespace), ctrlclient.MatchingFields{ + kubekeyv1alpha1.TaskOwnerField: ctrlclient.ObjectKeyFromObject(options.Pipeline).String(), + }); err != nil { + klog.V(5).ErrorS(err, "list task error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String(), err) + return ctrl.Result{}, err + } + + // Infer the current task's phase from its dependent tasks. + dl, err := options.Variable.Get(variable.InferPhase{ LocationUID: string(options.Task.UID), + Tasks: nsTasks.Items, }) + klog.InfoS("infer phase", "phase", dl, "task-name", options.Task.Spec.Name) if err != nil { klog.V(5).ErrorS(err, "find dependency error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String()) return ctrl.Result{}, err } - dt, ok := dl.(variable.DependencyTask) - if !ok { - klog.V(5).ErrorS(err, "failed to convert dependency", "task", ctrlclient.ObjectKeyFromObject(options.Task).String()) - return ctrl.Result{}, fmt.Errorf("[Task %s] failed to convert dependency", ctrlclient.ObjectKeyFromObject(options.Task).String()) - } - var nsTasks = &kubekeyv1alpha1.TaskList{} - if len(dt.Tasks) != 0 { - if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(options.Pipeline.Namespace), ctrlclient.MatchingFields{ - kubekeyv1alpha1.TaskOwnerField: ctrlclient.ObjectKeyFromObject(options.Pipeline).String(), - }); err != nil { - klog.V(5).ErrorS(err, "list task error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String(), err) - return ctrl.Result{}, err - } - } - var dts []kubekeyv1alpha1.Task - for _, t := range nsTasks.Items { // if dependency tasks is empty, skip - if slices.Contains(dt.Tasks, string(t.UID)) { - dts = append(dts, t) - } - } // Based on the results of the executed tasks dependent on, infer the next phase of the current task. - switch dt.Strategy(dts) { + switch dl.(kubekeyv1alpha1.TaskPhase) { case kubekeyv1alpha1.TaskPhasePending: return ctrl.Result{Requeue: true}, nil case kubekeyv1alpha1.TaskPhaseRunning: @@ -231,7 +220,7 @@ func (r *TaskReconciler) prepareTask(ctx context.Context, options taskReconcileO return err } - var curVariable = lg.(variable.VariableData) + var curVariable = lg.(variable.VariableData).DeepCopy() if pt := variable.BoolVar(curVariable, "prepareTask"); pt != nil && *pt { klog.InfoS("prepareTask is true, skip", "task", ctrlclient.ObjectKeyFromObject(options.Task), "host", host) continue @@ -255,11 +244,13 @@ func (r *TaskReconciler) prepareTask(ctx context.Context, options taskReconcileO // set prepareTask to true curVariable["prepareTask"] = true - options.Variable.Merge(variable.HostMerge{ + if err := options.Variable.Merge(variable.HostMerge{ HostNames: []string{h}, LocationUID: string(options.Task.UID), Data: curVariable, - }) + }); err != nil { + return err + } } return nil } @@ -434,13 +425,16 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO return } // set item to runtime variable - options.Variable.Merge(variable.HostMerge{ + if err := options.Variable.Merge(variable.HostMerge{ HostNames: []string{h}, LocationUID: string(options.Task.UID), Data: variable.VariableData{ "item": item, }, - }) + }); err != nil { + stderr = "set loop item to variable error" + return + } stdout, stderr = r.executeModule(ctx, options.Task, modules.ExecOptions{ Args: options.Task.Spec.Module.Args, Host: host, diff --git a/pkg/converter/converter.go b/pkg/converter/converter.go index 187bbc95..ebe173d9 100644 --- a/pkg/converter/converter.go +++ b/pkg/converter/converter.go @@ -229,19 +229,7 @@ func fileToBlock(baseFS fs.FS, baseDir string, blocks []kkcorev1.Block) error { } // MarshalBlock marshal block to task -func MarshalBlock(ctx context.Context, block kkcorev1.Block) *kubekeyv1alpha1.Task { - var role string - if v := ctx.Value(_const.CtxBlockRole); v != nil { - role = v.(string) - } - hosts := ctx.Value(_const.CtxBlockHosts).([]string) - if block.RunOnce { // if run_once. execute on the first task - hosts = hosts[:1] - } - var when []string - if v := ctx.Value(_const.CtxBlockWhen); v != nil { - when = v.([]string) - } +func MarshalBlock(ctx context.Context, role string, hosts []string, when []string, block kkcorev1.Block) *kubekeyv1alpha1.Task { task := &kubekeyv1alpha1.Task{ TypeMeta: metav1.TypeMeta{ Kind: "Task", diff --git a/pkg/task/internal.go b/pkg/task/internal.go index 417d8953..7af05839 100644 --- a/pkg/task/internal.go +++ b/pkg/task/internal.go @@ -34,7 +34,6 @@ import ( kkcorev1 "github.com/kubesphere/kubekey/v4/pkg/apis/core/v1" kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1" kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1" - _const "github.com/kubesphere/kubekey/v4/pkg/const" "github.com/kubesphere/kubekey/v4/pkg/converter" "github.com/kubesphere/kubekey/v4/pkg/modules" "github.com/kubesphere/kubekey/v4/pkg/project" @@ -94,6 +93,15 @@ func (c *taskController) AddTasks(ctx context.Context, pipeline *kubekeyv1.Pipel return err } + // set pipeline location + if err := v.Merge(variable.LocationMerge{ + UID: string(pipeline.UID), + Name: pipeline.Name, + }); err != nil { + klog.V(4).ErrorS(err, "set top location for pipeline", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) + return err + } + for _, play := range pb.Play { if !play.Taggable.IsEnabled(pipeline.Spec.Tags, pipeline.Spec.SkipTags) { // if not match the tags. skip @@ -119,9 +127,8 @@ func (c *taskController) AddTasks(ctx context.Context, pipeline *kubekeyv1.Pipel } // merge host information to runtime variable if err := v.Merge(variable.HostMerge{ - HostNames: []string{h}, - LocationUID: "", - Data: gfv, + HostNames: []string{h}, + Data: gfv, }); err != nil { klog.V(4).ErrorS(err, "Merge gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "host", h) return err @@ -150,22 +157,31 @@ func (c *taskController) AddTasks(ctx context.Context, pipeline *kubekeyv1.Pipel klog.V(4).ErrorS(nil, "Host is empty", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) return fmt.Errorf("host is empty") } - hctx := context.WithValue(ctx, _const.CtxBlockHosts, serials) - // generate playbook uid which set in location variable. - puid := uuid.NewString() - // merge play's vars in location variable. + // generate playbook uid. + uid := uuid.NewString() + // set play location if err := v.Merge(variable.LocationMerge{ - UID: puid, - Name: play.Name, - Type: variable.BlockLocation, - Vars: play.Vars, + ParentUID: string(pipeline.UID), + UID: uid, + Name: play.Name, + Type: variable.BlockLocation, + Vars: play.Vars, }); err != nil { - klog.V(4).ErrorS(err, "Merge play to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name) + klog.V(4).ErrorS(err, "set block location for play", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name) return err } // generate task from pre tasks - preTasks, err := c.createTasks(hctx, v, pipeline, play.PreTasks, nil, puid, variable.BlockLocation) + preTasks, err := c.createTasks(ctx, createTasksOptions{ + variable: v, + pipeline: pipeline, + hosts: serials, + blocks: play.PreTasks, + uid: uid, + role: "", + when: nil, + locationType: variable.BlockLocation, + }) if err != nil { klog.V(4).ErrorS(err, "Get pre task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name) return err @@ -174,18 +190,27 @@ func (c *taskController) AddTasks(ctx context.Context, pipeline *kubekeyv1.Pipel nsTasks.Items = append(nsTasks.Items, preTasks...) // generate task from role for _, role := range play.Roles { - ruid := uuid.NewString() + roleuid := uuid.NewString() if err := v.Merge(variable.LocationMerge{ - ParentUID: puid, - UID: ruid, - Name: play.Name, + ParentUID: uid, + UID: roleuid, + Name: role.Role, Type: variable.BlockLocation, Vars: role.Vars, }); err != nil { - klog.V(4).ErrorS(err, "Merge role to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name, "role", role.Role) + klog.V(4).ErrorS(err, "set block location for role", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name, "role", role.Role) return err } - roleTasks, err := c.createTasks(context.WithValue(hctx, _const.CtxBlockRole, role.Role), v, pipeline, role.Block, role.When.Data, ruid, variable.BlockLocation) + roleTasks, err := c.createTasks(ctx, createTasksOptions{ + variable: v, + pipeline: pipeline, + hosts: serials, + blocks: role.Block, + uid: roleuid, + role: role.Role, + when: role.When.Data, + locationType: variable.BlockLocation, + }) if err != nil { klog.V(4).ErrorS(err, "Get role task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name, "role", role.Role) return err @@ -193,14 +218,32 @@ func (c *taskController) AddTasks(ctx context.Context, pipeline *kubekeyv1.Pipel nsTasks.Items = append(nsTasks.Items, roleTasks...) } // generate task from tasks - tasks, err := c.createTasks(hctx, v, pipeline, play.Tasks, nil, puid, variable.BlockLocation) + tasks, err := c.createTasks(ctx, createTasksOptions{ + variable: v, + pipeline: pipeline, + hosts: serials, + blocks: play.Tasks, + uid: uid, + role: "", + when: nil, + locationType: variable.BlockLocation, + }) if err != nil { klog.V(4).ErrorS(err, "Get task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name) return err } nsTasks.Items = append(nsTasks.Items, tasks...) // generate task from post tasks - postTasks, err := c.createTasks(hctx, v, pipeline, play.Tasks, nil, puid, variable.BlockLocation) + postTasks, err := c.createTasks(ctx, createTasksOptions{ + variable: v, + pipeline: pipeline, + hosts: serials, + blocks: play.Tasks, + uid: uid, + role: "", + when: nil, + locationType: variable.BlockLocation, + }) if err != nil { klog.V(4).ErrorS(err, "Get post task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name) return err @@ -212,83 +255,121 @@ func (c *taskController) AddTasks(ctx context.Context, pipeline *kubekeyv1.Pipel return nil } +type createTasksOptions struct { + // pipeline level config + variable variable.Variable + pipeline *kubekeyv1.Pipeline + // playbook level config + hosts []string // which hosts will run playbook + // blocks level config + blocks []kkcorev1.Block + uid string // the parent location uid for blocks + role string // role name of blocks + when []string // when condition for blocks + locationType variable.LocationType // location type for blocks +} + // createTasks convert ansible block to task -func (k *taskController) createTasks(ctx context.Context, v variable.Variable, pipeline *kubekeyv1.Pipeline, ats []kkcorev1.Block, when []string, puid string, locationType variable.LocationType) ([]kubekeyv1alpha1.Task, error) { +func (k *taskController) createTasks(ctx context.Context, options createTasksOptions) ([]kubekeyv1alpha1.Task, error) { var tasks []kubekeyv1alpha1.Task - for _, at := range ats { - if !at.Taggable.IsEnabled(pipeline.Spec.Tags, pipeline.Spec.SkipTags) { + for _, at := range options.blocks { + if !at.Taggable.IsEnabled(options.pipeline.Spec.Tags, options.pipeline.Spec.SkipTags) { continue } - uid := uuid.NewString() - atWhen := append(when, at.When.Data...) switch { - case len(at.Block) != 0: // block - // add location variable - if err := v.Merge(variable.LocationMerge{ + case len(at.Block) != 0: + uid := uuid.NewString() + // set block location + if err := options.variable.Merge(variable.LocationMerge{ UID: uid, - ParentUID: puid, - Type: locationType, + ParentUID: options.uid, + Type: options.locationType, Name: at.Name, Vars: at.Vars, }); err != nil { - klog.V(4).ErrorS(err, "Merge block to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "set block location for block", "pipeline", ctrlclient.ObjectKeyFromObject(options.pipeline), "block", at.Name) return nil, err } // add block - block, err := k.createTasks(ctx, v, pipeline, at.Block, atWhen, uid, variable.BlockLocation) + blockTasks, err := k.createTasks(ctx, createTasksOptions{ + hosts: options.hosts, + role: options.role, + variable: options.variable, + pipeline: options.pipeline, + blocks: at.Block, + when: append(options.when, at.When.Data...), + uid: uid, + locationType: variable.BlockLocation, + }) if err != nil { - klog.V(4).ErrorS(err, "Get block task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "Get block task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(options.pipeline), "block", at.Name) return nil, err } - tasks = append(tasks, block...) + tasks = append(tasks, blockTasks...) if len(at.Always) != 0 { - always, err := k.createTasks(ctx, v, pipeline, at.Always, atWhen, uid, variable.AlwaysLocation) + alwaysTasks, err := k.createTasks(ctx, createTasksOptions{ + variable: options.variable, + pipeline: options.pipeline, + hosts: options.hosts, + blocks: at.Always, + uid: uid, + role: options.role, + when: append(options.when, at.When.Data...), + locationType: variable.AlwaysLocation, + }) if err != nil { - klog.V(4).ErrorS(err, "Get always task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "Get always task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(options.pipeline), "block", at.Name) return nil, err } - tasks = append(tasks, always...) + tasks = append(tasks, alwaysTasks...) } if len(at.Rescue) != 0 { - rescue, err := k.createTasks(ctx, v, pipeline, at.Rescue, atWhen, uid, variable.RescueLocation) + rescueTasks, err := k.createTasks(ctx, createTasksOptions{ + variable: options.variable, + pipeline: options.pipeline, + hosts: options.hosts, + blocks: at.Rescue, + uid: uid, + role: options.role, + when: append(options.when, at.When.Data...), + locationType: variable.RescueLocation, + }) if err != nil { - klog.V(4).ErrorS(err, "Get rescue task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "Get rescue task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(options.pipeline), "block", at.Name) return nil, err } - tasks = append(tasks, rescue...) + tasks = append(tasks, rescueTasks...) } - case at.IncludeTasks != "": // include_task + case at.IncludeTasks != "": // do nothing - at.Name = at.IncludeTasks - - // add location variable - if err := v.Merge(variable.LocationMerge{ - UID: uid, - ParentUID: puid, - Type: locationType, - Name: at.Name, + // set includeTask location + if err := options.variable.Merge(variable.LocationMerge{ + UID: uuid.NewString(), + ParentUID: options.uid, + Type: options.locationType, + Name: at.IncludeTasks, Vars: at.Vars, }); err != nil { - klog.V(4).ErrorS(err, "Merge block to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "set block location for includeTask", "pipeline", ctrlclient.ObjectKeyFromObject(options.pipeline), "block", at.Name) return nil, err } - default: // task - task := converter.MarshalBlock(context.WithValue(ctx, _const.CtxBlockWhen, atWhen), at) + default: + task := converter.MarshalBlock(ctx, options.role, options.hosts, append(options.when, at.When.Data...), at) // complete by pipeline - task.GenerateName = pipeline.Name + "-" - task.Namespace = pipeline.Namespace - if err := controllerutil.SetControllerReference(pipeline, task, k.schema); err != nil { - klog.V(4).ErrorS(err, "Set controller reference error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) + task.GenerateName = options.pipeline.Name + "-" + task.Namespace = options.pipeline.Namespace + if err := controllerutil.SetControllerReference(options.pipeline, task, k.schema); err != nil { + klog.V(4).ErrorS(err, "Set controller reference error", "pipeline", ctrlclient.ObjectKeyFromObject(options.pipeline), "block", at.Name) return nil, err } // complete module by unknown field for n, a := range at.UnknownFiled { data, err := json.Marshal(a) if err != nil { - klog.V(4).ErrorS(err, "Marshal unknown field error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name, "field", n) + klog.V(4).ErrorS(err, "Marshal unknown field error", "pipeline", ctrlclient.ObjectKeyFromObject(options.pipeline), "block", at.Name, "field", n) return nil, err } if m := modules.FindModule(n); m != nil { @@ -298,30 +379,28 @@ func (k *taskController) createTasks(ctx context.Context, v variable.Variable, p } } if task.Spec.Module.Name == "" { // action is necessary for a task - klog.V(4).ErrorS(nil, "No module/action detected in task", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) + klog.V(4).ErrorS(nil, "No module/action detected in task", "pipeline", ctrlclient.ObjectKeyFromObject(options.pipeline), "block", at.Name) return nil, fmt.Errorf("no module/action detected in task: %s", task.Name) } // create task if err := k.client.Create(ctx, task); err != nil { - klog.V(4).ErrorS(err, "Create task error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "Create task error", "pipeline", ctrlclient.ObjectKeyFromObject(options.pipeline), "block", at.Name) return nil, err } - uid = string(task.UID) tasks = append(tasks, *task) - // add location variable - if err := v.Merge(variable.LocationMerge{ - UID: uid, - ParentUID: puid, - Type: locationType, + // set task location + if err := options.variable.Merge(variable.LocationMerge{ + UID: string(task.UID), + ParentUID: options.uid, + Type: options.locationType, Name: at.Name, Vars: at.Vars, }); err != nil { - klog.V(4).ErrorS(err, "Merge block to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "set block location for task", "pipeline", ctrlclient.ObjectKeyFromObject(options.pipeline), "block", at.Name) return nil, err } } - } return tasks, nil } diff --git a/pkg/variable/helper.go b/pkg/variable/helper.go index fba2113f..9bee54fb 100644 --- a/pkg/variable/helper.go +++ b/pkg/variable/helper.go @@ -41,14 +41,26 @@ func mergeVariables(v1, v2 VariableData) VariableData { return mergedVars } -func findLocation(loc []location, uid string) *location { - for i := range loc { - if uid == loc[i].UID { - return &loc[i] +func findLocation(loc *location, uid string) *location { + if uid == loc.UID { + return loc + } + // find from block + for i := range loc.Block { + if r := findLocation(&loc.Block[i], uid); r != nil { + return r } - // find in block,always,rescue - if l := findLocation(append(append(loc[i].Block, loc[i].Always...), loc[i].Rescue...), uid); l != nil { - return l + } + // find from always + for i := range loc.Always { + if r := findLocation(&loc.Always[i], uid); r != nil { + return r + } + } + // find from rescue + for i := range loc.Rescue { + if r := findLocation(&loc.Rescue[i], uid); r != nil { + return r } } return nil diff --git a/pkg/variable/internal.go b/pkg/variable/internal.go index 5915f67e..1d4b832c 100644 --- a/pkg/variable/internal.go +++ b/pkg/variable/internal.go @@ -50,7 +50,7 @@ type value struct { // Location is the complete location index. // This index can help us determine the specific location of the task, // enabling us to retrieve the task's parameters and establish the execution order. - Location []location `json:"location"` + Location *location `json:"location"` } func (v value) deepCopy() value { @@ -120,6 +120,14 @@ func (v VariableData) String() string { return string(data) } +func (v VariableData) DeepCopy() VariableData { + nv := make(VariableData) + for k, vv := range v { + nv[k] = vv + } + return nv +} + type host struct { Vars VariableData `json:"vars"` RuntimeVars map[string]VariableData `json:"runtime"` diff --git a/pkg/variable/variable.go b/pkg/variable/variable.go index 6c03e817..a2f56f4c 100644 --- a/pkg/variable/variable.go +++ b/pkg/variable/variable.go @@ -272,99 +272,92 @@ func (g Hostnames) filter(data value) (any, error) { return hs, nil } -type DependencyTasks struct { +// Infer the next phase of the task.by it dependency. +// NOTE: To optimize performance, check only one dependency of a task instead of all dependencies. +// Therefore, do not assign roles without tasks. +type InferPhase struct { LocationUID string + Tasks []kubekeyv1alpha1.Task } -type DependencyTask struct { - Tasks []string - Strategy func([]kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase -} - -func (f DependencyTasks) filter(data value) (any, error) { +func (f InferPhase) filter(data value) (any, error) { loc := findLocation(data.Location, f.LocationUID) if loc == nil { return nil, fmt.Errorf("cannot found location %s", f.LocationUID) - } return f.getDependencyLocationUIDS(data, loc) } -func (f DependencyTasks) getDependencyLocationUIDS(data value, loc *location) (DependencyTask, error) { - if loc.PUID == "" { - return DependencyTask{ - Strategy: func([]kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase { - return kubekeyv1alpha1.TaskPhaseRunning - }, - }, nil +// If dependency tasks is not complete. waiting. +var succeedExecuteStrategy = func(tasks []kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase { + if len(tasks) == 0 { // non-dependency + return kubekeyv1alpha1.TaskPhaseRunning } - - // if tasks has failed. execute current task. - failedExecuteStrategy := func(tasks []kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase { - if len(tasks) == 0 { // non-dependency - return kubekeyv1alpha1.TaskPhaseRunning + skip := true + for _, t := range tasks { + if !t.IsComplete() { + return kubekeyv1alpha1.TaskPhasePending } - skip := true - for _, t := range tasks { - if !t.IsComplete() { - return kubekeyv1alpha1.TaskPhasePending - } - if t.IsFailed() { - return kubekeyv1alpha1.TaskPhaseRunning - } - if !t.IsSkipped() { - skip = false - } + if t.IsFailed() { + return kubekeyv1alpha1.TaskPhaseSkipped } - if skip { - return kubekeyv1alpha1.TaskPhaseRunning + if !t.IsSkipped() { + skip = false } + } + if skip { return kubekeyv1alpha1.TaskPhaseSkipped } + return kubekeyv1alpha1.TaskPhaseRunning +} - // If dependency tasks has failed. skip it. - succeedExecuteStrategy := func(tasks []kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase { - if len(tasks) == 0 { // non-dependency - return kubekeyv1alpha1.TaskPhaseRunning - } - skip := true - for _, t := range tasks { - if !t.IsComplete() { - return kubekeyv1alpha1.TaskPhasePending - } - if t.IsFailed() { - return kubekeyv1alpha1.TaskPhaseSkipped - } - if !t.IsSkipped() { - skip = false - } - } - if skip { - return kubekeyv1alpha1.TaskPhaseSkipped - } +// if tasks has failed. execute current task. +var failedExecuteStrategy = func(tasks []kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase { + if len(tasks) == 0 { // non-dependency return kubekeyv1alpha1.TaskPhaseRunning } - - // If dependency tasks is not complete. waiting. - // If dependency tasks is skipped. skip. - alwaysExecuteStrategy := func(tasks []kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase { - if len(tasks) == 0 { // non-dependency + skip := true + for _, t := range tasks { + if !t.IsComplete() { + return kubekeyv1alpha1.TaskPhasePending + } + if t.IsFailed() { return kubekeyv1alpha1.TaskPhaseRunning } - skip := true - for _, t := range tasks { - if !t.IsComplete() { - return kubekeyv1alpha1.TaskPhasePending - } - if !t.IsSkipped() { - skip = false - } - } - if skip { - return kubekeyv1alpha1.TaskPhaseSkipped + if !t.IsSkipped() { + skip = false } + } + if skip { return kubekeyv1alpha1.TaskPhaseRunning } + return kubekeyv1alpha1.TaskPhaseSkipped +} + +// If dependency tasks is skipped. skip. +var alwaysExecuteStrategy = func(tasks []kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase { + if len(tasks) == 0 { // non-dependency + return kubekeyv1alpha1.TaskPhaseRunning + } + skip := true + for _, t := range tasks { + if !t.IsComplete() { + return kubekeyv1alpha1.TaskPhasePending + } + if !t.IsSkipped() { + skip = false + } + } + if skip { + return kubekeyv1alpha1.TaskPhaseSkipped + } + return kubekeyv1alpha1.TaskPhaseRunning +} + +func (f InferPhase) getDependencyLocationUIDS(data value, loc *location) (kubekeyv1alpha1.TaskPhase, error) { + if loc.PUID == "" { + return kubekeyv1alpha1.TaskPhaseRunning, nil + } // Find the parent location and, based on where the current location is within the parent location, retrieve the dependent tasks. ploc := findLocation(data.Location, loc.PUID) @@ -374,17 +367,11 @@ func (f DependencyTasks) getDependencyLocationUIDS(data value, loc *location) (D if l.UID == loc.UID { // When location is the first element, it is necessary to check the dependency of its parent location. if i == 0 { - if data, err := f.getDependencyLocationUIDS(data, ploc); err != nil { - return DependencyTask{}, err - } else { - return data, nil - } + return f.getDependencyLocationUIDS(data, ploc) } + // When location is not the first element, dependency location is the preceding element in the same array. - return DependencyTask{ - Tasks: f.findAllTasks(ploc.Block[i-1]), - Strategy: succeedExecuteStrategy, - }, nil + return succeedExecuteStrategy(f.findAllTasks(ploc.Block[i-1], f.Tasks)), nil } } @@ -393,15 +380,10 @@ func (f DependencyTasks) getDependencyLocationUIDS(data value, loc *location) (D if l.UID == loc.UID { // When location is the first element, dependency location is all task of sibling block array. if i == 0 { - return DependencyTask{ - Tasks: f.findAllTasks(ploc.Block[len(ploc.Block)-1]), - Strategy: failedExecuteStrategy, - }, nil + return failedExecuteStrategy(f.findAllTasks(ploc.Block[len(ploc.Block)-1], f.Tasks)), nil } // When location is not the first element, dependency location is the preceding element in the same array - return DependencyTask{ - Tasks: f.findAllTasks(ploc.Rescue[i-1]), - Strategy: succeedExecuteStrategy}, nil + return failedExecuteStrategy(f.findAllTasks(ploc.Rescue[i-1], f.Tasks)), nil } } @@ -410,36 +392,33 @@ func (f DependencyTasks) getDependencyLocationUIDS(data value, loc *location) (D if l.UID == loc.UID { // When location is the first element, dependency location is all task of sibling block array if i == 0 { - return DependencyTask{ - Tasks: f.findAllTasks(ploc.Block[len(ploc.Block)-1]), - Strategy: alwaysExecuteStrategy, - }, nil + return alwaysExecuteStrategy(f.findAllTasks(ploc.Block[len(ploc.Block)-1], f.Tasks)), nil } // When location is not the first element, dependency location is the preceding element in the same array - return DependencyTask{ - Tasks: f.findAllTasks(ploc.Always[i-1]), - Strategy: alwaysExecuteStrategy, - }, nil - + return alwaysExecuteStrategy(f.findAllTasks(ploc.Always[i-1], f.Tasks)), nil } } - return DependencyTask{}, fmt.Errorf("connot find location %s in parent %s", loc.UID, loc.PUID) + return "", fmt.Errorf("connot find location %s in parent %s", loc.UID, loc.PUID) } -func (f DependencyTasks) findAllTasks(loc location) []string { - if len(loc.Block) == 0 { - return []string{loc.UID} +func (f InferPhase) findAllTasks(loc location, allTasks []kubekeyv1alpha1.Task) []kubekeyv1alpha1.Task { + if len(loc.Block) == 0 { // if block is empty the location is task graph + for _, task := range allTasks { + if string(task.UID) == loc.UID { + return []kubekeyv1alpha1.Task{task} + } + } } - var result = make([]string, 0) + var result = make([]kubekeyv1alpha1.Task, 0) for _, l := range loc.Block { - result = append(result, f.findAllTasks(l)...) + result = append(result, f.findAllTasks(l, allTasks)...) } for _, l := range loc.Rescue { - result = append(result, f.findAllTasks(l)...) + result = append(result, f.findAllTasks(l, allTasks)...) } for _, l := range loc.Always { - result = append(result, f.findAllTasks(l)...) + result = append(result, f.findAllTasks(l, allTasks)...) } return result @@ -494,13 +473,13 @@ type LocationMerge struct { } func (t LocationMerge) mergeTo(v *value) error { - if t.ParentUID == "" { - v.Location = append(v.Location, location{ + if t.ParentUID == "" { // set the top location + v.Location = &location{ Name: t.Name, PUID: t.ParentUID, UID: t.UID, Vars: t.Vars, - }) + } return nil } // find parent graph