fix: change magic string "ownerReferences:pipeline" to constant

fix: add some comment.

Signed-off-by: joyceliu <joyceliu@yunify.com>
This commit is contained in:
joyceliu 2024-02-20 13:56:20 +08:00
parent c70cd6576e
commit 426e13dd79
11 changed files with 226 additions and 395 deletions

View File

@ -22,6 +22,10 @@ import (
"k8s.io/apimachinery/pkg/runtime"
)
// TaskOwnerField is the field name of the owner reference in the task.
// It defined in proxy transport. Not applicable in kube-apiserver.
const TaskOwnerField = "ownerReferences:pipeline"
// AddConversionFuncs adds the conversion functions to the given scheme.
// NOTE: ownerReferences:pipeline is valid in proxy client.
func AddConversionFuncs(scheme *runtime.Scheme) error {
@ -29,7 +33,7 @@ func AddConversionFuncs(scheme *runtime.Scheme) error {
SchemeGroupVersion.WithKind("Task"),
func(label, value string) (string, string, error) {
switch label {
case "metadata.name", "metadata.namespace", "ownerReferences:pipeline":
case "metadata.name", "metadata.namespace", TaskOwnerField:
return label, value, nil
default:
return "", "", fmt.Errorf("field label %q not supported for Task", label)

View File

@ -33,6 +33,7 @@ import (
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/task"
"github.com/kubesphere/kubekey/v4/pkg/variable"
)
const (
@ -124,9 +125,7 @@ func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline *
}
}()
if err := r.TaskController.AddTasks(ctx, task.AddTaskOptions{
Pipeline: pipeline,
}); err != nil {
if err := r.TaskController.AddTasks(ctx, pipeline); err != nil {
klog.V(5).ErrorS(err, "add task error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed
pipeline.Status.Reason = fmt.Sprintf("add task to controller failed: %v", err)
@ -141,10 +140,16 @@ func (r *PipelineReconciler) clean(ctx context.Context, pipeline *kubekeyv1.Pipe
klog.V(5).InfoS("clean runtimeDir", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
// delete reference task
taskList := &kubekeyv1alpha1.TaskList{}
if err := r.Client.List(ctx, taskList, ctrlclient.MatchingFields{}); err != nil {
if err := r.Client.List(ctx, taskList, ctrlclient.InNamespace(pipeline.Namespace), ctrlclient.MatchingFields{
kubekeyv1alpha1.TaskOwnerField: ctrlclient.ObjectKeyFromObject(pipeline).String(),
}); err != nil {
klog.V(5).ErrorS(err, "list task error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return
}
// clean variable cache
variable.CleanVariable(pipeline)
if err := os.RemoveAll(_const.GetRuntimeDir()); err != nil {
klog.V(5).ErrorS(err, "clean runtime directory error", "runtime dir", _const.GetRuntimeDir(), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
}

View File

@ -24,7 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
cgcache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"
ctrl "sigs.k8s.io/controller-runtime"
@ -41,8 +40,6 @@ import (
type TaskReconciler struct {
// Client to resources
ctrlclient.Client
// VariableCache to store variable
VariableCache cgcache.Store
}
type taskReconcileOptions struct {
@ -89,30 +86,14 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c
}
// get variable
var v variable.Variable
vars, ok, err := r.VariableCache.GetByKey(string(pipeline.UID))
v, err := variable.GetVariable(variable.Options{
Ctx: ctx,
Client: r.Client,
Pipeline: *pipeline,
})
if err != nil {
klog.V(5).ErrorS(err, "get variable error", "task", request.String())
return ctrl.Result{}, err
}
if ok {
v = vars.(variable.Variable)
} else {
nv, err := variable.New(variable.Options{
Ctx: ctx,
Client: r.Client,
Pipeline: *pipeline,
})
if err != nil {
klog.V(5).ErrorS(err, "create variable error", "task", request.String())
return ctrl.Result{}, err
}
if err := r.VariableCache.Add(nv); err != nil {
klog.V(5).ErrorS(err, "add variable to store error", "task", request.String())
return ctrl.Result{}, err
}
v = nv
}
defer func() {
if task.IsComplete() {
@ -120,8 +101,8 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c
}
var nsTasks = &kubekeyv1alpha1.TaskList{}
klog.V(5).InfoS("update pipeline status", "task", request.String(), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline).String())
if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(task.Namespace), ctrlclient.MatchingFields{
"ownerReferences:pipeline": ctrlclient.ObjectKeyFromObject(pipeline).String(),
if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(pipeline.Namespace), ctrlclient.MatchingFields{
kubekeyv1alpha1.TaskOwnerField: ctrlclient.ObjectKeyFromObject(pipeline).String(),
}); err != nil {
klog.V(5).ErrorS(err, "list task error", "task", request.String())
return
@ -179,8 +160,8 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc
}
var nsTasks = &kubekeyv1alpha1.TaskList{}
if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(options.Task.Namespace), ctrlclient.MatchingFields{
"ownerReferences:pipeline": ctrlclient.ObjectKeyFromObject(options.Pipeline).String(),
if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(options.Pipeline.Namespace), ctrlclient.MatchingFields{
kubekeyv1alpha1.TaskOwnerField: ctrlclient.ObjectKeyFromObject(options.Pipeline).String(),
}); err != nil {
klog.V(5).ErrorS(err, "list task error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String(), err)
return ctrl.Result{}, err

View File

@ -85,8 +85,7 @@ func (m *commandManager) Run(ctx context.Context) error {
VariableCache: variable.Cache,
Client: m.Client,
TaskReconciler: &controllers.TaskReconciler{
Client: m.Client,
VariableCache: variable.Cache,
Client: m.Client,
},
})
if err != nil {
@ -97,9 +96,7 @@ func (m *commandManager) Run(ctx context.Context) error {
}
// init pipeline status
m.Pipeline.Status.Phase = kubekeyv1.PipelinePhaseRunning
if err := kd.AddTasks(ctx, task.AddTaskOptions{
Pipeline: m.Pipeline,
}); err != nil {
if err := kd.AddTasks(ctx, m.Pipeline); err != nil {
klog.ErrorS(err, "Add task error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
m.Pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed
m.Pipeline.Status.Reason = fmt.Sprintf("add task to controller failed: %v", err)

View File

@ -63,8 +63,7 @@ func (c controllerManager) Run(ctx context.Context) error {
MaxConcurrent: c.MaxConcurrentReconciles,
Client: mgr.GetClient(),
TaskReconciler: &controllers.TaskReconciler{
Client: mgr.GetClient(),
VariableCache: variable.Cache,
Client: mgr.GetClient(),
},
})
if err != nil {

View File

@ -98,7 +98,7 @@ func NewStorage(optsGetter apigeneric.RESTOptionsGetter) (TaskStorage, error) {
options := &apigeneric.StoreOptions{
RESTOptions: optsGetter,
AttrFunc: GetAttrs,
TriggerFunc: map[string]apistorage.IndexerFunc{"ownerReferences:pipeline": OwnerPipelineTriggerFunc},
TriggerFunc: map[string]apistorage.IndexerFunc{kubekeyv1alpha1.TaskOwnerField: OwnerPipelineTriggerFunc},
Indexers: Indexers(),
}
if err := store.CompleteWithOptions(options); err != nil {

View File

@ -136,7 +136,7 @@ func OwnerPipelineIndexFunc(obj interface{}) ([]string, error) {
// Indexers returns the indexers for pod storage.
func Indexers() *cache.Indexers {
return &cache.Indexers{
apistorage.FieldIndex("ownerReferences:pipeline"): OwnerPipelineIndexFunc,
apistorage.FieldIndex(kubekeyv1alpha1.TaskOwnerField): OwnerPipelineIndexFunc,
}
}
@ -146,7 +146,7 @@ func MatchTask(label labels.Selector, field fields.Selector) apistorage.Selectio
Label: label,
Field: field,
GetAttrs: GetAttrs,
IndexFields: []string{"ownerReferences:pipeline"},
IndexFields: []string{kubekeyv1alpha1.TaskOwnerField},
}
}
@ -169,7 +169,7 @@ func ToSelectableFields(task *kubekeyv1alpha1.Task) fields.Set {
taskSpecificFieldsSet := make(fields.Set, 10)
for _, reference := range task.OwnerReferences {
if reference.Kind == "Pipeline" {
taskSpecificFieldsSet["ownerReferences:pipeline"] = types.NamespacedName{
taskSpecificFieldsSet[kubekeyv1alpha1.TaskOwnerField] = types.NamespacedName{
Namespace: task.Namespace,
Name: reference.Name,
}.String()

View File

@ -1,150 +0,0 @@
/*
Copyright 2024 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"context"
"os"
"strings"
"testing"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
)
func TestTransport(t *testing.T) {
cwd, err := os.Getwd()
assert.NoError(t, err)
_const.SetWorkDir(cwd)
// create runtimeDir
if _, err := os.Stat(_const.RuntimeDir); err != nil && os.IsNotExist(err) {
err = os.Mkdir(_const.RuntimeDir, os.ModePerm)
assert.NoError(t, err)
}
defer os.RemoveAll(_const.RuntimeDir)
cli, err := NewLocalClient()
assert.NoError(t, err)
testcases := []struct {
name string
fn func() error
}{
{
name: "create task",
fn: func() error {
return cli.Create(context.Background(), &kubekeyv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Spec: kubekeyv1alpha1.KubeKeyTaskSpec{
Name: "test",
},
})
},
},
{
name: "get task",
fn: func() error {
task := &kubekeyv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Spec: kubekeyv1alpha1.KubeKeyTaskSpec{
Name: "test",
},
}
cli.Create(context.Background(), task)
return cli.Get(context.Background(), ctrlclient.ObjectKeyFromObject(task), task)
},
},
{
name: "list task",
fn: func() error {
cli.Create(context.Background(), &kubekeyv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Spec: kubekeyv1alpha1.KubeKeyTaskSpec{
Name: "test",
},
})
tasklist := &kubekeyv1alpha1.TaskList{}
return cli.List(context.Background(), tasklist)
},
},
{
name: "update task",
fn: func() error {
cli.Create(context.Background(), &kubekeyv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Spec: kubekeyv1alpha1.KubeKeyTaskSpec{
Name: "test",
},
})
if err := cli.Update(context.Background(), &kubekeyv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Spec: kubekeyv1alpha1.KubeKeyTaskSpec{
Name: "test1",
},
}); err != nil && !strings.Contains(err.Error(), "spec is immutable") {
return err
}
return nil
},
},
{
name: "delete task",
fn: func() error {
cli.Create(context.Background(), &kubekeyv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Spec: kubekeyv1alpha1.KubeKeyTaskSpec{
Name: "test",
},
})
return cli.Delete(context.Background(), &kubekeyv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
})
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
assert.NoError(t, tc.fn())
})
}
}

View File

@ -28,7 +28,6 @@ import (
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
"github.com/kubesphere/kubekey/v4/pkg/proxy"
"github.com/kubesphere/kubekey/v4/pkg/variable"
)
// Controller is the interface for running tasks
@ -36,13 +35,7 @@ type Controller interface {
// Start the controller
Start(ctx context.Context) error
// AddTasks adds tasks to the controller
AddTasks(ctx context.Context, o AddTaskOptions) error
}
type AddTaskOptions struct {
*kubekeyv1.Pipeline
// set by AddTask function
variable variable.Variable
AddTasks(ctx context.Context, pipeline *kubekeyv1.Pipeline) error
}
type ControllerOptions struct {
@ -68,9 +61,8 @@ func NewController(o ControllerOptions) (Controller, error) {
return &taskController{
schema: o.Scheme,
MaxConcurrent: o.MaxConcurrent,
wq: workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}),
taskqueue: workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}),
client: o.Client,
taskReconciler: o.TaskReconciler,
variableCache: o.VariableCache,
}, nil
}

View File

@ -24,7 +24,6 @@ import (
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
cgcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
@ -33,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
kkcorev1 "github.com/kubesphere/kubekey/v4/pkg/apis/core/v1"
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/converter"
@ -46,193 +46,177 @@ type taskController struct {
client ctrlclient.Client
taskReconciler reconcile.Reconciler
variableCache cgcache.Store
wq workqueue.RateLimitingInterface
taskqueue workqueue.RateLimitingInterface
MaxConcurrent int
}
func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
// AddTasks to taskqueue if Tasks is not completed
func (c *taskController) AddTasks(ctx context.Context, pipeline *kubekeyv1.Pipeline) error {
var nsTasks = &kubekeyv1alpha1.TaskList{}
if err := c.client.List(ctx, nsTasks, ctrlclient.InNamespace(o.Pipeline.Namespace)); err != nil {
klog.V(4).ErrorS(err, "List tasks error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
if err := c.client.List(ctx, nsTasks, ctrlclient.InNamespace(pipeline.Namespace), ctrlclient.MatchingFields{
kubekeyv1alpha1.TaskOwnerField: ctrlclient.ObjectKeyFromObject(pipeline).String(),
}); err != nil {
klog.V(4).ErrorS(err, "List tasks error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return err
}
defer func() {
// add task to workqueue
for _, task := range nsTasks.Items {
c.wq.Add(ctrl.Request{ctrlclient.ObjectKeyFromObject(&task)})
c.taskqueue.Add(ctrl.Request{NamespacedName: ctrlclient.ObjectKeyFromObject(&task)})
}
converter.CalculatePipelineStatus(nsTasks, o.Pipeline)
converter.CalculatePipelineStatus(nsTasks, pipeline)
}()
// filter by ownerReference
for i := len(nsTasks.Items) - 1; i >= 0; i-- {
var hasOwner bool
for _, ref := range nsTasks.Items[i].OwnerReferences {
if ref.UID == o.Pipeline.UID && ref.Kind == "Pipeline" {
hasOwner = true
}
}
if !hasOwner {
nsTasks.Items = append(nsTasks.Items[:i], nsTasks.Items[i+1:]...)
}
if len(nsTasks.Items) != 0 {
// task has generated. add exist generated task to taskqueue.
return nil
}
// generate tasks
v, err := variable.GetVariable(variable.Options{
Ctx: ctx,
Client: c.client,
Pipeline: *pipeline,
})
if err != nil {
return err
}
if len(nsTasks.Items) == 0 {
vars, ok, err := c.variableCache.GetByKey(string(o.Pipeline.UID))
if err != nil {
klog.V(4).ErrorS(err, "Get variable from store error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
klog.V(6).InfoS("deal project", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
projectFs, err := project.New(project.Options{Pipeline: pipeline}).FS(ctx, true)
if err != nil {
klog.V(4).ErrorS(err, "Deal project error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return err
}
// convert to transfer.Playbook struct
pb, err := converter.MarshalPlaybook(projectFs, pipeline.Spec.Playbook)
if err != nil {
return err
}
for _, play := range pb.Play {
if !play.Taggable.IsEnabled(pipeline.Spec.Tags, pipeline.Spec.SkipTags) {
// if not match the tags. skip
continue
}
// if tasks has not generated. generate tasks from pipeline
//vars, ok := cache.LocalVariable.Get(string(o.Pipeline.UID))
if ok {
o.variable = vars.(variable.Variable)
} else {
nv, err := variable.New(variable.Options{
Ctx: ctx,
Client: c.client,
Pipeline: *o.Pipeline,
})
if err != nil {
klog.V(4).ErrorS(err, "Create variable error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
if err := c.variableCache.Add(nv); err != nil {
klog.V(4).ErrorS(err, "Add variable to store error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
o.variable = nv
// hosts should contain all host's name. hosts should not be empty.
var hosts []string
if ahn, err := v.Get(variable.Hostnames{Name: play.PlayHost.Hosts}); err == nil {
hosts = ahn.([]string)
}
if len(hosts) == 0 {
klog.V(4).ErrorS(nil, "Hosts is empty", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return fmt.Errorf("hosts is empty")
}
klog.V(6).InfoS("deal project", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
projectFs, err := project.New(project.Options{Pipeline: o.Pipeline}).FS(ctx, true)
if err != nil {
klog.V(4).ErrorS(err, "Deal project error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
// convert to transfer.Playbook struct
pb, err := converter.MarshalPlaybook(projectFs, o.Pipeline.Spec.Playbook)
if err != nil {
return err
}
for _, play := range pb.Play {
if !play.Taggable.IsEnabled(o.Pipeline.Spec.Tags, o.Pipeline.Spec.SkipTags) {
continue
}
// convert Hosts (group or host) to all hosts
ahn, err := o.variable.Get(variable.Hostnames{Name: play.PlayHost.Hosts})
if err != nil {
klog.V(4).ErrorS(err, "Get all host name error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
// gather_fact
if play.GatherFacts {
for _, h := range ahn.([]string) {
gfv, err := getGatherFact(ctx, h, o.variable)
if err != nil {
klog.V(4).ErrorS(err, "Get gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "host", h)
return err
}
if err := o.variable.Merge(variable.HostMerge{
HostNames: []string{h},
LocationUID: "",
Data: gfv,
}); err != nil {
klog.V(4).ErrorS(err, "Merge gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "host", h)
return err
}
}
}
var hs [][]string
if play.RunOnce {
// runOnce only run in first node
hs = [][]string{{ahn.([]string)[0]}}
} else {
// group hosts by serial. run the playbook by serial
hs, err = converter.GroupHostBySerial(ahn.([]string), play.Serial.Data)
// when gather_fact is set. get host's information from remote.
if play.GatherFacts {
for _, h := range hosts {
gfv, err := getGatherFact(ctx, h, v)
if err != nil {
klog.V(4).ErrorS(err, "Group host by serial error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
klog.V(4).ErrorS(err, "Get gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "host", h)
return err
}
}
// split play by hosts group
for _, h := range hs {
puid := uuid.NewString()
if err := o.variable.Merge(variable.LocationMerge{
UID: puid,
Name: play.Name,
Type: variable.BlockLocation,
Vars: play.Vars,
// merge host information to runtime variable
if err := v.Merge(variable.HostMerge{
HostNames: []string{h},
LocationUID: "",
Data: gfv,
}); err != nil {
klog.V(4).ErrorS(err, "Merge gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "host", h)
return err
}
if len(h) == 0 {
err := fmt.Errorf("host is empty")
klog.V(4).ErrorS(err, "Host is empty", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
hctx := context.WithValue(ctx, _const.CtxBlockHosts, h)
// generate task from pre tasks
preTasks, err := c.createTasks(hctx, o, play.PreTasks, nil, puid, variable.BlockLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get pre task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name)
return err
}
nsTasks.Items = append(nsTasks.Items, preTasks...)
// generate task from role
for _, role := range play.Roles {
ruid := uuid.NewString()
if err := o.variable.Merge(variable.LocationMerge{
ParentUID: puid,
UID: ruid,
Name: play.Name,
Type: variable.BlockLocation,
Vars: role.Vars,
}); err != nil {
return err
}
roleTasks, err := c.createTasks(context.WithValue(hctx, _const.CtxBlockRole, role.Role), o, role.Block, role.When.Data, ruid, variable.BlockLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get role task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name, "role", role.Role)
return err
}
nsTasks.Items = append(nsTasks.Items, roleTasks...)
}
// generate task from tasks
tasks, err := c.createTasks(hctx, o, play.Tasks, nil, puid, variable.BlockLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name)
return err
}
nsTasks.Items = append(nsTasks.Items, tasks...)
// generate task from post tasks
postTasks, err := c.createTasks(hctx, o, play.Tasks, nil, puid, variable.BlockLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get post task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name)
return err
}
nsTasks.Items = append(nsTasks.Items, postTasks...)
}
}
// Batch execution, with each batch being a group of hosts run in serial.
var batchHosts [][]string
if play.RunOnce {
// runOnce only run in first node
batchHosts = [][]string{{hosts[0]}}
} else {
// group hosts by serial. run the playbook by serial
batchHosts, err = converter.GroupHostBySerial(hosts, play.Serial.Data)
if err != nil {
klog.V(4).ErrorS(err, "Group host by serial error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return err
}
}
// generate task by each batch.
for _, serials := range batchHosts {
// each batch hosts should not be empty.
if len(serials) == 0 {
klog.V(4).ErrorS(nil, "Host is empty", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return fmt.Errorf("host is empty")
}
hctx := context.WithValue(ctx, _const.CtxBlockHosts, serials)
// generate playbook uid which set in location variable.
puid := uuid.NewString()
// merge play's vars in location variable.
if err := v.Merge(variable.LocationMerge{
UID: puid,
Name: play.Name,
Type: variable.BlockLocation,
Vars: play.Vars,
}); err != nil {
klog.V(4).ErrorS(err, "Merge play to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name)
return err
}
// generate task from pre tasks
preTasks, err := c.createTasks(hctx, v, pipeline, play.PreTasks, nil, puid, variable.BlockLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get pre task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name)
return err
}
nsTasks.Items = append(nsTasks.Items, preTasks...)
// generate task from role
for _, role := range play.Roles {
ruid := uuid.NewString()
if err := v.Merge(variable.LocationMerge{
ParentUID: puid,
UID: ruid,
Name: play.Name,
Type: variable.BlockLocation,
Vars: role.Vars,
}); err != nil {
klog.V(4).ErrorS(err, "Merge role to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name, "role", role.Role)
return err
}
roleTasks, err := c.createTasks(context.WithValue(hctx, _const.CtxBlockRole, role.Role), v, pipeline, role.Block, role.When.Data, ruid, variable.BlockLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get role task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name, "role", role.Role)
return err
}
nsTasks.Items = append(nsTasks.Items, roleTasks...)
}
// generate task from tasks
tasks, err := c.createTasks(hctx, v, pipeline, play.Tasks, nil, puid, variable.BlockLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name)
return err
}
nsTasks.Items = append(nsTasks.Items, tasks...)
// generate task from post tasks
postTasks, err := c.createTasks(hctx, v, pipeline, play.Tasks, nil, puid, variable.BlockLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get post task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "play", play.Name)
return err
}
nsTasks.Items = append(nsTasks.Items, postTasks...)
}
}
return nil
}
// createTasks convert ansible block to task
func (k *taskController) createTasks(ctx context.Context, o AddTaskOptions, ats []kkcorev1.Block, when []string, puid string, locationType variable.LocationType) ([]kubekeyv1alpha1.Task, error) {
func (k *taskController) createTasks(ctx context.Context, v variable.Variable, pipeline *kubekeyv1.Pipeline, ats []kkcorev1.Block, when []string, puid string, locationType variable.LocationType) ([]kubekeyv1alpha1.Task, error) {
var tasks []kubekeyv1alpha1.Task
for _, at := range ats {
if !at.Taggable.IsEnabled(o.Pipeline.Spec.Tags, o.Pipeline.Spec.SkipTags) {
if !at.Taggable.IsEnabled(pipeline.Spec.Tags, pipeline.Spec.SkipTags) {
continue
}
@ -240,25 +224,25 @@ func (k *taskController) createTasks(ctx context.Context, o AddTaskOptions, ats
atWhen := append(when, at.When.Data...)
if len(at.Block) != 0 {
// add block
block, err := k.createTasks(ctx, o, at.Block, atWhen, uid, variable.BlockLocation)
block, err := k.createTasks(ctx, v, pipeline, at.Block, atWhen, uid, variable.BlockLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get block task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
klog.V(4).ErrorS(err, "Get block task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name)
return nil, err
}
tasks = append(tasks, block...)
if len(at.Always) != 0 {
always, err := k.createTasks(ctx, o, at.Always, atWhen, uid, variable.AlwaysLocation)
always, err := k.createTasks(ctx, v, pipeline, at.Always, atWhen, uid, variable.AlwaysLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get always task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
klog.V(4).ErrorS(err, "Get always task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name)
return nil, err
}
tasks = append(tasks, always...)
}
if len(at.Rescue) != 0 {
rescue, err := k.createTasks(ctx, o, at.Rescue, atWhen, uid, variable.RescueLocation)
rescue, err := k.createTasks(ctx, v, pipeline, at.Rescue, atWhen, uid, variable.RescueLocation)
if err != nil {
klog.V(4).ErrorS(err, "Get rescue task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
klog.V(4).ErrorS(err, "Get rescue task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name)
return nil, err
}
tasks = append(tasks, rescue...)
@ -266,17 +250,17 @@ func (k *taskController) createTasks(ctx context.Context, o AddTaskOptions, ats
} else {
task := converter.MarshalBlock(context.WithValue(ctx, _const.CtxBlockWhen, atWhen), at)
// complete by pipeline
task.GenerateName = o.Pipeline.Name + "-"
task.Namespace = o.Pipeline.Namespace
if err := controllerutil.SetControllerReference(o.Pipeline, task, k.schema); err != nil {
klog.V(4).ErrorS(err, "Set controller reference error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
task.GenerateName = pipeline.Name + "-"
task.Namespace = pipeline.Namespace
if err := controllerutil.SetControllerReference(pipeline, task, k.schema); err != nil {
klog.V(4).ErrorS(err, "Set controller reference error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name)
return nil, err
}
// complete module by unknown field
for n, a := range at.UnknownFiled {
data, err := json.Marshal(a)
if err != nil {
klog.V(4).ErrorS(err, "Marshal unknown field error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name, "field", n)
klog.V(4).ErrorS(err, "Marshal unknown field error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name, "field", n)
return nil, err
}
if m := modules.FindModule(n); m != nil {
@ -286,39 +270,39 @@ func (k *taskController) createTasks(ctx context.Context, o AddTaskOptions, ats
}
}
if task.Spec.Module.Name == "" { // action is necessary for a task
klog.V(4).ErrorS(nil, "No module/action detected in task", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
klog.V(4).ErrorS(nil, "No module/action detected in task", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name)
return nil, fmt.Errorf("no module/action detected in task: %s", task.Name)
}
// create task
if err := k.client.Create(ctx, task); err != nil {
klog.V(4).ErrorS(err, "Create task error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
klog.V(4).ErrorS(err, "Create task error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name)
return nil, err
}
uid = string(task.UID)
tasks = append(tasks, *task)
}
// add location to variable
if err := o.variable.Merge(variable.LocationMerge{
// add block vars to location variable
if err := v.Merge(variable.LocationMerge{
UID: uid,
ParentUID: puid,
Type: locationType,
Name: at.Name,
Vars: at.Vars,
}); err != nil {
klog.V(4).ErrorS(err, "Merge block to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
klog.V(4).ErrorS(err, "Merge block to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline), "block", at.Name)
return nil, err
}
}
return tasks, nil
}
// Start task controller, deal task in work queue
// Start task controller, deal task in work taskqueue
func (k *taskController) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
k.wq.ShutDown()
k.taskqueue.ShutDown()
}()
// deal work queue
// deal work taskqueue
wg := &sync.WaitGroup{}
for i := 0; i < k.MaxConcurrent; i++ {
wg.Add(1)
@ -334,19 +318,19 @@ func (k *taskController) Start(ctx context.Context) error {
}
func (k *taskController) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := k.wq.Get()
obj, shutdown := k.taskqueue.Get()
if shutdown {
return false
}
defer k.wq.Done(obj)
defer k.taskqueue.Done(obj)
req, ok := obj.(ctrl.Request)
if !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
k.wq.Forget(obj)
k.taskqueue.Forget(obj)
klog.V(4).ErrorS(nil, "Queue item was not a Request", "request", req)
// Return true, don't take a break
return true
@ -355,21 +339,21 @@ func (k *taskController) processNextWorkItem(ctx context.Context) bool {
result, err := k.taskReconciler.Reconcile(ctx, req)
switch {
case err != nil:
k.wq.AddRateLimited(req)
k.taskqueue.AddRateLimited(req)
klog.V(4).ErrorS(err, "Reconciler error", "request", req)
case result.RequeueAfter > 0:
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
k.wq.Forget(obj)
k.wq.AddAfter(req, result.RequeueAfter)
k.taskqueue.Forget(obj)
k.taskqueue.AddAfter(req, result.RequeueAfter)
case result.Requeue:
k.wq.AddRateLimited(req)
k.taskqueue.AddRateLimited(req)
default:
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
k.wq.Forget(obj)
k.taskqueue.Forget(obj)
}
return true
}

View File

@ -271,15 +271,6 @@ func (g Hostnames) filter(data value) (any, error) {
return hs, nil
}
const (
// FailedExecute If dependency tasks has failed. execute current task. otherwise skip it
FailedExecute = "failed-exec"
// SucceedExecute If dependency tasks succeeded. execute current task. otherwise skip it
SucceedExecute = "succeed-exec"
// AlwaysExecute always execute current task.
AlwaysExecute = "always-exec"
)
type DependencyTasks struct {
LocationUID string
}
@ -563,3 +554,31 @@ var Cache = cgcache.NewStore(func(obj interface{}) (string, error) {
}
return v.Key(), nil
})
func GetVariable(o Options) (Variable, error) {
vars, ok, err := Cache.GetByKey(string(o.Pipeline.UID))
if err != nil {
klog.V(5).ErrorS(err, "get variable error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
return nil, err
}
if ok {
return vars.(Variable), nil
}
// add new variable to cache
nv, err := New(o)
if err != nil {
klog.V(5).ErrorS(err, "create variable error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
return nil, err
}
if err := Cache.Add(nv); err != nil {
klog.V(5).ErrorS(err, "add variable to store error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
return nil, err
}
return nv, nil
}
func CleanVariable(p *kubekeyv1.Pipeline) {
if _, ok, err := Cache.GetByKey(string(p.UID)); err == nil && ok {
Cache.Delete(string(p.UID))
}
}