diff --git a/api/core/v1alpha1/task_types.go b/api/core/v1alpha1/task_types.go index 2896699a..f05fc894 100644 --- a/api/core/v1alpha1/task_types.go +++ b/api/core/v1alpha1/task_types.go @@ -74,10 +74,16 @@ type TaskStatus struct { // TaskHostResult each host result for task type TaskHostResult struct { - Host string `json:"host,omitempty"` - Stdout string `json:"stdout,omitempty"` - StdErr string `json:"stdErr,omitempty"` - Error string `json:"error,omitempty"` + Host string `json:"host,omitempty"` + Error string `json:"error,omitempty"` + RegisterResults []RegisterResult `json:"registerResults,omitempty"` +} + +type RegisterResult struct { + Item runtime.RawExtension `json:"item,omitempty"` + Stdout string `json:"stdout,omitempty"` + Stderr string `json:"stdErr,omitempty"` + Error string `json:"error,omitempty"` } // +genclient diff --git a/api/core/v1alpha1/zz_generated.deepcopy.go b/api/core/v1alpha1/zz_generated.deepcopy.go index b00b302a..e092cf8b 100644 --- a/api/core/v1alpha1/zz_generated.deepcopy.go +++ b/api/core/v1alpha1/zz_generated.deepcopy.go @@ -40,6 +40,22 @@ func (in *Module) DeepCopy() *Module { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RegisterResult) DeepCopyInto(out *RegisterResult) { + *out = *in + in.Item.DeepCopyInto(&out.Item) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegisterResult. +func (in *RegisterResult) DeepCopy() *RegisterResult { + if in == nil { + return nil + } + out := new(RegisterResult) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Task) DeepCopyInto(out *Task) { *out = *in @@ -70,6 +86,13 @@ func (in *Task) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TaskHostResult) DeepCopyInto(out *TaskHostResult) { *out = *in + if in.RegisterResults != nil { + in, out := &in.RegisterResults, &out.RegisterResults + *out = make([]RegisterResult, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskHostResult. @@ -157,7 +180,9 @@ func (in *TaskStatus) DeepCopyInto(out *TaskStatus) { if in.HostResults != nil { in, out := &in.HostResults, &out.HostResults *out = make([]TaskHostResult, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } } diff --git a/builtin/core/roles/native/init/tasks/main.yaml b/builtin/core/roles/native/init/tasks/main.yaml index 47bdce48..62adfbc8 100644 --- a/builtin/core/roles/native/init/tasks/main.yaml +++ b/builtin/core/roles/native/init/tasks/main.yaml @@ -3,7 +3,7 @@ hostnamectl set-hostname {{ .inventory_hostname }} \ && sed -i '/^127.0.1.1/s/.*/127.0.1.1 {{ .inventory_hostname }}/g' {{ .item }} when: - - .set_hostname + - .native.set_hostname - .inventory_hostname | ne "localhost" loop: "{{ .native.localDNS | toJson }}" - name: OS | Synchronize initialization script to remote node diff --git a/builtin/core/roles/precheck/os/tasks/main.yaml b/builtin/core/roles/precheck/os/tasks/main.yaml index ae318e3e..a27f6dc9 100644 --- a/builtin/core/roles/precheck/os/tasks/main.yaml +++ b/builtin/core/roles/precheck/os/tasks/main.yaml @@ -2,13 +2,13 @@ - name: OS | Assert valid system hostname format block: - name: OS | Validate inventory hostname is RFC-compliant - when: .set_hostname + when: .native.set_hostname assert: that: .inventory_hostname | regexMatch "^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$" fail_msg: >- The hostname "{{ .inventory_hostname }}" is invalid. Hostnames must use only lowercase alphanumeric characters, '.', or '-', and must start and end with an alphanumeric character. - name: OS | Validate current host system hostname is RFC-compliant - when: .set_hostname | not + when: .native.set_hostname | not assert: that: .hostname | regexMatch "^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$" fail_msg: >- diff --git a/pkg/executor/task_executor.go b/pkg/executor/task_executor.go index b8f3a8e7..ed044896 100644 --- a/pkg/executor/task_executor.go +++ b/pkg/executor/task_executor.go @@ -2,7 +2,6 @@ package executor import ( "context" - "encoding/json" "fmt" "os" "strings" @@ -12,9 +11,10 @@ import ( kkcorev1alpha1 "github.com/kubesphere/kubekey/api/core/v1alpha1" "github.com/schollz/progressbar/v3" "gopkg.in/yaml.v3" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" - "k8s.io/utils/ptr" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" _const "github.com/kubesphere/kubekey/v4/pkg/const" @@ -60,12 +60,46 @@ func (e *taskExecutor) Exec(ctx context.Context) error { // exit when task run failed if e.task.IsFailed() { failedMsg := "\n" + for _, result := range e.task.Status.HostResults { - if result.Error != "" { - failedMsg += fmt.Sprintf("[%s]: %s: %s\n", result.Host, result.StdErr, result.Error) + // 1. Print executor-level (host-level) error first, if exists + if strings.TrimSpace(result.Error) != "" { + failedMsg += fmt.Sprintf( + "[%s][executor]: %s\n", + result.Host, + result.Error, + ) + } + + // 2. Then print item-level errors (only items with error) + for idx, r := range result.RegisterResults { + if strings.TrimSpace(r.Error) == "" { + continue + } + + itemInfo := "item=" + if len(r.Item.Raw) > 0 { + itemInfo = "item=" + string(r.Item.Raw) + } else if r.Item.Object != nil { + itemInfo = fmt.Sprintf("item=%#v", r.Item.Object) + } + + failedMsg += fmt.Sprintf( + "[%s][%s][%d]: %s\n", + result.Host, + itemInfo, + idx, + r.Error, + ) } } - return errors.Errorf("task [%s](%s) run failed: %s", e.task.Spec.Name, ctrlclient.ObjectKeyFromObject(e.task), failedMsg) + + return errors.Errorf( + "task [%s](%s) run failed: %s", + e.task.Spec.Name, + ctrlclient.ObjectKeyFromObject(e.task), + failedMsg, + ) } return nil @@ -133,59 +167,69 @@ func (e *taskExecutor) execTask(ctx context.Context) { // condition checking, and module execution. It runs in parallel for each host. func (e *taskExecutor) execTaskHost(i int, h string) func(ctx context.Context) { return func(ctx context.Context) { - // task result var resErr error - var stdout, stderr, errMsg string + var registerResult []kkcorev1alpha1.RegisterResult + // task log - deferFunc := e.execTaskHostLogs(ctx, h, &stdout, &stderr, &errMsg) + deferFunc := e.execTaskHostLogs(ctx, h, ®isterResult) defer deferFunc() + defer func() { + resErr = errors.Join(resErr, e.dealRegister(h, registerResult)) + + var errMsg string if resErr != nil { errMsg = resErr.Error() - klog.V(5).ErrorS(resErr, "task run failed", "host", h, "stdout", stdout, "stderr", stderr, "error", errMsg, "task", ctrlclient.ObjectKeyFromObject(e.task)) } - resErr = errors.Join(resErr, e.dealRegister(h, stdout, stderr, errMsg)) - // fill result e.task.Status.HostResults[i] = kkcorev1alpha1.TaskHostResult{ - Host: h, - Stdout: stdout, - StdErr: stderr, - Error: errMsg, + Host: h, + Error: errMsg, + RegisterResults: registerResult, } }() - // task execute + ha, err := e.variable.Get(variable.GetAllVariable(h)) if err != nil { resErr = err return } - // convert hostVariable to map had, ok := ha.(map[string]any) if !ok { resErr = errors.Errorf("host: %s variable is not a map", h) return } - // execute module in loop with loop item. - // if loop is empty. execute once, and the item is null + for _, item := range e.dealLoop(had) { - resSkip, exeErr := e.executeModule(ctx, e.task, item, h, &stdout, &stderr) + stdout, stderr, rendered, exeErr := e.executeModule(ctx, e.task, item, h) + + var rawItem runtime.RawExtension + if rendered != nil { + if bs, err := json.Marshal(rendered); err == nil { + rawItem = runtime.RawExtension{Raw: bs} + } + } + + var errMsg string if exeErr != nil { - resErr = exeErr - break + errMsg = exeErr.Error() } - // loop execute once, skip task - if item == nil && resSkip { - stdout = modules.StdoutSkip - return + + r := kkcorev1alpha1.RegisterResult{ + Item: rawItem, + Stdout: stdout, + Stderr: stderr, + Error: errMsg, } + + registerResult = append(registerResult, r) } } } // execTaskHostLogs sets up and manages progress bar logging for task execution on a host. // It returns a cleanup function to be called when execution completes. -func (e *taskExecutor) execTaskHostLogs(ctx context.Context, h string, stdout, _, errMsg *string) func() { +func (e *taskExecutor) execTaskHostLogs(ctx context.Context, h string, registerResult *[]kkcorev1alpha1.RegisterResult) func() { // placeholder format task log var placeholder string if hostnameMaxLen, err := e.variable.Get(variable.GetHostMaxLength()); err == nil { @@ -227,51 +271,69 @@ func (e *taskExecutor) execTaskHostLogs(ctx context.Context, h string, stdout, _ }() return func() { + var failed bool + var skipped bool + + // determine overall status by scanning all register results + skipped = true // assume skip until we find a non-skip stdout + for _, r := range *registerResult { + if r.Error != "" { + failed = true + break + } + if r.Stdout != modules.StdoutSkip { + skipped = false + } + } + switch { - case ptr.Deref(errMsg, "") != "": - if e.task.Spec.IgnoreError != nil && *e.task.Spec.IgnoreError { // ignore + case failed: + // failed or ignore + if e.task.Spec.IgnoreError != nil && *e.task.Spec.IgnoreError { + // ignore bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34mignore \033[0m", h, placeholder)) if e.logOutput != os.Stdout { fmt.Fprintf(e.logOutput, "[%s]%s ignore \n", h, placeholder) } - } else { // failed + } else { + // failed bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[31mfailed \033[0m", h, placeholder)) if e.logOutput != os.Stdout { fmt.Fprintf(e.logOutput, "[%s]%s failed \n", h, placeholder) } } - case *stdout == modules.StdoutSkip: // skip + case skipped: + // skip bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34mskip \033[0m", h, placeholder)) if e.logOutput != os.Stdout { fmt.Fprintf(e.logOutput, "[%s]%s skip \n", h, placeholder) } - default: //success + default: + // success bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34msuccess\033[0m", h, placeholder)) if e.logOutput != os.Stdout { fmt.Fprintf(e.logOutput, "[%s]%s success\n", h, placeholder) } } - if err := bar.Finish(); err != nil { - klog.ErrorS(err, "finish bar error") - } + + _ = bar.Finish() } } // executeModule executes a single module task on a specific host. -func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.Task, item any, host string, stdout, stderr *string) (resSkip bool, resErr error) { +func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.Task, item any, host string) (stdout string, stderr string, rendered any, resErr error) { // Set loop item variable if one was provided if item != nil { // Convert item to runtime variable node, err := converter.ConvertMap2Node(map[string]any{_const.VariableItem: item}) if err != nil { - return false, errors.Wrap(err, "failed to convert loop item") + return modules.StdoutFailed, "", nil, err } // Merge item into host's runtime variables if err := e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{node}, host)); err != nil { - return false, errors.Wrap(err, "failed to set loop item to variable") + return modules.StdoutFailed, "", nil, err } - // Clean up loop item variable after execution defer func() { if item == nil { @@ -280,11 +342,11 @@ func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.T // Reset item to null resetNode, err := converter.ConvertMap2Node(map[string]any{_const.VariableItem: nil}) if err != nil { - resErr = errors.Wrap(err, "failed to convert loop item") + resErr = err return } if err := e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{resetNode}, host)); err != nil { - resErr = errors.Wrap(err, "failed to clean loop item to variable") + resErr = err return } }() @@ -293,24 +355,23 @@ func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.T // Get all variables for this host, including any loop item ha, err := e.variable.Get(variable.GetAllVariable(host)) if err != nil { - return false, errors.Wrapf(err, "failed to get host %s variable", host) + return modules.StdoutFailed, "", nil, err } // Convert host variables to map type had, ok := ha.(map[string]any) if !ok { - return false, errors.Wrapf(err, "host %s variable is not a map", host) + return modules.StdoutFailed, "", nil, err } - // check when condition if skip, err := e.dealWhen(had); err != nil { - return false, err + return modules.StdoutFailed, "", nil, err } else if skip { - return true, nil + return modules.StdoutSkip, "", nil, nil } // Execute the actual module with the prepared context - *stdout, *stderr, resErr = modules.FindModule(task.Spec.Module.Name)(ctx, modules.ExecOptions{ + stdout, stderr, resErr = modules.FindModule(task.Spec.Module.Name)(ctx, modules.ExecOptions{ Args: e.task.Spec.Module.Args, Host: host, Variable: e.variable, @@ -318,7 +379,10 @@ func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.T Playbook: *e.playbook, LogOutput: e.logOutput, }) - return false, e.dealFailedWhen(had, resErr) + if ferr := e.dealFailedWhen(had, resErr); ferr != nil { + return stdout, stderr, had[_const.VariableItem], ferr + } + return stdout, stderr, had[_const.VariableItem], nil } // dealLoop parses the loop specification into a slice of items to iterate over. @@ -371,42 +435,103 @@ func (e *taskExecutor) dealFailedWhen(had map[string]any, err error) error { return nil } -// dealRegister handles storing task output in a registered variable if specified. -// The output can be stored as raw string, JSON, or YAML based on the register type. -func (e *taskExecutor) dealRegister(host string, stdout, stderr, errMsg string) error { - if e.task.Spec.Register != "" { - var stdoutResult any = stdout - var stderrResult any = stderr - switch e.task.Spec.RegisterType { // if failed the stdout may not be json or yaml - case "json": - if err := json.Unmarshal([]byte(stdout), &stdoutResult); err != nil { - klog.V(5).ErrorS(err, "failed to register json value") - } - case "yaml", "yml": - if err := yaml.Unmarshal([]byte(stdout), &stdoutResult); err != nil { - klog.V(5).ErrorS(err, "failed to register yaml value") - } - default: - // store by string - if s, ok := stdoutResult.(string); ok { - stdoutResult = strings.TrimRight(s, "\n") - } - } - // set variable to parent location - node, err := converter.ConvertMap2Node(map[string]any{ - e.task.Spec.Register: map[string]any{ - "stdout": stdoutResult, - "stderr": stderrResult, - "error": errMsg, - }, - }) - if err != nil { - return err - } - if err := e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{node}, host)); err != nil { - return err - } +// dealRegister merges registerResult into global variables for the given host and processes "register" logic. +// If the task specifies a Register name, it composes the values from the registerResult slice, +// normalizes stdout data according to the RegisterType (json, yaml, or plain string), +// detects errors in any of the items, and merges the composed data into the task's runtime variables. +// It returns an error if any items are in error or if variable merge fails. +func (e *taskExecutor) dealRegister(host string, registerResult []kkcorev1alpha1.RegisterResult) (resErr error) { + if e.task.Spec.Register == "" { + // No register variable specified; nothing to merge. + return nil } - return nil + // parseStdout parses stdout according to the RegisterType. + parseStdout := func(s string) (any, error) { + var out any = s + + switch e.task.Spec.RegisterType { + case "json": + // Attempt to unmarshal as JSON. + if err := json.Unmarshal([]byte(s), &out); err != nil { + klog.V(5).ErrorS(err, "failed to register json value") + return s, err + } + case "yaml", "yml": + // Attempt to unmarshal as YAML. + if err := yaml.Unmarshal([]byte(s), &out); err != nil { + klog.V(5).ErrorS(err, "failed to register yaml value") + return s, err + } + default: + // Remove trailing newline by default. + if str, ok := out.(string); ok { + out = strings.TrimRight(str, "\n") + } + } + + return out, nil + } + + var value any + var hasItemError, hasParsedError bool + + // If there is exactly one registerResult with no Item data, use the flat representation. + if len(registerResult) == 1 && len(registerResult[0].Item.Raw) == 0 && registerResult[0].Item.Object == nil { + r := registerResult[0] + parsedStdout, perr := parseStdout(r.Stdout) + if perr != nil { + hasParsedError = true + } + value = map[string]any{ + "stdout": parsedStdout, + "stderr": r.Stderr, + "error": r.Error, + } + + // If there is any error at the module level, set the global error flag. + if strings.TrimSpace(r.Error) != "" { + hasItemError = true + } + } else { + // Otherwise, collect all items as an array of results. + var arr []any + for _, r := range registerResult { + parsedStdout, perr := parseStdout(r.Stdout) + if perr != nil { + hasParsedError = true + } + arr = append(arr, map[string]any{ + "item": string(r.Item.Raw), + "stdout": parsedStdout, + "stderr": r.Stderr, + "error": r.Error, + }) + + // If any item has error, set the global error flag. + if strings.TrimSpace(r.Error) != "" { + hasItemError = true + } + } + value = arr + } + + // If any item failed, return a unified task failure error sentinel. + if hasItemError { + resErr = errors.Join(resErr, errors.New("module run failed")) + } + if hasParsedError { + resErr = errors.Join(resErr, errors.New("parse register by register type failed")) + } + + // Convert the register mapping to a YAML node. + node, err := converter.ConvertMap2Node(map[string]any{ + e.task.Spec.Register: value, + }) + if err != nil { + return errors.Join(resErr, err) + } + + // Merge the YAML node into the runtime variable store for the current host. + return errors.Join(resErr, e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{node}, host))) }