diff --git a/pkg/executor/task_executor.go b/pkg/executor/task_executor.go index 8d5b94bd..fdf1584f 100644 --- a/pkg/executor/task_executor.go +++ b/pkg/executor/task_executor.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "strings" - "sync" "time" "github.com/cockroachdb/errors" @@ -16,7 +15,6 @@ import ( "gopkg.in/yaml.v3" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" _const "github.com/kubesphere/kubekey/v4/pkg/const" @@ -30,8 +28,6 @@ import ( type taskExecutor struct { *option task *kkcorev1alpha1.Task - // runOnce only executor task once - runOnce sync.Once // taskRunTimeout is the timeout for task executor taskRunTimeout time.Duration } @@ -96,55 +92,25 @@ func (e *taskExecutor) runTaskLoop(ctx context.Context) error { } fmt.Fprintf(e.logOutput, "%s %s%s\n", time.Now().Format(time.TimeOnly+" MST"), roleLog, e.task.Spec.Name) - // Create ticker for periodic reconciliation - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - - // reconcile handles task state transitions and execution - reconcile := func(ctx context.Context, request ctrl.Request) (_ ctrl.Result, err error) { - task := &kkcorev1alpha1.Task{} - if err = e.client.Get(ctx, request.NamespacedName, task); err != nil { - return ctrl.Result{Requeue: true}, err - } - defer func() { - err = e.client.Status().Patch(ctx, e.task, ctrlclient.MergeFrom(task.DeepCopy())) - }() - - // Handle task phase transitions - switch task.Status.Phase { - case "", kkcorev1alpha1.TaskPhasePending: - e.task.Status.Phase = kkcorev1alpha1.TaskPhaseRunning - case kkcorev1alpha1.TaskPhaseRunning: - // Execute task only once when it enters running phase - e.runOnce.Do(func() { - e.execTask(ctx) - }) - case kkcorev1alpha1.TaskPhaseFailed, kkcorev1alpha1.TaskPhaseIgnored, kkcorev1alpha1.TaskPhaseSuccess: - return ctrl.Result{}, nil - } - - return ctrl.Result{Requeue: true}, nil - } - - // Main loop to handle task execution and timeout for { - select { - case <-ctx.Done(): - return nil - case <-time.After(e.taskRunTimeout): - return errors.Errorf("task %q execution timeout", e.task.Spec.Name) - case <-ticker.C: - result, err := reconcile(ctx, ctrl.Request{NamespacedName: ctrlclient.ObjectKeyFromObject(e.task)}) - if err != nil { - klog.V(5).ErrorS(err, "failed to reconcile task", "task", ctrlclient.ObjectKeyFromObject(e.task), "playbook", ctrlclient.ObjectKeyFromObject(e.playbook)) - } - if result.Requeue { - continue - } - - return nil + if e.task.IsComplete() { + break + } + task := e.task.DeepCopy() + if e.task.Status.Phase == kkcorev1alpha1.TaskPhaseFailed { + e.task.Status.RestartCount++ + } + e.task.Status.Phase = kkcorev1alpha1.TaskPhaseRunning + if err := e.client.Status().Patch(ctx, e.task, ctrlclient.MergeFrom(task)); err != nil { + return errors.Wrap(err, "failed to patch task status") + } + task = e.task.DeepCopy() + e.execTask(ctx) + if err := e.client.Status().Patch(ctx, e.task, ctrlclient.MergeFrom(task)); err != nil { + return errors.Wrap(err, "failed to patch task status") } } + return nil } // execTask executes the task across all specified hosts in parallel and updates the task status.