kubekey/pkg/executor/executor.go
II 526e234594
fix: inventory_name is localhost, install failed (#2344)
* fix: misspelling

Signed-off-by: joyceliu <joyceliu@yunify.com>

* fix: misspelling

Signed-off-by: joyceliu <joyceliu@yunify.com>

* fix: misspelling

Signed-off-by: joyceliu <joyceliu@yunify.com>

* fix: misspelling

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: user go-template instance pongo2-template

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: not set incorrect

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: more beautiful progress bar

Signed-off-by: joyceliu <joyceliu@yunify.com>

* feat: deal localhost node.

Signed-off-by: joyceliu <joyceliu@yunify.com>

---------

Signed-off-by: joyceliu <joyceliu@yunify.com>
Co-authored-by: joyceliu <joyceliu@yunify.com>
2024-08-05 17:14:52 +08:00

603 lines
20 KiB
Go

/*
Copyright 2024 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/schollz/progressbar/v3"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kkcorev1 "github.com/kubesphere/kubekey/v4/pkg/apis/core/v1"
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
"github.com/kubesphere/kubekey/v4/pkg/connector"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/converter"
"github.com/kubesphere/kubekey/v4/pkg/converter/tmpl"
"github.com/kubesphere/kubekey/v4/pkg/modules"
"github.com/kubesphere/kubekey/v4/pkg/project"
"github.com/kubesphere/kubekey/v4/pkg/variable"
)
// TaskExecutor all task in pipeline
type TaskExecutor interface {
Exec(ctx context.Context) error
}
func NewTaskExecutor(client ctrlclient.Client, pipeline *kubekeyv1.Pipeline, logOutput io.Writer) TaskExecutor {
// get variable
v, err := variable.New(client, *pipeline)
if err != nil {
klog.V(5).ErrorS(nil, "convert playbook error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return nil
}
return &executor{
client: client,
pipeline: pipeline,
variable: v,
logOutput: logOutput,
}
}
type executor struct {
client ctrlclient.Client
pipeline *kubekeyv1.Pipeline
variable variable.Variable
logOutput io.Writer
}
type execBlockOptions struct {
// playbook level config
hosts []string // which hosts will run playbook
ignoreErrors *bool // IgnoreErrors for playbook
// blocks level config
blocks []kkcorev1.Block
role string // role name of blocks
when []string // when condition for blocks
tags kkcorev1.Taggable
}
func (e executor) Exec(ctx context.Context) error {
klog.V(6).InfoS("deal project", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline))
pj, err := project.New(*e.pipeline, true)
if err != nil {
return fmt.Errorf("deal project error: %w", err)
}
// convert to transfer.Playbook struct
pb, err := pj.MarshalPlaybook()
if err != nil {
return fmt.Errorf("convert playbook error: %w", err)
}
for _, play := range pb.Play {
if !play.Taggable.IsEnabled(e.pipeline.Spec.Tags, e.pipeline.Spec.SkipTags) {
// if not match the tags. skip
continue
}
// hosts should contain all host's name. hosts should not be empty.
var hosts []string
if ahn, err := e.variable.Get(variable.GetHostnames(play.PlayHost.Hosts)); err == nil {
hosts = ahn.([]string)
}
if len(hosts) == 0 { // if hosts is empty skip this playbook
klog.V(5).Info("Hosts is empty", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline))
continue
}
// when gather_fact is set. get host's information from remote.
if play.GatherFacts {
for _, h := range hosts {
gfv, err := e.getGatherFact(ctx, h, e.variable)
if err != nil {
return fmt.Errorf("get gather fact error: %w", err)
}
// merge host information to runtime variable
if err := e.variable.Merge(variable.MergeRemoteVariable(h, gfv)); err != nil {
klog.V(5).ErrorS(err, "Merge gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline), "host", h)
return fmt.Errorf("merge gather fact error: %w", err)
}
}
}
// Batch execution, with each batch being a group of hosts run in serial.
var batchHosts [][]string
if play.RunOnce {
// runOnce only run in first node
batchHosts = [][]string{{hosts[0]}}
} else {
// group hosts by serial. run the playbook by serial
batchHosts, err = converter.GroupHostBySerial(hosts, play.Serial.Data)
if err != nil {
return fmt.Errorf("group host by serial error: %w", err)
}
}
// generate and execute task.
for _, serials := range batchHosts {
// each batch hosts should not be empty.
if len(serials) == 0 {
klog.V(5).ErrorS(nil, "Host is empty", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline))
return fmt.Errorf("host is empty")
}
if err := e.mergeVariable(ctx, e.variable, play.Vars, serials...); err != nil {
return fmt.Errorf("merge variable error: %w", err)
}
// generate task from pre tasks
if err := e.execBlock(ctx, execBlockOptions{
hosts: serials,
ignoreErrors: play.IgnoreErrors,
blocks: play.PreTasks,
tags: play.Taggable,
}); err != nil {
return fmt.Errorf("execute pre-tasks from play error: %w", err)
}
// generate task from role
for _, role := range play.Roles {
if err := e.mergeVariable(ctx, e.variable, role.Vars, serials...); err != nil {
return fmt.Errorf("merge variable error: %w", err)
}
// use the most closely configuration
ignoreErrors := role.IgnoreErrors
if ignoreErrors == nil {
ignoreErrors = play.IgnoreErrors
}
if err := e.execBlock(ctx, execBlockOptions{
hosts: serials,
ignoreErrors: ignoreErrors,
blocks: role.Block,
role: role.Role,
when: role.When.Data,
tags: kkcorev1.JoinTag(role.Taggable, play.Taggable),
}); err != nil {
return fmt.Errorf("execute role-tasks error: %w", err)
}
}
// generate task from tasks
if err := e.execBlock(ctx, execBlockOptions{
hosts: serials,
ignoreErrors: play.IgnoreErrors,
blocks: play.Tasks,
tags: play.Taggable,
}); err != nil {
return fmt.Errorf("execute tasks error: %w", err)
}
// generate task from post tasks
if err := e.execBlock(ctx, execBlockOptions{
hosts: serials,
ignoreErrors: play.IgnoreErrors,
blocks: play.Tasks,
tags: play.Taggable,
}); err != nil {
return fmt.Errorf("execute post-tasks error: %w", err)
}
}
}
return nil
}
// getGatherFact get host info
func (e executor) getGatherFact(ctx context.Context, hostname string, vars variable.Variable) (map[string]any, error) {
v, err := vars.Get(variable.GetParamVariable(hostname))
if err != nil {
klog.V(5).ErrorS(err, "Get host variable error", "hostname", hostname)
return nil, err
}
connectorVars := make(map[string]any)
if c1, ok := v.(map[string]any)[_const.VariableConnector]; ok {
if c2, ok := c1.(map[string]any); ok {
connectorVars = c2
}
}
conn, err := connector.NewConnector(hostname, connectorVars)
if err != nil {
klog.V(5).ErrorS(err, "New connector error", "hostname", hostname)
return nil, err
}
if err := conn.Init(ctx); err != nil {
klog.V(5).ErrorS(err, "Init connection error", "hostname", hostname)
return nil, err
}
defer conn.Close(ctx)
if gf, ok := conn.(connector.GatherFacts); ok {
return gf.Info(ctx)
}
klog.V(5).ErrorS(nil, "gather fact is not defined in this connector", "hostname", hostname)
return nil, nil
}
// execBlock loop block and generate task.
func (e executor) execBlock(ctx context.Context, options execBlockOptions) error {
for _, at := range options.blocks {
if !kkcorev1.JoinTag(at.Taggable, options.tags).IsEnabled(e.pipeline.Spec.Tags, e.pipeline.Spec.SkipTags) {
continue
}
hosts := options.hosts
if at.RunOnce { // only run in first host
hosts = []string{options.hosts[0]}
}
tags := kkcorev1.JoinTag(at.Taggable, options.tags)
// use the most closely configuration
ignoreErrors := at.IgnoreErrors
if ignoreErrors == nil {
ignoreErrors = options.ignoreErrors
}
// merge variable which defined in block
if err := e.mergeVariable(ctx, e.variable, at.Vars, hosts...); err != nil {
klog.V(5).ErrorS(err, "merge variable error", "pipeline", e.pipeline, "block", at.Name)
return err
}
switch {
case len(at.Block) != 0:
var errs error
// exec block
if err := e.execBlock(ctx, execBlockOptions{
hosts: hosts,
ignoreErrors: ignoreErrors,
role: options.role,
blocks: at.Block,
when: append(options.when, at.When.Data...),
tags: tags,
}); err != nil {
klog.V(5).ErrorS(err, "execute tasks from block error", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline), "block", at.Name)
errs = errors.Join(errs, err)
}
// if block exec failed exec rescue
if e.pipeline.Status.Phase == kubekeyv1.PipelinePhaseFailed && len(at.Rescue) != 0 {
if err := e.execBlock(ctx, execBlockOptions{
hosts: hosts,
ignoreErrors: ignoreErrors,
blocks: at.Rescue,
role: options.role,
when: append(options.when, at.When.Data...),
tags: tags,
}); err != nil {
klog.V(5).ErrorS(err, "execute tasks from rescue error", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline), "block", at.Name)
errs = errors.Join(errs, err)
}
}
// exec always after block
if len(at.Always) != 0 {
if err := e.execBlock(ctx, execBlockOptions{
hosts: hosts,
ignoreErrors: ignoreErrors,
blocks: at.Always,
role: options.role,
when: append(options.when, at.When.Data...),
tags: tags,
}); err != nil {
klog.V(5).ErrorS(err, "execute tasks from always error", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline), "block", at.Name)
errs = errors.Join(errs, err)
}
}
// when execute error. return
if errs != nil {
return errs
}
case at.IncludeTasks != "":
// include tasks has converted to blocks.
// do nothing
default:
task := converter.MarshalBlock(ctx, options.role, hosts, append(options.when, at.When.Data...), at)
// complete by pipeline
task.GenerateName = e.pipeline.Name + "-"
task.Namespace = e.pipeline.Namespace
if err := controllerutil.SetControllerReference(e.pipeline, task, e.client.Scheme()); err != nil {
klog.V(5).ErrorS(err, "Set controller reference error", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline), "block", at.Name)
return err
}
// complete module by unknown field
for n, a := range at.UnknownFiled {
data, err := json.Marshal(a)
if err != nil {
klog.V(5).ErrorS(err, "Marshal unknown field error", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline), "block", at.Name, "field", n)
return err
}
if m := modules.FindModule(n); m != nil {
task.Spec.Module.Name = n
task.Spec.Module.Args = runtime.RawExtension{Raw: data}
break
}
}
if task.Spec.Module.Name == "" { // action is necessary for a task
klog.V(5).ErrorS(nil, "No module/action detected in task", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline), "block", at.Name)
return fmt.Errorf("no module/action detected in task: %s", task.Name)
}
// create task
if err := e.client.Create(ctx, task); err != nil {
klog.V(5).ErrorS(err, "create task error", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline), "block", at.Name)
return err
}
for {
var roleLog string
if task.Annotations[kubekeyv1alpha1.TaskAnnotationRole] != "" {
roleLog = "[" + task.Annotations[kubekeyv1alpha1.TaskAnnotationRole] + "] "
}
klog.V(5).InfoS("begin run task", "task", ctrlclient.ObjectKeyFromObject(task))
fmt.Fprintf(e.logOutput, "%s %s%s\n", time.Now().Format(time.TimeOnly+" MST"), roleLog, task.Spec.Name)
// exec task
task.Status.Phase = kubekeyv1alpha1.TaskPhaseRunning
if err := e.client.Status().Update(ctx, task); err != nil {
klog.V(5).ErrorS(err, "update task status error", "task", ctrlclient.ObjectKeyFromObject(task))
}
if err := e.executeTask(ctx, task, options); err != nil {
klog.V(5).ErrorS(err, "exec task error", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline), "block", at.Name)
return err
}
if err := e.client.Status().Update(ctx, task); err != nil {
klog.V(5).ErrorS(err, "update task status error", "task", ctrlclient.ObjectKeyFromObject(task))
return err
}
if task.IsComplete() {
break
}
}
e.pipeline.Status.TaskResult.Total++
switch task.Status.Phase {
case kubekeyv1alpha1.TaskPhaseSuccess:
e.pipeline.Status.TaskResult.Success++
case kubekeyv1alpha1.TaskPhaseIgnored:
e.pipeline.Status.TaskResult.Ignored++
case kubekeyv1alpha1.TaskPhaseFailed:
e.pipeline.Status.TaskResult.Failed++
}
// exit when task run failed
if task.IsFailed() {
var hostReason []kubekeyv1.PipelineFailedDetailHost
for _, tr := range task.Status.HostResults {
hostReason = append(hostReason, kubekeyv1.PipelineFailedDetailHost{
Host: tr.Host,
Stdout: tr.Stdout,
StdErr: tr.StdErr,
})
}
e.pipeline.Status.FailedDetail = append(e.pipeline.Status.FailedDetail, kubekeyv1.PipelineFailedDetail{
Task: task.Spec.Name,
Hosts: hostReason,
})
e.pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed
return fmt.Errorf("task %s run failed", task.Spec.Name)
}
}
}
return nil
}
// executeTask parallel in each host.
func (e executor) executeTask(ctx context.Context, task *kubekeyv1alpha1.Task, options execBlockOptions) error {
// check task host results
wg := &wait.Group{}
task.Status.HostResults = make([]kubekeyv1alpha1.TaskHostResult, len(task.Spec.Hosts))
for i, h := range task.Spec.Hosts {
wg.StartWithContext(ctx, func(ctx context.Context) {
// task result
var stdout, stderr string
defer func() {
if task.Spec.Register != "" {
var stdoutResult any = stdout
var stderrResult any = stderr
// try to convert by json
_ = json.Unmarshal([]byte(stdout), &stdoutResult)
// try to convert by json
_ = json.Unmarshal([]byte(stderr), &stderrResult)
// set variable to parent location
if err := e.variable.Merge(variable.MergeRuntimeVariable(h, map[string]any{
task.Spec.Register: map[string]any{
"stdout": stdoutResult,
"stderr": stderrResult,
},
})); err != nil {
stderr = fmt.Sprintf("register task result to variable error: %v", err)
return
}
}
if stderr != "" && task.Spec.IgnoreError != nil && *task.Spec.IgnoreError {
klog.V(5).ErrorS(nil, "task run failed", "host", h, "stdout", stdout, "stderr", stderr, "task", ctrlclient.ObjectKeyFromObject(task))
} else if stderr != "" {
klog.ErrorS(nil, "task run failed", "host", h, "stdout", stdout, "stderr", stderr, "task", ctrlclient.ObjectKeyFromObject(task))
}
// fill result
task.Status.HostResults[i] = kubekeyv1alpha1.TaskHostResult{
Host: h,
Stdout: stdout,
StdErr: stderr,
}
}()
// task log
// placeholder format task log
var placeholder string
if hostNameMaxLen, err := e.variable.Get(variable.GetHostMaxLength()); err == nil {
placeholder = strings.Repeat(" ", hostNameMaxLen.(int)-len(h))
}
// progress bar for task
var bar = progressbar.NewOptions(-1,
progressbar.OptionSetWriter(e.logOutput),
progressbar.OptionSpinnerCustom([]string{" "}),
progressbar.OptionEnableColorCodes(true),
progressbar.OptionSetDescription(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[36mrunning\033[0m", h, placeholder)),
progressbar.OptionOnCompletion(func() {
if _, err := os.Stdout.WriteString("\n"); err != nil {
klog.ErrorS(err, "failed to write output", "host", h)
}
}),
)
go func() {
for !bar.IsFinished() {
if err := bar.Add(1); err != nil {
return
}
time.Sleep(100 * time.Millisecond)
}
}()
defer func() {
switch {
case stderr != "":
if task.Spec.IgnoreError != nil && *task.Spec.IgnoreError { // ignore
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34mignore \033[0m", h, placeholder))
} else { // failed
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[31mfailed \033[0m", h, placeholder))
}
case stdout == "skip": // skip
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34mskip \033[0m", h, placeholder))
default: //success
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34msuccess\033[0m", h, placeholder))
}
if err := bar.Finish(); err != nil {
klog.ErrorS(err, "finish bar error")
}
}()
// task execute
ha, err := e.variable.Get(variable.GetAllVariable(h))
if err != nil {
stderr = fmt.Sprintf("get variable error: %v", err)
return
}
// check when condition
if len(task.Spec.When) > 0 {
ok, err := tmpl.ParseBool(ha.(map[string]any), task.Spec.When)
if err != nil {
stderr = fmt.Sprintf("parse when condition error: %v", err)
return
}
if !ok {
stdout = "skip"
return
}
}
// execute module with loop
// if loop is empty. execute once, and the item is null
for _, item := range e.parseLoop(ctx, ha.(map[string]any), task) {
// set item to runtime variable
if err := e.variable.Merge(variable.MergeRuntimeVariable(h, map[string]any{
_const.VariableItem: item,
})); err != nil {
stderr = fmt.Sprintf("set loop item to variable error: %v", err)
return
}
stdout, stderr = e.executeModule(ctx, task, modules.ExecOptions{
Args: task.Spec.Module.Args,
Host: h,
Variable: e.variable,
Task: *task,
Pipeline: *e.pipeline,
})
// delete item
if err := e.variable.Merge(variable.MergeRuntimeVariable(h, map[string]any{
_const.VariableItem: nil,
})); err != nil {
stderr = fmt.Sprintf("clean loop item to variable error: %v", err)
return
}
}
})
}
wg.Wait()
// host result for task
task.Status.Phase = kubekeyv1alpha1.TaskPhaseSuccess
for _, data := range task.Status.HostResults {
if data.StdErr != "" {
if task.Spec.IgnoreError != nil && *task.Spec.IgnoreError {
task.Status.Phase = kubekeyv1alpha1.TaskPhaseIgnored
} else {
task.Status.Phase = kubekeyv1alpha1.TaskPhaseFailed
}
break
}
}
return nil
}
// parseLoop parse loop to slice. if loop contains template string. convert it.
// loop is json string. try convertor to string slice by json.
// loop is normal string. set it to empty slice and return.
// loop is string slice. return it.
func (e executor) parseLoop(ctx context.Context, ha map[string]any, task *kubekeyv1alpha1.Task) []any {
switch {
case task.Spec.Loop.Raw == nil:
// loop is not set. add one element to execute once module.
return []any{nil}
default:
return variable.Extension2Slice(ha, task.Spec.Loop)
}
}
// executeModule find register module and execute it.
func (e executor) executeModule(ctx context.Context, task *kubekeyv1alpha1.Task, opts modules.ExecOptions) (string, string) {
// get all variable. which contains item.
lg, err := opts.Variable.Get(variable.GetAllVariable(opts.Host))
if err != nil {
klog.V(5).ErrorS(err, "get location variable error", "task", ctrlclient.ObjectKeyFromObject(task))
return "", err.Error()
}
// check failed when condition
if len(task.Spec.FailedWhen) > 0 {
ok, err := tmpl.ParseBool(lg.(map[string]any), task.Spec.FailedWhen)
if err != nil {
klog.V(5).ErrorS(err, "validate FailedWhen condition error", "task", ctrlclient.ObjectKeyFromObject(task))
return "", err.Error()
}
if ok {
return "", "failed by failedWhen"
}
}
return modules.FindModule(task.Spec.Module.Name)(ctx, opts)
}
// mergeVariable to runtime variable
func (e executor) mergeVariable(ctx context.Context, v variable.Variable, vd map[string]any, hosts ...string) error {
if len(vd) == 0 {
// skip
return nil
}
for _, host := range hosts {
if err := v.Merge(variable.MergeRuntimeVariable(host, vd)); err != nil {
return err
}
}
return nil
}