From ff1a5d12d1d69fe11882f4d3bbd5e7131b0577ec Mon Sep 17 00:00:00 2001 From: pixiake Date: Sat, 30 May 2020 01:07:37 +0800 Subject: [PATCH] refine etcd steps --- pkg/cluster/etcd/etcd.go | 204 +++++++++++++++++++++++----------- pkg/cluster/etcd/tmpl/etcd.go | 22 ++-- pkg/install/install.go | 4 +- 3 files changed, 153 insertions(+), 77 deletions(-) diff --git a/pkg/cluster/etcd/etcd.go b/pkg/cluster/etcd/etcd.go index 9b1736b8..c59d69a1 100644 --- a/pkg/cluster/etcd/etcd.go +++ b/pkg/cluster/etcd/etcd.go @@ -37,6 +37,9 @@ var ( etcdConfigDir = "/etc/ssl/etcd" etcdCertDir = "/etc/ssl/etcd/ssl" etcdBinDir = "/usr/local/bin" + accessAddresses = "" + peerAddresses = []string{} + etcdStatus = "" ) func GenerateEtcdCerts(mgr *manager.Manager) error { @@ -139,7 +142,7 @@ func syncEtcdCertsToMaster(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn } func GenerateEtcdService(mgr *manager.Manager) error { - mgr.Logger.Infoln("Starting etcd cluster") + mgr.Logger.Infoln("Creating etcd service") return mgr.RunTaskOnEtcdNodes(generateEtcdService, true) } @@ -155,16 +158,6 @@ func generateEtcdService(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ss return errors.Wrap(errors.WithStack(err1), "Failed to generate etcd service") } - etcdEnv, err := tmpl.GenerateEtcdEnv(mgr, node, mgr.Runner.Index) - if err != nil { - return err - } - etcdEnvBase64 := base64.StdEncoding.EncodeToString([]byte(etcdEnv)) - _, err2 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /etc/etcd.env\"", etcdEnvBase64)) - if err2 != nil { - return errors.Wrap(errors.WithStack(err2), "Failed to generate etcd env") - } - etcdBin, err := tmpl.GenerateEtcdBinary(mgr, mgr.Runner.Index) if err != nil { return err @@ -181,73 +174,156 @@ func generateEtcdService(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ss return errors.Wrap(errors.WithStack(err4), "Failed to get etcdctl") } - _, err5 := mgr.Runner.RunCmd("sudo -E /bin/sh -c \"systemctl daemon-reload && systemctl restart etcd\"") - if err5 != nil { - return errors.Wrap(errors.WithStack(err5), "Failed to start etcd") + if err := restartEtcd(mgr); err != nil { + return err } addrList := []string{} for _, host := range mgr.EtcdNodes { addrList = append(addrList, fmt.Sprintf("https://%s:2379", host.InternalAddress)) } - checkHealthCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s cluster-health | grep -q 'cluster is healthy'\"", node.Name, node.Name, etcdBinDir, strings.Join(addrList, ",")) - if mgr.Runner.Index == 0 { - for i := 20; i > 0; i-- { - _, err := mgr.Runner.RunCmd(checkHealthCmd) - if err != nil { - fmt.Println("Waiting for etcd to start") - if i == 1 { - return errors.Wrap(errors.WithStack(err), "Failed to start etcd cluster") + + accessAddresses = strings.Join(addrList, ",") + + return nil +} + +func SetupEtcdCluster(mgr *manager.Manager) error { + mgr.Logger.Infoln("Starting etcd cluster") + + return mgr.RunTaskOnEtcdNodes(setupEtcdCluster, false) +} + +func setupEtcdCluster(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error { + localPeerAddresses := []string{} + output, _ := mgr.Runner.RunCmd("sudo -E /bin/sh -c \"[ -f /etc/etcd.env ] && echo 'Configuration file already exists' || echo 'Configuration file will be created'\"") + if strings.TrimSpace(output) == "Configuration file already exists" { + if err := helthCheck(mgr, node); err != nil { + return err + } + etcdStatus = "existing" + for i := 0; i <= mgr.Runner.Index; i++ { + localPeerAddresses = append(localPeerAddresses, fmt.Sprintf("etcd%d=https://%s:2380", i+1, mgr.EtcdNodes[i].InternalAddress)) + } + if mgr.Runner.Index == len(mgr.EtcdNodes)-1 { + peerAddresses = localPeerAddresses + } + } else { + for i := 0; i <= mgr.Runner.Index; i++ { + localPeerAddresses = append(localPeerAddresses, fmt.Sprintf("etcd%d=https://%s:2380", i+1, mgr.EtcdNodes[i].InternalAddress)) + } + if mgr.Runner.Index == len(mgr.EtcdNodes)-1 { + peerAddresses = localPeerAddresses + } + if mgr.Runner.Index == 0 { + if err := refreshConfig(mgr, node, mgr.Runner.Index, localPeerAddresses, "new"); err != nil { + return err + } + etcdStatus = "new" + } else { + switch etcdStatus { + case "new": + if err := refreshConfig(mgr, node, mgr.Runner.Index, localPeerAddresses, "new"); err != nil { + return err } - } else { - break + case "existing": + if err := refreshConfig(mgr, node, mgr.Runner.Index, localPeerAddresses, "existing"); err != nil { + return err + } + joinMemberCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s member add %s %s\"", node.Name, node.Name, etcdBinDir, accessAddresses, fmt.Sprintf("etcd%d", mgr.Runner.Index+1), fmt.Sprintf("https://%s:2380", node.InternalAddress)) + _, err := mgr.Runner.RunCmd(joinMemberCmd) + if err != nil { + return errors.Wrap(errors.WithStack(err), "Failed to add etcd member") + } + if err := restartEtcd(mgr); err != nil { + return err + } + if err := helthCheck(mgr, node); err != nil { + return err + } + checkMemberCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --no-sync --endpoints=%s member list\"", node.Name, node.Name, etcdBinDir, accessAddresses) + memberList, err := mgr.Runner.RunCmd(checkMemberCmd) + if err != nil { + return errors.Wrap(errors.WithStack(err), "Failed to list etcd member") + } + if !strings.Contains(memberList, fmt.Sprintf("https://%s:2379", node.InternalAddress)) { + return errors.Wrap(errors.WithStack(err), "Failed to add etcd member") + } + default: + return errors.New("Failed to get etcd cluster status") } - time.Sleep(time.Second * 5) + } + + } + return nil +} + +func RefreshEtcdConfig(mgr *manager.Manager) error { + mgr.Logger.Infoln("Refreshing etcd configuration") + + return mgr.RunTaskOnEtcdNodes(refreshEtcdConfig, true) +} + +func refreshEtcdConfig(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error { + + if etcdStatus == "new" { + if err := refreshConfig(mgr, node, mgr.Runner.Index, peerAddresses, "new"); err != nil { + return err + } + if err := restartEtcd(mgr); err != nil { + return err + } + if err := helthCheck(mgr, node); err != nil { + return err } } - //else { - // checkMemberCmd := fmt.Sprintf("export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --no-sync --endpoints=%s member list | grep -q %s", node.HostName, node.HostName, etcdBinDir, strings.Join(addrList, ","), fmt.Sprintf("https://%s:2379", node.InternalAddress)) - // _, err := mgr.Runner.RunCmd(checkMemberCmd) - // if err != nil { - // joinMemberCmd := fmt.Sprintf("export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s member add %s %s", node.HostName, node.HostName, etcdBinDir, strings.Join(addrList, ","), fmt.Sprintf("etcd%d", mgr.Runner.Index+1), fmt.Sprintf("https://%s:2380", node.InternalAddress)) - // _, err := mgr.Runner.RunCmd(joinMemberCmd) - // if err != nil { - // fmt.Println("Failed to add etcd member") - // } - // } - //} - for i := 20; i > 0; i-- { - _, err := mgr.Runner.RunCmd(checkHealthCmd) - if err != nil { - fmt.Println("Waiting for etcd to start") - if i == 1 { - return errors.Wrap(errors.WithStack(err), "Failed to start etcd") - } - } else { - break - } - time.Sleep(time.Second * 5) + if err := refreshConfig(mgr, node, mgr.Runner.Index, peerAddresses, "existing"); err != nil { + return err } - reloadEtcdEnvCmd := "sed -i '/ETCD_INITIAL_CLUSTER_STATE/s/\\:.*/\\: existing/g' /etc/etcd.env && systemctl daemon-reload && systemctl restart etcd" - _, err6 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", reloadEtcdEnvCmd)) - if err6 != nil { - return errors.Wrap(errors.WithStack(err6), "Failed to reload etcd env") - } - - for i := 20; i > 0; i-- { - _, err := mgr.Runner.RunCmd(checkHealthCmd) - if err != nil { - fmt.Println("Waiting for etcd to start") - if i == 1 { - return errors.Wrap(errors.WithStack(err), "Failed to start etcd") - } - } else { - break - } - time.Sleep(time.Second * 5) + if err := helthCheck(mgr, node); err != nil { + return err } return nil } + +func helthCheck(mgr *manager.Manager, node *kubekeyapi.HostCfg) error { + checkHealthCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s cluster-health | grep -q 'cluster is healthy'\"", node.Name, node.Name, etcdBinDir, accessAddresses) +helthCheckLoop: + for i := 20; i > 0; i-- { + _, err := mgr.Runner.RunCmd(checkHealthCmd) + if err != nil { + fmt.Println("Waiting for etcd to start") + if i == 1 { + return errors.Wrap(errors.WithStack(err), "Failed to start etcd cluster") + } + } else { + break helthCheckLoop + } + time.Sleep(time.Second * 5) + } + return nil +} + +func refreshConfig(mgr *manager.Manager, node *kubekeyapi.HostCfg, index int, endpoints []string, state string) error { + etcdEnv, err := tmpl.GenerateEtcdEnv(mgr, node, index, endpoints, state) + if err != nil { + return err + } + etcdEnvBase64 := base64.StdEncoding.EncodeToString([]byte(etcdEnv)) + _, err2 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /etc/etcd.env\"", etcdEnvBase64)) + if err2 != nil { + return errors.Wrap(errors.WithStack(err2), "Failed to generate etcd env") + } + return nil +} + +func restartEtcd(mgr *manager.Manager) error { + _, err5 := mgr.Runner.RunCmd("sudo -E /bin/sh -c \"systemctl daemon-reload && systemctl restart etcd && systemctl enable etcd\"") + if err5 != nil { + return errors.Wrap(errors.WithStack(err5), "Failed to start etcd") + } + return nil +} diff --git a/pkg/cluster/etcd/tmpl/etcd.go b/pkg/cluster/etcd/tmpl/etcd.go index 5ebd57ea..faf3bb89 100644 --- a/pkg/cluster/etcd/tmpl/etcd.go +++ b/pkg/cluster/etcd/tmpl/etcd.go @@ -54,7 +54,7 @@ WantedBy=multi-user.target ETCD_DATA_DIR=/var/lib/etcd ETCD_ADVERTISE_CLIENT_URLS=https://{{ .Ip }}:2379 ETCD_INITIAL_ADVERTISE_PEER_URLS=https://{{ .Ip }}:2380 -ETCD_INITIAL_CLUSTER_STATE=new +ETCD_INITIAL_CLUSTER_STATE={{ .State }} ETCD_METRICS=basic ETCD_LISTEN_CLIENT_URLS=https://{{ .Ip }}:2379,https://127.0.0.1:2379 ETCD_ELECTION_TIMEOUT=5000 @@ -63,7 +63,7 @@ ETCD_INITIAL_CLUSTER_TOKEN=k8s_etcd ETCD_LISTEN_PEER_URLS=https://{{ .Ip }}:2380 ETCD_NAME={{ .Name }} ETCD_PROXY=off -ETCD_INITIAL_CLUSTER={{ .Endpoints }} +ETCD_INITIAL_CLUSTER={{ .peerAddresses }} ETCD_AUTO_COMPACTION_RETENTION=8 ETCD_SNAPSHOT_COUNT=10000 @@ -116,17 +116,15 @@ func GenerateEtcdService(mgr *manager.Manager, index int) (string, error) { }) } -func GenerateEtcdEnv(mgr *manager.Manager, node *kubekeyapi.HostCfg, index int) (string, error) { - endpoints := []string{} - for index, host := range mgr.EtcdNodes { - endpoints = append(endpoints, fmt.Sprintf("etcd%d=https://%s:2380", index+1, host.InternalAddress)) - } +func GenerateEtcdEnv(mgr *manager.Manager, node *kubekeyapi.HostCfg, index int, endpoints []string, state string) (string, error) { return util.Render(EtcdEnvTempl, util.Data{ - "Tag": kubekeyapi.DefaultEtcdVersion, - "Name": fmt.Sprintf("etcd%d", index+1), - "Ip": node.InternalAddress, - "Hostname": node.Name, - "Endpoints": strings.Join(endpoints, ","), + "Tag": kubekeyapi.DefaultEtcdVersion, + "Name": fmt.Sprintf("etcd%d", index+1), + "Ip": node.InternalAddress, + "Hostname": node.Name, + "State": state, + "peerAddresses": strings.Join(endpoints, ","), }) + } diff --git a/pkg/install/install.go b/pkg/install/install.go index dae471d4..4b843006 100644 --- a/pkg/install/install.go +++ b/pkg/install/install.go @@ -52,7 +52,9 @@ func ExecTasks(mgr *manager.Manager) error { {Task: images.PreDownloadImages, ErrMsg: "Failed to pre-download images"}, {Task: etcd.GenerateEtcdCerts, ErrMsg: "Failed to generate etcd certs"}, {Task: etcd.SyncEtcdCertsToMaster, ErrMsg: "Failed to sync etcd certs"}, - {Task: etcd.GenerateEtcdService, ErrMsg: "Failed to start etcd cluster"}, + {Task: etcd.GenerateEtcdService, ErrMsg: "Failed to create etcd service"}, + {Task: etcd.SetupEtcdCluster, ErrMsg: "Failed to start etcd cluster"}, + {Task: etcd.RefreshEtcdConfig, ErrMsg: "Failed to refresh etcd configuration"}, {Task: kubernetes.GetClusterStatus, ErrMsg: "Failed to get cluster status"}, {Task: kubernetes.SyncKubeBinaries, ErrMsg: "Failed to sync kube binaries"}, {Task: kubernetes.InitKubernetesCluster, ErrMsg: "Failed to init kubernetes cluster"},