fix: more clear loop out

Signed-off-by: redscholar <blacktiledhouse@gmail.com>
This commit is contained in:
redscholar 2025-12-04 18:53:47 +08:00
parent 2b5f060fea
commit 3a29a28e20
No known key found for this signature in database
GPG Key ID: 5A4D7C7DB5D38D49
3 changed files with 245 additions and 93 deletions

View File

@ -74,10 +74,16 @@ type TaskStatus struct {
// TaskHostResult each host result for task
type TaskHostResult struct {
Host string `json:"host,omitempty"`
Stdout string `json:"stdout,omitempty"`
StdErr string `json:"stdErr,omitempty"`
Error string `json:"error,omitempty"`
Host string `json:"host,omitempty"`
Error string `json:"error,omitempty"`
RegisterResults []RegisterResult `json:"registerResults,omitempty"`
}
type RegisterResult struct {
Item runtime.RawExtension `json:"item,omitempty"`
Stdout string `json:"stdout,omitempty"`
Stderr string `json:"stdErr,omitempty"`
Error string `json:"error,omitempty"`
}
// +genclient

View File

@ -40,6 +40,22 @@ func (in *Module) DeepCopy() *Module {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RegisterResult) DeepCopyInto(out *RegisterResult) {
*out = *in
in.Item.DeepCopyInto(&out.Item)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegisterResult.
func (in *RegisterResult) DeepCopy() *RegisterResult {
if in == nil {
return nil
}
out := new(RegisterResult)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Task) DeepCopyInto(out *Task) {
*out = *in
@ -70,6 +86,13 @@ func (in *Task) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TaskHostResult) DeepCopyInto(out *TaskHostResult) {
*out = *in
if in.RegisterResults != nil {
in, out := &in.RegisterResults, &out.RegisterResults
*out = make([]RegisterResult, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskHostResult.
@ -157,7 +180,9 @@ func (in *TaskStatus) DeepCopyInto(out *TaskStatus) {
if in.HostResults != nil {
in, out := &in.HostResults, &out.HostResults
*out = make([]TaskHostResult, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}

View File

@ -2,7 +2,6 @@ package executor
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
@ -12,9 +11,10 @@ import (
kkcorev1alpha1 "github.com/kubesphere/kubekey/api/core/v1alpha1"
"github.com/schollz/progressbar/v3"
"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
@ -60,12 +60,46 @@ func (e *taskExecutor) Exec(ctx context.Context) error {
// exit when task run failed
if e.task.IsFailed() {
failedMsg := "\n"
for _, result := range e.task.Status.HostResults {
if result.Error != "" {
failedMsg += fmt.Sprintf("[%s]: %s: %s\n", result.Host, result.StdErr, result.Error)
// 1. Print executor-level (host-level) error first, if exists
if strings.TrimSpace(result.Error) != "" {
failedMsg += fmt.Sprintf(
"[%s][executor]: %s\n",
result.Host,
result.Error,
)
}
// 2. Then print item-level errors (only items with error)
for idx, r := range result.RegisterResults {
if strings.TrimSpace(r.Error) == "" {
continue
}
itemInfo := "item=<nil>"
if len(r.Item.Raw) > 0 {
itemInfo = "item=" + string(r.Item.Raw)
} else if r.Item.Object != nil {
itemInfo = fmt.Sprintf("item=%#v", r.Item.Object)
}
failedMsg += fmt.Sprintf(
"[%s][%s][%d]: %s\n",
result.Host,
itemInfo,
idx,
r.Error,
)
}
}
return errors.Errorf("task [%s](%s) run failed: %s", e.task.Spec.Name, ctrlclient.ObjectKeyFromObject(e.task), failedMsg)
return errors.Errorf(
"task [%s](%s) run failed: %s",
e.task.Spec.Name,
ctrlclient.ObjectKeyFromObject(e.task),
failedMsg,
)
}
return nil
@ -133,63 +167,69 @@ func (e *taskExecutor) execTask(ctx context.Context) {
// condition checking, and module execution. It runs in parallel for each host.
func (e *taskExecutor) execTaskHost(i int, h string) func(ctx context.Context) {
return func(ctx context.Context) {
// task result
var resErr error
var stdout, stderr, errMsg string
var registerResult []kkcorev1alpha1.RegisterResult
// task log
deferFunc := e.execTaskHostLogs(ctx, h, &stdout, &stderr, &errMsg)
deferFunc := e.execTaskHostLogs(ctx, h, &registerResult)
defer deferFunc()
defer func() {
resErr = errors.Join(resErr, e.dealRegister(h, registerResult))
var errMsg string
if resErr != nil {
errMsg = resErr.Error()
klog.V(5).ErrorS(resErr, "task run failed", "host", h, "stdout", stdout, "stderr", stderr, "error", errMsg, "task", ctrlclient.ObjectKeyFromObject(e.task))
}
resErr = errors.Join(resErr, e.dealRegister(h, stdout, stderr, errMsg))
// fill result
e.task.Status.HostResults[i] = kkcorev1alpha1.TaskHostResult{
Host: h,
Stdout: stdout,
StdErr: stderr,
Error: errMsg,
Host: h,
Error: errMsg,
RegisterResults: registerResult,
}
}()
// task execute
ha, err := e.variable.Get(variable.GetAllVariable(h))
if err != nil {
resErr = err
return
}
// convert hostVariable to map
had, ok := ha.(map[string]any)
if !ok {
resErr = errors.Errorf("host: %s variable is not a map", h)
return
}
// execute module in loop with loop item.
// if loop is empty. execute once, and the item is null
allLoopSkipped := true
for _, item := range e.dealLoop(had) {
resSkip, exeErr := e.executeModule(ctx, e.task, item, h, &stdout, &stderr)
if exeErr != nil || !resSkip {
allLoopSkipped = false
stdout, stderr, rendered, exeErr := e.executeModule(ctx, e.task, item, h)
var rawItem runtime.RawExtension
if rendered != nil {
if bs, err := json.Marshal(rendered); err == nil {
rawItem = runtime.RawExtension{Raw: bs}
}
}
var errMsg string
if exeErr != nil {
resErr = exeErr
break
errMsg = exeErr.Error()
}
}
// when all loop skipped, skip task
if allLoopSkipped {
stdout = modules.StdoutSkip
return
r := kkcorev1alpha1.RegisterResult{
Item: rawItem,
Stdout: stdout,
Stderr: stderr,
Error: errMsg,
}
registerResult = append(registerResult, r)
}
}
}
// execTaskHostLogs sets up and manages progress bar logging for task execution on a host.
// It returns a cleanup function to be called when execution completes.
func (e *taskExecutor) execTaskHostLogs(ctx context.Context, h string, stdout, _, errMsg *string) func() {
func (e *taskExecutor) execTaskHostLogs(ctx context.Context, h string, registerResult *[]kkcorev1alpha1.RegisterResult) func() {
// placeholder format task log
var placeholder string
if hostnameMaxLen, err := e.variable.Get(variable.GetHostMaxLength()); err == nil {
@ -231,51 +271,69 @@ func (e *taskExecutor) execTaskHostLogs(ctx context.Context, h string, stdout, _
}()
return func() {
var failed bool
var skipped bool
// determine overall status by scanning all register results
skipped = true // assume skip until we find a non-skip stdout
for _, r := range *registerResult {
if r.Error != "" {
failed = true
break
}
if r.Stdout != modules.StdoutSkip {
skipped = false
}
}
switch {
case ptr.Deref(errMsg, "") != "":
if e.task.Spec.IgnoreError != nil && *e.task.Spec.IgnoreError { // ignore
case failed:
// failed or ignore
if e.task.Spec.IgnoreError != nil && *e.task.Spec.IgnoreError {
// ignore
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34mignore \033[0m", h, placeholder))
if e.logOutput != os.Stdout {
fmt.Fprintf(e.logOutput, "[%s]%s ignore \n", h, placeholder)
}
} else { // failed
} else {
// failed
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[31mfailed \033[0m", h, placeholder))
if e.logOutput != os.Stdout {
fmt.Fprintf(e.logOutput, "[%s]%s failed \n", h, placeholder)
}
}
case *stdout == modules.StdoutSkip: // skip
case skipped:
// skip
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34mskip \033[0m", h, placeholder))
if e.logOutput != os.Stdout {
fmt.Fprintf(e.logOutput, "[%s]%s skip \n", h, placeholder)
}
default: //success
default:
// success
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34msuccess\033[0m", h, placeholder))
if e.logOutput != os.Stdout {
fmt.Fprintf(e.logOutput, "[%s]%s success\n", h, placeholder)
}
}
if err := bar.Finish(); err != nil {
klog.ErrorS(err, "finish bar error")
}
_ = bar.Finish()
}
}
// executeModule executes a single module task on a specific host.
func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.Task, item any, host string, stdout, stderr *string) (resSkip bool, resErr error) {
func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.Task, item any, host string) (stdout string, stderr string, rendered any, resErr error) {
// Set loop item variable if one was provided
if item != nil {
// Convert item to runtime variable
node, err := converter.ConvertMap2Node(map[string]any{_const.VariableItem: item})
if err != nil {
return false, errors.Wrap(err, "failed to convert loop item")
return modules.StdoutFailed, "", nil, err
}
// Merge item into host's runtime variables
if err := e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{node}, host)); err != nil {
return false, errors.Wrap(err, "failed to set loop item to variable")
return modules.StdoutFailed, "", nil, err
}
// Clean up loop item variable after execution
defer func() {
if item == nil {
@ -284,11 +342,11 @@ func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.T
// Reset item to null
resetNode, err := converter.ConvertMap2Node(map[string]any{_const.VariableItem: nil})
if err != nil {
resErr = errors.Wrap(err, "failed to convert loop item")
resErr = err
return
}
if err := e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{resetNode}, host)); err != nil {
resErr = errors.Wrap(err, "failed to clean loop item to variable")
resErr = err
return
}
}()
@ -297,24 +355,23 @@ func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.T
// Get all variables for this host, including any loop item
ha, err := e.variable.Get(variable.GetAllVariable(host))
if err != nil {
return false, errors.Wrapf(err, "failed to get host %s variable", host)
return modules.StdoutFailed, "", nil, err
}
// Convert host variables to map type
had, ok := ha.(map[string]any)
if !ok {
return false, errors.Wrapf(err, "host %s variable is not a map", host)
return modules.StdoutFailed, "", nil, err
}
// check when condition
if skip, err := e.dealWhen(had); err != nil {
return false, err
return modules.StdoutFailed, "", nil, err
} else if skip {
return true, nil
return modules.StdoutSkip, "", nil, nil
}
// Execute the actual module with the prepared context
*stdout, *stderr, resErr = modules.FindModule(task.Spec.Module.Name)(ctx, modules.ExecOptions{
stdout, stderr, resErr = modules.FindModule(task.Spec.Module.Name)(ctx, modules.ExecOptions{
Args: e.task.Spec.Module.Args,
Host: host,
Variable: e.variable,
@ -322,7 +379,10 @@ func (e *taskExecutor) executeModule(ctx context.Context, task *kkcorev1alpha1.T
Playbook: *e.playbook,
LogOutput: e.logOutput,
})
return false, e.dealFailedWhen(had, resErr)
if ferr := e.dealFailedWhen(had, resErr); ferr != nil {
return stdout, stderr, had[_const.VariableItem], ferr
}
return stdout, stderr, had[_const.VariableItem], nil
}
// dealLoop parses the loop specification into a slice of items to iterate over.
@ -375,42 +435,103 @@ func (e *taskExecutor) dealFailedWhen(had map[string]any, err error) error {
return nil
}
// dealRegister handles storing task output in a registered variable if specified.
// The output can be stored as raw string, JSON, or YAML based on the register type.
func (e *taskExecutor) dealRegister(host string, stdout, stderr, errMsg string) error {
if e.task.Spec.Register != "" {
var stdoutResult any = stdout
var stderrResult any = stderr
switch e.task.Spec.RegisterType { // if failed the stdout may not be json or yaml
case "json":
if err := json.Unmarshal([]byte(stdout), &stdoutResult); err != nil {
klog.V(5).ErrorS(err, "failed to register json value")
}
case "yaml", "yml":
if err := yaml.Unmarshal([]byte(stdout), &stdoutResult); err != nil {
klog.V(5).ErrorS(err, "failed to register yaml value")
}
default:
// store by string
if s, ok := stdoutResult.(string); ok {
stdoutResult = strings.TrimRight(s, "\n")
}
}
// set variable to parent location
node, err := converter.ConvertMap2Node(map[string]any{
e.task.Spec.Register: map[string]any{
"stdout": stdoutResult,
"stderr": stderrResult,
"error": errMsg,
},
})
if err != nil {
return err
}
if err := e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{node}, host)); err != nil {
return err
}
// dealRegister merges registerResult into global variables for the given host and processes "register" logic.
// If the task specifies a Register name, it composes the values from the registerResult slice,
// normalizes stdout data according to the RegisterType (json, yaml, or plain string),
// detects errors in any of the items, and merges the composed data into the task's runtime variables.
// It returns an error if any items are in error or if variable merge fails.
func (e *taskExecutor) dealRegister(host string, registerResult []kkcorev1alpha1.RegisterResult) (resErr error) {
if e.task.Spec.Register == "" {
// No register variable specified; nothing to merge.
return nil
}
return nil
// parseStdout parses stdout according to the RegisterType.
parseStdout := func(s string) (any, error) {
var out any = s
switch e.task.Spec.RegisterType {
case "json":
// Attempt to unmarshal as JSON.
if err := json.Unmarshal([]byte(s), &out); err != nil {
klog.V(5).ErrorS(err, "failed to register json value")
return s, err
}
case "yaml", "yml":
// Attempt to unmarshal as YAML.
if err := yaml.Unmarshal([]byte(s), &out); err != nil {
klog.V(5).ErrorS(err, "failed to register yaml value")
return s, err
}
default:
// Remove trailing newline by default.
if str, ok := out.(string); ok {
out = strings.TrimRight(str, "\n")
}
}
return out, nil
}
var value any
var hasItemError, hasParsedError bool
// If there is exactly one registerResult with no Item data, use the flat representation.
if len(registerResult) == 1 && len(registerResult[0].Item.Raw) == 0 && registerResult[0].Item.Object == nil {
r := registerResult[0]
parsedStdout, perr := parseStdout(r.Stdout)
if perr != nil {
hasParsedError = true
}
value = map[string]any{
"stdout": parsedStdout,
"stderr": r.Stderr,
"error": r.Error,
}
// If there is any error at the module level, set the global error flag.
if strings.TrimSpace(r.Error) != "" {
hasItemError = true
}
} else {
// Otherwise, collect all items as an array of results.
var arr []any
for _, r := range registerResult {
parsedStdout, perr := parseStdout(r.Stdout)
if perr != nil {
hasParsedError = true
}
arr = append(arr, map[string]any{
"item": string(r.Item.Raw),
"stdout": parsedStdout,
"stderr": r.Stderr,
"error": r.Error,
})
// If any item has error, set the global error flag.
if strings.TrimSpace(r.Error) != "" {
hasItemError = true
}
}
value = arr
}
// If any item failed, return a unified task failure error sentinel.
if hasItemError {
resErr = errors.Join(resErr, errors.New("module run failed"))
}
if hasParsedError {
resErr = errors.Join(resErr, errors.New("parse register by register type failed"))
}
// Convert the register mapping to a YAML node.
node, err := converter.ConvertMap2Node(map[string]any{
e.task.Spec.Register: value,
})
if err != nil {
return errors.Join(resErr, err)
}
// Merge the YAML node into the runtime variable store for the current host.
return errors.Join(resErr, e.variable.Merge(variable.MergeRuntimeVariable([]yaml.Node{node}, host)))
}