kubekey/pkg/executor/task_executor.go
redscholar 3a29a28e20
fix: more clear loop out
Signed-off-by: redscholar <blacktiledhouse@gmail.com>
2025-12-04 19:13:56 +08:00

538 lines
16 KiB
Go

package executor
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/cockroachdb/errors"
kkcorev1alpha1 "github.com/kubesphere/kubekey/api/core/v1alpha1"
"github.com/schollz/progressbar/v3"
"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/converter"
"github.com/kubesphere/kubekey/v4/pkg/converter/tmpl"
"github.com/kubesphere/kubekey/v4/pkg/modules"
"github.com/kubesphere/kubekey/v4/pkg/variable"
)
// taskExecutor handles the execution of a single task across multiple hosts.
type taskExecutor struct {
*option
task *kkcorev1alpha1.Task
// taskRunTimeout is the timeout for task executor
taskRunTimeout time.Duration
}
// Exec creates and executes a task, updating its status and the parent playbook's status.
// It returns an error if the task creation or execution fails.
func (e *taskExecutor) Exec(ctx context.Context) error {
if e.taskRunTimeout == time.Duration(0) {
e.taskRunTimeout = 60 * time.Minute
}
// create task
if err := e.client.Create(ctx, e.task); err != nil {
return errors.Wrapf(err, "failed to create task %v", e.task)
}
defer func() {
e.playbook.Status.Statistics.Total++
switch e.task.Status.Phase {
case kkcorev1alpha1.TaskPhaseSuccess:
e.playbook.Status.Statistics.Success++
case kkcorev1alpha1.TaskPhaseIgnored:
e.playbook.Status.Statistics.Ignored++
case kkcorev1alpha1.TaskPhaseFailed:
e.playbook.Status.Statistics.Failed++
}
}()
// run task
if err := e.runTaskLoop(ctx); err != nil {
return err
}
// exit when task run failed
if e.task.IsFailed() {
failedMsg := "\n"
for _, result := range e.task.Status.HostResults {
// 1. Print executor-level (host-level) error first, if exists
if strings.TrimSpace(result.Error) != "" {
failedMsg += fmt.Sprintf(
"[%s][executor]: %s\n",
result.Host,
result.Error,
)
}
// 2. Then print item-level errors (only items with error)
for idx, r := range result.RegisterResults {
if strings.TrimSpace(r.Error) == "" {
continue
}
itemInfo := "item=<nil>"
if len(r.Item.Raw) > 0 {
itemInfo = "item=" + string(r.Item.Raw)
} else if r.Item.Object != nil {
itemInfo = fmt.Sprintf("item=%#v", r.Item.Object)
}
failedMsg += fmt.Sprintf(
"[%s][%s][%d]: %s\n",
result.Host,
itemInfo,
idx,
r.Error,
)
}
}
return errors.Errorf(
"task [%s](%s) run failed: %s",
e.task.Spec.Name,
ctrlclient.ObjectKeyFromObject(e.task),
failedMsg,
)
}
return nil
}
// runTaskLoop runs a task in a loop until it completes or times out.
// It periodically reconciles the task status and executes the task when it enters the running phase.
func (e *taskExecutor) runTaskLoop(ctx context.Context) error {
klog.V(3).InfoS("begin run task", "task", ctrlclient.ObjectKeyFromObject(e.task))
defer klog.V(3).InfoS("end run task", "task", ctrlclient.ObjectKeyFromObject(e.task))
// Add role prefix to log output if role annotation exists
var roleLog string
if e.task.Annotations[kkcorev1alpha1.TaskAnnotationRelativePath] != "" {
roleLog = "[" + e.task.Annotations[kkcorev1alpha1.TaskAnnotationRelativePath] + "] "
}
fmt.Fprintf(e.logOutput, "%s %s%s\n", time.Now().Format(time.TimeOnly+" MST"), roleLog, e.task.Spec.Name)
for {
if e.task.IsComplete() {
break
}
task := e.task.DeepCopy()
if e.task.Status.Phase == kkcorev1alpha1.TaskPhaseFailed {
e.task.Status.RestartCount++
}
e.task.Status.Phase = kkcorev1alpha1.TaskPhaseRunning
if err := e.client.Status().Patch(ctx, e.task, ctrlclient.MergeFrom(task)); err != nil {
return errors.Wrapf(err, "failed to patch task status of %s", ctrlclient.ObjectKeyFromObject(task))
}
task = e.task.DeepCopy()
e.execTask(ctx)
if err := e.client.Status().Patch(ctx, e.task, ctrlclient.MergeFrom(task)); err != nil {
return errors.Wrapf(err, "failed to patch task status of %s", ctrlclient.ObjectKeyFromObject(task))
}
}
return nil
}
// execTask executes the task across all specified hosts in parallel and updates the task status.
func (e *taskExecutor) execTask(ctx context.Context) {
// check task host results
wg := &wait.Group{}
e.task.Status.HostResults = make([]kkcorev1alpha1.TaskHostResult, len(e.task.Spec.Hosts))
for i, h := range e.task.Spec.Hosts {
wg.StartWithContext(ctx, e.execTaskHost(i, h))
}
wg.Wait()
// host result for task
e.task.Status.Phase = kkcorev1alpha1.TaskPhaseSuccess
for _, data := range e.task.Status.HostResults {
if data.Error != "" {
if e.task.Spec.IgnoreError != nil && *e.task.Spec.IgnoreError {
e.task.Status.Phase = kkcorev1alpha1.TaskPhaseIgnored
} else {
e.task.Status.Phase = kkcorev1alpha1.TaskPhaseFailed
}
break
}
}
}
// execTaskHost handles executing a task on a single host, including variable setup,
// condition checking, and module execution. It runs in parallel for each host.
func (e *taskExecutor) execTaskHost(i int, h string) func(ctx context.Context) {
return func(ctx context.Context) {
var resErr error
var registerResult []kkcorev1alpha1.RegisterResult
// task log
deferFunc := e.execTaskHostLogs(ctx, h, &registerResult)
defer deferFunc()
defer func() {
resErr = errors.Join(resErr, e.dealRegister(h, registerResult))
var errMsg string
if resErr != nil {
errMsg = resErr.Error()
}
e.task.Status.HostResults[i] = kkcorev1alpha1.TaskHostResult{
Host: h,
Error: errMsg,
RegisterResults: registerResult,
}
}()
ha, err := e.variable.Get(variable.GetAllVariable(h))
if err != nil {
resErr = err
return
}
had, ok := ha.(map[string]any)
if !ok {
resErr = errors.Errorf("host: %s variable is not a map", h)
return
}
for _, item := range e.dealLoop(had) {
stdout, stderr, rendered, exeErr := e.executeModule(ctx, e.task, item, h)
var rawItem runtime.RawExtension
if rendered != nil {
if bs, err := json.Marshal(rendered); err == nil {
rawItem = runtime.RawExtension{Raw: bs}
}
}
var errMsg string
if exeErr != nil {
errMsg = exeErr.Error()
}
r := kkcorev1alpha1.RegisterResult{
Item: rawItem,
Stdout: stdout,
Stderr: stderr,
Error: errMsg,
}
registerResult = append(registerResult, r)
}
}
}
// execTaskHostLogs sets up and manages progress bar logging for task execution on a host.
// It returns a cleanup function to be called when execution completes.
func (e *taskExecutor) execTaskHostLogs(ctx context.Context, h string, registerResult *[]kkcorev1alpha1.RegisterResult) func() {
// placeholder format task log
var placeholder string
if hostnameMaxLen, err := e.variable.Get(variable.GetHostMaxLength()); err == nil {
if hl, ok := hostnameMaxLen.(int); ok {
placeholder = strings.Repeat(" ", hl-len(h))
}
}
// progress bar for task
options := []progressbar.Option{
progressbar.OptionSetWriter(os.Stdout),
// progressbar.OptionSpinnerCustom([]string{" "}),
progressbar.OptionSpinnerType(14),
progressbar.OptionEnableColorCodes(true),
progressbar.OptionSetDescription(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[36mrunning\033[0m", h, placeholder)),
progressbar.OptionOnCompletion(func() {
if _, err := os.Stdout.WriteString("\n"); err != nil {
klog.ErrorS(err, "failed to write output", "host", h)
}
}),
}
if e.logOutput != os.Stdout {
options = append(options, progressbar.OptionSetVisibility(false))
}
bar := progressbar.NewOptions(-1, options...)
// run progress
go func() {
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(context.Context) (bool, error) {
if bar.IsFinished() {
return true, nil
}
if err := bar.Add(1); err != nil {
return false, errors.Wrap(err, "failed to process bar")
}
return false, nil
}); err != nil {
klog.ErrorS(err, "failed to wait for task run to finish", "host", h)
}
}()
return func() {
var failed bool
var skipped bool
// determine overall status by scanning all register results
skipped = true // assume skip until we find a non-skip stdout
for _, r := range *registerResult {
if r.Error != "" {
failed = true
break
}
if r.Stdout != modules.StdoutSkip {
skipped = false
}
}
switch {
case failed:
// failed or ignore
if e.task.Spec.IgnoreError != nil && *e.task.Spec.IgnoreError {
// ignore
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34mignore \033[0m", h, placeholder))
if e.logOutput != os.Stdout {
fmt.Fprintf(e.logOutput, "[%s]%s ignore \n", h, placeholder)
}
} else {
// failed
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[31mfailed \033[0m", h, placeholder))
if e.logOutput != os.Stdout {
fmt.Fprintf(e.logOutput, "[%s]%s failed \n", h, placeholder)
}
}
case skipped:
// skip
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34mskip \033[0m", h, placeholder))
if e.logOutput != os.Stdout {
fmt.Fprintf(e.logOutput, "[%s]%s skip \n", h, placeholder)
}
default:
// success
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34msuccess\033[0m", h, placeholder))
if e.logOutput != os.Stdout {
fmt.Fprintf(e.logOutput, "[%s]%s success\n", h, placeholder)
}
}
_ = bar.Finish()
}
}
// executeModule executes a single module task on a specific host.
func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.Task, item any, host string) (stdout string, stderr string, rendered any, resErr error) {
// Set loop item variable if one was provided
if item != nil {
// Convert item to runtime variable
node, err := converter.ConvertMap2Node(map[string]any{_const.VariableItem: item})
if err != nil {
return modules.StdoutFailed, "", nil, err
}
// Merge item into host's runtime variables
if err := e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{node}, host)); err != nil {
return modules.StdoutFailed, "", nil, err
}
// Clean up loop item variable after execution
defer func() {
if item == nil {
return
}
// Reset item to null
resetNode, err := converter.ConvertMap2Node(map[string]any{_const.VariableItem: nil})
if err != nil {
resErr = err
return
}
if err := e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{resetNode}, host)); err != nil {
resErr = err
return
}
}()
}
// Get all variables for this host, including any loop item
ha, err := e.variable.Get(variable.GetAllVariable(host))
if err != nil {
return modules.StdoutFailed, "", nil, err
}
// Convert host variables to map type
had, ok := ha.(map[string]any)
if !ok {
return modules.StdoutFailed, "", nil, err
}
// check when condition
if skip, err := e.dealWhen(had); err != nil {
return modules.StdoutFailed, "", nil, err
} else if skip {
return modules.StdoutSkip, "", nil, nil
}
// Execute the actual module with the prepared context
stdout, stderr, resErr = modules.FindModule(task.Spec.Module.Name)(ctx, modules.ExecOptions{
Args: e.task.Spec.Module.Args,
Host: host,
Variable: e.variable,
Task: *e.task,
Playbook: *e.playbook,
LogOutput: e.logOutput,
})
if ferr := e.dealFailedWhen(had, resErr); ferr != nil {
return stdout, stderr, had[_const.VariableItem], ferr
}
return stdout, stderr, had[_const.VariableItem], nil
}
// dealLoop parses the loop specification into a slice of items to iterate over.
// If no loop is specified, returns a single nil item. Otherwise converts the loop
// specification from JSON into a slice of values.
func (e *taskExecutor) dealLoop(ha map[string]any) []any {
var items []any
switch {
case e.task.Spec.Loop.Raw == nil:
// loop is not set. add one element to execute once module.
items = []any{nil}
default:
items = variable.Extension2Slice(ha, e.task.Spec.Loop)
}
return items
}
// dealWhen evaluates the "when" conditions for a task to determine if it should be skipped.
// Returns true if the task should be skipped, false if it should proceed.
func (e *taskExecutor) dealWhen(had map[string]any) (bool, error) {
if len(e.task.Spec.When) > 0 {
ok, err := tmpl.ParseBool(had, e.task.Spec.When...)
if err != nil {
return false, err
}
if !ok {
return true, nil
}
}
return false, nil
}
// dealFailedWhen evaluates the "failed_when" conditions for a task to determine if it should fail.
// Returns true if the task should be marked as failed, false if it should proceed.
func (e *taskExecutor) dealFailedWhen(had map[string]any, err error) error {
if err != nil {
return err
}
if len(e.task.Spec.FailedWhen) > 0 {
ok, err := tmpl.ParseBool(had, e.task.Spec.FailedWhen...)
if err != nil {
return errors.Wrap(err, "failed to parse failed_when condition")
}
if ok {
return errors.New("reach failed_when, failed")
}
}
return nil
}
// dealRegister merges registerResult into global variables for the given host and processes "register" logic.
// If the task specifies a Register name, it composes the values from the registerResult slice,
// normalizes stdout data according to the RegisterType (json, yaml, or plain string),
// detects errors in any of the items, and merges the composed data into the task's runtime variables.
// It returns an error if any items are in error or if variable merge fails.
func (e *taskExecutor) dealRegister(host string, registerResult []kkcorev1alpha1.RegisterResult) (resErr error) {
if e.task.Spec.Register == "" {
// No register variable specified; nothing to merge.
return nil
}
// parseStdout parses stdout according to the RegisterType.
parseStdout := func(s string) (any, error) {
var out any = s
switch e.task.Spec.RegisterType {
case "json":
// Attempt to unmarshal as JSON.
if err := json.Unmarshal([]byte(s), &out); err != nil {
klog.V(5).ErrorS(err, "failed to register json value")
return s, err
}
case "yaml", "yml":
// Attempt to unmarshal as YAML.
if err := yaml.Unmarshal([]byte(s), &out); err != nil {
klog.V(5).ErrorS(err, "failed to register yaml value")
return s, err
}
default:
// Remove trailing newline by default.
if str, ok := out.(string); ok {
out = strings.TrimRight(str, "\n")
}
}
return out, nil
}
var value any
var hasItemError, hasParsedError bool
// If there is exactly one registerResult with no Item data, use the flat representation.
if len(registerResult) == 1 && len(registerResult[0].Item.Raw) == 0 && registerResult[0].Item.Object == nil {
r := registerResult[0]
parsedStdout, perr := parseStdout(r.Stdout)
if perr != nil {
hasParsedError = true
}
value = map[string]any{
"stdout": parsedStdout,
"stderr": r.Stderr,
"error": r.Error,
}
// If there is any error at the module level, set the global error flag.
if strings.TrimSpace(r.Error) != "" {
hasItemError = true
}
} else {
// Otherwise, collect all items as an array of results.
var arr []any
for _, r := range registerResult {
parsedStdout, perr := parseStdout(r.Stdout)
if perr != nil {
hasParsedError = true
}
arr = append(arr, map[string]any{
"item": string(r.Item.Raw),
"stdout": parsedStdout,
"stderr": r.Stderr,
"error": r.Error,
})
// If any item has error, set the global error flag.
if strings.TrimSpace(r.Error) != "" {
hasItemError = true
}
}
value = arr
}
// If any item failed, return a unified task failure error sentinel.
if hasItemError {
resErr = errors.Join(resErr, errors.New("module run failed"))
}
if hasParsedError {
resErr = errors.Join(resErr, errors.New("parse register by register type failed"))
}
// Convert the register mapping to a YAML node.
node, err := converter.ConvertMap2Node(map[string]any{
e.task.Spec.Register: value,
})
if err != nil {
return errors.Join(resErr, err)
}
// Merge the YAML node into the runtime variable store for the current host.
return errors.Join(resErr, e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{node}, host)))
}