diff --git a/pkg/apis/kubekey/v1alpha1/conversion.go b/pkg/apis/kubekey/v1alpha1/conversion.go index 86fde9bf..66ec1711 100644 --- a/pkg/apis/kubekey/v1alpha1/conversion.go +++ b/pkg/apis/kubekey/v1alpha1/conversion.go @@ -22,6 +22,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// TaskOwnerField is the field name of the owner reference in the task. +// It defined in proxy transport. Not applicable in kube-apiserver. +const TaskOwnerField = "ownerReferences:pipeline" + // AddConversionFuncs adds the conversion functions to the given scheme. // NOTE: ownerReferences:pipeline is valid in proxy client. func AddConversionFuncs(scheme *runtime.Scheme) error { @@ -29,7 +33,7 @@ func AddConversionFuncs(scheme *runtime.Scheme) error { SchemeGroupVersion.WithKind("Task"), func(label, value string) (string, string, error) { switch label { - case "metadata.name", "metadata.namespace", "ownerReferences:pipeline": + case "metadata.name", "metadata.namespace", TaskOwnerField: return label, value, nil default: return "", "", fmt.Errorf("field label %q not supported for Task", label) diff --git a/pkg/controllers/pipeline_controller.go b/pkg/controllers/pipeline_controller.go index b5e4968b..3a16e688 100644 --- a/pkg/controllers/pipeline_controller.go +++ b/pkg/controllers/pipeline_controller.go @@ -33,6 +33,7 @@ import ( kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1" _const "github.com/kubesphere/kubekey/v4/pkg/const" "github.com/kubesphere/kubekey/v4/pkg/task" + "github.com/kubesphere/kubekey/v4/pkg/variable" ) const ( @@ -124,9 +125,7 @@ func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline * } }() - if err := r.TaskController.AddTasks(ctx, task.AddTaskOptions{ - Pipeline: pipeline, - }); err != nil { + if err := r.TaskController.AddTasks(ctx, pipeline); err != nil { klog.V(5).ErrorS(err, "add task error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed pipeline.Status.Reason = fmt.Sprintf("add task to controller failed: %v", err) @@ -141,10 +140,16 @@ func (r *PipelineReconciler) clean(ctx context.Context, pipeline *kubekeyv1.Pipe klog.V(5).InfoS("clean runtimeDir", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) // delete reference task taskList := &kubekeyv1alpha1.TaskList{} - if err := r.Client.List(ctx, taskList, ctrlclient.MatchingFields{}); err != nil { + if err := r.Client.List(ctx, taskList, ctrlclient.InNamespace(pipeline.Namespace), ctrlclient.MatchingFields{ + kubekeyv1alpha1.TaskOwnerField: ctrlclient.ObjectKeyFromObject(pipeline).String(), + }); err != nil { klog.V(5).ErrorS(err, "list task error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) return } + + // clean variable cache + variable.CleanVariable(pipeline) + if err := os.RemoveAll(_const.GetRuntimeDir()); err != nil { klog.V(5).ErrorS(err, "clean runtime directory error", "runtime dir", _const.GetRuntimeDir(), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) } diff --git a/pkg/controllers/task_controller.go b/pkg/controllers/task_controller.go index 24c424f7..3c222df1 100644 --- a/pkg/controllers/task_controller.go +++ b/pkg/controllers/task_controller.go @@ -24,7 +24,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - cgcache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/utils/strings/slices" ctrl "sigs.k8s.io/controller-runtime" @@ -41,8 +40,6 @@ import ( type TaskReconciler struct { // Client to resources ctrlclient.Client - // VariableCache to store variable - VariableCache cgcache.Store } type taskReconcileOptions struct { @@ -89,30 +86,14 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c } // get variable - var v variable.Variable - vars, ok, err := r.VariableCache.GetByKey(string(pipeline.UID)) + v, err := variable.GetVariable(variable.Options{ + Ctx: ctx, + Client: r.Client, + Pipeline: *pipeline, + }) if err != nil { - klog.V(5).ErrorS(err, "get variable error", "task", request.String()) return ctrl.Result{}, err } - if ok { - v = vars.(variable.Variable) - } else { - nv, err := variable.New(variable.Options{ - Ctx: ctx, - Client: r.Client, - Pipeline: *pipeline, - }) - if err != nil { - klog.V(5).ErrorS(err, "create variable error", "task", request.String()) - return ctrl.Result{}, err - } - if err := r.VariableCache.Add(nv); err != nil { - klog.V(5).ErrorS(err, "add variable to store error", "task", request.String()) - return ctrl.Result{}, err - } - v = nv - } defer func() { if task.IsComplete() { @@ -120,8 +101,8 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c } var nsTasks = &kubekeyv1alpha1.TaskList{} klog.V(5).InfoS("update pipeline status", "task", request.String(), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline).String()) - if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(task.Namespace), ctrlclient.MatchingFields{ - "ownerReferences:pipeline": ctrlclient.ObjectKeyFromObject(pipeline).String(), + if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(pipeline.Namespace), ctrlclient.MatchingFields{ + kubekeyv1alpha1.TaskOwnerField: ctrlclient.ObjectKeyFromObject(pipeline).String(), }); err != nil { klog.V(5).ErrorS(err, "list task error", "task", request.String()) return @@ -179,8 +160,8 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc } var nsTasks = &kubekeyv1alpha1.TaskList{} - if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(options.Task.Namespace), ctrlclient.MatchingFields{ - "ownerReferences:pipeline": ctrlclient.ObjectKeyFromObject(options.Pipeline).String(), + if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(options.Pipeline.Namespace), ctrlclient.MatchingFields{ + kubekeyv1alpha1.TaskOwnerField: ctrlclient.ObjectKeyFromObject(options.Pipeline).String(), }); err != nil { klog.V(5).ErrorS(err, "list task error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String(), err) return ctrl.Result{}, err diff --git a/pkg/manager/command_manager.go b/pkg/manager/command_manager.go index b286a59a..46990dd7 100644 --- a/pkg/manager/command_manager.go +++ b/pkg/manager/command_manager.go @@ -85,8 +85,7 @@ func (m *commandManager) Run(ctx context.Context) error { VariableCache: variable.Cache, Client: m.Client, TaskReconciler: &controllers.TaskReconciler{ - Client: m.Client, - VariableCache: variable.Cache, + Client: m.Client, }, }) if err != nil { @@ -97,9 +96,7 @@ func (m *commandManager) Run(ctx context.Context) error { } // init pipeline status m.Pipeline.Status.Phase = kubekeyv1.PipelinePhaseRunning - if err := kd.AddTasks(ctx, task.AddTaskOptions{ - Pipeline: m.Pipeline, - }); err != nil { + if err := kd.AddTasks(ctx, m.Pipeline); err != nil { klog.ErrorS(err, "Add task error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline)) m.Pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed m.Pipeline.Status.Reason = fmt.Sprintf("add task to controller failed: %v", err) diff --git a/pkg/manager/controller_manager.go b/pkg/manager/controller_manager.go index b1369345..8bad8791 100644 --- a/pkg/manager/controller_manager.go +++ b/pkg/manager/controller_manager.go @@ -63,8 +63,7 @@ func (c controllerManager) Run(ctx context.Context) error { MaxConcurrent: c.MaxConcurrentReconciles, Client: mgr.GetClient(), TaskReconciler: &controllers.TaskReconciler{ - Client: mgr.GetClient(), - VariableCache: variable.Cache, + Client: mgr.GetClient(), }, }) if err != nil { diff --git a/pkg/proxy/resources/task/storage.go b/pkg/proxy/resources/task/storage.go index 24bc7415..151e95f1 100644 --- a/pkg/proxy/resources/task/storage.go +++ b/pkg/proxy/resources/task/storage.go @@ -98,7 +98,7 @@ func NewStorage(optsGetter apigeneric.RESTOptionsGetter) (TaskStorage, error) { options := &apigeneric.StoreOptions{ RESTOptions: optsGetter, AttrFunc: GetAttrs, - TriggerFunc: map[string]apistorage.IndexerFunc{"ownerReferences:pipeline": OwnerPipelineTriggerFunc}, + TriggerFunc: map[string]apistorage.IndexerFunc{kubekeyv1alpha1.TaskOwnerField: OwnerPipelineTriggerFunc}, Indexers: Indexers(), } if err := store.CompleteWithOptions(options); err != nil { diff --git a/pkg/proxy/resources/task/strategy.go b/pkg/proxy/resources/task/strategy.go index a55d751b..c187b58f 100644 --- a/pkg/proxy/resources/task/strategy.go +++ b/pkg/proxy/resources/task/strategy.go @@ -136,7 +136,7 @@ func OwnerPipelineIndexFunc(obj interface{}) ([]string, error) { // Indexers returns the indexers for pod storage. func Indexers() *cache.Indexers { return &cache.Indexers{ - apistorage.FieldIndex("ownerReferences:pipeline"): OwnerPipelineIndexFunc, + apistorage.FieldIndex(kubekeyv1alpha1.TaskOwnerField): OwnerPipelineIndexFunc, } } @@ -146,7 +146,7 @@ func MatchTask(label labels.Selector, field fields.Selector) apistorage.Selectio Label: label, Field: field, GetAttrs: GetAttrs, - IndexFields: []string{"ownerReferences:pipeline"}, + IndexFields: []string{kubekeyv1alpha1.TaskOwnerField}, } } @@ -169,7 +169,7 @@ func ToSelectableFields(task *kubekeyv1alpha1.Task) fields.Set { taskSpecificFieldsSet := make(fields.Set, 10) for _, reference := range task.OwnerReferences { if reference.Kind == "Pipeline" { - taskSpecificFieldsSet["ownerReferences:pipeline"] = types.NamespacedName{ + taskSpecificFieldsSet[kubekeyv1alpha1.TaskOwnerField] = types.NamespacedName{ Namespace: task.Namespace, Name: reference.Name, }.String() diff --git a/pkg/proxy/transport_test.go b/pkg/proxy/transport_test.go deleted file mode 100644 index 7c9925e8..00000000 --- a/pkg/proxy/transport_test.go +++ /dev/null @@ -1,150 +0,0 @@ -/* -Copyright 2024 The KubeSphere Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package proxy - -import ( - "context" - "os" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" - - kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1" - _const "github.com/kubesphere/kubekey/v4/pkg/const" -) - -func TestTransport(t *testing.T) { - cwd, err := os.Getwd() - assert.NoError(t, err) - _const.SetWorkDir(cwd) - // create runtimeDir - if _, err := os.Stat(_const.RuntimeDir); err != nil && os.IsNotExist(err) { - err = os.Mkdir(_const.RuntimeDir, os.ModePerm) - assert.NoError(t, err) - } - defer os.RemoveAll(_const.RuntimeDir) - - cli, err := NewLocalClient() - assert.NoError(t, err) - - testcases := []struct { - name string - fn func() error - }{ - { - name: "create task", - fn: func() error { - return cli.Create(context.Background(), &kubekeyv1alpha1.Task{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: kubekeyv1alpha1.KubeKeyTaskSpec{ - Name: "test", - }, - }) - }, - }, - { - name: "get task", - fn: func() error { - task := &kubekeyv1alpha1.Task{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: kubekeyv1alpha1.KubeKeyTaskSpec{ - Name: "test", - }, - } - cli.Create(context.Background(), task) - return cli.Get(context.Background(), ctrlclient.ObjectKeyFromObject(task), task) - }, - }, - { - name: "list task", - fn: func() error { - cli.Create(context.Background(), &kubekeyv1alpha1.Task{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: kubekeyv1alpha1.KubeKeyTaskSpec{ - Name: "test", - }, - }) - tasklist := &kubekeyv1alpha1.TaskList{} - return cli.List(context.Background(), tasklist) - }, - }, - { - name: "update task", - fn: func() error { - cli.Create(context.Background(), &kubekeyv1alpha1.Task{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: kubekeyv1alpha1.KubeKeyTaskSpec{ - Name: "test", - }, - }) - if err := cli.Update(context.Background(), &kubekeyv1alpha1.Task{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: kubekeyv1alpha1.KubeKeyTaskSpec{ - Name: "test1", - }, - }); err != nil && !strings.Contains(err.Error(), "spec is immutable") { - return err - } - return nil - }, - }, - { - name: "delete task", - fn: func() error { - cli.Create(context.Background(), &kubekeyv1alpha1.Task{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: kubekeyv1alpha1.KubeKeyTaskSpec{ - Name: "test", - }, - }) - return cli.Delete(context.Background(), &kubekeyv1alpha1.Task{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - }) - }, - }, - } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - assert.NoError(t, tc.fn()) - }) - } -} diff --git a/pkg/task/controller.go b/pkg/task/controller.go index 758d448b..ef1843e0 100644 --- a/pkg/task/controller.go +++ b/pkg/task/controller.go @@ -28,7 +28,6 @@ import ( kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1" "github.com/kubesphere/kubekey/v4/pkg/proxy" - "github.com/kubesphere/kubekey/v4/pkg/variable" ) // Controller is the interface for running tasks @@ -36,13 +35,7 @@ type Controller interface { // Start the controller Start(ctx context.Context) error // AddTasks adds tasks to the controller - AddTasks(ctx context.Context, o AddTaskOptions) error -} - -type AddTaskOptions struct { - *kubekeyv1.Pipeline - // set by AddTask function - variable variable.Variable + AddTasks(ctx context.Context, pipeline *kubekeyv1.Pipeline) error } type ControllerOptions struct { @@ -68,9 +61,8 @@ func NewController(o ControllerOptions) (Controller, error) { return &taskController{ schema: o.Scheme, MaxConcurrent: o.MaxConcurrent, - wq: workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}), + taskqueue: workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}), client: o.Client, taskReconciler: o.TaskReconciler, - variableCache: o.VariableCache, }, nil } diff --git a/pkg/task/internal.go b/pkg/task/internal.go index 525137f3..7174cbcc 100644 --- a/pkg/task/internal.go +++ b/pkg/task/internal.go @@ -24,7 +24,6 @@ import ( "github.com/google/uuid" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/json" - cgcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -33,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" kkcorev1 "github.com/kubesphere/kubekey/v4/pkg/apis/core/v1" + kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1" kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1" _const "github.com/kubesphere/kubekey/v4/pkg/const" "github.com/kubesphere/kubekey/v4/pkg/converter" @@ -46,193 +46,177 @@ type taskController struct { client ctrlclient.Client taskReconciler reconcile.Reconciler - variableCache cgcache.Store - - wq workqueue.RateLimitingInterface + taskqueue workqueue.RateLimitingInterface MaxConcurrent int } -func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { +// AddTasks to taskqueue if Tasks is not completed +func (c *taskController) AddTasks(ctx context.Context, pipeline *kubekeyv1.Pipeline) error { var nsTasks = &kubekeyv1alpha1.TaskList{} - if err := c.client.List(ctx, nsTasks, ctrlclient.InNamespace(o.Pipeline.Namespace)); err != nil { - klog.V(4).ErrorS(err, "List tasks error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) + if err := c.client.List(ctx, nsTasks, ctrlclient.InNamespace(pipeline.Namespace), ctrlclient.MatchingFields{ + kubekeyv1alpha1.TaskOwnerField: ctrlclient.ObjectKeyFromObject(pipeline).String(), + }); err != nil { + klog.V(4).ErrorS(err, "List tasks error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) return err } defer func() { - // add task to workqueue for _, task := range nsTasks.Items { - c.wq.Add(ctrl.Request{ctrlclient.ObjectKeyFromObject(&task)}) + c.taskqueue.Add(ctrl.Request{NamespacedName: ctrlclient.ObjectKeyFromObject(&task)}) } - converter.CalculatePipelineStatus(nsTasks, o.Pipeline) + converter.CalculatePipelineStatus(nsTasks, pipeline) }() - // filter by ownerReference - for i := len(nsTasks.Items) - 1; i >= 0; i-- { - var hasOwner bool - for _, ref := range nsTasks.Items[i].OwnerReferences { - if ref.UID == o.Pipeline.UID && ref.Kind == "Pipeline" { - hasOwner = true - } - } - - if !hasOwner { - nsTasks.Items = append(nsTasks.Items[:i], nsTasks.Items[i+1:]...) - } + if len(nsTasks.Items) != 0 { + // task has generated. add exist generated task to taskqueue. + return nil + } + // generate tasks + v, err := variable.GetVariable(variable.Options{ + Ctx: ctx, + Client: c.client, + Pipeline: *pipeline, + }) + if err != nil { + return err } - if len(nsTasks.Items) == 0 { - vars, ok, err := c.variableCache.GetByKey(string(o.Pipeline.UID)) - if err != nil { - klog.V(4).ErrorS(err, "Get variable from store error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) - return err + klog.V(6).InfoS("deal project", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) + projectFs, err := project.New(project.Options{Pipeline: pipeline}).FS(ctx, true) + if err != nil { + klog.V(4).ErrorS(err, "Deal project error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) + return err + } + + // convert to transfer.Playbook struct + pb, err := converter.MarshalPlaybook(projectFs, pipeline.Spec.Playbook) + if err != nil { + return err + } + + for _, play := range pb.Play { + if !play.Taggable.IsEnabled(pipeline.Spec.Tags, pipeline.Spec.SkipTags) { + // if not match the tags. skip + continue } - // if tasks has not generated. generate tasks from pipeline - //vars, ok := cache.LocalVariable.Get(string(o.Pipeline.UID)) - if ok { - o.variable = vars.(variable.Variable) - } else { - nv, err := variable.New(variable.Options{ - Ctx: ctx, - Client: c.client, - Pipeline: *o.Pipeline, - }) - if err != nil { - klog.V(4).ErrorS(err, "Create variable error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) - return err - } - if err := c.variableCache.Add(nv); err != nil { - klog.V(4).ErrorS(err, "Add variable to store error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) - return err - } - o.variable = nv + // hosts should contain all host's name. hosts should not be empty. + var hosts []string + if ahn, err := v.Get(variable.Hostnames{Name: play.PlayHost.Hosts}); err == nil { + hosts = ahn.([]string) + } + if len(hosts) == 0 { + klog.V(4).ErrorS(nil, "Hosts is empty", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) + return fmt.Errorf("hosts is empty") } - klog.V(6).InfoS("deal project", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) - projectFs, err := project.New(project.Options{Pipeline: o.Pipeline}).FS(ctx, true) - if err != nil { - klog.V(4).ErrorS(err, "Deal project error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) - return err - } - - // convert to transfer.Playbook struct - pb, err := converter.MarshalPlaybook(projectFs, o.Pipeline.Spec.Playbook) - if err != nil { - return err - } - - for _, play := range pb.Play { - if !play.Taggable.IsEnabled(o.Pipeline.Spec.Tags, o.Pipeline.Spec.SkipTags) { - continue - } - // convert Hosts (group or host) to all hosts - ahn, err := o.variable.Get(variable.Hostnames{Name: play.PlayHost.Hosts}) - if err != nil { - klog.V(4).ErrorS(err, "Get all host name error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) - return err - } - - // gather_fact - if play.GatherFacts { - for _, h := range ahn.([]string) { - gfv, err := getGatherFact(ctx, h, o.variable) - if err != nil { - klog.V(4).ErrorS(err, "Get gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "host", h) - return err - } - if err := o.variable.Merge(variable.HostMerge{ - HostNames: []string{h}, - LocationUID: "", - Data: gfv, - }); err != nil { - klog.V(4).ErrorS(err, "Merge gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "host", h) - return err - } - } - } - - var hs [][]string - if play.RunOnce { - // runOnce only run in first node - hs = [][]string{{ahn.([]string)[0]}} - } else { - // group hosts by serial. run the playbook by serial - hs, err = converter.GroupHostBySerial(ahn.([]string), play.Serial.Data) + // when gather_fact is set. get host's information from remote. + if play.GatherFacts { + for _, h := range hosts { + gfv, err := getGatherFact(ctx, h, v) if err != nil { - klog.V(4).ErrorS(err, "Group host by serial error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) + klog.V(4).ErrorS(err, "Get gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "host", h) return err } - } - - // split play by hosts group - for _, h := range hs { - puid := uuid.NewString() - if err := o.variable.Merge(variable.LocationMerge{ - UID: puid, - Name: play.Name, - Type: variable.BlockLocation, - Vars: play.Vars, + // merge host information to runtime variable + if err := v.Merge(variable.HostMerge{ + HostNames: []string{h}, + LocationUID: "", + Data: gfv, }); err != nil { + klog.V(4).ErrorS(err, "Merge gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "host", h) return err } - if len(h) == 0 { - err := fmt.Errorf("host is empty") - klog.V(4).ErrorS(err, "Host is empty", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) - return err - } - hctx := context.WithValue(ctx, _const.CtxBlockHosts, h) - // generate task from pre tasks - preTasks, err := c.createTasks(hctx, o, play.PreTasks, nil, puid, variable.BlockLocation) - if err != nil { - klog.V(4).ErrorS(err, "Get pre task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name) - return err - } - nsTasks.Items = append(nsTasks.Items, preTasks...) - // generate task from role - for _, role := range play.Roles { - ruid := uuid.NewString() - if err := o.variable.Merge(variable.LocationMerge{ - ParentUID: puid, - UID: ruid, - Name: play.Name, - Type: variable.BlockLocation, - Vars: role.Vars, - }); err != nil { - return err - } - roleTasks, err := c.createTasks(context.WithValue(hctx, _const.CtxBlockRole, role.Role), o, role.Block, role.When.Data, ruid, variable.BlockLocation) - if err != nil { - klog.V(4).ErrorS(err, "Get role task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name, "role", role.Role) - return err - } - nsTasks.Items = append(nsTasks.Items, roleTasks...) - } - // generate task from tasks - tasks, err := c.createTasks(hctx, o, play.Tasks, nil, puid, variable.BlockLocation) - if err != nil { - klog.V(4).ErrorS(err, "Get task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name) - return err - } - nsTasks.Items = append(nsTasks.Items, tasks...) - // generate task from post tasks - postTasks, err := c.createTasks(hctx, o, play.Tasks, nil, puid, variable.BlockLocation) - if err != nil { - klog.V(4).ErrorS(err, "Get post task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name) - return err - } - nsTasks.Items = append(nsTasks.Items, postTasks...) } } + + // Batch execution, with each batch being a group of hosts run in serial. + var batchHosts [][]string + if play.RunOnce { + // runOnce only run in first node + batchHosts = [][]string{{hosts[0]}} + } else { + // group hosts by serial. run the playbook by serial + batchHosts, err = converter.GroupHostBySerial(hosts, play.Serial.Data) + if err != nil { + klog.V(4).ErrorS(err, "Group host by serial error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) + return err + } + } + + // generate task by each batch. + for _, serials := range batchHosts { + // each batch hosts should not be empty. + if len(serials) == 0 { + klog.V(4).ErrorS(nil, "Host is empty", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) + return fmt.Errorf("host is empty") + } + hctx := context.WithValue(ctx, _const.CtxBlockHosts, serials) + + // generate playbook uid which set in location variable. + puid := uuid.NewString() + // merge play's vars in location variable. + if err := v.Merge(variable.LocationMerge{ + UID: puid, + Name: play.Name, + Type: variable.BlockLocation, + Vars: play.Vars, + }); err != nil { + klog.V(4).ErrorS(err, "Merge play to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name) + return err + } + // generate task from pre tasks + preTasks, err := c.createTasks(hctx, v, pipeline, play.PreTasks, nil, puid, variable.BlockLocation) + if err != nil { + klog.V(4).ErrorS(err, "Get pre task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name) + return err + } + + nsTasks.Items = append(nsTasks.Items, preTasks...) + // generate task from role + for _, role := range play.Roles { + ruid := uuid.NewString() + if err := v.Merge(variable.LocationMerge{ + ParentUID: puid, + UID: ruid, + Name: play.Name, + Type: variable.BlockLocation, + Vars: role.Vars, + }); err != nil { + klog.V(4).ErrorS(err, "Merge role to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name, "role", role.Role) + return err + } + roleTasks, err := c.createTasks(context.WithValue(hctx, _const.CtxBlockRole, role.Role), v, pipeline, role.Block, role.When.Data, ruid, variable.BlockLocation) + if err != nil { + klog.V(4).ErrorS(err, "Get role task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name, "role", role.Role) + return err + } + nsTasks.Items = append(nsTasks.Items, roleTasks...) + } + // generate task from tasks + tasks, err := c.createTasks(hctx, v, pipeline, play.Tasks, nil, puid, variable.BlockLocation) + if err != nil { + klog.V(4).ErrorS(err, "Get task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name) + return err + } + nsTasks.Items = append(nsTasks.Items, tasks...) + // generate task from post tasks + postTasks, err := c.createTasks(hctx, v, pipeline, play.Tasks, nil, puid, variable.BlockLocation) + if err != nil { + klog.V(4).ErrorS(err, "Get post task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name) + return err + } + nsTasks.Items = append(nsTasks.Items, postTasks...) + } } return nil } // createTasks convert ansible block to task -func (k *taskController) createTasks(ctx context.Context, o AddTaskOptions, ats []kkcorev1.Block, when []string, puid string, locationType variable.LocationType) ([]kubekeyv1alpha1.Task, error) { +func (k *taskController) createTasks(ctx context.Context, v variable.Variable, pipeline *kubekeyv1.Pipeline, ats []kkcorev1.Block, when []string, puid string, locationType variable.LocationType) ([]kubekeyv1alpha1.Task, error) { var tasks []kubekeyv1alpha1.Task for _, at := range ats { - if !at.Taggable.IsEnabled(o.Pipeline.Spec.Tags, o.Pipeline.Spec.SkipTags) { + if !at.Taggable.IsEnabled(pipeline.Spec.Tags, pipeline.Spec.SkipTags) { continue } @@ -240,25 +224,25 @@ func (k *taskController) createTasks(ctx context.Context, o AddTaskOptions, ats atWhen := append(when, at.When.Data...) if len(at.Block) != 0 { // add block - block, err := k.createTasks(ctx, o, at.Block, atWhen, uid, variable.BlockLocation) + block, err := k.createTasks(ctx, v, pipeline, at.Block, atWhen, uid, variable.BlockLocation) if err != nil { - klog.V(4).ErrorS(err, "Get block task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "Get block task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) return nil, err } tasks = append(tasks, block...) if len(at.Always) != 0 { - always, err := k.createTasks(ctx, o, at.Always, atWhen, uid, variable.AlwaysLocation) + always, err := k.createTasks(ctx, v, pipeline, at.Always, atWhen, uid, variable.AlwaysLocation) if err != nil { - klog.V(4).ErrorS(err, "Get always task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "Get always task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) return nil, err } tasks = append(tasks, always...) } if len(at.Rescue) != 0 { - rescue, err := k.createTasks(ctx, o, at.Rescue, atWhen, uid, variable.RescueLocation) + rescue, err := k.createTasks(ctx, v, pipeline, at.Rescue, atWhen, uid, variable.RescueLocation) if err != nil { - klog.V(4).ErrorS(err, "Get rescue task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "Get rescue task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) return nil, err } tasks = append(tasks, rescue...) @@ -266,17 +250,17 @@ func (k *taskController) createTasks(ctx context.Context, o AddTaskOptions, ats } else { task := converter.MarshalBlock(context.WithValue(ctx, _const.CtxBlockWhen, atWhen), at) // complete by pipeline - task.GenerateName = o.Pipeline.Name + "-" - task.Namespace = o.Pipeline.Namespace - if err := controllerutil.SetControllerReference(o.Pipeline, task, k.schema); err != nil { - klog.V(4).ErrorS(err, "Set controller reference error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) + task.GenerateName = pipeline.Name + "-" + task.Namespace = pipeline.Namespace + if err := controllerutil.SetControllerReference(pipeline, task, k.schema); err != nil { + klog.V(4).ErrorS(err, "Set controller reference error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) return nil, err } // complete module by unknown field for n, a := range at.UnknownFiled { data, err := json.Marshal(a) if err != nil { - klog.V(4).ErrorS(err, "Marshal unknown field error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name, "field", n) + klog.V(4).ErrorS(err, "Marshal unknown field error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name, "field", n) return nil, err } if m := modules.FindModule(n); m != nil { @@ -286,39 +270,39 @@ func (k *taskController) createTasks(ctx context.Context, o AddTaskOptions, ats } } if task.Spec.Module.Name == "" { // action is necessary for a task - klog.V(4).ErrorS(nil, "No module/action detected in task", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) + klog.V(4).ErrorS(nil, "No module/action detected in task", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) return nil, fmt.Errorf("no module/action detected in task: %s", task.Name) } // create task if err := k.client.Create(ctx, task); err != nil { - klog.V(4).ErrorS(err, "Create task error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "Create task error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) return nil, err } uid = string(task.UID) tasks = append(tasks, *task) } - // add location to variable - if err := o.variable.Merge(variable.LocationMerge{ + // add block vars to location variable + if err := v.Merge(variable.LocationMerge{ UID: uid, ParentUID: puid, Type: locationType, Name: at.Name, Vars: at.Vars, }); err != nil { - klog.V(4).ErrorS(err, "Merge block to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) + klog.V(4).ErrorS(err, "Merge block to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name) return nil, err } } return tasks, nil } -// Start task controller, deal task in work queue +// Start task controller, deal task in work taskqueue func (k *taskController) Start(ctx context.Context) error { go func() { <-ctx.Done() - k.wq.ShutDown() + k.taskqueue.ShutDown() }() - // deal work queue + // deal work taskqueue wg := &sync.WaitGroup{} for i := 0; i < k.MaxConcurrent; i++ { wg.Add(1) @@ -334,19 +318,19 @@ func (k *taskController) Start(ctx context.Context) error { } func (k *taskController) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := k.wq.Get() + obj, shutdown := k.taskqueue.Get() if shutdown { return false } - defer k.wq.Done(obj) + defer k.taskqueue.Done(obj) req, ok := obj.(ctrl.Request) if !ok { // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. - k.wq.Forget(obj) + k.taskqueue.Forget(obj) klog.V(4).ErrorS(nil, "Queue item was not a Request", "request", req) // Return true, don't take a break return true @@ -355,21 +339,21 @@ func (k *taskController) processNextWorkItem(ctx context.Context) bool { result, err := k.taskReconciler.Reconcile(ctx, req) switch { case err != nil: - k.wq.AddRateLimited(req) + k.taskqueue.AddRateLimited(req) klog.V(4).ErrorS(err, "Reconciler error", "request", req) case result.RequeueAfter > 0: // The result.RequeueAfter request will be lost, if it is returned // along with a non-nil error. But this is intended as // We need to drive to stable reconcile loops before queuing due // to result.RequestAfter - k.wq.Forget(obj) - k.wq.AddAfter(req, result.RequeueAfter) + k.taskqueue.Forget(obj) + k.taskqueue.AddAfter(req, result.RequeueAfter) case result.Requeue: - k.wq.AddRateLimited(req) + k.taskqueue.AddRateLimited(req) default: // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. - k.wq.Forget(obj) + k.taskqueue.Forget(obj) } return true } diff --git a/pkg/variable/variable.go b/pkg/variable/variable.go index 44d457fe..680a3766 100644 --- a/pkg/variable/variable.go +++ b/pkg/variable/variable.go @@ -271,15 +271,6 @@ func (g Hostnames) filter(data value) (any, error) { return hs, nil } -const ( - // FailedExecute If dependency tasks has failed. execute current task. otherwise skip it - FailedExecute = "failed-exec" - // SucceedExecute If dependency tasks succeeded. execute current task. otherwise skip it - SucceedExecute = "succeed-exec" - // AlwaysExecute always execute current task. - AlwaysExecute = "always-exec" -) - type DependencyTasks struct { LocationUID string } @@ -563,3 +554,31 @@ var Cache = cgcache.NewStore(func(obj interface{}) (string, error) { } return v.Key(), nil }) + +func GetVariable(o Options) (Variable, error) { + vars, ok, err := Cache.GetByKey(string(o.Pipeline.UID)) + if err != nil { + klog.V(5).ErrorS(err, "get variable error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline)) + return nil, err + } + if ok { + return vars.(Variable), nil + } + // add new variable to cache + nv, err := New(o) + if err != nil { + klog.V(5).ErrorS(err, "create variable error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline)) + return nil, err + } + if err := Cache.Add(nv); err != nil { + klog.V(5).ErrorS(err, "add variable to store error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline)) + return nil, err + } + return nv, nil +} + +func CleanVariable(p *kubekeyv1.Pipeline) { + if _, ok, err := Cache.GetByKey(string(p.UID)); err == nil && ok { + Cache.Delete(string(p.UID)) + } +}