kubekey/pkg/executor/pipeline_executor.go
liujian 3e56b095de
fix: test kk push image to harbor. (#2457)
Co-authored-by: joyceliu <joyceliu@yunify.com>
2024-11-19 14:23:29 +08:00

281 lines
8.4 KiB
Go

/*
Copyright 2024 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
"context"
"errors"
"fmt"
"io"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
kkcorev1 "github.com/kubesphere/kubekey/v4/pkg/apis/core/v1"
kkprojectv1 "github.com/kubesphere/kubekey/v4/pkg/apis/project/v1"
"github.com/kubesphere/kubekey/v4/pkg/connector"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/converter"
"github.com/kubesphere/kubekey/v4/pkg/project"
"github.com/kubesphere/kubekey/v4/pkg/variable"
"github.com/kubesphere/kubekey/v4/pkg/variable/source"
)
// NewPipelineExecutor return a new pipelineExecutor
func NewPipelineExecutor(ctx context.Context, client ctrlclient.Client, pipeline *kkcorev1.Pipeline, logOutput io.Writer) Executor {
// get variable
v, err := variable.New(ctx, client, *pipeline, source.FileSource)
if err != nil {
klog.V(5).ErrorS(nil, "convert playbook error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return nil
}
return &pipelineExecutor{
option: &option{
client: client,
pipeline: pipeline,
variable: v,
logOutput: logOutput,
},
}
}
// executor for pipeline
type pipelineExecutor struct {
*option
}
// Exec pipeline. covert playbook to block and executor it.
func (e pipelineExecutor) Exec(ctx context.Context) error {
klog.V(5).InfoS("deal project", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline))
pj, err := project.New(ctx, *e.pipeline, true)
if err != nil {
return fmt.Errorf("deal project error: %w", err)
}
// convert to transfer.Playbook struct
pb, err := pj.MarshalPlaybook()
if err != nil {
return fmt.Errorf("convert playbook error: %w", err)
}
for _, play := range pb.Play {
// check tags
if !play.Taggable.IsEnabled(e.pipeline.Spec.Tags, e.pipeline.Spec.SkipTags) {
// if not match the tags. skip
continue
}
// hosts should contain all host's name. hosts should not be empty.
var hosts []string
if err := e.dealHosts(play.PlayHost, &hosts); err != nil {
klog.V(4).ErrorS(err, "deal hosts error, skip this playbook", "hosts", play.PlayHost)
continue
}
// when gather_fact is set. get host's information from remote.
if err := e.dealGatherFacts(ctx, play.GatherFacts, hosts); err != nil {
return fmt.Errorf("deal gather_facts argument error: %w", err)
}
// Batch execution, with each batch being a group of hosts run in serial.
var batchHosts [][]string
if err := e.dealSerial(play.Serial.Data, hosts, &batchHosts); err != nil {
return fmt.Errorf("deal serial argument error: %w", err)
}
e.dealRunOnce(play.RunOnce, hosts, &batchHosts)
// exec pipeline in each BatchHosts
if err := e.execBatchHosts(ctx, play, batchHosts); err != nil {
return fmt.Errorf("exec batch hosts error: %v", err)
}
}
return nil
}
// execBatchHosts executor block in play order by: "pre_tasks" > "roles" > "tasks" > "post_tasks"
func (e pipelineExecutor) execBatchHosts(ctx context.Context, play kkprojectv1.Play, batchHosts [][]string) any {
// generate and execute task.
for _, serials := range batchHosts {
// each batch hosts should not be empty.
if len(serials) == 0 {
klog.V(5).ErrorS(nil, "Host is empty", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline))
return errors.New("host is empty")
}
if err := e.variable.Merge(variable.MergeRuntimeVariable(play.Vars, serials...)); err != nil {
return fmt.Errorf("merge variable error: %w", err)
}
// generate task from pre tasks
if err := (blockExecutor{
option: e.option,
hosts: serials,
ignoreErrors: play.IgnoreErrors,
blocks: play.PreTasks,
tags: play.Taggable,
}.Exec(ctx)); err != nil {
return fmt.Errorf("execute pre-tasks from play error: %w", err)
}
// generate task from role
for _, role := range play.Roles {
if !kkprojectv1.JoinTag(role.Taggable, play.Taggable).IsEnabled(e.pipeline.Spec.Tags, e.pipeline.Spec.SkipTags) {
// if not match the tags. skip
continue
}
if err := e.variable.Merge(variable.MergeRuntimeVariable(role.Vars, serials...)); err != nil {
return fmt.Errorf("merge variable error: %w", err)
}
// use the most closely configuration
ignoreErrors := role.IgnoreErrors
if ignoreErrors == nil {
ignoreErrors = play.IgnoreErrors
}
// role is block.
if err := (blockExecutor{
option: e.option,
hosts: serials,
ignoreErrors: ignoreErrors,
blocks: role.Block,
role: role.Role,
when: role.When.Data,
tags: kkprojectv1.JoinTag(role.Taggable, play.Taggable),
}.Exec(ctx)); err != nil {
return fmt.Errorf("execute role-tasks error: %w", err)
}
}
// generate task from tasks
if err := (blockExecutor{
option: e.option,
hosts: serials,
ignoreErrors: play.IgnoreErrors,
blocks: play.Tasks,
tags: play.Taggable,
}.Exec(ctx)); err != nil {
return fmt.Errorf("execute tasks error: %w", err)
}
// generate task from post tasks
if err := (blockExecutor{
option: e.option,
hosts: serials,
ignoreErrors: play.IgnoreErrors,
blocks: play.PostTasks,
tags: play.Taggable,
}.Exec(ctx)); err != nil {
return fmt.Errorf("execute post-tasks error: %w", err)
}
}
return nil
}
// dealHosts "hosts" argument in playbook. get hostname from kkprojectv1.PlayHost
func (e pipelineExecutor) dealHosts(host kkprojectv1.PlayHost, i *[]string) error {
ahn, err := e.variable.Get(variable.GetHostnames(host.Hosts))
if err != nil {
return fmt.Errorf("getHostnames error: %w", err)
}
if h, ok := ahn.([]string); ok {
*i = h
}
if len(*i) == 0 { // if hosts is empty skip this playbook
return errors.New("hosts is empty")
}
return nil
}
// dealGatherFacts "gather_facts" argument in playbook. get host remote info and merge to variable
func (e pipelineExecutor) dealGatherFacts(ctx context.Context, gatherFacts bool, hosts []string) error {
if !gatherFacts {
// skip
return nil
}
dealGatherFactsInHost := func(hostname string) error {
v, err := e.variable.Get(variable.GetParamVariable(hostname))
if err != nil {
klog.V(5).ErrorS(err, "get host variable error", "hostname", hostname)
return err
}
connectorVars := make(map[string]any)
if c1, ok := v.(map[string]any)[_const.VariableConnector]; ok {
if c2, ok := c1.(map[string]any); ok {
connectorVars = c2
}
}
// get host connector
conn, err := connector.NewConnector(hostname, connectorVars)
if err != nil {
klog.V(5).ErrorS(err, "new connector error", "hostname", hostname)
return err
}
if err := conn.Init(ctx); err != nil {
klog.V(5).ErrorS(err, "init connection error", "hostname", hostname)
return err
}
defer conn.Close(ctx)
if gf, ok := conn.(connector.GatherFacts); ok {
remoteInfo, err := gf.HostInfo(ctx)
if err != nil {
klog.V(5).ErrorS(err, "gatherFacts from connector error", "hostname", hostname)
return err
}
if err := e.variable.Merge(variable.MergeRemoteVariable(remoteInfo, hostname)); err != nil {
klog.V(5).ErrorS(err, "merge gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(e.pipeline), "host", hostname)
return fmt.Errorf("merge gather fact error: %w", err)
}
}
return nil
}
for _, hostname := range hosts {
if err := dealGatherFactsInHost(hostname); err != nil {
return err
}
}
return nil
}
// dealSerial "serial" argument in playbook.
func (e pipelineExecutor) dealSerial(serial []any, hosts []string, batchHosts *[][]string) error {
var err error
*batchHosts, err = converter.GroupHostBySerial(hosts, serial)
if err != nil {
return fmt.Errorf("group host by serial error: %w", err)
}
return nil
}
// dealRunOnce argument in playbook. if RunOnce is true. it's always only run in the first hosts.
func (e pipelineExecutor) dealRunOnce(runOnce bool, hosts []string, batchHosts *[][]string) {
if runOnce {
// runOnce only run in first node
*batchHosts = [][]string{{hosts[0]}}
}
}