dev-v2.0.0: add ignore-err options

Signed-off-by: 24sama <leo@kubesphere.io>
This commit is contained in:
24sama 2021-11-04 17:21:45 +08:00
parent 6efac5a38e
commit 89adf28ae3
16 changed files with 179 additions and 72 deletions

View File

@ -30,6 +30,7 @@ var addNodesCmd = &cobra.Command{
FilePath: opt.ClusterCfgFile,
KsEnable: false,
Debug: opt.Verbose,
IgnoreErr: opt.IgnoreErr,
SkipConfirmCheck: opt.SkipConfirmCheck,
SkipPullImages: opt.SkipPullImages,
InCluster: opt.InCluster,

View File

@ -46,6 +46,7 @@ var clusterCmd = &cobra.Command{
InCluster: opt.InCluster,
DeployLocalStorage: opt.LocalStorage,
Debug: opt.Verbose,
IgnoreErr: opt.IgnoreErr,
SkipConfirmCheck: opt.SkipConfirmCheck,
ContainerManager: opt.ContainerManager,
}

View File

@ -23,6 +23,7 @@ import (
type Options struct {
Verbose bool
IgnoreErr bool
Addons string
Name string
ClusterCfgPath string
@ -75,6 +76,7 @@ func init() {
rootCmd.PersistentFlags().BoolVar(&opt.InCluster, "in-cluster", false, "Running inside the cluster")
rootCmd.PersistentFlags().BoolVar(&opt.Verbose, "debug", true, "Print detailed information")
rootCmd.PersistentFlags().BoolVarP(&opt.SkipConfirmCheck, "yes", "y", false, "Skip confirm check")
rootCmd.PersistentFlags().BoolVar(&opt.IgnoreErr, "ignore-err", false, "Ignore the error message, remove the host which reported error and force to continue")
// Cobra also supports local flags, which will only run
// when this action is called directly.
}

View File

@ -23,10 +23,10 @@ spec:
control-plane: controller-manager
spec:
securityContext:
runAsNonRoot: true
runAsUser: 1000
containers:
- command:
- /manager
- ./manager
args:
- --leader-elect
image: controller:latest

View File

@ -125,6 +125,22 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}
nodes, err := clusterDiff(r, ctx, cluster)
if err != nil {
return ctrl.Result{RequeueAfter: 2 * time.Second}, err
}
// If the CR cluster define current cluster
if len(nodes) != 0 {
if err := adaptCurrentCluster(nodes, cluster); err != nil {
return ctrl.Result{RequeueAfter: 2 * time.Second}, err
}
if err := r.Status().Update(context.TODO(), cluster); err != nil {
return ctrl.Result{RequeueAfter: 2 * time.Second}, err
}
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}
if err := updateRunJob(r, req, ctx, cluster, jobFound, log, CreateCluster); err != nil {
return ctrl.Result{RequeueAfter: 2 * time.Second}, err
}
@ -324,7 +340,7 @@ func (r *ClusterReconciler) jobForCluster(c *kubekeyv1alpha2.Cluster, action str
args = []string{"create", "cluster", "-f", "/home/kubekey/config/cluster.yaml", "-y", "--in-cluster", "true"}
} else if action == AddNodes {
name = fmt.Sprintf("%s-add-nodes", c.Name)
args = []string{"add", "nodes", "-f", "/home/kubekey/config/cluster.yaml", "-y", "--in-cluster", "true"}
args = []string{"add", "nodes", "-f", "/home/kubekey/config/cluster.yaml", "-y", "--in-cluster", "true", "--ignore-err", "true"}
}
podlist := &corev1.PodList{}
@ -517,29 +533,6 @@ func updateRunJob(r *ClusterReconciler, req ctrl.Request, ctx context.Context, c
// Check if the job already exists, if not create a new one
if err := r.Get(ctx, types.NamespacedName{Name: name, Namespace: "kubekey-system"}, jobFound); err != nil && !kubeErr.IsNotFound(err) {
return nil
} else if err == nil && (jobFound.Status.Failed != 0 || jobFound.Status.Succeeded != 0) {
// delete old pods
podlist := &corev1.PodList{}
listOpts := []client.ListOption{
client.InNamespace("kubekey-system"),
client.MatchingLabels{"job-name": name},
}
if err := r.List(context.TODO(), podlist, listOpts...); err == nil && len(podlist.Items) != 0 {
for _, pod := range podlist.Items {
_ = r.Delete(ctx, &pod)
}
}
if err := r.Delete(ctx, jobFound); err != nil {
log.Error(err, "Failed to delete old Job", "Job.Namespace", jobFound.Namespace, "Job.Name", jobFound.Name)
return err
}
jobCluster := r.jobForCluster(cluster, action)
log.Info("Creating a new Job to create cluster", "Job.Namespace", jobCluster.Namespace, "Job.Name", jobCluster.Name)
if err := r.Create(ctx, jobCluster); err != nil {
log.Error(err, "Failed to create new Job", "Job.Namespace", jobCluster.Namespace, "Job.Name", jobCluster.Name)
return err
}
} else if kubeErr.IsNotFound(err) {
jobCluster := r.jobForCluster(cluster, action)
log.Info("Creating a new Job to create cluster", "Job.Namespace", jobCluster.Namespace, "Job.Name", jobCluster.Name)
@ -585,3 +578,78 @@ func sendHostsAction(action int, hosts []kubekeyv1alpha2.HostCfg, log logr.Logge
}
}
}
type NodeInfo struct {
Address string
Master bool
Worker bool
}
func clusterDiff(r *ClusterReconciler, ctx context.Context, c *kubekeyv1alpha2.Cluster) ([]kubekeyv1alpha2.NodeStatus, error) {
nodes := &corev1.NodeList{}
newNodes := make([]kubekeyv1alpha2.NodeStatus, 0)
if err := r.List(ctx, nodes, &client.ListOptions{}); err != nil {
return newNodes, err
}
m := make(map[string]NodeInfo)
for _, node := range nodes.Items {
var info NodeInfo
if _, ok := node.Labels["node-role.kubernetes.io/control-plane"]; ok {
info.Master = true
}
if _, ok := node.Labels["node-role.kubernetes.io/master"]; ok {
info.Master = true
}
if _, ok := node.Labels["node-role.kubernetes.io/worker"]; ok {
info.Worker = true
}
for _, address := range node.Status.Addresses {
if address.Type == corev1.NodeInternalIP {
info.Address = address.Address
}
}
m[node.Name] = info
}
for _, host := range c.Spec.Hosts {
if info, ok := m[host.Name]; ok {
if info.Address == host.InternalAddress {
newNodes = append(newNodes, kubekeyv1alpha2.NodeStatus{
InternalIP: host.InternalAddress,
Hostname: host.Name,
Roles: map[string]bool{"master": info.Master, "worker": info.Worker},
})
}
}
}
return newNodes, nil
}
func adaptCurrentCluster(newNodes []kubekeyv1alpha2.NodeStatus, c *kubekeyv1alpha2.Cluster) error {
var newMaster, newWorker []string
for _, node := range newNodes {
//if node.Roles["etcd"] {
// newEtcd = append(newEtcd, node.Hostname)
//}
if node.Roles["master"] {
newMaster = append(newMaster, node.Hostname)
}
if node.Roles["worker"] {
newWorker = append(newWorker, node.Hostname)
}
}
c.Status.NodesCount = len(newNodes)
c.Status.MasterCount = len(newMaster)
//c.Status.EtcdCount = len(newEtcd)
c.Status.WorkerCount = len(newWorker)
c.Status.Nodes = newNodes
c.Status.Version = c.Spec.Kubernetes.Version
c.Status.NetworkPlugin = c.Spec.Network.Plugin
return nil
}

View File

@ -16,7 +16,6 @@ func (k *KubeAction) AutoAssert(runtime connector.Runtime) {
ClusterHosts: kubeRuntime.ClusterHosts,
Cluster: kubeRuntime.Cluster,
Kubeconfig: kubeRuntime.Kubeconfig,
Conditions: kubeRuntime.Conditions,
ClientSet: kubeRuntime.ClientSet,
Arg: kubeRuntime.Arg,
}

View File

@ -11,7 +11,6 @@ type KubeConf struct {
ClusterName string
Cluster *kubekeyapiv1alpha2.ClusterSpec
Kubeconfig string
Conditions []kubekeyapiv1alpha2.Condition
ClientSet *kubekeyclientset.Clientset
Arg Argument
}
@ -32,7 +31,6 @@ func (k *KubeModule) AutoAssert() {
ClusterName: kubeRuntime.ClusterName,
Cluster: kubeRuntime.Cluster,
Kubeconfig: kubeRuntime.Kubeconfig,
Conditions: kubeRuntime.Conditions,
ClientSet: kubeRuntime.ClientSet,
Arg: kubeRuntime.Arg,
}
@ -52,7 +50,6 @@ func (k *KubeCustomModule) AutoAssert() {
ClusterName: kubeRuntime.ClusterName,
Cluster: kubeRuntime.Cluster,
Kubeconfig: kubeRuntime.Kubeconfig,
Conditions: kubeRuntime.Conditions,
ClientSet: kubeRuntime.ClientSet,
Arg: kubeRuntime.Arg,
}

View File

@ -16,7 +16,6 @@ func (k *KubePrepare) AutoAssert(runtime connector.Runtime) {
ClusterHosts: kubeRuntime.ClusterHosts,
Cluster: kubeRuntime.Cluster,
Kubeconfig: kubeRuntime.Kubeconfig,
Conditions: kubeRuntime.Conditions,
ClientSet: kubeRuntime.ClientSet,
Arg: kubeRuntime.Arg,
}

View File

@ -13,7 +13,6 @@ type KubeRuntime struct {
ClusterName string
Cluster *kubekeyapiv1alpha2.ClusterSpec
Kubeconfig string
Conditions []kubekeyapiv1alpha2.Condition
ClientSet *kubekeyclientset.Clientset
Arg Argument
}
@ -25,6 +24,7 @@ type Argument struct {
KsEnable bool
KsVersion string
Debug bool
IgnoreErr bool
SkipPullImages bool
AddImagesRepo bool
DeployLocalStorage bool
@ -53,16 +53,7 @@ func NewKubeRuntime(flag string, arg Argument) (*KubeRuntime, error) {
defaultCluster.Kubernetes.ContainerManager = arg.ContainerManager
}
//var clientset *kubekeyclientset.Clientset
//if arg.InCluster {
// c, err := kubekeycontroller.NewKubekeyClient()
// if err != nil {
// return nil, err
// }
// clientset = c
//}
base := connector.NewBaseRuntime(cluster.Name, connector.NewDialer(), arg.Debug)
base := connector.NewBaseRuntime(cluster.Name, connector.NewDialer(), arg.Debug, arg.IgnoreErr)
for _, v := range hostGroups.All {
host := ToHosts(v)
if v.IsMaster {
@ -87,8 +78,7 @@ func NewKubeRuntime(flag string, arg Argument) (*KubeRuntime, error) {
ClusterHosts: generateHosts(hostGroups, defaultCluster),
Cluster: defaultCluster,
ClusterName: cluster.Name,
//ClientSet: clientset,
Arg: arg,
Arg: arg,
}
r.BaseRuntime = base

View File

@ -19,9 +19,9 @@ type BaseHost struct {
func NewHost() *BaseHost {
return &BaseHost{
Roles: make([]string, 0, 0),
RoleTable: make(map[string]bool),
Cache: cache.NewCache(),
Roles: make([]string, 0, 0),
RoleTable: make(map[string]bool),
Cache: cache.NewCache(),
}
}

View File

@ -28,10 +28,12 @@ type ModuleRuntime interface {
GenerateWorkDir() error
GetHostWorkDir() string
GetWorkDir() string
GetIgnoreErr() bool
GetAllHosts() []Host
SetAllHosts([]Host)
GetHostsByRole(role string) []Host
DeleteHost(host Host)
HostIsDeprecated(host Host) bool
InitLogger() error
}

View File

@ -10,22 +10,26 @@ import (
)
type BaseRuntime struct {
ObjName string
connector Connector
runner *Runner
workDir string
verbose bool
allHosts []Host
roleHosts map[string][]Host
ObjName string
connector Connector
runner *Runner
workDir string
verbose bool
ignoreErr bool
allHosts []Host
roleHosts map[string][]Host
deprecatedHosts map[string]string
}
func NewBaseRuntime(name string, connector Connector, verbose bool) BaseRuntime {
func NewBaseRuntime(name string, connector Connector, verbose bool, ignoreErr bool) BaseRuntime {
return BaseRuntime{
ObjName: name,
connector: connector,
verbose: verbose,
allHosts: make([]Host, 0, 0),
roleHosts: make(map[string][]Host),
ObjName: name,
connector: connector,
verbose: verbose,
ignoreErr: ignoreErr,
allHosts: make([]Host, 0, 0),
roleHosts: make(map[string][]Host),
deprecatedHosts: make(map[string]string),
}
}
@ -87,6 +91,10 @@ func (b *BaseRuntime) GetWorkDir() string {
return b.workDir
}
func (b *BaseRuntime) GetIgnoreErr() bool {
return b.ignoreErr
}
func (b *BaseRuntime) GetAllHosts() []Host {
return b.allHosts
}
@ -105,13 +113,22 @@ func (b *BaseRuntime) RemoteHost() Host {
func (b *BaseRuntime) DeleteHost(host Host) {
i := 0
for i = range b.allHosts {
if b.GetAllHosts()[i].GetName() == host.GetName() {
break
for j := range b.allHosts {
if b.GetAllHosts()[j].GetName() != host.GetName() {
b.allHosts[i] = b.allHosts[j]
i++
}
}
b.allHosts = append(b.allHosts[:i], b.allHosts[i+1:]...)
b.allHosts = b.allHosts[:i]
b.RoleMapDelete(host)
b.deprecatedHosts[host.GetName()] = ""
}
func (b *BaseRuntime) HostIsDeprecated(host Host) bool {
if _, ok := b.deprecatedHosts[host.GetName()]; ok {
return true
}
return false
}
func (b *BaseRuntime) InitLogger() error {
@ -154,13 +171,15 @@ func (b *BaseRuntime) AppendRoleMap(host Host) {
}
func (b *BaseRuntime) RoleMapDelete(host Host) {
for k, role := range b.roleHosts {
for role, hosts := range b.roleHosts {
i := 0
for i = range role {
if role[i].GetName() == host.GetName() {
role = append(role[:i], role[i+1:]...)
for j := range hosts {
if hosts[j].GetName() != host.GetName() {
hosts[i] = hosts[j]
i++
}
}
b.roleHosts[k] = role
hosts = hosts[:i]
b.roleHosts[role] = hosts
}
}

View File

@ -35,7 +35,22 @@ func (b *BaseTaskModule) Run(result *ending.ModuleResult) {
ac := res.ActionResults[j]
logger.Log.Infof("%s: [%s]", ac.Status.String(), ac.Host.GetName())
result.AppendHostResult(ac)
if _, ok := t.(*task.RemoteTask); ok {
if b.Runtime.GetIgnoreErr() {
if len(b.Runtime.GetAllHosts()) > 0 {
if ac.GetStatus() == ending.FAILED {
res.Status = ending.SUCCESS
b.Runtime.DeleteHost(ac.Host)
}
} else {
result.ErrResult(errors.Wrapf(res.CombineErr(), "Module[%s] exec failed", b.Name))
return
}
}
}
}
if res.IsFailed() {
result.ErrResult(errors.Wrapf(res.CombineErr(), "Module[%s] exec failed", b.Name))
return

View File

@ -29,6 +29,7 @@ type Pipeline struct {
Name string
Modules []module.Module
Runtime connector.Runtime
SpecHosts int
PipelineCache *cache.Cache
ModuleCachePool sync.Pool
ModulePostHooks []module.PostHookInterface
@ -37,6 +38,7 @@ type Pipeline struct {
func (p *Pipeline) Init() error {
fmt.Print(logo)
p.PipelineCache = cache.NewCache()
p.SpecHosts = len(p.Runtime.GetAllHosts())
if err := p.Runtime.GenerateWorkDir(); err != nil {
return err
}
@ -48,7 +50,7 @@ func (p *Pipeline) Init() error {
func (p *Pipeline) Start() error {
if err := p.Init(); err != nil {
return errors.Wrapf(err, "Pipeline[%s] exec failed", p.Name)
return errors.Wrapf(err, "Pipeline[%s] execute failed", p.Name)
}
for i := range p.Modules {
m := p.Modules[i]
@ -60,13 +62,16 @@ func (p *Pipeline) Start() error {
res := p.RunModule(m)
err := m.CallPostHook(res)
if res.IsFailed() {
return errors.Wrapf(res.CombineResult, "Pipeline[%s] exec failed", p.Name)
return errors.Wrapf(res.CombineResult, "Pipeline[%s] execute failed", p.Name)
}
if err != nil {
return errors.Wrapf(err, "Pipeline[%s] exec failed", p.Name)
return errors.Wrapf(err, "Pipeline[%s] execute failed", p.Name)
}
}
p.releasePipelineCache()
if p.SpecHosts != len(p.Runtime.GetAllHosts()) {
return errors.Errorf("Pipeline[%s] execute failed: there are some error in your spec hosts", p.Name)
}
logger.Log.Infof("Pipeline[%s] execute successful", p.Name)
return nil
}

View File

@ -59,6 +59,9 @@ func (t *RemoteTask) Execute() *ending.TaskResult {
wg := &sync.WaitGroup{}
for i := range t.Hosts {
if t.Runtime.HostIsDeprecated(t.Hosts[i]) {
continue
}
selfRuntime := t.Runtime.Copy()
selfHost := t.Hosts[i].Copy()

View File

@ -51,6 +51,9 @@ func NewAddNodesPipeline(runtime *common.KubeRuntime) error {
if err := kubekeycontroller.PatchNodeImportStatus(runtime, kubekeycontroller.Failed); err != nil {
return err
}
if err := kubekeycontroller.UpdateStatus(runtime); err != nil {
return err
}
}
return err
}
@ -94,6 +97,9 @@ func NewK3sAddNodesPipeline(runtime *common.KubeRuntime) error {
if err := kubekeycontroller.PatchNodeImportStatus(runtime, kubekeycontroller.Failed); err != nil {
return err
}
if err := kubekeycontroller.UpdateStatus(runtime); err != nil {
return err
}
}
return err
}