From 9b2f4172417801222e3d168524d8f33929ffb1ee Mon Sep 17 00:00:00 2001 From: pixiake Date: Sat, 18 Apr 2020 00:00:46 +0800 Subject: [PATCH] add install cluster --- apis/v1alpha1/cluster.go | 33 +++ cluster/container-engine/docker/docker.go | 2 +- cluster/etcd/etcd.go | 234 ++++++++++++++++++++++ cluster/etcd/tmpl/certs.go | 198 ++++++++++++++++++ cluster/etcd/tmpl/etcd.go | 116 +++++++++++ cluster/kubernetes/master.go | 30 ++- cluster/kubernetes/nodes.go | 13 +- cluster/kubernetes/tmpl/kubeadm.go | 113 +++++++++++ cluster/preinstall/initOS.go | 6 +- cluster/preinstall/tmpl/InitOS.go | 2 +- install/executor.go | 50 +---- install/install.go | 5 + util/manager/manager.go | 38 +--- util/manager/tasks.go | 71 ++++--- util/runner/runner.go | 38 ++-- util/runner/tee.go | 119 ----------- util/ssh/ssh.go | 56 +++--- util/util.go | 130 ++++++++++++ 18 files changed, 968 insertions(+), 286 deletions(-) create mode 100644 cluster/etcd/tmpl/certs.go create mode 100644 cluster/etcd/tmpl/etcd.go delete mode 100644 util/runner/tee.go diff --git a/apis/v1alpha1/cluster.go b/apis/v1alpha1/cluster.go index e9dcf1e9..e90fa08b 100644 --- a/apis/v1alpha1/cluster.go +++ b/apis/v1alpha1/cluster.go @@ -2,6 +2,7 @@ package v1alpha1 import ( "fmt" + "github.com/pixiake/kubekey/util" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -81,6 +82,13 @@ func addDefaultingFuncs(scheme *runtime.Scheme) error { return RegisterDefaults(scheme) } +type ExternalEtcd struct { + Endpoints []string + CaFile string + CertFile string + KeyFile string +} + func RegisterDefaults(scheme *runtime.Scheme) error { scheme.AddTypeDefaultingFunc(&ClusterCfg{}, func(obj interface{}) { SetDefaultClusterCfg(obj.(*ClusterCfg)) }) return nil @@ -106,3 +114,28 @@ func (cfg *ClusterCfg) GenerateHosts() []string { hostsList = append(hostsList, lbHost) return hostsList } + +func (cfg *ClusterCfg) GenerateCertSANs() []string { + clusterSvc := fmt.Sprintf("kubernetes.default.svc.%s", cfg.KubeClusterName) + defaultCertSANs := []string{"kubernetes", "kubernetes.default", "kubernetes.default.svc", clusterSvc, "localhost", "127.0.0.1"} + extraCertSANs := []string{} + + extraCertSANs = append(extraCertSANs, cfg.LBKubeApiserver.Domain) + extraCertSANs = append(extraCertSANs, cfg.LBKubeApiserver.Address) + + for _, host := range cfg.Hosts { + extraCertSANs = append(extraCertSANs, host.HostName) + extraCertSANs = append(extraCertSANs, fmt.Sprintf("%s.%s", host.HostName, cfg.KubeClusterName)) + if host.SSHAddress != cfg.LBKubeApiserver.Address { + extraCertSANs = append(extraCertSANs, host.SSHAddress) + } + if host.InternalAddress != host.SSHAddress && host.InternalAddress != cfg.LBKubeApiserver.Address { + extraCertSANs = append(extraCertSANs, host.InternalAddress) + } + } + + extraCertSANs = append(extraCertSANs, util.ParseIp(cfg.Network.KubeServiceCIDR)[0]) + + defaultCertSANs = append(defaultCertSANs, extraCertSANs...) + return defaultCertSANs +} diff --git a/cluster/container-engine/docker/docker.go b/cluster/container-engine/docker/docker.go index fab62faa..9b127fe5 100644 --- a/cluster/container-engine/docker/docker.go +++ b/cluster/container-engine/docker/docker.go @@ -15,7 +15,7 @@ func InstallerDocker(mgr *manager.Manager) error { func installDockerOnNode(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error { cmd := "sudo sh -c \"[ -z $(which docker) ] && curl https://raw.githubusercontent.com/pixiake/kubeocean/master/scripts/docker-install.sh | sh ; systemctl enable docker\"" - _, err := mgr.Runner.RunRaw(cmd) + _, err := mgr.Runner.RunCmd(cmd) if err != nil { return errors.Wrap(errors.WithStack(err), "failed to install docker") } diff --git a/cluster/etcd/etcd.go b/cluster/etcd/etcd.go index 2db03675..88b715c4 100644 --- a/cluster/etcd/etcd.go +++ b/cluster/etcd/etcd.go @@ -1 +1,235 @@ package etcd + +import ( + "encoding/base64" + "fmt" + kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1" + "github.com/pixiake/kubekey/cluster/etcd/tmpl" + "github.com/pixiake/kubekey/util/manager" + "github.com/pixiake/kubekey/util/ssh" + "github.com/pkg/errors" + "strings" + "time" +) + +var ( + certsStr = make(chan map[string]string) + certsContent = map[string]string{} + etcdBackupPrefix = "/var/backups" + etcdDataDir = "/var/lib/etcd" + etcdConfigDir = "/etc/ssl/etcd" + etcdCertDir = "/etc/ssl/etcd/ssl" + etcdBinDir = "/usr/local/bin" +) + +func GenerateEtcdCerts(mgr *manager.Manager) error { + mgr.Logger.Infoln("Generate etcd certs") + + return mgr.RunTaskOnEtcdNodes(generateCerts, true) +} + +func generateCerts(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error { + + if mgr.Runner.Index == 0 { + certsScript, err := tmpl.GenerateEtcdSslScript(mgr.Cluster) + if err != nil { + return err + } + certsScriptBase64 := base64.StdEncoding.EncodeToString([]byte(certsScript)) + _, err1 := mgr.Runner.RunCmd(fmt.Sprintf("echo %s | base64 -d > /tmp/kubekey/make-ssl-etcd.sh && chmod +x /tmp/kubekey/make-ssl-etcd.sh", certsScriptBase64)) + if err1 != nil { + return errors.Wrap(errors.WithStack(err1), "failed to generate etcd certs script") + } + certsOpensslCfg, err := tmpl.GenerateEtcdSslCfg(mgr.Cluster) + if err != nil { + return err + } + certsOpensslCfgBase64 := base64.StdEncoding.EncodeToString([]byte(certsOpensslCfg)) + _, err2 := mgr.Runner.RunCmd(fmt.Sprintf("echo %s | base64 -d > /tmp/kubekey/openssl.conf", certsOpensslCfgBase64)) + if err2 != nil { + return errors.Wrap(errors.WithStack(err2), "failed to generate etcd certs script") + } + + cmd := fmt.Sprintf("mkdir -p %s && /bin/bash -x %s/make-ssl-etcd.sh -f %s/openssl.conf -d %s", etcdCertDir, "/tmp/kubekey", "/tmp/kubekey", etcdCertDir) + + _, err3 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", cmd)) + if err3 != nil { + return errors.Wrap(errors.WithStack(err3), "failed to generate etcd certs") + } + + for _, cert := range generateCertsFiles(mgr) { + certsBase64Cmd := fmt.Sprintf("cat %s/%s | base64 --wrap=0", etcdCertDir, cert) + certsBase64, err4 := mgr.Runner.RunCmd(certsBase64Cmd) + if err4 != nil { + return errors.Wrap(errors.WithStack(err4), "failed to get etcd certs content") + } + certsContent[cert] = certsBase64 + } + + for i := 1; i <= len(mgr.EtcdNodes.Hosts)-1; i++ { + certsStr <- certsContent + } + + } else { + mgr.Runner.RunCmd(fmt.Sprintf("sudo mkdir -p %s", etcdCertDir)) + for file, cert := range <-certsStr { + writeCertCmd := fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > %s/%s\"", cert, etcdCertDir, file) + _, err4 := mgr.Runner.RunCmd(writeCertCmd) + if err4 != nil { + return errors.Wrap(errors.WithStack(err4), "failed to write etcd certs content") + } + } + } + + return nil +} + +func generateCertsFiles(mgr *manager.Manager) []string { + var certsList []string + certsList = append(certsList, "ca.pem") + certsList = append(certsList, "ca-key.pem") + for _, host := range mgr.EtcdNodes.Hosts { + certsList = append(certsList, fmt.Sprintf("admin-%s.pem", host.HostName)) + certsList = append(certsList, fmt.Sprintf("admin-%s-key.pem", host.HostName)) + certsList = append(certsList, fmt.Sprintf("member-%s.pem", host.HostName)) + certsList = append(certsList, fmt.Sprintf("member-%s-key.pem", host.HostName)) + } + for _, host := range mgr.MasterNodes.Hosts { + certsList = append(certsList, fmt.Sprintf("node-%s.pem", host.HostName)) + certsList = append(certsList, fmt.Sprintf("node-%s-key.pem", host.HostName)) + } + return certsList +} + +func SyncEtcdCertsToMaster(mgr *manager.Manager) error { + mgr.Logger.Infoln("Sync etcd certs") + + return mgr.RunTaskOnMasterNodes(syncEtcdCertsToMaster, true) +} + +func syncEtcdCertsToMaster(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error { + if !node.IsEtcd { + mgr.Runner.RunCmd(fmt.Sprintf("sudo mkdir -p %s", etcdCertDir)) + for file, cert := range certsContent { + writeCertCmd := fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > %s/%s\"", cert, etcdCertDir, file) + _, err := mgr.Runner.RunCmd(writeCertCmd) + if err != nil { + return errors.Wrap(errors.WithStack(err), "failed to sync etcd certs to master") + } + } + } + return nil +} + +func GenerateEtcdService(mgr *manager.Manager) error { + mgr.Logger.Infoln("Start etcd cluster") + + return mgr.RunTaskOnEtcdNodes(generateEtcdService, true) +} + +func generateEtcdService(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error { + etcdService, err := tmpl.GenerateEtcdService(mgr, mgr.Runner.Index) + if err != nil { + return err + } + etcdServiceBase64 := base64.StdEncoding.EncodeToString([]byte(etcdService)) + _, err1 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /etc/systemd/system/etcd.service\"", etcdServiceBase64)) + if err1 != nil { + return errors.Wrap(errors.WithStack(err1), "failed to generate etcd service") + } + + etcdEnv, err := tmpl.GenerateEtcdEnv(mgr, node, mgr.Runner.Index) + if err != nil { + return err + } + etcdEnvBase64 := base64.StdEncoding.EncodeToString([]byte(etcdEnv)) + _, err2 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /etc/etcd.env\"", etcdEnvBase64)) + if err2 != nil { + return errors.Wrap(errors.WithStack(err2), "failed to generate etcd env") + } + + etcdBin, err := tmpl.GenerateEtcdBinary(mgr, mgr.Runner.Index) + if err != nil { + return err + } + etcdBinBase64 := base64.StdEncoding.EncodeToString([]byte(etcdBin)) + _, err3 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /usr/local/bin/etcd && chmod +x /usr/local/bin/etcd\"", etcdBinBase64)) + if err3 != nil { + return errors.Wrap(errors.WithStack(err3), "failed to generate etcd bin") + } + + getEtcdCtlCmd := fmt.Sprintf("docker run --rm -v /usr/local/bin:/systembindir %s:%s /bin/cp /usr/local/bin/etcdctl /systembindir/etcdctl", kubekeyapi.DefaultEtcdRepo, kubekeyapi.DefaultEtcdVersion) + _, err4 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", getEtcdCtlCmd)) + if err4 != nil { + return errors.Wrap(errors.WithStack(err4), "failed to get etcdctl") + } + + _, err5 := mgr.Runner.RunCmd("sudo -E /bin/sh -c \"systemctl daemon-reload && systemctl restart etcd\"") + if err5 != nil { + return errors.Wrap(errors.WithStack(err5), "failed to start etcd") + } + + addrList := []string{} + for _, host := range mgr.EtcdNodes.Hosts { + addrList = append(addrList, fmt.Sprintf("https://%s:2379", host.InternalAddress)) + } + checkHealthCmd := fmt.Sprintf("export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s cluster-health | grep -q 'cluster is healthy'", node.HostName, node.HostName, etcdBinDir, strings.Join(addrList, ",")) + if mgr.Runner.Index == 0 { + for i := 20; i > 0; i-- { + _, err := mgr.Runner.RunCmd(checkHealthCmd) + if err != nil { + fmt.Println("Waiting for etcd to start") + if i == 1 { + return errors.Wrap(errors.WithStack(err), "failed to start etcd") + } + } else { + break + } + time.Sleep(time.Second * 5) + } + } else { + checkMemberCmd := fmt.Sprintf("export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --no-sync --endpoints=%s member list | grep -q %s", node.HostName, node.HostName, etcdBinDir, strings.Join(addrList, ","), fmt.Sprintf("https://%s:2379", node.InternalAddress)) + _, err := mgr.Runner.RunCmd(checkMemberCmd) + if err != nil { + joinMemberCmd := fmt.Sprintf("export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s member add %s %s", node.HostName, node.HostName, etcdBinDir, strings.Join(addrList, ","), fmt.Sprintf("etcd%d", mgr.Runner.Index+1), fmt.Sprintf("https://%s:2380", node.InternalAddress)) + _, err := mgr.Runner.RunCmd(joinMemberCmd) + if err != nil { + fmt.Println("failed to add etcd member") + } + } + } + + for i := 20; i > 0; i-- { + _, err := mgr.Runner.RunCmd(checkHealthCmd) + if err != nil { + fmt.Println("Waiting for etcd to start") + if i == 1 { + return errors.Wrap(errors.WithStack(err), "failed to start etcd") + } + } else { + break + } + time.Sleep(time.Second * 5) + } + + reloadEtcdEnvCmd := "sed -i '/ETCD_INITIAL_CLUSTER_STATE/s/\\:.*/\\: existing/g' /etc/etcd.env && systemctl daemon-reload && systemctl restart etcd" + _, err6 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", reloadEtcdEnvCmd)) + if err6 != nil { + return errors.Wrap(errors.WithStack(err6), "failed to reload etcd env") + } + + for i := 20; i > 0; i-- { + _, err := mgr.Runner.RunCmd(checkHealthCmd) + if err != nil { + fmt.Println("Waiting for etcd to start") + if i == 1 { + return errors.Wrap(errors.WithStack(err), "failed to start etcd") + } + } else { + break + } + time.Sleep(time.Second * 5) + } + + return nil +} diff --git a/cluster/etcd/tmpl/certs.go b/cluster/etcd/tmpl/certs.go new file mode 100644 index 00000000..ec8974ca --- /dev/null +++ b/cluster/etcd/tmpl/certs.go @@ -0,0 +1,198 @@ +package tmpl + +import ( + "github.com/lithammer/dedent" + kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1" + "github.com/pixiake/kubekey/util" + "strings" + "text/template" +) + +func Add(a int, b int) int { + return a + b +} + +var ( + funcMap = template.FuncMap{"Add": Add} + EtcdSslCfgTempl = template.Must(template.New("etcdSslCfg").Funcs(funcMap).Parse( + dedent.Dedent(`[req] +req_extensions = v3_req +distinguished_name = req_distinguished_name + +[req_distinguished_name] + +[ v3_req ] +basicConstraints = CA:FALSE +keyUsage = nonRepudiation, digitalSignature, keyEncipherment +subjectAltName = @alt_names + +[ ssl_client ] +extendedKeyUsage = clientAuth, serverAuth +basicConstraints = CA:FALSE +subjectKeyIdentifier=hash +authorityKeyIdentifier=keyid,issuer +subjectAltName = @alt_names + +[ v3_ca ] +basicConstraints = CA:TRUE +keyUsage = nonRepudiation, digitalSignature, keyEncipherment +subjectAltName = @alt_names +authorityKeyIdentifier=keyid:always,issuer + +[alt_names] +{{- range $i, $v := .Dns }} +DNS.{{ Add $i 1 }} = {{ $v }} +{{- end }} +{{- range $i, $v := .Ips }} +IP.{{ Add $i 1 }} = {{ $v }} +{{- end }} + + `))) + + EtcdSslTempl = template.Must(template.New("etcdSsl").Parse( + dedent.Dedent(`#!/bin/bash + +# Author: Smana smainklh@gmail.com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o pipefail +usage() +{ + cat << EOF +Create self signed certificates + +Usage : $(basename $0) -f [-d ] + -h | --help : Show this message + -f | --config : Openssl configuration file + -d | --ssldir : Directory where the certificates will be installed + + ex : + $(basename $0) -f openssl.conf -d /srv/ssl +EOF +} + +# Options parsing +while (($#)); do + case "$1" in + -h | --help) usage; exit 0;; + -f | --config) CONFIG=${2}; shift 2;; + -d | --ssldir) SSLDIR="${2}"; shift 2;; + *) + usage + echo "ERROR : Unknown option" + exit 3 + ;; + esac +done + +if [ -z ${CONFIG} ]; then + echo "ERROR: the openssl configuration file is missing. option -f" + exit 1 +fi +if [ -z ${SSLDIR} ]; then + SSLDIR="/etc/ssl/etcd" +fi + +tmpdir=$(mktemp -d /tmp/etcd_cacert.XXXXXX) +trap 'rm -rf "${tmpdir}"' EXIT +cd "${tmpdir}" + +mkdir -p "${SSLDIR}" + +# Root CA +if [ -e "$SSLDIR/ca-key.pem" ]; then + # Reuse existing CA + cp $SSLDIR/{ca.pem,ca-key.pem} . +else + openssl genrsa -out ca-key.pem 2048 > /dev/null 2>&1 + openssl req -x509 -new -nodes -key ca-key.pem -days 36500 -out ca.pem -subj "/CN=etcd-ca" > /dev/null 2>&1 +fi + +MASTERS='{{ .Masters }}' +HOSTS='{{ .Hosts }}' + +# ETCD member +if [ -n "$MASTERS" ]; then + for host in $MASTERS; do + cn="${host%%.*}" + # Member key + openssl genrsa -out member-${host}-key.pem 2048 > /dev/null 2>&1 + openssl req -new -key member-${host}-key.pem -out member-${host}.csr -subj "/CN=etcd-member-${cn}" -config ${CONFIG} > /dev/null 2>&1 + openssl x509 -req -in member-${host}.csr -CA ca.pem -CAkey ca-key.pem -CAcreateserial -out member-${host}.pem -days 36500 -extensions ssl_client -extfile ${CONFIG} > /dev/null 2>&1 + + # Admin key + openssl genrsa -out admin-${host}-key.pem 2048 > /dev/null 2>&1 + openssl req -new -key admin-${host}-key.pem -out admin-${host}.csr -subj "/CN=etcd-admin-${cn}" > /dev/null 2>&1 + openssl x509 -req -in admin-${host}.csr -CA ca.pem -CAkey ca-key.pem -CAcreateserial -out admin-${host}.pem -days 36500 -extensions ssl_client -extfile ${CONFIG} > /dev/null 2>&1 + done +fi + +# Node keys +if [ -n "$HOSTS" ]; then + for host in $HOSTS; do + cn="${host%%.*}" + openssl genrsa -out node-${host}-key.pem 2048 > /dev/null 2>&1 + openssl req -new -key node-${host}-key.pem -out node-${host}.csr -subj "/CN=etcd-node-${cn}" > /dev/null 2>&1 + openssl x509 -req -in node-${host}.csr -CA ca.pem -CAkey ca-key.pem -CAcreateserial -out node-${host}.pem -days 36500 -extensions ssl_client -extfile ${CONFIG} > /dev/null 2>&1 + done +fi + +# Install certs +if [ -e "$SSLDIR/ca-key.pem" ]; then + # No pass existing CA + rm -f ca.pem ca-key.pem +fi + +mv *.pem ${SSLDIR}/ + `))) +) + +func GenerateEtcdSslCfg(cfg *kubekeyapi.ClusterCfg) (string, error) { + dnsList := []string{"localhost", "etcd.kube-system.svc.cluster.local", "etcd.kube-system.svc", "etcd.kube-system", "etcd"} + ipList := []string{"127.0.0.1"} + + if cfg.LBKubeApiserver.Domain == "" { + dnsList = append(dnsList, kubekeyapi.DefaultLBDomain) + } else { + dnsList = append(dnsList, cfg.LBKubeApiserver.Domain) + } + + for _, host := range cfg.Hosts { + dnsList = append(dnsList, host.HostName) + ipList = append(ipList, host.InternalAddress) + } + + return util.Render(EtcdSslCfgTempl, util.Data{ + "Dns": dnsList, + "Ips": ipList, + }) +} + +func GenerateEtcdSslScript(cfg *kubekeyapi.ClusterCfg) (string, error) { + var masters []string + var hosts []string + _, etcdNodes, masterNodes, _, _ := cfg.GroupHosts() + + for _, host := range etcdNodes.Hosts { + masters = append(masters, host.HostName) + } + for _, host := range masterNodes.Hosts { + hosts = append(hosts, host.HostName) + } + return util.Render(EtcdSslTempl, util.Data{ + "Masters": strings.Join(masters, " "), + "Hosts": strings.Join(hosts, " "), + }) +} diff --git a/cluster/etcd/tmpl/etcd.go b/cluster/etcd/tmpl/etcd.go new file mode 100644 index 00000000..442a58c3 --- /dev/null +++ b/cluster/etcd/tmpl/etcd.go @@ -0,0 +1,116 @@ +package tmpl + +import ( + "fmt" + "github.com/lithammer/dedent" + kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1" + "github.com/pixiake/kubekey/util" + "github.com/pixiake/kubekey/util/manager" + "strings" + "text/template" +) + +var ( + EtcdServiceTempl = template.Must(template.New("EtcdService").Parse( + dedent.Dedent(`[Unit] +Description=etcd docker wrapper +Wants=docker.socket +After=docker.service + +[Service] +User=root +PermissionsStartOnly=true +EnvironmentFile=-/etc/etcd.env +ExecStart=/usr/local/bin/etcd +ExecStartPre=-/usr/bin/docker rm -f {{ .Name }} +ExecStop=/usr/bin/docker stop {{ .Name }} +Restart=always +RestartSec=15s +TimeoutStartSec=30s + +[Install] +WantedBy=multi-user.target + `))) + + EtcdEnvTempl = template.Must(template.New("etcdEnv").Parse( + dedent.Dedent(`# Environment file for etcd {{ .Tag }} +ETCD_DATA_DIR=/var/lib/etcd +ETCD_ADVERTISE_CLIENT_URLS=https://{{ .Ip }}:2379 +ETCD_INITIAL_ADVERTISE_PEER_URLS=https://{{ .Ip }}:2380 +ETCD_INITIAL_CLUSTER_STATE=new +ETCD_METRICS=basic +ETCD_LISTEN_CLIENT_URLS=https://{{ .Ip }}:2379,https://127.0.0.1:2379 +ETCD_ELECTION_TIMEOUT=5000 +ETCD_HEARTBEAT_INTERVAL=250 +ETCD_INITIAL_CLUSTER_TOKEN=k8s_etcd +ETCD_LISTEN_PEER_URLS=https://{{ .Ip }}:2380 +ETCD_NAME={{ .Name }} +ETCD_PROXY=off +ETCD_INITIAL_CLUSTER={{ .Endpoints }} +ETCD_AUTO_COMPACTION_RETENTION=8 +ETCD_SNAPSHOT_COUNT=10000 + +# TLS settings +ETCD_TRUSTED_CA_FILE=/etc/ssl/etcd/ssl/ca.pem +ETCD_CERT_FILE=/etc/ssl/etcd/ssl/member-{{ .Hostname }}.pem +ETCD_KEY_FILE=/etc/ssl/etcd/ssl/member-{{ .Hostname }}-key.pem +ETCD_CLIENT_CERT_AUTH=true + +ETCD_PEER_TRUSTED_CA_FILE=/etc/ssl/etcd/ssl/ca.pem +ETCD_PEER_CERT_FILE=/etc/ssl/etcd/ssl/member-{{ .Hostname }}.pem +ETCD_PEER_KEY_FILE=/etc/ssl/etcd/ssl/member-{{ .Hostname }}-key.pem +ETCD_PEER_CLIENT_CERT_AUTH=True + +# CLI settings +ETCDCTL_ENDPOINTS=https://127.0.0.1:2379 +ETCDCTL_CA_FILE=/etc/ssl/etcd/ssl/ca.pem +ETCDCTL_KEY_FILE=/etc/ssl/etcd/ssl/admin-{{ .Hostname }}-key.pem +ETCDCTL_CERT_FILE=/etc/ssl/etcd/ssl/admin-{{ .Hostname }}.pem + `))) + + EtcdTempl = template.Must(template.New("etcd").Parse( + dedent.Dedent(`#!/bin/bash +/usr/bin/docker run \ + --restart=on-failure:5 \ + --env-file=/etc/etcd.env \ + --net=host \ + -v /etc/ssl/certs:/etc/ssl/certs:ro \ + -v /etc/ssl/etcd/ssl:/etc/ssl/etcd/ssl:ro \ + -v /var/lib/etcd:/var/lib/etcd:rw \ + --memory=512M \ + --blkio-weight=1000 \ + --name={{ .Name }} \ + {{ .Repo }}:{{ .Tag }} \ + /usr/local/bin/etcd \ + "$@" + `))) +) + +func GenerateEtcdBinary(mgr *manager.Manager, index int) (string, error) { + return util.Render(EtcdTempl, util.Data{ + "Name": fmt.Sprintf("etcd%d", index+1), + "Repo": kubekeyapi.DefaultEtcdRepo, + "Tag": kubekeyapi.DefaultEtcdVersion, + }) +} + +func GenerateEtcdService(mgr *manager.Manager, index int) (string, error) { + return util.Render(EtcdServiceTempl, util.Data{ + "Name": fmt.Sprintf("etcd%d", index+1), + }) +} + +func GenerateEtcdEnv(mgr *manager.Manager, node *kubekeyapi.HostCfg, index int) (string, error) { + endpoints := []string{} + for index, host := range mgr.EtcdNodes.Hosts { + endpoints = append(endpoints, fmt.Sprintf("etcd%d=https://%s:2380", index+1, host.InternalAddress)) + } + + return util.Render(EtcdEnvTempl, util.Data{ + "Tag": kubekeyapi.DefaultEtcdVersion, + "Name": fmt.Sprintf("etcd%d", index+1), + "Ip": node.InternalAddress, + "Hostname": node.HostName, + "Endpoints": strings.Join(endpoints, ","), + }) +} diff --git a/cluster/kubernetes/master.go b/cluster/kubernetes/master.go index cb00899d..1a5ea9e8 100644 --- a/cluster/kubernetes/master.go +++ b/cluster/kubernetes/master.go @@ -1,5 +1,33 @@ package kubernetes -func SyncKubeFiles() { +import ( + "encoding/base64" + "fmt" + kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1" + "github.com/pixiake/kubekey/cluster/kubernetes/tmpl" + "github.com/pixiake/kubekey/util/manager" + "github.com/pixiake/kubekey/util/ssh" + "github.com/pkg/errors" +) +func InitKubernetesCluster(mgr *manager.Manager) error { + mgr.Logger.Infoln("Init kubernetes cluster") + + return mgr.RunTaskOnMasterNodes(initKubernetesCluster, true) +} + +func initKubernetesCluster(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error { + if mgr.Runner.Index == 0 { + kubeadmCfg, err := tmpl.GenerateKubeadmCfg(mgr) + if err != nil { + return err + } + kubeadmCfgBase64 := base64.StdEncoding.EncodeToString([]byte(kubeadmCfg)) + _, err1 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"mkdir -p /etc/kubernetes && echo %s | base64 -d > /etc/kubernetes/kubeadm-config.yaml\"", kubeadmCfgBase64)) + if err1 != nil { + return errors.Wrap(errors.WithStack(err1), "failed to generate kubeadm config") + } + } + + return nil } diff --git a/cluster/kubernetes/nodes.go b/cluster/kubernetes/nodes.go index 3bb76fb6..788a60ff 100644 --- a/cluster/kubernetes/nodes.go +++ b/cluster/kubernetes/nodes.go @@ -16,7 +16,7 @@ import ( func SyncKubeBinaries(mgr *manager.Manager) error { mgr.Logger.Infoln("Syncing kube binaries……") - return mgr.RunTaskOnAllNodes(syncKubeBinaries, true) + return mgr.RunTaskOnK8sNodes(syncKubeBinaries, true) } func syncKubeBinaries(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error { @@ -51,7 +51,7 @@ func syncKubeBinaries(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.C } } cmd := strings.Join(cmdlist, " && ") - _, err3 := mgr.Runner.RunRaw(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", cmd)) + _, err3 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", cmd)) if err3 != nil { return errors.Wrap(errors.WithStack(err3), fmt.Sprintf("failed to create kubelet link")) } @@ -70,7 +70,7 @@ func setKubelet(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connect return err1 } kubeletServiceBase64 := base64.StdEncoding.EncodeToString([]byte(kubeletService)) - _, err2 := mgr.Runner.RunRaw(fmt.Sprintf("echo %s | base64 -d > %s/kubelet.service", kubeletServiceBase64, "/tmp/kubekey")) + _, err2 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /etc/systemd/system/kubelet.service && ln -snf /usr/local/bin/kubelet /usr/bin/kubelet\"", kubeletServiceBase64)) if err2 != nil { return errors.Wrap(errors.WithStack(err2), "failed to generate kubelet service") } @@ -80,15 +80,10 @@ func setKubelet(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connect return err3 } kubeletEnvBase64 := base64.StdEncoding.EncodeToString([]byte(kubeletEnv)) - _, err4 := mgr.Runner.RunRaw(fmt.Sprintf("echo %s | base64 -d > %s/10-kubeadm.conf", kubeletEnvBase64, "/tmp/kubekey")) + _, err4 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"mkdir -p /etc/systemd/system/kubelet.service.d && echo %s | base64 -d > /etc/systemd/system/kubelet.service.d/10-kubeadm.conf\"", kubeletEnvBase64)) if err4 != nil { return errors.Wrap(errors.WithStack(err2), "failed to generate kubelet env") } - _, err5 := mgr.Runner.RunRaw("sudo -E /bin/sh -c \"cp -f /tmp/kubekey/kubelet.service /etc/systemd/system && mkdir -p /etc/systemd/system/kubelet.service.d && cp -f /tmp/kubekey/10-kubeadm.conf /etc/systemd/system/kubelet.service.d\"") - if err5 != nil { - return errors.Wrap(errors.WithStack(err2), "failed to configure kubelet service") - } - return nil } diff --git a/cluster/kubernetes/tmpl/kubeadm.go b/cluster/kubernetes/tmpl/kubeadm.go index e693b3d1..96cd6e6f 100644 --- a/cluster/kubernetes/tmpl/kubeadm.go +++ b/cluster/kubernetes/tmpl/kubeadm.go @@ -1 +1,114 @@ package tmpl + +import ( + "fmt" + "github.com/lithammer/dedent" + kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1" + "github.com/pixiake/kubekey/util" + "github.com/pixiake/kubekey/util/manager" + "text/template" +) + +var KubeadmCfgTempl = template.Must(template.New("kubeadmCfg").Parse( + dedent.Dedent(`--- +apiVersion: kubeadm.k8s.io/v1beta2 +kind: ClusterConfiguration +etcd: + external: + endpoints: + {{- range .ExternalEtcd.Endpoints }} + - {{ . }} + {{- end }} + caFile: {{ .ExternalEtcd.CaFile }} + certFile: {{ .ExternalEtcd.CertFile }} + keyFile: {{ .ExternalEtcd.KeyFile }} +dns: + type: CoreDNS + imageRepository: coredns + imageTag: 1.6.0 +imageRepository: {{ .ImageRepo }} +kubernetesVersion: {{ .Version }} +certificatesDir: /etc/kubernetes/pki +clusterName: {{ .ClusterName }} +controlPlaneEndpoint: {{ .ControlPlaneEndpoint }} +networking: + dnsDomain: {{ .ClusterName }} + podSubnet: {{ .PodSubnet }} + serviceSubnet: {{ .ServiceSubnet }} +apiServer: + extraArgs: + authorization-mode: Node,RBAC + timeoutForControlPlane: 4m0s + certSANs: + {{- range .CertSANs }} + - {{ . }} + {{- end }} + +--- +apiVersion: kubeproxy.config.k8s.io/v1alpha1 +kind: KubeProxyConfiguration +bindAddress: 0.0.0.0 +clientConnection: + acceptContentTypes: + burst: 10 + contentType: application/vnd.kubernetes.protobuf + kubeconfig: + qps: 5 +clusterCIDR: {{ .PodSubnet }} +configSyncPeriod: 15m0s +conntrack: + maxPerCore: 32768 + min: 131072 + tcpCloseWaitTimeout: 1h0m0s + tcpEstablishedTimeout: 24h0m0s +enableProfiling: False +healthzBindAddress: 0.0.0.0:10256 +iptables: + masqueradeAll: False + masqueradeBit: 14 + minSyncPeriod: 0s + syncPeriod: 30s +ipvs: + excludeCIDRs: [] + minSyncPeriod: 0s + scheduler: rr + syncPeriod: 30s + strictARP: False +metricsBindAddress: 127.0.0.1:10249 +mode: ipvs +nodePortAddresses: [] +oomScoreAdj: -999 +portRange: +udpIdleTimeout: 250ms + `))) + +func GenerateKubeadmCfg(mgr *manager.Manager) (string, error) { + var externalEtcd kubekeyapi.ExternalEtcd + var endpointsList []string + var caFile, certFile, keyFile string + + for _, host := range mgr.EtcdNodes.Hosts { + endpoint := fmt.Sprintf("https://%s:%s", host.InternalAddress, kubekeyapi.DefaultEtcdPort) + endpointsList = append(endpointsList, endpoint) + } + externalEtcd.Endpoints = endpointsList + + caFile = "/etc/ssl/etcd/ssl/ca.pem" + certFile = fmt.Sprintf("/etc/ssl/etcd/ssl/admin-%s.pem", mgr.EtcdNodes.Hosts[0].HostName) + keyFile = fmt.Sprintf("/etc/ssl/etcd/ssl/admin-%s-key.pem", mgr.EtcdNodes.Hosts[0].HostName) + + externalEtcd.CaFile = caFile + externalEtcd.CertFile = certFile + externalEtcd.KeyFile = keyFile + + return util.Render(KubeadmCfgTempl, util.Data{ + "ImageRepo": mgr.Cluster.KubeImageRepo, + "Version": mgr.Cluster.KubeVersion, + "ClusterName": mgr.Cluster.KubeClusterName, + "ControlPlaneEndpoint": fmt.Sprintf("%s:%s", mgr.Cluster.LBKubeApiserver.Address, mgr.Cluster.LBKubeApiserver.Port), + "PodSubnet": mgr.Cluster.Network.KubePodsCIDR, + "ServiceSubnet": mgr.Cluster.Network.KubeServiceCIDR, + "CertSANs": mgr.Cluster.GenerateCertSANs(), + "ExternalEtcd": externalEtcd, + }) +} diff --git a/cluster/preinstall/initOS.go b/cluster/preinstall/initOS.go index aba7162b..d585664d 100644 --- a/cluster/preinstall/initOS.go +++ b/cluster/preinstall/initOS.go @@ -18,7 +18,7 @@ func InitOS(mgr *manager.Manager) error { func initOsOnNode(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error { tmpDir := "/tmp/kubekey" - _, err := mgr.Runner.RunRaw(fmt.Sprintf("mkdir -p %s", tmpDir)) + _, err := mgr.Runner.RunCmd(fmt.Sprintf("mkdir -p %s", tmpDir)) if err != nil { return errors.Wrap(errors.WithStack(err), "failed to init operating system") } @@ -29,12 +29,12 @@ func initOsOnNode(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Conne } str := base64.StdEncoding.EncodeToString([]byte(initOsScript)) - _, err2 := mgr.Runner.RunRaw(fmt.Sprintf("echo %s | base64 -d > %s/initOS.sh && chmod +x %s/initOS.sh", str, tmpDir, tmpDir)) + _, err2 := mgr.Runner.RunCmd(fmt.Sprintf("echo %s | base64 -d > %s/initOS.sh && chmod +x %s/initOS.sh", str, tmpDir, tmpDir)) if err2 != nil { return errors.Wrap(errors.WithStack(err2), "failed to init operating system") } - _, err3 := mgr.Runner.RunRaw(fmt.Sprintf("sudo %s/initOS.sh", tmpDir)) + _, err3 := mgr.Runner.RunCmd(fmt.Sprintf("sudo %s/initOS.sh", tmpDir)) if err3 != nil { return errors.Wrap(errors.WithStack(err3), "failed to init operating system") } diff --git a/cluster/preinstall/tmpl/InitOS.go b/cluster/preinstall/tmpl/InitOS.go index 4496368c..bf99188b 100644 --- a/cluster/preinstall/tmpl/InitOS.go +++ b/cluster/preinstall/tmpl/InitOS.go @@ -37,7 +37,7 @@ modinfo br_netfilter > /dev/null 2>&1 if [ $? -eq 0 ]; then modprobe br_netfilter mkdir -p /etc/modules-load.d - echo 'br_netfilter' > /etc/modules-load.d/kubeocean-br_netfilter.conf + echo 'br_netfilter' > /etc/modules-load.d/kubekey-br_netfilter.conf fi modprobe ip_vs diff --git a/install/executor.go b/install/executor.go index f424a6f8..188a234b 100644 --- a/install/executor.go +++ b/install/executor.go @@ -1,32 +1,17 @@ package install import ( - "github.com/pixiake/kubekey/util/manager" - ssh2 "github.com/pixiake/kubekey/util/ssh" - "github.com/sirupsen/logrus" - kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1" + "github.com/pixiake/kubekey/util/manager" + ssh "github.com/pixiake/kubekey/util/ssh" + "github.com/sirupsen/logrus" ) -// Options groups the various possible options for running -// the Kubernetes installation. -type Options struct { - Verbose bool - Manifest string - CredentialsFile string - BackupFile string - DestroyWorkers bool - RemoveBinaries bool -} - -// Installer is entrypoint for installation process type Executor struct { cluster *kubekeyapi.ClusterCfg logger *logrus.Logger } -// NewInstaller returns a new installer, responsible for dispatching -// between the different supported Kubernetes versions and running the func NewExecutor(cluster *kubekeyapi.ClusterCfg, logger *logrus.Logger) *Executor { return &Executor{ cluster: cluster, @@ -34,7 +19,6 @@ func NewExecutor(cluster *kubekeyapi.ClusterCfg, logger *logrus.Logger) *Executo } } -// Install run the installation process func (executor *Executor) Execute() error { mgr, err := executor.createManager() if err != nil { @@ -43,30 +27,18 @@ func (executor *Executor) Execute() error { return ExecTasks(mgr) } -// Reset resets cluster: -// * destroys all the worker machines -// * kubeadm reset masters -//func (i *Installer) Reset(options *Options) error { -// s, err := i.createState(options) -// if err != nil { -// return err -// } -// return installation.Reset(s) -//} - func (executor *Executor) createManager() (*manager.Manager, error) { mgr := &manager.Manager{} - + allNodes, etcdNodes, masterNodes, workerNodes, k8sNodes := executor.cluster.GroupHosts() + mgr.AllNodes = allNodes + mgr.EtcdNodes = etcdNodes + mgr.MasterNodes = masterNodes + mgr.WorkerNodes = workerNodes + mgr.K8sNodes = k8sNodes mgr.Cluster = executor.cluster - mgr.Connector = ssh2.NewConnector() - //s.Configuration = configupload.NewConfiguration() - //mgr.WorkDir = "kubekey" + mgr.Connector = ssh.NewConnector() mgr.Logger = executor.logger mgr.Verbose = true - //s.ManifestFilePath = options.Manifest - //s.CredentialsFilePath = options.CredentialsFile - //s.BackupFile = options.BackupFile - //s.DestroyWorkers = options.DestroyWorkers - //s.RemoveBinaries = options.RemoveBinaries + return mgr, nil } diff --git a/install/install.go b/install/install.go index 6cbe934a..7bbb7629 100644 --- a/install/install.go +++ b/install/install.go @@ -2,6 +2,7 @@ package install import ( "github.com/pixiake/kubekey/cluster/container-engine/docker" + "github.com/pixiake/kubekey/cluster/etcd" "github.com/pixiake/kubekey/cluster/kubernetes" "github.com/pixiake/kubekey/cluster/preinstall" "github.com/pixiake/kubekey/util/manager" @@ -14,7 +15,11 @@ func ExecTasks(mgr *manager.Manager) error { {Fn: preinstall.InitOS, ErrMsg: "failed to download kube binaries"}, {Fn: docker.InstallerDocker, ErrMsg: "failed to install docker"}, {Fn: kubernetes.SyncKubeBinaries, ErrMsg: "failed to sync kube binaries"}, + {Fn: etcd.GenerateEtcdCerts, ErrMsg: "failed to generate etcd certs"}, + {Fn: etcd.SyncEtcdCertsToMaster, ErrMsg: "failed to sync etcd certs"}, + {Fn: etcd.GenerateEtcdService, ErrMsg: "failed to start etcd cluster"}, {Fn: kubernetes.ConfigureKubeletService, ErrMsg: "failed to sync kube binaries"}, + {Fn: kubernetes.InitKubernetesCluster, ErrMsg: "failed to init kubernetes cluster"}, } for _, step := range createTasks { diff --git a/util/manager/manager.go b/util/manager/manager.go index 54e32de5..1bc6913f 100644 --- a/util/manager/manager.go +++ b/util/manager/manager.go @@ -12,35 +12,20 @@ import ( //dynclient "sigs.k8s.io/manager-runtime/pkg/client" ) -//func New() (*State, error) { -// joinToken, err := bootstraputil.GenerateBootstrapToken() -// return &State{ -// JoinToken: joinToken, -// }, err -//} - -// State holds together currently test flags and parsed info, along with -// utilities like logger type Manager struct { - Cluster *kubekeyapi.ClusterCfg - Logger logrus.FieldLogger - Connector *ssh2.Dialer - //Configuration *configupload.Configuration + Cluster *kubekeyapi.ClusterCfg + Logger logrus.FieldLogger + Connector *ssh2.Dialer Runner *runner.Runner + AllNodes *kubekeyapi.Hosts + EtcdNodes *kubekeyapi.Hosts + MasterNodes *kubekeyapi.Hosts + WorkerNodes *kubekeyapi.Hosts + K8sNodes *kubekeyapi.Hosts WorkDir string JoinCommand string JoinToken string - //RESTConfig *rest.Config - //DynamicClient dynclient.Client - Verbose bool - //BackupFile string - //DestroyWorkers bool - //RemoveBinaries bool - //ForceUpgrade bool - //UpgradeMachineDeployments bool - //PatchCNI bool - //CredentialsFilePath string - //ManifestFilePath string + Verbose bool } func (mgr *Manager) KubeadmVerboseFlag() string { @@ -50,8 +35,7 @@ func (mgr *Manager) KubeadmVerboseFlag() string { return "" } -// Clone returns a shallow copy of the State. func (mgr *Manager) Clone() *Manager { - newState := *mgr - return &newState + newManager := *mgr + return &newManager } diff --git a/util/manager/tasks.go b/util/manager/tasks.go index 35e79240..9a8d22f0 100644 --- a/util/manager/tasks.go +++ b/util/manager/tasks.go @@ -20,7 +20,7 @@ const ( // NodeTask is a task that is specifically tailored to run on a single node. type NodeTask func(mgr *Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error -func (mgr *Manager) runTask(node *kubekeyapi.HostCfg, task NodeTask, prefixed bool) error { +func (mgr *Manager) runTask(node *kubekeyapi.HostCfg, task NodeTask, prefixed bool, index int) error { var ( err error conn ssh.Connection @@ -40,9 +40,9 @@ func (mgr *Manager) runTask(node *kubekeyapi.HostCfg, task NodeTask, prefixed bo mgr.Runner = &runner.Runner{ Conn: conn, Verbose: mgr.Verbose, - //OS: node.OS, - Prefix: prefix, - Host: node, + Prefix: prefix, + Host: node, + Index: index, } return task(mgr, node, conn) @@ -81,16 +81,16 @@ func (mgr *Manager) RunTaskOnNodes(nodes []kubekeyapi.HostCfg, task NodeTask, pa if parallel { ccons <- struct{}{} wg.Add(1) - go func(mgr *Manager, node *kubekeyapi.HostCfg, result chan string) { - err = mgr.runTask(node, task, parallel) + go func(mgr *Manager, node *kubekeyapi.HostCfg, result chan string, index int) { + err = mgr.runTask(node, task, parallel, index) if err != nil { mgr.Logger.Error(err) hasErrors = true } result <- "done" - }(mgrTask, &nodes[i], result) + }(mgrTask, &nodes[i], result, i) } else { - err = mgrTask.runTask(&nodes[i], task, parallel) + err = mgrTask.runTask(&nodes[i], task, parallel, i) if err != nil { break } @@ -106,38 +106,37 @@ func (mgr *Manager) RunTaskOnNodes(nodes []kubekeyapi.HostCfg, task NodeTask, pa return err } -// RunTaskOnAllNodes runs the given task on all hosts. func (mgr *Manager) RunTaskOnAllNodes(task NodeTask, parallel bool) error { - // It's not possible to concatenate host lists in this function. - // Some of the task(determineOS, determineHostname) write to the state and sending a copy would break that. - if err := mgr.RunTaskOnNodes(mgr.Cluster.Hosts, task, parallel); err != nil { + if err := mgr.RunTaskOnNodes(mgr.AllNodes.Hosts, task, parallel); err != nil { return err } - //if s.Cluster.StaticWorkers != nil { - // return s.RunTaskOnNodes(s.Cluster.StaticWorkers, task, parallel) - //} return nil } -// RunTaskOnLeader runs the given task on the leader host. -//func (s *State) RunTaskOnLeader(task NodeTask) error { -// leader, err := s.Cluster.Leader() -// if err != nil { -// return err -// } -// -// hosts := []kubekeyapi.HostConfig{ -// leader, -// } -// -// return s.RunTaskOnNodes(hosts, task, false) -//} +func (mgr *Manager) RunTaskOnEtcdNodes(task NodeTask, parallel bool) error { + if err := mgr.RunTaskOnNodes(mgr.EtcdNodes.Hosts, task, parallel); err != nil { + return err + } + return nil +} -// RunTaskOnFollowers runs the given task on the follower hosts. -//func (s *State) RunTaskOnFollowers(task NodeTask, parallel bool) error { -// return s.RunTaskOnNodes(s.Cluster.Followers(), task, parallel) -//} -// -//func (s *State) RunTaskOnStaticWorkers(task NodeTask, parallel bool) error { -// return s.RunTaskOnNodes(s.Cluster.StaticWorkers, task, parallel) -//} +func (mgr *Manager) RunTaskOnMasterNodes(task NodeTask, parallel bool) error { + if err := mgr.RunTaskOnNodes(mgr.MasterNodes.Hosts, task, parallel); err != nil { + return err + } + return nil +} + +func (mgr *Manager) RunTaskOnWorkerNodes(task NodeTask, parallel bool) error { + if err := mgr.RunTaskOnNodes(mgr.WorkerNodes.Hosts, task, parallel); err != nil { + return err + } + return nil +} + +func (mgr *Manager) RunTaskOnK8sNodes(task NodeTask, parallel bool) error { + if err := mgr.RunTaskOnNodes(mgr.K8sNodes.Hosts, task, parallel); err != nil { + return err + } + return nil +} diff --git a/util/runner/runner.go b/util/runner/runner.go index 00b509f7..fd0676dd 100644 --- a/util/runner/runner.go +++ b/util/runner/runner.go @@ -3,29 +3,24 @@ package runner import ( "fmt" kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1" - "github.com/pixiake/kubekey/util" ssh2 "github.com/pixiake/kubekey/util/ssh" "github.com/pkg/errors" "strings" - "text/template" "time" ) -// Runner bundles a connection to a host with the verbosity and -// other options for running commands via SSH. type Runner struct { Conn ssh2.Connection Prefix string OS string Verbose bool Host *kubekeyapi.HostCfg - Result chan string + Index int } -// TemplateVariables is a render context for templates type TemplateVariables map[string]interface{} -func (r *Runner) RunRaw(cmd string) (string, error) { +func (r *Runner) RunCmd(cmd string) (string, error) { if r.Conn == nil { return "", errors.New("runner is not tied to an opened SSH connection") } @@ -37,12 +32,19 @@ func (r *Runner) RunRaw(cmd string) (string, error) { return output, nil } - if output != "" { - fmt.Printf("[%s %s] MSG:\n", r.Host.HostName, r.Host.SSHAddress) - fmt.Println(output) + if err != nil { + return "", err } - return "", err + if output != "" { + if strings.Contains(cmd, "base64") && strings.Contains(cmd, "--wrap=0") { + } else { + fmt.Printf("[%s %s] MSG:\n", r.Host.HostName, r.Host.SSHAddress) + fmt.Println(output) + } + } + + return output, nil } func (r *Runner) ScpFile(src, dst string) error { @@ -64,18 +66,6 @@ func (r *Runner) ScpFile(src, dst string) error { return nil } -// Run executes a given command/script, optionally printing its output to -// stdout/stderr. -func (r *Runner) Run(cmd string, variables TemplateVariables) (string, error) { - tmpl, _ := template.New("base").Parse(cmd) - cmd, err := util.Render(tmpl, variables) - if err != nil { - return "", err - } - - return r.RunRaw(cmd) -} - // WaitForPod waits for the availability of the given Kubernetes element. func (r *Runner) WaitForPod(namespace string, name string, timeout time.Duration) error { cmd := fmt.Sprintf(`sudo kubectl --kubeconfig=/etc/kubernetes/admin.conf -n "%s" get pod "%s" -o jsonpath='{.status.phase}' --ignore-not-found`, namespace, name) @@ -98,7 +88,7 @@ func (r *Runner) WaitForCondition(cmd string, timeout time.Duration, validator v cutoff := time.Now().Add(timeout) for time.Now().Before(cutoff) { - stdout, _ := r.Run(cmd, nil) + stdout, _ := r.RunCmd(cmd) if validator(stdout) { return true } diff --git a/util/runner/tee.go b/util/runner/tee.go deleted file mode 100644 index 9eb36c14..00000000 --- a/util/runner/tee.go +++ /dev/null @@ -1,119 +0,0 @@ -package runner - -import ( - "bytes" - "io" - "strings" - "sync" -) - -// NewTee constructor -func NewTee(wc io.WriteCloser) *Tee { - return &Tee{upstream: wc} -} - -// Tee mimics the unix `tee` command by piping its -// input through to the upstream writer and also -// capturing it in a buffer. -type Tee struct { - buffer bytes.Buffer - upstream io.WriteCloser -} - -func (t *Tee) Write(p []byte) (int, error) { - t.buffer.Write(p) - - return t.upstream.Write(p) -} - -func (t *Tee) String() string { - return strings.TrimSpace(t.buffer.String()) -} - -// Close underlying io.Closer -func (t *Tee) Close() error { - return t.upstream.Close() -} - -// Writer implements io.Writer with prefix each lines. -type Writer struct { - w io.Writer - p []byte - l sync.Mutex - b *bytes.Buffer -} - -// New creates a new prefix Writer. -func New(w io.Writer, prefix string) *Writer { - return &Writer{ - w: w, - p: []byte(prefix), - } -} - -// Write writes data to base Writer with prefix. -func (w *Writer) Write(p []byte) (int, error) { - w.l.Lock() - defer w.l.Unlock() - if w.w == nil { - return 0, io.EOF - } - - size := len(p) - if w.b != nil { - w.b.Write(p) - p = w.b.Bytes() - w.b = nil - } - - b := new(bytes.Buffer) - for len(p) > 0 { - n := bytes.IndexByte(p, '\n') - if n < 0 { - w.b = new(bytes.Buffer) - w.b.Write(p) - break - } - b.Write(w.p) - b.Write(p[:n+1]) - p = p[n+1:] - } - - if b.Len() > 0 { - _, err := b.WriteTo(w.w) - if err != nil { - return 0, err - } - } - return size, nil -} - -func (w *Writer) flush() error { - if w.w == nil { - return io.EOF - } - if w.b == nil { - return nil - } - b := new(bytes.Buffer) - b.Write(w.p) - w.b.WriteTo(b) - w.b = nil - b.WriteByte('\n') - _, err := b.WriteTo(w.w) - return err -} - -// Close flush buffered data and close Writer. -func (w *Writer) Close() error { - w.l.Lock() - defer w.l.Unlock() - if w.w == nil { - return nil - } - err := w.flush() - w.w = nil - return err -} - -var _ io.WriteCloser = &Writer{} diff --git a/util/ssh/ssh.go b/util/ssh/ssh.go index b7b84d1c..03baae63 100644 --- a/util/ssh/ssh.go +++ b/util/ssh/ssh.go @@ -264,7 +264,7 @@ func (c *connection) Exec(cmd string, host *v1alpha1.HostCfg) (string, int, erro ssh.TTY_OP_OSPEED: 14400, // output speed = 14.4kbaud } - err = sess.RequestPty("xterm", 80, 40, modes) + err = sess.RequestPty("xterm", 100, 50, modes) if err != nil { return "", 0, err } @@ -273,37 +273,41 @@ func (c *connection) Exec(cmd string, host *v1alpha1.HostCfg) (string, int, erro out, _ := sess.StdoutPipe() var output []byte - go func(in io.WriteCloser, out io.Reader, output *[]byte) { - var ( - line string - r = bufio.NewReader(out) - ) - for { - b, err := r.ReadByte() + err = sess.Start(strings.TrimSpace(cmd)) + if err != nil { + return "", 0, err + } + + var ( + line = "" + r = bufio.NewReader(out) + ) + + for { + b, err := r.ReadByte() + if err != nil { + break + } + + output = append(output, b) + + if b == byte('\n') { + line = "" + continue + } + + line += string(b) + + if (strings.HasPrefix(line, "[sudo] password for ") || strings.HasPrefix(line, "Password")) && strings.HasSuffix(line, ": ") { + _, err = stdin.Write([]byte(host.Password + "\n")) if err != nil { break } - - *output = append(*output, b) - - if b == byte('\n') { - line = "" - continue - } - - line += string(b) - - if (strings.HasPrefix(line, "[sudo] password for ") || strings.HasPrefix(line, "Password")) && strings.HasSuffix(line, ": ") { - _, err = in.Write([]byte(host.Password + "\n")) - if err != nil { - break - } - } } - }(stdin, out, &output) + } exitCode := 0 - _, err = sess.CombinedOutput(strings.TrimSpace(cmd)) + err = sess.Wait() if err != nil { exitCode = 1 } diff --git a/util/util.go b/util/util.go index dffcc0db..22494f1c 100644 --- a/util/util.go +++ b/util/util.go @@ -1,9 +1,12 @@ package util import ( + "encoding/binary" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "net" "os" + "strconv" "strings" "text/template" ) @@ -66,3 +69,130 @@ func Render(tmpl *template.Template, variables map[string]interface{}) (string, } return buf.String(), nil } + +func ParseIp(ip string) []string { + var availableIPs []string + // if ip is "1.1.1.1/",trim / + ip = strings.TrimRight(ip, "/") + if strings.Contains(ip, "/") == true { + if strings.Contains(ip, "/32") == true { + aip := strings.Replace(ip, "/32", "", -1) + availableIPs = append(availableIPs, aip) + } else { + availableIPs = GetAvailableIP(ip) + } + } else if strings.Contains(ip, "-") == true { + ipRange := strings.SplitN(ip, "-", 2) + availableIPs = GetAvailableIPRange(ipRange[0], ipRange[1]) + } else { + availableIPs = append(availableIPs, ip) + } + return availableIPs +} + +func GetAvailableIPRange(ipStart, ipEnd string) []string { + var availableIPs []string + + firstIP := net.ParseIP(ipStart) + endIP := net.ParseIP(ipEnd) + if firstIP.To4() == nil || endIP.To4() == nil { + return availableIPs + } + firstIPNum := ipToInt(firstIP.To4()) + EndIPNum := ipToInt(endIP.To4()) + pos := int32(1) + + newNum := firstIPNum + + for newNum <= EndIPNum { + availableIPs = append(availableIPs, intToIP(newNum).String()) + newNum = newNum + pos + } + return availableIPs +} + +func GetAvailableIP(ipAndMask string) []string { + var availableIPs []string + + ipAndMask = strings.TrimSpace(ipAndMask) + ipAndMask = IPAddressToCIDR(ipAndMask) + _, ipnet, _ := net.ParseCIDR(ipAndMask) + + firstIP, _ := networkRange(ipnet) + ipNum := ipToInt(firstIP) + size := networkSize(ipnet.Mask) + pos := int32(1) + max := size - 2 // -1 for the broadcast address, -1 for the gateway address + + var newNum int32 + for attempt := int32(0); attempt < max; attempt++ { + newNum = ipNum + pos + pos = pos%max + 1 + availableIPs = append(availableIPs, intToIP(newNum).String()) + } + return availableIPs +} + +func IPAddressToCIDR(ipAdress string) string { + if strings.Contains(ipAdress, "/") == true { + ipAndMask := strings.Split(ipAdress, "/") + ip := ipAndMask[0] + mask := ipAndMask[1] + if strings.Contains(mask, ".") == true { + mask = IPMaskStringToCIDR(mask) + } + return ip + "/" + mask + } else { + return ipAdress + } +} + +func IPMaskStringToCIDR(netmask string) string { + netmaskList := strings.Split(netmask, ".") + var mint []int + for _, v := range netmaskList { + strv, _ := strconv.Atoi(v) + mint = append(mint, strv) + } + myIPMask := net.IPv4Mask(byte(mint[0]), byte(mint[1]), byte(mint[2]), byte(mint[3])) + ones, _ := myIPMask.Size() + return strconv.Itoa(ones) +} + +func IPMaskCIDRToString(one string) string { + oneInt, _ := strconv.Atoi(one) + mIPmask := net.CIDRMask(oneInt, 32) + var maskstring []string + for _, v := range mIPmask { + maskstring = append(maskstring, strconv.Itoa(int(v))) + } + return strings.Join(maskstring, ".") +} + +func networkRange(network *net.IPNet) (net.IP, net.IP) { + netIP := network.IP.To4() + firstIP := netIP.Mask(network.Mask) + lastIP := net.IPv4(0, 0, 0, 0).To4() + for i := 0; i < len(lastIP); i++ { + lastIP[i] = netIP[i] | ^network.Mask[i] + } + return firstIP, lastIP +} + +func networkSize(mask net.IPMask) int32 { + m := net.IPv4Mask(0, 0, 0, 0) + for i := 0; i < net.IPv4len; i++ { + m[i] = ^mask[i] + } + return int32(binary.BigEndian.Uint32(m)) + 1 +} + +func ipToInt(ip net.IP) int32 { + return int32(binary.BigEndian.Uint32(ip.To4())) +} + +func intToIP(n int32) net.IP { + b := make([]byte, 4) + binary.BigEndian.PutUint32(b, uint32(n)) + return net.IP(b) +}