refine etcd steps

This commit is contained in:
pixiake 2020-05-30 01:07:37 +08:00
parent 34c87f8bc3
commit ff1a5d12d1
3 changed files with 153 additions and 77 deletions

View File

@ -37,6 +37,9 @@ var (
etcdConfigDir = "/etc/ssl/etcd"
etcdCertDir = "/etc/ssl/etcd/ssl"
etcdBinDir = "/usr/local/bin"
accessAddresses = ""
peerAddresses = []string{}
etcdStatus = ""
)
func GenerateEtcdCerts(mgr *manager.Manager) error {
@ -139,7 +142,7 @@ func syncEtcdCertsToMaster(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn
}
func GenerateEtcdService(mgr *manager.Manager) error {
mgr.Logger.Infoln("Starting etcd cluster")
mgr.Logger.Infoln("Creating etcd service")
return mgr.RunTaskOnEtcdNodes(generateEtcdService, true)
}
@ -155,16 +158,6 @@ func generateEtcdService(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ss
return errors.Wrap(errors.WithStack(err1), "Failed to generate etcd service")
}
etcdEnv, err := tmpl.GenerateEtcdEnv(mgr, node, mgr.Runner.Index)
if err != nil {
return err
}
etcdEnvBase64 := base64.StdEncoding.EncodeToString([]byte(etcdEnv))
_, err2 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /etc/etcd.env\"", etcdEnvBase64))
if err2 != nil {
return errors.Wrap(errors.WithStack(err2), "Failed to generate etcd env")
}
etcdBin, err := tmpl.GenerateEtcdBinary(mgr, mgr.Runner.Index)
if err != nil {
return err
@ -181,73 +174,156 @@ func generateEtcdService(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ss
return errors.Wrap(errors.WithStack(err4), "Failed to get etcdctl")
}
_, err5 := mgr.Runner.RunCmd("sudo -E /bin/sh -c \"systemctl daemon-reload && systemctl restart etcd\"")
if err5 != nil {
return errors.Wrap(errors.WithStack(err5), "Failed to start etcd")
if err := restartEtcd(mgr); err != nil {
return err
}
addrList := []string{}
for _, host := range mgr.EtcdNodes {
addrList = append(addrList, fmt.Sprintf("https://%s:2379", host.InternalAddress))
}
checkHealthCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s cluster-health | grep -q 'cluster is healthy'\"", node.Name, node.Name, etcdBinDir, strings.Join(addrList, ","))
if mgr.Runner.Index == 0 {
for i := 20; i > 0; i-- {
_, err := mgr.Runner.RunCmd(checkHealthCmd)
if err != nil {
fmt.Println("Waiting for etcd to start")
if i == 1 {
return errors.Wrap(errors.WithStack(err), "Failed to start etcd cluster")
accessAddresses = strings.Join(addrList, ",")
return nil
}
func SetupEtcdCluster(mgr *manager.Manager) error {
mgr.Logger.Infoln("Starting etcd cluster")
return mgr.RunTaskOnEtcdNodes(setupEtcdCluster, false)
}
func setupEtcdCluster(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
localPeerAddresses := []string{}
output, _ := mgr.Runner.RunCmd("sudo -E /bin/sh -c \"[ -f /etc/etcd.env ] && echo 'Configuration file already exists' || echo 'Configuration file will be created'\"")
if strings.TrimSpace(output) == "Configuration file already exists" {
if err := helthCheck(mgr, node); err != nil {
return err
}
etcdStatus = "existing"
for i := 0; i <= mgr.Runner.Index; i++ {
localPeerAddresses = append(localPeerAddresses, fmt.Sprintf("etcd%d=https://%s:2380", i+1, mgr.EtcdNodes[i].InternalAddress))
}
if mgr.Runner.Index == len(mgr.EtcdNodes)-1 {
peerAddresses = localPeerAddresses
}
} else {
for i := 0; i <= mgr.Runner.Index; i++ {
localPeerAddresses = append(localPeerAddresses, fmt.Sprintf("etcd%d=https://%s:2380", i+1, mgr.EtcdNodes[i].InternalAddress))
}
if mgr.Runner.Index == len(mgr.EtcdNodes)-1 {
peerAddresses = localPeerAddresses
}
if mgr.Runner.Index == 0 {
if err := refreshConfig(mgr, node, mgr.Runner.Index, localPeerAddresses, "new"); err != nil {
return err
}
etcdStatus = "new"
} else {
switch etcdStatus {
case "new":
if err := refreshConfig(mgr, node, mgr.Runner.Index, localPeerAddresses, "new"); err != nil {
return err
}
} else {
break
case "existing":
if err := refreshConfig(mgr, node, mgr.Runner.Index, localPeerAddresses, "existing"); err != nil {
return err
}
joinMemberCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s member add %s %s\"", node.Name, node.Name, etcdBinDir, accessAddresses, fmt.Sprintf("etcd%d", mgr.Runner.Index+1), fmt.Sprintf("https://%s:2380", node.InternalAddress))
_, err := mgr.Runner.RunCmd(joinMemberCmd)
if err != nil {
return errors.Wrap(errors.WithStack(err), "Failed to add etcd member")
}
if err := restartEtcd(mgr); err != nil {
return err
}
if err := helthCheck(mgr, node); err != nil {
return err
}
checkMemberCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --no-sync --endpoints=%s member list\"", node.Name, node.Name, etcdBinDir, accessAddresses)
memberList, err := mgr.Runner.RunCmd(checkMemberCmd)
if err != nil {
return errors.Wrap(errors.WithStack(err), "Failed to list etcd member")
}
if !strings.Contains(memberList, fmt.Sprintf("https://%s:2379", node.InternalAddress)) {
return errors.Wrap(errors.WithStack(err), "Failed to add etcd member")
}
default:
return errors.New("Failed to get etcd cluster status")
}
time.Sleep(time.Second * 5)
}
}
return nil
}
func RefreshEtcdConfig(mgr *manager.Manager) error {
mgr.Logger.Infoln("Refreshing etcd configuration")
return mgr.RunTaskOnEtcdNodes(refreshEtcdConfig, true)
}
func refreshEtcdConfig(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
if etcdStatus == "new" {
if err := refreshConfig(mgr, node, mgr.Runner.Index, peerAddresses, "new"); err != nil {
return err
}
if err := restartEtcd(mgr); err != nil {
return err
}
if err := helthCheck(mgr, node); err != nil {
return err
}
}
//else {
// checkMemberCmd := fmt.Sprintf("export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --no-sync --endpoints=%s member list | grep -q %s", node.HostName, node.HostName, etcdBinDir, strings.Join(addrList, ","), fmt.Sprintf("https://%s:2379", node.InternalAddress))
// _, err := mgr.Runner.RunCmd(checkMemberCmd)
// if err != nil {
// joinMemberCmd := fmt.Sprintf("export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s member add %s %s", node.HostName, node.HostName, etcdBinDir, strings.Join(addrList, ","), fmt.Sprintf("etcd%d", mgr.Runner.Index+1), fmt.Sprintf("https://%s:2380", node.InternalAddress))
// _, err := mgr.Runner.RunCmd(joinMemberCmd)
// if err != nil {
// fmt.Println("Failed to add etcd member")
// }
// }
//}
for i := 20; i > 0; i-- {
_, err := mgr.Runner.RunCmd(checkHealthCmd)
if err != nil {
fmt.Println("Waiting for etcd to start")
if i == 1 {
return errors.Wrap(errors.WithStack(err), "Failed to start etcd")
}
} else {
break
}
time.Sleep(time.Second * 5)
if err := refreshConfig(mgr, node, mgr.Runner.Index, peerAddresses, "existing"); err != nil {
return err
}
reloadEtcdEnvCmd := "sed -i '/ETCD_INITIAL_CLUSTER_STATE/s/\\:.*/\\: existing/g' /etc/etcd.env && systemctl daemon-reload && systemctl restart etcd"
_, err6 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", reloadEtcdEnvCmd))
if err6 != nil {
return errors.Wrap(errors.WithStack(err6), "Failed to reload etcd env")
}
for i := 20; i > 0; i-- {
_, err := mgr.Runner.RunCmd(checkHealthCmd)
if err != nil {
fmt.Println("Waiting for etcd to start")
if i == 1 {
return errors.Wrap(errors.WithStack(err), "Failed to start etcd")
}
} else {
break
}
time.Sleep(time.Second * 5)
if err := helthCheck(mgr, node); err != nil {
return err
}
return nil
}
func helthCheck(mgr *manager.Manager, node *kubekeyapi.HostCfg) error {
checkHealthCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s cluster-health | grep -q 'cluster is healthy'\"", node.Name, node.Name, etcdBinDir, accessAddresses)
helthCheckLoop:
for i := 20; i > 0; i-- {
_, err := mgr.Runner.RunCmd(checkHealthCmd)
if err != nil {
fmt.Println("Waiting for etcd to start")
if i == 1 {
return errors.Wrap(errors.WithStack(err), "Failed to start etcd cluster")
}
} else {
break helthCheckLoop
}
time.Sleep(time.Second * 5)
}
return nil
}
func refreshConfig(mgr *manager.Manager, node *kubekeyapi.HostCfg, index int, endpoints []string, state string) error {
etcdEnv, err := tmpl.GenerateEtcdEnv(mgr, node, index, endpoints, state)
if err != nil {
return err
}
etcdEnvBase64 := base64.StdEncoding.EncodeToString([]byte(etcdEnv))
_, err2 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /etc/etcd.env\"", etcdEnvBase64))
if err2 != nil {
return errors.Wrap(errors.WithStack(err2), "Failed to generate etcd env")
}
return nil
}
func restartEtcd(mgr *manager.Manager) error {
_, err5 := mgr.Runner.RunCmd("sudo -E /bin/sh -c \"systemctl daemon-reload && systemctl restart etcd && systemctl enable etcd\"")
if err5 != nil {
return errors.Wrap(errors.WithStack(err5), "Failed to start etcd")
}
return nil
}

View File

@ -54,7 +54,7 @@ WantedBy=multi-user.target
ETCD_DATA_DIR=/var/lib/etcd
ETCD_ADVERTISE_CLIENT_URLS=https://{{ .Ip }}:2379
ETCD_INITIAL_ADVERTISE_PEER_URLS=https://{{ .Ip }}:2380
ETCD_INITIAL_CLUSTER_STATE=new
ETCD_INITIAL_CLUSTER_STATE={{ .State }}
ETCD_METRICS=basic
ETCD_LISTEN_CLIENT_URLS=https://{{ .Ip }}:2379,https://127.0.0.1:2379
ETCD_ELECTION_TIMEOUT=5000
@ -63,7 +63,7 @@ ETCD_INITIAL_CLUSTER_TOKEN=k8s_etcd
ETCD_LISTEN_PEER_URLS=https://{{ .Ip }}:2380
ETCD_NAME={{ .Name }}
ETCD_PROXY=off
ETCD_INITIAL_CLUSTER={{ .Endpoints }}
ETCD_INITIAL_CLUSTER={{ .peerAddresses }}
ETCD_AUTO_COMPACTION_RETENTION=8
ETCD_SNAPSHOT_COUNT=10000
@ -116,17 +116,15 @@ func GenerateEtcdService(mgr *manager.Manager, index int) (string, error) {
})
}
func GenerateEtcdEnv(mgr *manager.Manager, node *kubekeyapi.HostCfg, index int) (string, error) {
endpoints := []string{}
for index, host := range mgr.EtcdNodes {
endpoints = append(endpoints, fmt.Sprintf("etcd%d=https://%s:2380", index+1, host.InternalAddress))
}
func GenerateEtcdEnv(mgr *manager.Manager, node *kubekeyapi.HostCfg, index int, endpoints []string, state string) (string, error) {
return util.Render(EtcdEnvTempl, util.Data{
"Tag": kubekeyapi.DefaultEtcdVersion,
"Name": fmt.Sprintf("etcd%d", index+1),
"Ip": node.InternalAddress,
"Hostname": node.Name,
"Endpoints": strings.Join(endpoints, ","),
"Tag": kubekeyapi.DefaultEtcdVersion,
"Name": fmt.Sprintf("etcd%d", index+1),
"Ip": node.InternalAddress,
"Hostname": node.Name,
"State": state,
"peerAddresses": strings.Join(endpoints, ","),
})
}

View File

@ -52,7 +52,9 @@ func ExecTasks(mgr *manager.Manager) error {
{Task: images.PreDownloadImages, ErrMsg: "Failed to pre-download images"},
{Task: etcd.GenerateEtcdCerts, ErrMsg: "Failed to generate etcd certs"},
{Task: etcd.SyncEtcdCertsToMaster, ErrMsg: "Failed to sync etcd certs"},
{Task: etcd.GenerateEtcdService, ErrMsg: "Failed to start etcd cluster"},
{Task: etcd.GenerateEtcdService, ErrMsg: "Failed to create etcd service"},
{Task: etcd.SetupEtcdCluster, ErrMsg: "Failed to start etcd cluster"},
{Task: etcd.RefreshEtcdConfig, ErrMsg: "Failed to refresh etcd configuration"},
{Task: kubernetes.GetClusterStatus, ErrMsg: "Failed to get cluster status"},
{Task: kubernetes.SyncKubeBinaries, ErrMsg: "Failed to sync kube binaries"},
{Task: kubernetes.InitKubernetesCluster, ErrMsg: "Failed to init kubernetes cluster"},