kubekey/pkg/variable/variable.go
joyceliu 2a676185e2 feat: kubekey gitops
Signed-off-by: joyceliu <joyceliu@yunify.com>
2024-01-05 15:14:36 +08:00

552 lines
14 KiB
Go

/*
Copyright 2023 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package variable
import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"regexp"
"strconv"
"strings"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/variable/source"
)
type Variable interface {
Get(option GetOption) (any, error)
Merge(option ...MergeOption) error
}
type Options struct {
Ctx context.Context
Client ctrlclient.Client
Pipeline kubekeyv1.Pipeline
}
// New variable. generate value from config args. and render to source.
func New(o Options) (Variable, error) {
// new source
s, err := source.New(filepath.Join(_const.RuntimeDirFromObject(&o.Pipeline), _const.RuntimePipelineVariableDir))
if err != nil {
klog.Errorf("create file source failed: %v", err)
return nil, err
}
// get config
var config = &kubekeyv1.Config{}
if err := o.Client.Get(o.Ctx, types.NamespacedName{o.Pipeline.Spec.ConfigRef.Namespace, o.Pipeline.Spec.ConfigRef.Name}, config); err != nil {
klog.Errorf("get config from pipeline error %v", err)
return nil, err
}
// get inventory
var inventory = &kubekeyv1.Inventory{}
if err := o.Client.Get(o.Ctx, types.NamespacedName{o.Pipeline.Spec.InventoryRef.Namespace, o.Pipeline.Spec.InventoryRef.Name}, inventory); err != nil {
klog.Errorf("get inventory from pipeline error %v", err)
return nil, err
}
v := &variable{
source: s,
value: &value{
Config: *config,
Inventory: *inventory,
Hosts: make(map[string]host),
},
}
// read data from source
data, err := v.source.Read()
if err != nil {
klog.Errorf("read data from source error %v", err)
return nil, err
}
for k, d := range data {
if k == _const.RuntimePipelineVariableLocationFile {
// set location
if err := json.Unmarshal(d, &v.value.Location); err != nil {
klog.Errorf("unmarshal location error %v", err)
return nil, err
}
} else {
// set hosts
h := host{}
if err := json.Unmarshal(d, &h); err != nil {
klog.Errorf("unmarshal host error %v", err)
return nil, err
}
v.value.Hosts[strings.TrimSuffix(k, ".json")] = h
}
}
return v, nil
}
type GetOption interface {
filter(data value) (any, error)
}
// KeyPath get a key path variable
type KeyPath struct {
// HostName which host obtain the variable
HostName string
// LocationUID locate which variable belong to
LocationUID string
// Path base top variable.
Path []string
}
func (k KeyPath) filter(data value) (any, error) {
// find value from location
var getLocationFunc func(uid string) any
getLocationFunc = func(uid string) any {
if loc := findLocation(data.Location, uid); loc != nil {
// find value from task
if v, ok := data.Hosts[k.HostName].RuntimeVars[uid]; ok {
if result := k.getValue(v, k.Path...); result != nil {
return result
}
}
if result := k.getValue(loc.Vars, k.Path...); result != nil {
return result
}
if loc.PUID != "" {
return getLocationFunc(loc.PUID)
}
}
return nil
}
if result := getLocationFunc(k.LocationUID); result != nil {
return result, nil
}
// find value from host
if result := k.getValue(data.Hosts[k.HostName].Vars, k.Path...); result != nil {
return result, nil
}
// find value from global
if result := k.getValue(data.getGlobalVars(k.HostName), k.Path...); result != nil {
return result, nil
}
return nil, nil
}
// getValue from variable.VariableData use key path. if key path is empty return nil
func (k KeyPath) getValue(value VariableData, key ...string) any {
if len(key) == 0 {
return nil
}
var result any
result = value
for _, s := range key {
result = result.(VariableData)[s]
}
return result
}
// ParentLocation UID for current location
type ParentLocation struct {
LocationUID string
}
func (p ParentLocation) filter(data value) (any, error) {
loc := findLocation(data.Location, p.LocationUID)
if loc != nil {
return loc.PUID, nil
}
return nil, fmt.Errorf("cannot find location %s", p.LocationUID)
}
// LocationVars get all variable for location
type LocationVars struct {
// HostName which host obtain the variable
HostName string
// LocationUID locate which variable belong to
LocationUID string
}
func (b LocationVars) filter(data value) (any, error) {
var result VariableData
// find from host runtime
if v, ok := data.Hosts[b.HostName].RuntimeVars[b.LocationUID]; ok {
result = mergeVariables(result, v)
}
// find
// merge location variable
var mergeLocationVarsFunc func(uid string)
mergeLocationVarsFunc = func(uid string) {
// find value from task
if v, ok := data.Hosts[b.HostName].RuntimeVars[uid]; ok {
result = mergeVariables(result, v)
}
if loc := findLocation(data.Location, uid); loc != nil {
result = mergeVariables(result, loc.Vars)
if loc.PUID != "" {
mergeLocationVarsFunc(loc.PUID)
}
}
}
mergeLocationVarsFunc(b.LocationUID)
// get value from host
result = mergeVariables(result, data.Hosts[b.HostName].Vars)
// get value from global
result = mergeVariables(result, data.getGlobalVars(b.HostName))
return result, nil
}
// HostVars get all top variable for a host
type HostVars struct {
HostName string
}
func (k HostVars) filter(data value) (any, error) {
return mergeVariables(data.getGlobalVars(k.HostName), data.Hosts[k.HostName].Vars), nil
}
// Hostnames from array contains group name or host name
type Hostnames struct {
Name []string
}
func (g Hostnames) filter(data value) (any, error) {
var hs []string
for _, n := range g.Name {
// add host to hs
if _, ok := data.Hosts[n]; ok {
hs = append(hs, n)
}
// add group's host to gs
for gn, gv := range convertGroup(data.Inventory) {
if gn == n {
hs = mergeSlice(hs, gv.([]string))
break
}
}
// Add the specified host in the specified group to the hs.
regex := regexp.MustCompile(`^(.*)\[\d\]$`)
if match := regex.FindStringSubmatch(n); match != nil {
index, err := strconv.Atoi(match[2])
if err != nil {
klog.Errorf("convert index %s to int failed: %v", match[2], err)
return nil, err
}
for gn, gv := range data.Inventory.Spec.Groups {
if gn == match[1] {
hs = append(hs, gv.Hosts[index])
break
}
}
}
}
return hs, nil
}
const (
// FailedExecute If dependency tasks has failed. execute current task. otherwise skip it
FailedExecute = "failed-exec"
// SucceedExecute If dependency tasks succeeded. execute current task. otherwise skip it
SucceedExecute = "succeed-exec"
// AlwaysExecute always execute current task.
AlwaysExecute = "always-exec"
)
type DependencyTasks struct {
LocationUID string
}
type DependencyTask struct {
Tasks []string
Strategy func([]kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase
}
func (f DependencyTasks) filter(data value) (any, error) {
loc := findLocation(data.Location, f.LocationUID)
if loc == nil {
return nil, fmt.Errorf("cannot found location %s", f.LocationUID)
}
return f.getDependencyLocationUIDS(data, loc)
}
func (f DependencyTasks) getDependencyLocationUIDS(data value, loc *location) (DependencyTask, error) {
if loc.PUID == "" {
return DependencyTask{
Strategy: func([]kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase {
return kubekeyv1alpha1.TaskPhaseRunning
},
}, nil
}
// if tasks has failed. execute current task.
failedExecuteStrategy := func(tasks []kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase {
skip := true
for _, t := range tasks {
if !t.IsComplete() {
return kubekeyv1alpha1.TaskPhasePending
}
if t.IsFailed() {
return kubekeyv1alpha1.TaskPhaseRunning
}
if !t.IsSkipped() {
skip = false
}
}
if skip {
return kubekeyv1alpha1.TaskPhaseRunning
}
return kubekeyv1alpha1.TaskPhaseSkipped
}
// If dependency tasks has failed. skip it.
succeedExecuteStrategy := func(tasks []kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase {
skip := true
for _, t := range tasks {
if !t.IsComplete() {
return kubekeyv1alpha1.TaskPhasePending
}
if t.IsFailed() {
return kubekeyv1alpha1.TaskPhaseSkipped
}
if !t.IsSkipped() {
skip = false
}
}
if skip {
return kubekeyv1alpha1.TaskPhaseSkipped
}
return kubekeyv1alpha1.TaskPhaseRunning
}
// If dependency tasks is not complete. waiting.
// If dependency tasks is skipped. skip.
alwaysExecuteStrategy := func(tasks []kubekeyv1alpha1.Task) kubekeyv1alpha1.TaskPhase {
skip := true
for _, t := range tasks {
if !t.IsComplete() {
return kubekeyv1alpha1.TaskPhasePending
}
if !t.IsSkipped() {
skip = false
}
}
if skip {
return kubekeyv1alpha1.TaskPhaseSkipped
}
return kubekeyv1alpha1.TaskPhaseRunning
}
// Find the parent location and, based on where the current location is within the parent location, retrieve the dependent tasks.
ploc := findLocation(data.Location, loc.PUID)
// location in Block.
for i, l := range ploc.Block {
if l.UID == loc.UID {
// When location is the first element, it is necessary to check the dependency of its parent location.
if i == 0 {
if data, err := f.getDependencyLocationUIDS(data, ploc); err != nil {
return DependencyTask{}, err
} else {
return data, nil
}
}
// When location is not the first element, dependency location is the preceding element in the same array.
return DependencyTask{
Tasks: f.findAllTasks(ploc.Block[i-1]),
Strategy: succeedExecuteStrategy,
}, nil
}
}
// location in Rescue
for i, l := range ploc.Rescue {
if l.UID == loc.UID {
// When location is the first element, dependency location is all task of sibling block array.
if i == 0 {
return DependencyTask{
Tasks: f.findAllTasks(ploc.Block[len(ploc.Block)-1]),
Strategy: failedExecuteStrategy,
}, nil
}
// When location is not the first element, dependency location is the preceding element in the same array
return DependencyTask{
Tasks: f.findAllTasks(ploc.Rescue[i-1]),
Strategy: succeedExecuteStrategy}, nil
}
}
// If location in Always
for i, l := range ploc.Always {
if l.UID == loc.UID {
// When location is the first element, dependency location is all task of sibling block array
if i == 0 {
return DependencyTask{
Tasks: f.findAllTasks(ploc.Block[len(ploc.Block)-1]),
Strategy: alwaysExecuteStrategy,
}, nil
}
// When location is not the first element, dependency location is the preceding element in the same array
return DependencyTask{
Tasks: f.findAllTasks(ploc.Always[i-1]),
Strategy: alwaysExecuteStrategy,
}, nil
}
}
return DependencyTask{}, fmt.Errorf("connot find location %s in parent %s", loc.UID, loc.PUID)
}
func (f DependencyTasks) findAllTasks(loc location) []string {
if len(loc.Block) == 0 {
return []string{loc.UID}
}
var result = make([]string, 0)
for _, l := range loc.Block {
result = append(result, f.findAllTasks(l)...)
}
for _, l := range loc.Rescue {
result = append(result, f.findAllTasks(l)...)
}
for _, l := range loc.Always {
result = append(result, f.findAllTasks(l)...)
}
return result
}
type MergeOption interface {
mergeTo(data *value) error
}
// HostMerge merge variable to host
type HostMerge struct {
// HostName of host
HostNames []string
// LocationVars to find block. Only merge the last level block.
//LocationVars []string
LocationUID string
// Data to merge
Data VariableData
}
func (h HostMerge) mergeTo(v *value) error {
for _, name := range h.HostNames {
hv := v.Hosts[name]
if h.LocationUID == "" { // merge to host var
hv.Vars = mergeVariables(h.Data, v.Hosts[name].Vars)
} else { // merge to host runtime
if hv.RuntimeVars == nil {
hv.RuntimeVars = make(map[string]VariableData)
}
hv.RuntimeVars[h.LocationUID] = mergeVariables(v.Hosts[name].RuntimeVars[h.LocationUID], h.Data)
}
v.Hosts[name] = hv
}
return nil
}
type LocationType string
const (
BlockLocation LocationType = "block"
AlwaysLocation LocationType = "always"
RescueLocation LocationType = "rescue"
)
// LocationMerge merge variable to location
type LocationMerge struct {
Uid string
ParentID string
Type LocationType
Name string
Vars VariableData
}
func (t LocationMerge) mergeTo(v *value) error {
if t.ParentID == "" {
v.Location = append(v.Location, location{
Name: t.Name,
PUID: t.ParentID,
UID: t.Uid,
Vars: t.Vars,
})
return nil
}
// find parent graph
parentLocation := findLocation(v.Location, t.ParentID)
if parentLocation == nil {
return fmt.Errorf("cannot find parent location %s", t.ParentID)
}
switch t.Type {
case BlockLocation:
for _, loc := range parentLocation.Block {
if loc.UID == t.Uid {
klog.Warningf("task graph %s already exist", t.Uid)
return nil
}
}
parentLocation.Block = append(parentLocation.Block, location{
Name: t.Name,
PUID: t.ParentID,
UID: t.Uid,
Vars: t.Vars,
})
case AlwaysLocation:
for _, loc := range parentLocation.Always {
if loc.UID == t.Uid {
klog.Warningf("task graph %s already exist", t.Uid)
return nil
}
}
parentLocation.Always = append(parentLocation.Always, location{
Name: t.Name,
PUID: t.ParentID,
UID: t.Uid,
Vars: t.Vars,
})
case RescueLocation:
for _, loc := range parentLocation.Rescue {
if loc.UID == t.Uid {
klog.Warningf("task graph %s already exist", t.Uid)
return nil
}
}
parentLocation.Rescue = append(parentLocation.Rescue, location{
Name: t.Name,
PUID: t.ParentID,
UID: t.Uid,
Vars: t.Vars,
})
default:
return fmt.Errorf("unknown LocationType. only support block,always,rescue ")
}
return nil
}