feat: more fast execute task (#2603)

Signed-off-by: joyceliu <joyceliu@yunify.com>
This commit is contained in:
liujian 2025-06-05 14:07:39 +08:00 committed by GitHub
parent 9ec3aa7472
commit 72680b80be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -6,7 +6,6 @@ import (
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/cockroachdb/errors"
@ -16,7 +15,6 @@ import (
"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
@ -30,8 +28,6 @@ import (
type taskExecutor struct {
*option
task *kkcorev1alpha1.Task
// runOnce only executor task once
runOnce sync.Once
// taskRunTimeout is the timeout for task executor
taskRunTimeout time.Duration
}
@ -96,55 +92,25 @@ func (e *taskExecutor) runTaskLoop(ctx context.Context) error {
}
fmt.Fprintf(e.logOutput, "%s %s%s\n", time.Now().Format(time.TimeOnly+" MST"), roleLog, e.task.Spec.Name)
// Create ticker for periodic reconciliation
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
// reconcile handles task state transitions and execution
reconcile := func(ctx context.Context, request ctrl.Request) (_ ctrl.Result, err error) {
task := &kkcorev1alpha1.Task{}
if err = e.client.Get(ctx, request.NamespacedName, task); err != nil {
return ctrl.Result{Requeue: true}, err
}
defer func() {
err = e.client.Status().Patch(ctx, e.task, ctrlclient.MergeFrom(task.DeepCopy()))
}()
// Handle task phase transitions
switch task.Status.Phase {
case "", kkcorev1alpha1.TaskPhasePending:
e.task.Status.Phase = kkcorev1alpha1.TaskPhaseRunning
case kkcorev1alpha1.TaskPhaseRunning:
// Execute task only once when it enters running phase
e.runOnce.Do(func() {
e.execTask(ctx)
})
case kkcorev1alpha1.TaskPhaseFailed, kkcorev1alpha1.TaskPhaseIgnored, kkcorev1alpha1.TaskPhaseSuccess:
return ctrl.Result{}, nil
}
return ctrl.Result{Requeue: true}, nil
}
// Main loop to handle task execution and timeout
for {
select {
case <-ctx.Done():
return nil
case <-time.After(e.taskRunTimeout):
return errors.Errorf("task %q execution timeout", e.task.Spec.Name)
case <-ticker.C:
result, err := reconcile(ctx, ctrl.Request{NamespacedName: ctrlclient.ObjectKeyFromObject(e.task)})
if err != nil {
klog.V(5).ErrorS(err, "failed to reconcile task", "task", ctrlclient.ObjectKeyFromObject(e.task), "playbook", ctrlclient.ObjectKeyFromObject(e.playbook))
}
if result.Requeue {
continue
}
return nil
if e.task.IsComplete() {
break
}
task := e.task.DeepCopy()
if e.task.Status.Phase == kkcorev1alpha1.TaskPhaseFailed {
e.task.Status.RestartCount++
}
e.task.Status.Phase = kkcorev1alpha1.TaskPhaseRunning
if err := e.client.Status().Patch(ctx, e.task, ctrlclient.MergeFrom(task)); err != nil {
return errors.Wrap(err, "failed to patch task status")
}
task = e.task.DeepCopy()
e.execTask(ctx)
if err := e.client.Status().Patch(ctx, e.task, ctrlclient.MergeFrom(task)); err != nil {
return errors.Wrap(err, "failed to patch task status")
}
}
return nil
}
// execTask executes the task across all specified hosts in parallel and updates the task status.