mirror of
https://github.com/kubesphere/kubekey.git
synced 2025-12-26 01:22:51 +00:00
277 lines
8.3 KiB
Go
277 lines
8.3 KiB
Go
/*
|
|
Copyright 2024 The KubeSphere Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package executor
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
|
|
kkcorev1alpha1 "github.com/kubesphere/kubekey/api/core/v1alpha1"
|
|
kkprojectv1 "github.com/kubesphere/kubekey/api/project/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/klog/v2"
|
|
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
"github.com/kubesphere/kubekey/v4/pkg/converter"
|
|
"github.com/kubesphere/kubekey/v4/pkg/project"
|
|
"github.com/kubesphere/kubekey/v4/pkg/variable"
|
|
"github.com/kubesphere/kubekey/v4/pkg/variable/source"
|
|
)
|
|
|
|
// NewPlaybookExecutor return a new playbookExecutor
|
|
func NewPlaybookExecutor(ctx context.Context, client ctrlclient.Client, playbook *kkcorev1.Playbook, logOutput io.Writer) Executor {
|
|
// get variable
|
|
v, err := variable.New(ctx, client, *playbook, source.FileSource)
|
|
if err != nil {
|
|
klog.V(5).ErrorS(err, "get variable error", "playbook", ctrlclient.ObjectKeyFromObject(playbook))
|
|
|
|
return nil
|
|
}
|
|
|
|
return &playbookExecutor{
|
|
option: &option{
|
|
client: client,
|
|
playbook: playbook,
|
|
variable: v,
|
|
logOutput: logOutput,
|
|
},
|
|
}
|
|
}
|
|
|
|
// executor for playbook
|
|
type playbookExecutor struct {
|
|
*option
|
|
}
|
|
|
|
// Exec playbook. covert playbook to block and executor it.
|
|
func (e playbookExecutor) Exec(ctx context.Context) (retErr error) {
|
|
old := e.playbook.DeepCopy()
|
|
defer func() {
|
|
e.syncStatus(ctx, old, retErr)
|
|
}()
|
|
fmt.Fprint(e.logOutput, `
|
|
|
|
_ __ _ _ __
|
|
| | / / | | | | / /
|
|
| |/ / _ _| |__ ___| |/ / ___ _ _
|
|
| \| | | | '_ \ / _ \ \ / _ \ | | |
|
|
| |\ \ |_| | |_) | __/ |\ \ __/ |_| |
|
|
\_| \_/\__,_|_.__/ \___\_| \_/\___|\__, |
|
|
__/ |
|
|
|___/
|
|
|
|
`)
|
|
fmt.Fprintf(e.logOutput, "%s [Playbook %s] start\n", time.Now().Format(time.TimeOnly+" MST"), ctrlclient.ObjectKeyFromObject(e.playbook))
|
|
klog.V(5).InfoS("deal project", "playbook", ctrlclient.ObjectKeyFromObject(e.playbook))
|
|
pj, err := project.New(ctx, *e.playbook, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// convert to transfer.Playbook struct
|
|
pb, err := pj.MarshalPlaybook()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, play := range pb.Play {
|
|
// hosts should contain all host's name. hosts should not be empty.
|
|
var hosts []string
|
|
if err := e.dealHosts(play.PlayHost, &hosts); err != nil {
|
|
klog.V(4).ErrorS(err, "deal hosts error, skip this playbook", "hosts", play.PlayHost)
|
|
|
|
continue
|
|
}
|
|
// check tags
|
|
if play.Taggable.IsEnabled(e.playbook.Spec.Tags, e.playbook.Spec.SkipTags) {
|
|
// when gather_fact is set. get host's information from remote.
|
|
if err := e.dealGatherFacts(ctx, play.GatherFacts, hosts); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Batch execution, with each batch being a group of hosts run in serial.
|
|
var batchHosts [][]string
|
|
if err := e.dealSerial(play.Serial.Data, hosts, &batchHosts); err != nil {
|
|
return err
|
|
}
|
|
e.dealRunOnce(play.RunOnce, hosts, &batchHosts)
|
|
// exec playbook in each BatchHosts
|
|
if err := e.execBatchHosts(ctx, play, batchHosts); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e playbookExecutor) syncStatus(ctx context.Context, old *kkcorev1.Playbook, err error) {
|
|
if err != nil {
|
|
e.playbook.Status.Phase = kkcorev1.PlaybookPhaseFailed
|
|
e.playbook.Status.FailureReason = kkcorev1.PlaybookFailedReasonTaskFailed
|
|
e.playbook.Status.FailureMessage = err.Error()
|
|
} else {
|
|
e.playbook.Status.Phase = kkcorev1.PlaybookPhaseSucceeded
|
|
}
|
|
|
|
fmt.Fprintf(e.logOutput, "%s [Playbook %s] finish. total: %v,success: %v,ignored: %v,failed: %v\n", time.Now().Format(time.TimeOnly+" MST"), ctrlclient.ObjectKeyFromObject(e.playbook),
|
|
e.playbook.Status.Statistics.Total, e.playbook.Status.Statistics.Success, e.playbook.Status.Statistics.Ignored, e.playbook.Status.Statistics.Failed)
|
|
|
|
// fill results from variable
|
|
rv, err := e.variable.Get(variable.GetResultVariable())
|
|
if err != nil {
|
|
klog.ErrorS(err, "failed to get playbook results detail", "playbook", ctrlclient.ObjectKeyFromObject(e.playbook))
|
|
}
|
|
e.playbook.Status.Result.Raw, err = json.Marshal(rv)
|
|
if err != nil {
|
|
klog.ErrorS(err, "failed to marshal playbook results detail", "playbook", ctrlclient.ObjectKeyFromObject(e.playbook))
|
|
}
|
|
|
|
// update playbook status
|
|
if err := e.client.Status().Patch(ctx, e.playbook, ctrlclient.MergeFrom(old)); err != nil {
|
|
klog.ErrorS(err, "update playbook error", "playbook", ctrlclient.ObjectKeyFromObject(e.playbook))
|
|
}
|
|
}
|
|
|
|
// execBatchHosts executor block in play order by: "pre_tasks" > "roles" > "tasks" > "post_tasks"
|
|
func (e playbookExecutor) execBatchHosts(ctx context.Context, play kkprojectv1.Play, batchHosts [][]string) error {
|
|
// generate and execute task.
|
|
for _, serials := range batchHosts {
|
|
// each batch hosts should not be empty.
|
|
if len(serials) == 0 {
|
|
return errors.Errorf("host is empty")
|
|
}
|
|
|
|
if err := e.variable.Merge(variable.MergeRuntimeVariable(play.Vars.Nodes, serials...)); err != nil {
|
|
return err
|
|
}
|
|
// generate task from pre tasks
|
|
if err := (blockExecutor{
|
|
option: e.option,
|
|
hosts: serials,
|
|
ignoreErrors: play.IgnoreErrors,
|
|
blocks: play.PreTasks,
|
|
tags: play.Taggable,
|
|
}.Exec(ctx)); err != nil {
|
|
return err
|
|
}
|
|
// generate task from role
|
|
for _, role := range play.Roles {
|
|
// use the most closely configuration
|
|
ignoreErrors := role.IgnoreErrors
|
|
if ignoreErrors == nil {
|
|
ignoreErrors = play.IgnoreErrors
|
|
}
|
|
|
|
// role has block.
|
|
if err := (roleExecutor{
|
|
option: e.option,
|
|
hosts: serials,
|
|
ignoreErrors: ignoreErrors,
|
|
role: role,
|
|
when: role.When.Data,
|
|
tags: kkprojectv1.JoinTag(role.Taggable, play.Taggable),
|
|
}.Exec(ctx)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// generate task from tasks
|
|
if err := (blockExecutor{
|
|
option: e.option,
|
|
hosts: serials,
|
|
ignoreErrors: play.IgnoreErrors,
|
|
blocks: play.Tasks,
|
|
tags: play.Taggable,
|
|
}.Exec(ctx)); err != nil {
|
|
return err
|
|
}
|
|
// generate task from post tasks
|
|
if err := (blockExecutor{
|
|
option: e.option,
|
|
hosts: serials,
|
|
ignoreErrors: play.IgnoreErrors,
|
|
blocks: play.PostTasks,
|
|
tags: play.Taggable,
|
|
}.Exec(ctx)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// dealHosts "hosts" argument in playbook. get hostname from kkprojectv1.PlayHost
|
|
func (e playbookExecutor) dealHosts(host kkprojectv1.PlayHost, i *[]string) error {
|
|
ahn, err := e.variable.Get(variable.GetHostnames(host.Hosts))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if h, ok := ahn.([]string); ok {
|
|
*i = h
|
|
}
|
|
if len(*i) == 0 { // if hosts is empty skip this playbook
|
|
return errors.New("hosts is empty")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// dealGatherFacts "gather_facts" argument in playbook. get host remote info and merge to variable
|
|
func (e playbookExecutor) dealGatherFacts(ctx context.Context, gatherFacts bool, hosts []string) error {
|
|
if !gatherFacts {
|
|
// skip
|
|
return nil
|
|
}
|
|
// run setup task
|
|
return (&taskExecutor{option: e.option, task: &kkcorev1alpha1.Task{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
GenerateName: e.playbook.Name + "-",
|
|
Namespace: e.playbook.Namespace,
|
|
},
|
|
Spec: kkcorev1alpha1.TaskSpec{
|
|
Name: "gather_facts",
|
|
Hosts: hosts,
|
|
Module: kkcorev1alpha1.Module{
|
|
Name: "setup",
|
|
},
|
|
},
|
|
}}).Exec(ctx)
|
|
}
|
|
|
|
// dealSerial "serial" argument in playbook.
|
|
func (e playbookExecutor) dealSerial(serial []any, hosts []string, batchHosts *[][]string) error {
|
|
var err error
|
|
*batchHosts, err = converter.GroupHostBySerial(hosts, serial)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// dealRunOnce argument in playbook. if RunOnce is true. it's always only run in the first hosts.
|
|
func (e playbookExecutor) dealRunOnce(runOnce bool, hosts []string, batchHosts *[][]string) {
|
|
if runOnce {
|
|
// runOnce only run in first node
|
|
*batchHosts = [][]string{{hosts[0]}}
|
|
}
|
|
}
|