kubekey/pkg/converter/converter.go
joyceliu 2a676185e2 feat: kubekey gitops
Signed-off-by: joyceliu <joyceliu@yunify.com>
2024-01-05 15:14:36 +08:00

433 lines
13 KiB
Go

/*
Copyright 2023 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package converter
import (
"context"
"fmt"
"io/fs"
"math"
"path/filepath"
"strconv"
"strings"
"gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
kkcorev1 "github.com/kubesphere/kubekey/v4/pkg/apis/core/v1"
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/project"
"github.com/kubesphere/kubekey/v4/pkg/variable"
)
// MarshalPlaybook kkcorev1.Playbook from a playbook file
func MarshalPlaybook(baseFS fs.FS, pbPath string) (*kkcorev1.Playbook, error) {
// convert playbook to kkcorev1.Playbook
pb := &kkcorev1.Playbook{}
if err := loadPlaybook(baseFS, pbPath, pb); err != nil {
klog.Errorf(" load playbook with include %s failed: %v", pbPath, err)
return nil, err
}
// convertRoles
if err := convertRoles(baseFS, pbPath, pb); err != nil {
klog.Errorf("convertRoles error %v", err)
return nil, err
}
if err := convertIncludeTasks(baseFS, pbPath, pb); err != nil {
klog.Errorf("convertIncludeTasks error %v", err)
return nil, err
}
if err := pb.Validate(); err != nil {
klog.Errorf("validate playbook %s failed: %v", pbPath, err)
return nil, err
}
return pb, nil
}
// loadPlaybook with include_playbook. Join all playbooks into one playbook
func loadPlaybook(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
// baseDir is the local ansible project dir which playbook belong to
pbData, err := fs.ReadFile(baseFS, pbPath)
if err != nil {
klog.Errorf("read playbook %s failed: %v", pbPath, err)
return err
}
var plays []kkcorev1.Play
if err := yaml.Unmarshal(pbData, &plays); err != nil {
klog.Errorf("unmarshal playbook %s failed: %v", pbPath, err)
return err
}
for _, p := range plays {
if p.ImportPlaybook != "" {
importPlaybook := project.GetPlaybookBaseFromPlaybook(baseFS, pbPath, p.ImportPlaybook)
if importPlaybook == "" {
return fmt.Errorf("cannot found import playbook %s", importPlaybook)
}
if err := loadPlaybook(baseFS, importPlaybook, pb); err != nil {
return err
}
}
// fill block in roles
for i, r := range p.Roles {
roleBase := project.GetRoleBaseFromPlaybook(baseFS, pbPath, r.Role)
if roleBase == "" {
return fmt.Errorf("cannot found role %s", r.Role)
}
mainTask := project.GetYamlFile(baseFS, filepath.Join(roleBase, _const.ProjectRolesTasksDir, _const.ProjectRolesTasksMainFile))
if mainTask == "" {
return fmt.Errorf("cannot found main task for role %s", r.Role)
}
rdata, err := fs.ReadFile(baseFS, mainTask)
if err != nil {
klog.Errorf("read role %s failed: %v", mainTask, err)
return err
}
var blocks []kkcorev1.Block
if err := yaml.Unmarshal(rdata, &blocks); err != nil {
klog.Errorf("unmarshal role %s failed: %v", r.Role, err)
return err
}
p.Roles[i].Block = blocks
}
pb.Play = append(pb.Play, p)
}
return nil
}
// convertRoles convert roleName to block
func convertRoles(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
for i, p := range pb.Play {
for i, r := range p.Roles {
roleBase := project.GetRoleBaseFromPlaybook(baseFS, pbPath, r.Role)
if roleBase == "" {
return fmt.Errorf("cannot found role %s", r.Role)
}
// load block
mainTask := project.GetYamlFile(baseFS, filepath.Join(roleBase, _const.ProjectRolesTasksDir, _const.ProjectRolesTasksMainFile))
if mainTask == "" {
return fmt.Errorf("cannot found main task for role %s", r.Role)
}
rdata, err := fs.ReadFile(baseFS, mainTask)
if err != nil {
klog.Errorf("read role %s failed: %v", mainTask, err)
return err
}
var blocks []kkcorev1.Block
if err := yaml.Unmarshal(rdata, &blocks); err != nil {
klog.Errorf("unmarshal role %s failed: %v", r.Role, err)
return err
}
p.Roles[i].Block = blocks
// load defaults (optional)
mainDefault := project.GetYamlFile(baseFS, filepath.Join(roleBase, _const.ProjectRolesDefaultsDir, _const.ProjectRolesDefaultsMainFile))
if mainDefault != "" {
mainData, err := fs.ReadFile(baseFS, mainDefault)
if err != nil {
klog.Errorf("read defaults variable for role %s error: %v", r.Role, err)
return err
}
var vars variable.VariableData
if err := yaml.Unmarshal(mainData, &vars); err != nil {
klog.Errorf("unmarshal defaults variable for role %s error: %v", r.Role, err)
return err
}
p.Roles[i].Vars = vars
}
}
pb.Play[i] = p
}
return nil
}
// convertIncludeTasks from file to blocks
func convertIncludeTasks(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
var pbBase = filepath.Dir(filepath.Dir(pbPath))
for _, play := range pb.Play {
if err := fileToBlock(baseFS, pbBase, play.PreTasks); err != nil {
return err
}
if err := fileToBlock(baseFS, pbBase, play.Tasks); err != nil {
return err
}
if err := fileToBlock(baseFS, pbBase, play.PostTasks); err != nil {
return err
}
for _, r := range play.Roles {
roleBase := project.GetRoleBaseFromPlaybook(baseFS, pbPath, r.Role)
if err := fileToBlock(baseFS, filepath.Join(roleBase, _const.ProjectRolesTasksDir), r.Block); err != nil {
return err
}
}
}
return nil
}
func fileToBlock(baseFS fs.FS, baseDir string, blocks []kkcorev1.Block) error {
for i, b := range blocks {
if b.IncludeTasks != "" {
data, err := fs.ReadFile(baseFS, filepath.Join(baseDir, b.IncludeTasks))
if err != nil {
klog.Errorf("readFile %s error %v", filepath.Join(baseDir, b.IncludeTasks), err)
return err
}
var bs []kkcorev1.Block
if err := yaml.Unmarshal(data, &bs); err != nil {
klog.Errorf("unmarshal data %s to []Block error %v", filepath.Join(baseDir, b.IncludeTasks), err)
return err
}
b.Block = bs
blocks[i] = b
}
if err := fileToBlock(baseFS, baseDir, b.Block); err != nil {
return err
}
if err := fileToBlock(baseFS, baseDir, b.Rescue); err != nil {
return err
}
if err := fileToBlock(baseFS, baseDir, b.Always); err != nil {
return err
}
}
return nil
}
// MarshalBlock marshal block to task
func MarshalBlock(ctx context.Context, block kkcorev1.Block, owner ctrlclient.Object) *kubekeyv1alpha1.Task {
var role string
if v := ctx.Value(_const.CtxBlockRole); v != nil {
role = v.(string)
}
hosts := ctx.Value(_const.CtxBlockHosts).([]string)
if block.RunOnce { // if run_once. execute on the first task
hosts = hosts[:1]
}
var uid string
if v := ctx.Value(_const.CtxBlockTaskUID); v != nil {
uid = v.(string)
}
var when []string
if v := ctx.Value(_const.CtxBlockWhen); v != nil {
when = v.([]string)
}
task := &kubekeyv1alpha1.Task{
TypeMeta: metav1.TypeMeta{
Kind: "Task",
APIVersion: "kubekey.kubesphere.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", owner.GetName(), rand.String(12)),
Namespace: owner.GetNamespace(),
UID: types.UID(uid),
CreationTimestamp: metav1.Now(),
Annotations: map[string]string{
kubekeyv1alpha1.TaskAnnotationRole: role,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: owner.GetObjectKind().GroupVersionKind().GroupVersion().String(),
Kind: owner.GetObjectKind().GroupVersionKind().Kind,
Name: owner.GetName(),
UID: owner.GetUID(),
Controller: pointer.Bool(true),
BlockOwnerDeletion: pointer.Bool(true),
},
},
},
Spec: kubekeyv1alpha1.KubeKeyTaskSpec{
Name: block.Name,
Hosts: hosts,
IgnoreError: block.IgnoreErrors,
Retries: block.Retries,
//Loop: block.Loop,
When: when,
FailedWhen: block.FailedWhen.Data,
Register: block.Register,
},
Status: kubekeyv1alpha1.TaskStatus{
Phase: kubekeyv1alpha1.TaskPhasePending,
},
}
if len(block.Loop) != 0 {
data, err := json.Marshal(block.Loop)
if err != nil {
klog.Errorf("marshal loop %v error: %v", block.Loop, err)
}
task.Spec.Loop = runtime.RawExtension{Raw: data}
}
return task
}
// GroupHostBySerial group hosts by serial
func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) {
if len(serial) == 0 {
return [][]string{hosts}, nil
}
result := make([][]string, 0)
sp := 0
for _, a := range serial {
switch a.(type) {
case int:
if sp+a.(int) > len(hosts)-1 {
result = append(result, hosts[sp:])
return result, nil
}
result = append(result, hosts[sp:sp+a.(int)])
sp += a.(int)
case string:
if strings.HasSuffix(a.(string), "%") {
b, err := strconv.Atoi(strings.TrimSuffix(a.(string), "%"))
if err != nil {
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
return nil, err
}
if sp+int(math.Ceil(float64(len(hosts)*b)/100.0)) > len(hosts)-1 {
result = append(result, hosts[sp:])
return result, nil
}
result = append(result, hosts[sp:sp+int(math.Ceil(float64(len(hosts)*b)/100.0))])
sp += int(math.Ceil(float64(len(hosts)*b) / 100.0))
} else {
b, err := strconv.Atoi(a.(string))
if err != nil {
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
return nil, err
}
if sp+b > len(hosts)-1 {
result = append(result, hosts[sp:])
return result, nil
}
result = append(result, hosts[sp:sp+b])
sp += b
}
default:
return nil, fmt.Errorf("unknown serial type. only support int or percent")
}
}
// if serial is not match all hosts. use last serial
if sp < len(hosts) {
a := serial[len(serial)-1]
for {
switch a.(type) {
case int:
if sp+a.(int) > len(hosts)-1 {
result = append(result, hosts[sp:])
return result, nil
}
result = append(result, hosts[sp:sp+a.(int)])
sp += a.(int)
case string:
if strings.HasSuffix(a.(string), "%") {
b, err := strconv.Atoi(strings.TrimSuffix(a.(string), "%"))
if err != nil {
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
return nil, err
}
if sp+int(math.Ceil(float64(len(hosts)*b)/100.0)) > len(hosts)-1 {
result = append(result, hosts[sp:])
return result, nil
}
result = append(result, hosts[sp:sp+int(math.Ceil(float64(len(hosts)*b)/100.0))])
sp += int(math.Ceil(float64(len(hosts)*b) / 100.0))
} else {
b, err := strconv.Atoi(a.(string))
if err != nil {
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
return nil, err
}
if sp+b > len(hosts)-1 {
result = append(result, hosts[sp:])
return result, nil
}
result = append(result, hosts[sp:sp+b])
sp += b
}
default:
return nil, fmt.Errorf("unknown serial type. only support int or percent")
}
}
}
return result, nil
}
// CalculatePipelineStatus calculate pipeline status from tasks
func CalculatePipelineStatus(nsTasks *kubekeyv1alpha1.TaskList, pipeline *kubekeyv1.Pipeline) {
if pipeline.Status.Phase != kubekeyv1.PipelinePhaseRunning {
// only deal running pipeline
return
}
pipeline.Status.TaskResult = kubekeyv1.PipelineTaskResult{
Total: len(nsTasks.Items),
}
var failedDetail []kubekeyv1.PipelineFailedDetail
for _, t := range nsTasks.Items {
switch t.Status.Phase {
case kubekeyv1alpha1.TaskPhaseSuccess:
pipeline.Status.TaskResult.Success++
case kubekeyv1alpha1.TaskPhaseIgnored:
pipeline.Status.TaskResult.Ignored++
case kubekeyv1alpha1.TaskPhaseSkipped:
pipeline.Status.TaskResult.Skipped++
}
if t.Status.Phase == kubekeyv1alpha1.TaskPhaseFailed && t.Spec.Retries <= t.Status.RestartCount {
var hostReason []kubekeyv1.PipelineFailedDetailHost
for _, tr := range t.Status.FailedDetail {
hostReason = append(hostReason, kubekeyv1.PipelineFailedDetailHost{
Host: tr.Host,
Stdout: tr.Stdout,
StdErr: tr.StdErr,
})
}
failedDetail = append(failedDetail, kubekeyv1.PipelineFailedDetail{
Task: t.Name,
Hosts: hostReason,
})
pipeline.Status.TaskResult.Failed++
}
}
if pipeline.Status.TaskResult.Failed != 0 {
pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed
pipeline.Status.Reason = "task failed"
pipeline.Status.FailedDetail = failedDetail
} else if pipeline.Status.TaskResult.Total == pipeline.Status.TaskResult.Success+pipeline.Status.TaskResult.Ignored+pipeline.Status.TaskResult.Skipped {
pipeline.Status.Phase = kubekeyv1.PipelinePhaseSucceed
}
}