From 332a0eef8d72151e456c8a2374c4a5b9e74d09e2 Mon Sep 17 00:00:00 2001 From: 24sama Date: Wed, 22 Sep 2021 15:25:51 +0800 Subject: [PATCH] Experiment: add list_certs pipeline and renew_certs pipeline Signed-off-by: 24sama --- cmd/list_cert.go | 13 +- cmd/renew_cert.go | 13 +- pkg/cluster/install/install_test.go | 2 +- pkg/pipelines/certs/module.go | 139 +++++++++++++++++- pkg/pipelines/certs/tasks.go | 214 ++++++++++++++++++++++++++-- pkg/pipelines/check_certs.go | 10 +- pkg/pipelines/common/common.go | 6 +- pkg/pipelines/common/loader.go | 9 +- pkg/pipelines/renew_certs.go | 45 ++++++ 9 files changed, 407 insertions(+), 44 deletions(-) create mode 100644 pkg/pipelines/renew_certs.go diff --git a/cmd/list_cert.go b/cmd/list_cert.go index 9cc45142..552c0a19 100644 --- a/cmd/list_cert.go +++ b/cmd/list_cert.go @@ -1,17 +1,20 @@ package cmd import ( - "github.com/kubesphere/kubekey/pkg/cluster/certs" - "github.com/kubesphere/kubekey/pkg/util" + "github.com/kubesphere/kubekey/pkg/pipelines" + "github.com/kubesphere/kubekey/pkg/pipelines/common" "github.com/spf13/cobra" ) var listClusterCertsCmd = &cobra.Command{ Use: "check-expiration", Short: "Check certificates expiration for a Kubernetes cluster", - Run: func(cmd *cobra.Command, args []string) { - logger := util.InitLogger(opt.Verbose) - _ = certs.ListCluster(opt.ClusterCfgFile, logger, opt.Verbose) + RunE: func(cmd *cobra.Command, args []string) error { + arg := common.Argument{ + FilePath: opt.ClusterCfgFile, + Debug: opt.Verbose, + } + return pipelines.CheckCerts(arg) }, } diff --git a/cmd/renew_cert.go b/cmd/renew_cert.go index 28b16744..f6bbe605 100644 --- a/cmd/renew_cert.go +++ b/cmd/renew_cert.go @@ -1,17 +1,20 @@ package cmd import ( - "github.com/kubesphere/kubekey/pkg/cluster/certs" - "github.com/kubesphere/kubekey/pkg/util" + "github.com/kubesphere/kubekey/pkg/pipelines" + "github.com/kubesphere/kubekey/pkg/pipelines/common" "github.com/spf13/cobra" ) var renewClusterCertsCmd = &cobra.Command{ Use: "renew", Short: "renew a cluster certs", - Run: func(cmd *cobra.Command, args []string) { - logger := util.InitLogger(opt.Verbose) - _ = certs.RenewClusterCerts(opt.ClusterCfgFile, logger, opt.Verbose) + RunE: func(cmd *cobra.Command, args []string) error { + arg := common.Argument{ + FilePath: opt.ClusterCfgFile, + Debug: opt.Verbose, + } + return pipelines.RenewCerts(arg) }, } diff --git a/pkg/cluster/install/install_test.go b/pkg/cluster/install/install_test.go index 866c53f6..749dcb3d 100644 --- a/pkg/cluster/install/install_test.go +++ b/pkg/cluster/install/install_test.go @@ -94,7 +94,7 @@ func assertEqual(t *testing.T, actual interface{}, expected interface{}) { func Test_install(t *testing.T) { cfg, objName := GenTestClusterCfg("Test_install") - executor := executor.NewExecutor(&cfg.Spec, objName, logger, "", true, true, true, false, false, nil) + executor := executor.NewExecutor(&cfg.Spec, objName, logger, "", true, true, true, false, false, nil, "") connections := make(map[string]*MockConnection) executor.Connector = &MockConnector{connections: connections} diff --git a/pkg/pipelines/certs/module.go b/pkg/pipelines/certs/module.go index 4be50110..df336db0 100644 --- a/pkg/pipelines/certs/module.go +++ b/pkg/pipelines/certs/module.go @@ -1,7 +1,14 @@ package certs import ( + "fmt" + "github.com/kubesphere/kubekey/pkg/core/modules" + "github.com/kubesphere/kubekey/pkg/core/prepare" "github.com/kubesphere/kubekey/pkg/pipelines/common" + "github.com/kubesphere/kubekey/pkg/pipelines/kubernetes" + "github.com/pkg/errors" + "os" + "text/tabwriter" ) type CheckCertsModule struct { @@ -11,11 +18,129 @@ type CheckCertsModule struct { func (c *CheckCertsModule) Init() { c.Name = "CheckCertsModule" - //check := &modules.Task{ - // Name: "CheckClusterCerts", - // Desc: "check cluster certs", - // Hosts: c.Runtime.GetHostsByRole(common.Master), - // Action: , - // Parallel: true, - //} + check := &modules.Task{ + Name: "CheckClusterCerts", + Desc: "check cluster certs", + Hosts: c.Runtime.GetHostsByRole(common.Master), + Action: new(ListClusterCerts), + Parallel: true, + } + + c.Tasks = []*modules.Task{ + check, + } +} + +type PrintClusterCertsModule struct { + common.KubeModule +} + +func (p *PrintClusterCertsModule) Init() { + p.Name = "PrintClusterCertsModule" + p.Desc = "display cluster certs form" +} + +func (p *PrintClusterCertsModule) Run() error { + certificates := make([]*Certificate, 0) + caCertificates := make([]*CaCertificate, 0) + + for _, host := range p.Runtime.GetHostsByRole(common.Master) { + certs, ok := host.GetCache().Get(common.Certificate) + if !ok { + return errors.New("get certificate failed by pipeline cache") + } + ca, ok := host.GetCache().Get(common.CaCertificate) + if !ok { + return errors.New("get ca certificate failed by pipeline cache") + } + hostCertificates := certs.([]*Certificate) + hostCaCertificates := ca.([]*CaCertificate) + certificates = append(certificates, hostCertificates...) + caCertificates = append(caCertificates, hostCaCertificates...) + } + + w := tabwriter.NewWriter(os.Stdout, 10, 4, 3, ' ', 0) + _, _ = fmt.Fprintln(w, "CERTIFICATE\tEXPIRES\tRESIDUAL TIME\tCERTIFICATE AUTHORITY\tNODE") + for _, cert := range certificates { + s := fmt.Sprintf("%s\t%s\t%s\t%s\t%-8v", + cert.Name, + cert.Expires, + cert.Residual, + cert.AuthorityName, + cert.NodeName, + ) + + _, _ = fmt.Fprintln(w, s) + continue + } + _, _ = fmt.Fprintln(w) + _, _ = fmt.Fprintln(w, "CERTIFICATE AUTHORITY\tEXPIRES\tRESIDUAL TIME\tNODE") + for _, caCert := range caCertificates { + c := fmt.Sprintf("%s\t%s\t%s\t%-8v", + caCert.AuthorityName, + caCert.Expires, + caCert.Residual, + caCert.NodeName, + ) + + _, _ = fmt.Fprintln(w, c) + continue + } + + _ = w.Flush() + return nil +} + +type RenewCertsModule struct { + common.KubeModule +} + +func (r *RenewCertsModule) Init() { + r.Name = "RenewCertsModule" + + renew := &modules.Task{ + Name: "RenewCerts", + Desc: "Renew control-plane certs", + Hosts: r.Runtime.GetHostsByRole(common.Master), + Action: new(RenewCerts), + Parallel: false, + Retry: 5, + } + + copyKubeConfig := &modules.Task{ + Name: "CopyKubeConfig", + Desc: "Copy admin.conf to ~/.kube/config", + Hosts: r.Runtime.GetHostsByRole(common.Master), + Action: new(kubernetes.CopyKubeConfigForControlPlane), + Parallel: true, + Retry: 2, + } + + fetchKubeConfig := &modules.Task{ + Name: "FetchKubeConfig", + Desc: "Fetch kube config file from control-plane", + Hosts: r.Runtime.GetHostsByRole(common.Master), + Prepare: new(common.OnlyFirstMaster), + Action: new(FetchKubeConfig), + Parallel: true, + } + + syncKubeConfig := &modules.Task{ + Name: "SyncKubeConfig", + Desc: "synchronize kube config to worker", + Hosts: r.Runtime.GetHostsByRole(common.Worker), + Prepare: &prepare.PrepareCollection{ + new(common.OnlyWorker), + }, + Action: new(SyneKubeConfigToWorker), + Parallel: true, + Retry: 3, + } + + r.Tasks = []*modules.Task{ + renew, + copyKubeConfig, + fetchKubeConfig, + syncKubeConfig, + } } diff --git a/pkg/pipelines/certs/tasks.go b/pkg/pipelines/certs/tasks.go index 40343fed..67ad2696 100644 --- a/pkg/pipelines/certs/tasks.go +++ b/pkg/pipelines/certs/tasks.go @@ -1,8 +1,18 @@ package certs import ( + "encoding/base64" + "fmt" "github.com/kubesphere/kubekey/pkg/core/connector" "github.com/kubesphere/kubekey/pkg/pipelines/common" + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest" + certutil "k8s.io/client-go/util/cert" + "path/filepath" + "strings" + "time" ) type Certificate struct { @@ -35,8 +45,6 @@ var ( "controller-manager.conf", "scheduler.conf", } - certificates []*Certificate - caCertificates []*CaCertificate ) type ListClusterCerts struct { @@ -44,17 +52,195 @@ type ListClusterCerts struct { } func (l *ListClusterCerts) Execute(runtime connector.Runtime) error { - //for _, certFileName := range certificateList { - // certPath := filepath.Join(common.KubeCertDir, certFileName) - // certContext, err := runtime.GetRunner().SudoCmd(fmt.Sprintf("cat %s", certPath), false) - // if err != nil { - // return errors.Wrap(err, "get cluster certs failed") - // } - // if cert, err := getCertInfo(certContext, certFileName, node.Name); err != nil { - // return err - // } else { - // certificates = append(certificates, cert) - // } - //} + host := runtime.RemoteHost() + + certificates := make([]*Certificate, 0) + caCertificates := make([]*CaCertificate, 0) + + for _, certFileName := range certificateList { + certPath := filepath.Join(common.KubeCertDir, certFileName) + certContext, err := runtime.GetRunner().SudoCmd(fmt.Sprintf("cat %s", certPath), false) + if err != nil { + return errors.Wrap(err, "get cluster certs failed") + } + if cert, err := getCertInfo(certContext, certFileName, host.GetName()); err != nil { + return err + } else { + certificates = append(certificates, cert) + } + } + for _, kubeConfigFileName := range kubeConfigList { + kubeConfigPath := filepath.Join(common.KubeConfigDir, kubeConfigFileName) + newConfig := clientcmdapi.NewConfig() + kubeconfigBytes, err := runtime.GetRunner().SudoCmd(fmt.Sprintf("cat %s", kubeConfigPath), false) + decoded, _, err := clientcmdlatest.Codec.Decode([]byte(kubeconfigBytes), &schema.GroupVersionKind{Version: clientcmdlatest.Version, Kind: "Config"}, newConfig) + if err != nil { + return err + } + newConfig = decoded.(*clientcmdapi.Config) + for _, a := range newConfig.AuthInfos { + certContextBase64 := a.ClientCertificateData + tmp := base64.StdEncoding.EncodeToString(certContextBase64) + certContext, err := base64.StdEncoding.DecodeString(tmp) + if err != nil { + return err + } + if cert, err := getCertInfo(string(certContext), kubeConfigFileName, host.GetName()); err != nil { + return err + } else { + certificates = append(certificates, cert) + } + } + } + + for _, caCertFileName := range caCertificateList { + certPath := filepath.Join(common.KubeCertDir, caCertFileName) + caCertContext, err := runtime.GetRunner().SudoCmd(fmt.Sprintf("cat %s", certPath), false) + if err != nil { + return errors.Wrap(err, "Failed to get cluster certs") + } + if cert, err := getCaCertInfo(caCertContext, caCertFileName, host.GetName()); err != nil { + return err + } else { + caCertificates = append(caCertificates, cert) + } + } + + host.GetCache().Set(common.Certificate, certificates) + host.GetCache().Set(common.CaCertificate, caCertificates) + return nil +} + +func getCertInfo(certContext, certFileName, nodeName string) (*Certificate, error) { + certs, err1 := certutil.ParseCertsPEM([]byte(certContext)) + if err1 != nil { + return nil, errors.Wrap(err1, "Failed to get cluster certs") + } + var authorityName string + switch certFileName { + case "apiserver.crt": + authorityName = "ca" + case "apiserver-kubelet-client.crt": + authorityName = "ca" + case "front-proxy-client.crt": + authorityName = "front-proxy-ca" + default: + authorityName = "" + } + cert := Certificate{ + Name: certFileName, + Expires: certs[0].NotAfter.Format("Jan 02, 2006 15:04 MST"), + Residual: ResidualTime(certs[0].NotAfter), + AuthorityName: authorityName, + NodeName: nodeName, + } + return &cert, nil +} + +func getCaCertInfo(certContext, certFileName, nodeName string) (*CaCertificate, error) { + certs, err := certutil.ParseCertsPEM([]byte(certContext)) + if err != nil { + return nil, errors.Wrap(err, "Failed to get cluster certs") + } + cert1 := CaCertificate{ + AuthorityName: certFileName, + Expires: certs[0].NotAfter.Format("Jan 02, 2006 15:04 MST"), + Residual: ResidualTime(certs[0].NotAfter), + NodeName: nodeName, + } + return &cert1, nil +} + +func ResidualTime(t time.Time) string { + d := time.Until(t) + if seconds := int(d.Seconds()); seconds < -1 { + return fmt.Sprintf("") + } else if seconds < 0 { + return fmt.Sprintf("0s") + } else if seconds < 60 { + return fmt.Sprintf("%ds", seconds) + } else if minutes := int(d.Minutes()); minutes < 60 { + return fmt.Sprintf("%dm", minutes) + } else if hours := int(d.Hours()); hours < 24 { + return fmt.Sprintf("%dh", hours) + } else if hours < 24*365 { + return fmt.Sprintf("%dd", hours/24) + } + return fmt.Sprintf("%dy", int(d.Hours()/24/365)) +} + +type RenewCerts struct { + common.KubeAction +} + +func (r *RenewCerts) Execute(runtime connector.Runtime) error { + var kubeadmList = []string{ + "cd /etc/kubernetes", + "/usr/local/bin/kubeadm alpha certs renew apiserver", + "/usr/local/bin/kubeadm alpha certs renew apiserver-kubelet-client", + "/usr/local/bin/kubeadm alpha certs renew front-proxy-client", + "/usr/local/bin/kubeadm alpha certs renew admin.conf", + "/usr/local/bin/kubeadm alpha certs renew controller-manager.conf", + "/usr/local/bin/kubeadm alpha certs renew scheduler.conf", + } + + var restartList = []string{ + "docker ps -af name=k8s_kube-apiserver* -q | xargs --no-run-if-empty docker rm -f", + "docker ps -af name=k8s_kube-scheduler* -q | xargs --no-run-if-empty docker rm -f", + "docker ps -af name=k8s_kube-controller-manager* -q | xargs --no-run-if-empty docker rm -f", + "systemctl restart kubelet", + } + + _, err := runtime.GetRunner().SudoCmd(strings.Join(kubeadmList, " && "), false) + if err != nil { + return errors.Wrap(err, "kubeadm alpha certs renew failed") + } + _, err = runtime.GetRunner().SudoCmd(strings.Join(restartList, " && "), false) + if err != nil { + return errors.Wrap(err, "kube-apiserver, kube-schedule, kube-controller-manager or kubelet restart failed") + } + return nil +} + +type FetchKubeConfig struct { + common.KubeAction +} + +func (f *FetchKubeConfig) Execute(runtime connector.Runtime) error { + tmpConfigFile := filepath.Join(common.TmpDir, "admin.conf") + if _, err := runtime.GetRunner().SudoCmd(fmt.Sprintf("cp /etc/kubernetes/admin.conf %s", tmpConfigFile), false); err != nil { + return errors.Wrap(errors.WithStack(err), "copy kube config to /tmp/ failed") + } + + host := runtime.RemoteHost() + if err := runtime.GetRunner().Fetch(filepath.Join(runtime.GetWorkDir(), host.GetName(), "admin.conf"), tmpConfigFile); err != nil { + return errors.Wrap(errors.WithStack(err), "fetch kube config file failed") + } + return nil +} + +type SyneKubeConfigToWorker struct { + common.KubeAction +} + +func (s *SyneKubeConfigToWorker) Execute(runtime connector.Runtime) error { + createConfigDirCmd := "mkdir -p /root/.kube && mkdir -p $HOME/.kube" + if _, err := runtime.GetRunner().SudoCmd(createConfigDirCmd, false); err != nil { + return errors.Wrap(errors.WithStack(err), "create .kube dir failed") + } + + firstMaster := runtime.GetHostsByRole(common.Master)[0] + if err := runtime.GetRunner().SudoScp(filepath.Join(runtime.GetWorkDir(), firstMaster.GetName(), "admin.conf"), "/root/.kube/config"); err != nil { + return errors.Wrap(errors.WithStack(err), "sudo scp config file to worker /root/.kube/config failed") + } + + if err := runtime.GetRunner().SudoScp(filepath.Join(runtime.GetWorkDir(), firstMaster.GetName(), "admin.conf"), "$HOME/.kube/config"); err != nil { + return errors.Wrap(errors.WithStack(err), "sudo scp config file to worker $HOME/.kube/config failed") + } + + chownKubeConfig := "chown $(id -u):$(id -g) -R $HOME/.kube" + if _, err := runtime.GetRunner().SudoCmd(chownKubeConfig, false); err != nil { + return errors.Wrap(errors.WithStack(err), "chown .kube dir failed") + } return nil } diff --git a/pkg/pipelines/check_certs.go b/pkg/pipelines/check_certs.go index 6e8927f1..820e62d3 100644 --- a/pkg/pipelines/check_certs.go +++ b/pkg/pipelines/check_certs.go @@ -3,18 +3,14 @@ package pipelines import ( "github.com/kubesphere/kubekey/pkg/core/modules" "github.com/kubesphere/kubekey/pkg/core/pipeline" - "github.com/kubesphere/kubekey/pkg/pipelines/bootstrap/config" - "github.com/kubesphere/kubekey/pkg/pipelines/bootstrap/confirm" + "github.com/kubesphere/kubekey/pkg/pipelines/certs" "github.com/kubesphere/kubekey/pkg/pipelines/common" - "github.com/kubesphere/kubekey/pkg/pipelines/kubernetes" ) func CheckCertsPipeline(runtime *common.KubeRuntime) error { m := []modules.Module{ - &confirm.DeleteNodeConfirmModule{}, - &config.ModifyConfigModule{}, - &kubernetes.CompareConfigAndClusterInfoModule{}, - &kubernetes.DeleteKubeNodeModule{}, + &certs.CheckCertsModule{}, + &certs.PrintClusterCertsModule{}, } p := pipeline.Pipeline{ diff --git a/pkg/pipelines/common/common.go b/pkg/pipelines/common/common.go index 4be328b7..3ee7ba89 100644 --- a/pkg/pipelines/common/common.go +++ b/pkg/pipelines/common/common.go @@ -50,6 +50,10 @@ const ( ETCDExist = "etcdExist" // KubernetesModule - ClusterStatus = "ClusterStatus" + ClusterStatus = "clusterStatus" ClusterExist = "clusterExist" + + // CertsModule + Certificate = "certificate" + CaCertificate = "caCertificate" ) diff --git a/pkg/pipelines/common/loader.go b/pkg/pipelines/common/loader.go index f63f0ff5..59a54a6e 100644 --- a/pkg/pipelines/common/loader.go +++ b/pkg/pipelines/common/loader.go @@ -139,9 +139,10 @@ func (f FileLoader) Load() (*kubekeyapiv1alpha1.Cluster, error) { if err != nil { return nil, errors.Wrap(err, "Failed to look up current directory") } - if len(f.KubernetesVersion) != 0 { - _ = exec.Command("/bin/sh", "-c", fmt.Sprintf("sed -i \"/version/s/\\:.*/\\: %s/g\" %s", f.KubernetesVersion, fp)).Run() - } + // It will lead to nil pointer err + //if len(f.KubernetesVersion) != 0 { + // _ = exec.Command("/bin/sh", "-c", fmt.Sprintf("sed -i \"/version/s/\\:.*/\\: %s/g\" %s", f.KubernetesVersion, fp)).Run() + //} file, err := os.Open(fp) if err != nil { return nil, errors.Wrap(err, "Unable to open the given cluster configuration file") @@ -190,7 +191,7 @@ func (f FileLoader) Load() (*kubekeyapiv1alpha1.Cluster, error) { clusterCfg.Spec.KubeSphere.Configurations = "---\n" + string(content) clusterCfg.Spec.KubeSphere.Version = "v2.1.1" default: - return nil, errors.Wrap(err, fmt.Sprintf("Unsupported version: %s", labels["version"])) + return nil, errors.New(fmt.Sprintf("Unsupported version: %s", labels["version"])) } } } diff --git a/pkg/pipelines/renew_certs.go b/pkg/pipelines/renew_certs.go new file mode 100644 index 00000000..64870d64 --- /dev/null +++ b/pkg/pipelines/renew_certs.go @@ -0,0 +1,45 @@ +package pipelines + +import ( + "github.com/kubesphere/kubekey/pkg/core/modules" + "github.com/kubesphere/kubekey/pkg/core/pipeline" + "github.com/kubesphere/kubekey/pkg/pipelines/certs" + "github.com/kubesphere/kubekey/pkg/pipelines/common" +) + +func RenewCertsPipeline(runtime *common.KubeRuntime) error { + m := []modules.Module{ + &certs.RenewCertsModule{}, + &certs.CheckCertsModule{}, + &certs.PrintClusterCertsModule{}, + } + + p := pipeline.Pipeline{ + Name: "CheckCertsPipeline", + Modules: m, + Runtime: runtime, + } + if err := p.Start(); err != nil { + return err + } + return nil +} + +func RenewCerts(args common.Argument) error { + var loaderType string + if args.FilePath != "" { + loaderType = common.File + } else { + loaderType = common.AllInOne + } + + runtime, err := common.NewKubeRuntime(loaderType, args) + if err != nil { + return err + } + + if err := RenewCertsPipeline(runtime); err != nil { + return err + } + return nil +}