add install cluster

This commit is contained in:
pixiake 2020-04-18 00:00:46 +08:00
parent 3cb237961a
commit 9b2f417241
18 changed files with 968 additions and 286 deletions

View File

@ -2,6 +2,7 @@ package v1alpha1
import (
"fmt"
"github.com/pixiake/kubekey/util"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
@ -81,6 +82,13 @@ func addDefaultingFuncs(scheme *runtime.Scheme) error {
return RegisterDefaults(scheme)
}
type ExternalEtcd struct {
Endpoints []string
CaFile string
CertFile string
KeyFile string
}
func RegisterDefaults(scheme *runtime.Scheme) error {
scheme.AddTypeDefaultingFunc(&ClusterCfg{}, func(obj interface{}) { SetDefaultClusterCfg(obj.(*ClusterCfg)) })
return nil
@ -106,3 +114,28 @@ func (cfg *ClusterCfg) GenerateHosts() []string {
hostsList = append(hostsList, lbHost)
return hostsList
}
func (cfg *ClusterCfg) GenerateCertSANs() []string {
clusterSvc := fmt.Sprintf("kubernetes.default.svc.%s", cfg.KubeClusterName)
defaultCertSANs := []string{"kubernetes", "kubernetes.default", "kubernetes.default.svc", clusterSvc, "localhost", "127.0.0.1"}
extraCertSANs := []string{}
extraCertSANs = append(extraCertSANs, cfg.LBKubeApiserver.Domain)
extraCertSANs = append(extraCertSANs, cfg.LBKubeApiserver.Address)
for _, host := range cfg.Hosts {
extraCertSANs = append(extraCertSANs, host.HostName)
extraCertSANs = append(extraCertSANs, fmt.Sprintf("%s.%s", host.HostName, cfg.KubeClusterName))
if host.SSHAddress != cfg.LBKubeApiserver.Address {
extraCertSANs = append(extraCertSANs, host.SSHAddress)
}
if host.InternalAddress != host.SSHAddress && host.InternalAddress != cfg.LBKubeApiserver.Address {
extraCertSANs = append(extraCertSANs, host.InternalAddress)
}
}
extraCertSANs = append(extraCertSANs, util.ParseIp(cfg.Network.KubeServiceCIDR)[0])
defaultCertSANs = append(defaultCertSANs, extraCertSANs...)
return defaultCertSANs
}

View File

@ -15,7 +15,7 @@ func InstallerDocker(mgr *manager.Manager) error {
func installDockerOnNode(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
cmd := "sudo sh -c \"[ -z $(which docker) ] && curl https://raw.githubusercontent.com/pixiake/kubeocean/master/scripts/docker-install.sh | sh ; systemctl enable docker\""
_, err := mgr.Runner.RunRaw(cmd)
_, err := mgr.Runner.RunCmd(cmd)
if err != nil {
return errors.Wrap(errors.WithStack(err), "failed to install docker")
}

View File

@ -1 +1,235 @@
package etcd
import (
"encoding/base64"
"fmt"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/cluster/etcd/tmpl"
"github.com/pixiake/kubekey/util/manager"
"github.com/pixiake/kubekey/util/ssh"
"github.com/pkg/errors"
"strings"
"time"
)
var (
certsStr = make(chan map[string]string)
certsContent = map[string]string{}
etcdBackupPrefix = "/var/backups"
etcdDataDir = "/var/lib/etcd"
etcdConfigDir = "/etc/ssl/etcd"
etcdCertDir = "/etc/ssl/etcd/ssl"
etcdBinDir = "/usr/local/bin"
)
func GenerateEtcdCerts(mgr *manager.Manager) error {
mgr.Logger.Infoln("Generate etcd certs")
return mgr.RunTaskOnEtcdNodes(generateCerts, true)
}
func generateCerts(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
if mgr.Runner.Index == 0 {
certsScript, err := tmpl.GenerateEtcdSslScript(mgr.Cluster)
if err != nil {
return err
}
certsScriptBase64 := base64.StdEncoding.EncodeToString([]byte(certsScript))
_, err1 := mgr.Runner.RunCmd(fmt.Sprintf("echo %s | base64 -d > /tmp/kubekey/make-ssl-etcd.sh && chmod +x /tmp/kubekey/make-ssl-etcd.sh", certsScriptBase64))
if err1 != nil {
return errors.Wrap(errors.WithStack(err1), "failed to generate etcd certs script")
}
certsOpensslCfg, err := tmpl.GenerateEtcdSslCfg(mgr.Cluster)
if err != nil {
return err
}
certsOpensslCfgBase64 := base64.StdEncoding.EncodeToString([]byte(certsOpensslCfg))
_, err2 := mgr.Runner.RunCmd(fmt.Sprintf("echo %s | base64 -d > /tmp/kubekey/openssl.conf", certsOpensslCfgBase64))
if err2 != nil {
return errors.Wrap(errors.WithStack(err2), "failed to generate etcd certs script")
}
cmd := fmt.Sprintf("mkdir -p %s && /bin/bash -x %s/make-ssl-etcd.sh -f %s/openssl.conf -d %s", etcdCertDir, "/tmp/kubekey", "/tmp/kubekey", etcdCertDir)
_, err3 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", cmd))
if err3 != nil {
return errors.Wrap(errors.WithStack(err3), "failed to generate etcd certs")
}
for _, cert := range generateCertsFiles(mgr) {
certsBase64Cmd := fmt.Sprintf("cat %s/%s | base64 --wrap=0", etcdCertDir, cert)
certsBase64, err4 := mgr.Runner.RunCmd(certsBase64Cmd)
if err4 != nil {
return errors.Wrap(errors.WithStack(err4), "failed to get etcd certs content")
}
certsContent[cert] = certsBase64
}
for i := 1; i <= len(mgr.EtcdNodes.Hosts)-1; i++ {
certsStr <- certsContent
}
} else {
mgr.Runner.RunCmd(fmt.Sprintf("sudo mkdir -p %s", etcdCertDir))
for file, cert := range <-certsStr {
writeCertCmd := fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > %s/%s\"", cert, etcdCertDir, file)
_, err4 := mgr.Runner.RunCmd(writeCertCmd)
if err4 != nil {
return errors.Wrap(errors.WithStack(err4), "failed to write etcd certs content")
}
}
}
return nil
}
func generateCertsFiles(mgr *manager.Manager) []string {
var certsList []string
certsList = append(certsList, "ca.pem")
certsList = append(certsList, "ca-key.pem")
for _, host := range mgr.EtcdNodes.Hosts {
certsList = append(certsList, fmt.Sprintf("admin-%s.pem", host.HostName))
certsList = append(certsList, fmt.Sprintf("admin-%s-key.pem", host.HostName))
certsList = append(certsList, fmt.Sprintf("member-%s.pem", host.HostName))
certsList = append(certsList, fmt.Sprintf("member-%s-key.pem", host.HostName))
}
for _, host := range mgr.MasterNodes.Hosts {
certsList = append(certsList, fmt.Sprintf("node-%s.pem", host.HostName))
certsList = append(certsList, fmt.Sprintf("node-%s-key.pem", host.HostName))
}
return certsList
}
func SyncEtcdCertsToMaster(mgr *manager.Manager) error {
mgr.Logger.Infoln("Sync etcd certs")
return mgr.RunTaskOnMasterNodes(syncEtcdCertsToMaster, true)
}
func syncEtcdCertsToMaster(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
if !node.IsEtcd {
mgr.Runner.RunCmd(fmt.Sprintf("sudo mkdir -p %s", etcdCertDir))
for file, cert := range certsContent {
writeCertCmd := fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > %s/%s\"", cert, etcdCertDir, file)
_, err := mgr.Runner.RunCmd(writeCertCmd)
if err != nil {
return errors.Wrap(errors.WithStack(err), "failed to sync etcd certs to master")
}
}
}
return nil
}
func GenerateEtcdService(mgr *manager.Manager) error {
mgr.Logger.Infoln("Start etcd cluster")
return mgr.RunTaskOnEtcdNodes(generateEtcdService, true)
}
func generateEtcdService(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
etcdService, err := tmpl.GenerateEtcdService(mgr, mgr.Runner.Index)
if err != nil {
return err
}
etcdServiceBase64 := base64.StdEncoding.EncodeToString([]byte(etcdService))
_, err1 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /etc/systemd/system/etcd.service\"", etcdServiceBase64))
if err1 != nil {
return errors.Wrap(errors.WithStack(err1), "failed to generate etcd service")
}
etcdEnv, err := tmpl.GenerateEtcdEnv(mgr, node, mgr.Runner.Index)
if err != nil {
return err
}
etcdEnvBase64 := base64.StdEncoding.EncodeToString([]byte(etcdEnv))
_, err2 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /etc/etcd.env\"", etcdEnvBase64))
if err2 != nil {
return errors.Wrap(errors.WithStack(err2), "failed to generate etcd env")
}
etcdBin, err := tmpl.GenerateEtcdBinary(mgr, mgr.Runner.Index)
if err != nil {
return err
}
etcdBinBase64 := base64.StdEncoding.EncodeToString([]byte(etcdBin))
_, err3 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /usr/local/bin/etcd && chmod +x /usr/local/bin/etcd\"", etcdBinBase64))
if err3 != nil {
return errors.Wrap(errors.WithStack(err3), "failed to generate etcd bin")
}
getEtcdCtlCmd := fmt.Sprintf("docker run --rm -v /usr/local/bin:/systembindir %s:%s /bin/cp /usr/local/bin/etcdctl /systembindir/etcdctl", kubekeyapi.DefaultEtcdRepo, kubekeyapi.DefaultEtcdVersion)
_, err4 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", getEtcdCtlCmd))
if err4 != nil {
return errors.Wrap(errors.WithStack(err4), "failed to get etcdctl")
}
_, err5 := mgr.Runner.RunCmd("sudo -E /bin/sh -c \"systemctl daemon-reload && systemctl restart etcd\"")
if err5 != nil {
return errors.Wrap(errors.WithStack(err5), "failed to start etcd")
}
addrList := []string{}
for _, host := range mgr.EtcdNodes.Hosts {
addrList = append(addrList, fmt.Sprintf("https://%s:2379", host.InternalAddress))
}
checkHealthCmd := fmt.Sprintf("export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s cluster-health | grep -q 'cluster is healthy'", node.HostName, node.HostName, etcdBinDir, strings.Join(addrList, ","))
if mgr.Runner.Index == 0 {
for i := 20; i > 0; i-- {
_, err := mgr.Runner.RunCmd(checkHealthCmd)
if err != nil {
fmt.Println("Waiting for etcd to start")
if i == 1 {
return errors.Wrap(errors.WithStack(err), "failed to start etcd")
}
} else {
break
}
time.Sleep(time.Second * 5)
}
} else {
checkMemberCmd := fmt.Sprintf("export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --no-sync --endpoints=%s member list | grep -q %s", node.HostName, node.HostName, etcdBinDir, strings.Join(addrList, ","), fmt.Sprintf("https://%s:2379", node.InternalAddress))
_, err := mgr.Runner.RunCmd(checkMemberCmd)
if err != nil {
joinMemberCmd := fmt.Sprintf("export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s member add %s %s", node.HostName, node.HostName, etcdBinDir, strings.Join(addrList, ","), fmt.Sprintf("etcd%d", mgr.Runner.Index+1), fmt.Sprintf("https://%s:2380", node.InternalAddress))
_, err := mgr.Runner.RunCmd(joinMemberCmd)
if err != nil {
fmt.Println("failed to add etcd member")
}
}
}
for i := 20; i > 0; i-- {
_, err := mgr.Runner.RunCmd(checkHealthCmd)
if err != nil {
fmt.Println("Waiting for etcd to start")
if i == 1 {
return errors.Wrap(errors.WithStack(err), "failed to start etcd")
}
} else {
break
}
time.Sleep(time.Second * 5)
}
reloadEtcdEnvCmd := "sed -i '/ETCD_INITIAL_CLUSTER_STATE/s/\\:.*/\\: existing/g' /etc/etcd.env && systemctl daemon-reload && systemctl restart etcd"
_, err6 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", reloadEtcdEnvCmd))
if err6 != nil {
return errors.Wrap(errors.WithStack(err6), "failed to reload etcd env")
}
for i := 20; i > 0; i-- {
_, err := mgr.Runner.RunCmd(checkHealthCmd)
if err != nil {
fmt.Println("Waiting for etcd to start")
if i == 1 {
return errors.Wrap(errors.WithStack(err), "failed to start etcd")
}
} else {
break
}
time.Sleep(time.Second * 5)
}
return nil
}

198
cluster/etcd/tmpl/certs.go Normal file
View File

@ -0,0 +1,198 @@
package tmpl
import (
"github.com/lithammer/dedent"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util"
"strings"
"text/template"
)
func Add(a int, b int) int {
return a + b
}
var (
funcMap = template.FuncMap{"Add": Add}
EtcdSslCfgTempl = template.Must(template.New("etcdSslCfg").Funcs(funcMap).Parse(
dedent.Dedent(`[req]
req_extensions = v3_req
distinguished_name = req_distinguished_name
[req_distinguished_name]
[ v3_req ]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names
[ ssl_client ]
extendedKeyUsage = clientAuth, serverAuth
basicConstraints = CA:FALSE
subjectKeyIdentifier=hash
authorityKeyIdentifier=keyid,issuer
subjectAltName = @alt_names
[ v3_ca ]
basicConstraints = CA:TRUE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names
authorityKeyIdentifier=keyid:always,issuer
[alt_names]
{{- range $i, $v := .Dns }}
DNS.{{ Add $i 1 }} = {{ $v }}
{{- end }}
{{- range $i, $v := .Ips }}
IP.{{ Add $i 1 }} = {{ $v }}
{{- end }}
`)))
EtcdSslTempl = template.Must(template.New("etcdSsl").Parse(
dedent.Dedent(`#!/bin/bash
# Author: Smana smainklh@gmail.com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -o errexit
set -o pipefail
usage()
{
cat << EOF
Create self signed certificates
Usage : $(basename $0) -f <config> [-d <ssldir>]
-h | --help : Show this message
-f | --config : Openssl configuration file
-d | --ssldir : Directory where the certificates will be installed
ex :
$(basename $0) -f openssl.conf -d /srv/ssl
EOF
}
# Options parsing
while (($#)); do
case "$1" in
-h | --help) usage; exit 0;;
-f | --config) CONFIG=${2}; shift 2;;
-d | --ssldir) SSLDIR="${2}"; shift 2;;
*)
usage
echo "ERROR : Unknown option"
exit 3
;;
esac
done
if [ -z ${CONFIG} ]; then
echo "ERROR: the openssl configuration file is missing. option -f"
exit 1
fi
if [ -z ${SSLDIR} ]; then
SSLDIR="/etc/ssl/etcd"
fi
tmpdir=$(mktemp -d /tmp/etcd_cacert.XXXXXX)
trap 'rm -rf "${tmpdir}"' EXIT
cd "${tmpdir}"
mkdir -p "${SSLDIR}"
# Root CA
if [ -e "$SSLDIR/ca-key.pem" ]; then
# Reuse existing CA
cp $SSLDIR/{ca.pem,ca-key.pem} .
else
openssl genrsa -out ca-key.pem 2048 > /dev/null 2>&1
openssl req -x509 -new -nodes -key ca-key.pem -days 36500 -out ca.pem -subj "/CN=etcd-ca" > /dev/null 2>&1
fi
MASTERS='{{ .Masters }}'
HOSTS='{{ .Hosts }}'
# ETCD member
if [ -n "$MASTERS" ]; then
for host in $MASTERS; do
cn="${host%%.*}"
# Member key
openssl genrsa -out member-${host}-key.pem 2048 > /dev/null 2>&1
openssl req -new -key member-${host}-key.pem -out member-${host}.csr -subj "/CN=etcd-member-${cn}" -config ${CONFIG} > /dev/null 2>&1
openssl x509 -req -in member-${host}.csr -CA ca.pem -CAkey ca-key.pem -CAcreateserial -out member-${host}.pem -days 36500 -extensions ssl_client -extfile ${CONFIG} > /dev/null 2>&1
# Admin key
openssl genrsa -out admin-${host}-key.pem 2048 > /dev/null 2>&1
openssl req -new -key admin-${host}-key.pem -out admin-${host}.csr -subj "/CN=etcd-admin-${cn}" > /dev/null 2>&1
openssl x509 -req -in admin-${host}.csr -CA ca.pem -CAkey ca-key.pem -CAcreateserial -out admin-${host}.pem -days 36500 -extensions ssl_client -extfile ${CONFIG} > /dev/null 2>&1
done
fi
# Node keys
if [ -n "$HOSTS" ]; then
for host in $HOSTS; do
cn="${host%%.*}"
openssl genrsa -out node-${host}-key.pem 2048 > /dev/null 2>&1
openssl req -new -key node-${host}-key.pem -out node-${host}.csr -subj "/CN=etcd-node-${cn}" > /dev/null 2>&1
openssl x509 -req -in node-${host}.csr -CA ca.pem -CAkey ca-key.pem -CAcreateserial -out node-${host}.pem -days 36500 -extensions ssl_client -extfile ${CONFIG} > /dev/null 2>&1
done
fi
# Install certs
if [ -e "$SSLDIR/ca-key.pem" ]; then
# No pass existing CA
rm -f ca.pem ca-key.pem
fi
mv *.pem ${SSLDIR}/
`)))
)
func GenerateEtcdSslCfg(cfg *kubekeyapi.ClusterCfg) (string, error) {
dnsList := []string{"localhost", "etcd.kube-system.svc.cluster.local", "etcd.kube-system.svc", "etcd.kube-system", "etcd"}
ipList := []string{"127.0.0.1"}
if cfg.LBKubeApiserver.Domain == "" {
dnsList = append(dnsList, kubekeyapi.DefaultLBDomain)
} else {
dnsList = append(dnsList, cfg.LBKubeApiserver.Domain)
}
for _, host := range cfg.Hosts {
dnsList = append(dnsList, host.HostName)
ipList = append(ipList, host.InternalAddress)
}
return util.Render(EtcdSslCfgTempl, util.Data{
"Dns": dnsList,
"Ips": ipList,
})
}
func GenerateEtcdSslScript(cfg *kubekeyapi.ClusterCfg) (string, error) {
var masters []string
var hosts []string
_, etcdNodes, masterNodes, _, _ := cfg.GroupHosts()
for _, host := range etcdNodes.Hosts {
masters = append(masters, host.HostName)
}
for _, host := range masterNodes.Hosts {
hosts = append(hosts, host.HostName)
}
return util.Render(EtcdSslTempl, util.Data{
"Masters": strings.Join(masters, " "),
"Hosts": strings.Join(hosts, " "),
})
}

116
cluster/etcd/tmpl/etcd.go Normal file
View File

@ -0,0 +1,116 @@
package tmpl
import (
"fmt"
"github.com/lithammer/dedent"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util"
"github.com/pixiake/kubekey/util/manager"
"strings"
"text/template"
)
var (
EtcdServiceTempl = template.Must(template.New("EtcdService").Parse(
dedent.Dedent(`[Unit]
Description=etcd docker wrapper
Wants=docker.socket
After=docker.service
[Service]
User=root
PermissionsStartOnly=true
EnvironmentFile=-/etc/etcd.env
ExecStart=/usr/local/bin/etcd
ExecStartPre=-/usr/bin/docker rm -f {{ .Name }}
ExecStop=/usr/bin/docker stop {{ .Name }}
Restart=always
RestartSec=15s
TimeoutStartSec=30s
[Install]
WantedBy=multi-user.target
`)))
EtcdEnvTempl = template.Must(template.New("etcdEnv").Parse(
dedent.Dedent(`# Environment file for etcd {{ .Tag }}
ETCD_DATA_DIR=/var/lib/etcd
ETCD_ADVERTISE_CLIENT_URLS=https://{{ .Ip }}:2379
ETCD_INITIAL_ADVERTISE_PEER_URLS=https://{{ .Ip }}:2380
ETCD_INITIAL_CLUSTER_STATE=new
ETCD_METRICS=basic
ETCD_LISTEN_CLIENT_URLS=https://{{ .Ip }}:2379,https://127.0.0.1:2379
ETCD_ELECTION_TIMEOUT=5000
ETCD_HEARTBEAT_INTERVAL=250
ETCD_INITIAL_CLUSTER_TOKEN=k8s_etcd
ETCD_LISTEN_PEER_URLS=https://{{ .Ip }}:2380
ETCD_NAME={{ .Name }}
ETCD_PROXY=off
ETCD_INITIAL_CLUSTER={{ .Endpoints }}
ETCD_AUTO_COMPACTION_RETENTION=8
ETCD_SNAPSHOT_COUNT=10000
# TLS settings
ETCD_TRUSTED_CA_FILE=/etc/ssl/etcd/ssl/ca.pem
ETCD_CERT_FILE=/etc/ssl/etcd/ssl/member-{{ .Hostname }}.pem
ETCD_KEY_FILE=/etc/ssl/etcd/ssl/member-{{ .Hostname }}-key.pem
ETCD_CLIENT_CERT_AUTH=true
ETCD_PEER_TRUSTED_CA_FILE=/etc/ssl/etcd/ssl/ca.pem
ETCD_PEER_CERT_FILE=/etc/ssl/etcd/ssl/member-{{ .Hostname }}.pem
ETCD_PEER_KEY_FILE=/etc/ssl/etcd/ssl/member-{{ .Hostname }}-key.pem
ETCD_PEER_CLIENT_CERT_AUTH=True
# CLI settings
ETCDCTL_ENDPOINTS=https://127.0.0.1:2379
ETCDCTL_CA_FILE=/etc/ssl/etcd/ssl/ca.pem
ETCDCTL_KEY_FILE=/etc/ssl/etcd/ssl/admin-{{ .Hostname }}-key.pem
ETCDCTL_CERT_FILE=/etc/ssl/etcd/ssl/admin-{{ .Hostname }}.pem
`)))
EtcdTempl = template.Must(template.New("etcd").Parse(
dedent.Dedent(`#!/bin/bash
/usr/bin/docker run \
--restart=on-failure:5 \
--env-file=/etc/etcd.env \
--net=host \
-v /etc/ssl/certs:/etc/ssl/certs:ro \
-v /etc/ssl/etcd/ssl:/etc/ssl/etcd/ssl:ro \
-v /var/lib/etcd:/var/lib/etcd:rw \
--memory=512M \
--blkio-weight=1000 \
--name={{ .Name }} \
{{ .Repo }}:{{ .Tag }} \
/usr/local/bin/etcd \
"$@"
`)))
)
func GenerateEtcdBinary(mgr *manager.Manager, index int) (string, error) {
return util.Render(EtcdTempl, util.Data{
"Name": fmt.Sprintf("etcd%d", index+1),
"Repo": kubekeyapi.DefaultEtcdRepo,
"Tag": kubekeyapi.DefaultEtcdVersion,
})
}
func GenerateEtcdService(mgr *manager.Manager, index int) (string, error) {
return util.Render(EtcdServiceTempl, util.Data{
"Name": fmt.Sprintf("etcd%d", index+1),
})
}
func GenerateEtcdEnv(mgr *manager.Manager, node *kubekeyapi.HostCfg, index int) (string, error) {
endpoints := []string{}
for index, host := range mgr.EtcdNodes.Hosts {
endpoints = append(endpoints, fmt.Sprintf("etcd%d=https://%s:2380", index+1, host.InternalAddress))
}
return util.Render(EtcdEnvTempl, util.Data{
"Tag": kubekeyapi.DefaultEtcdVersion,
"Name": fmt.Sprintf("etcd%d", index+1),
"Ip": node.InternalAddress,
"Hostname": node.HostName,
"Endpoints": strings.Join(endpoints, ","),
})
}

View File

@ -1,5 +1,33 @@
package kubernetes
func SyncKubeFiles() {
import (
"encoding/base64"
"fmt"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/cluster/kubernetes/tmpl"
"github.com/pixiake/kubekey/util/manager"
"github.com/pixiake/kubekey/util/ssh"
"github.com/pkg/errors"
)
func InitKubernetesCluster(mgr *manager.Manager) error {
mgr.Logger.Infoln("Init kubernetes cluster")
return mgr.RunTaskOnMasterNodes(initKubernetesCluster, true)
}
func initKubernetesCluster(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
if mgr.Runner.Index == 0 {
kubeadmCfg, err := tmpl.GenerateKubeadmCfg(mgr)
if err != nil {
return err
}
kubeadmCfgBase64 := base64.StdEncoding.EncodeToString([]byte(kubeadmCfg))
_, err1 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"mkdir -p /etc/kubernetes && echo %s | base64 -d > /etc/kubernetes/kubeadm-config.yaml\"", kubeadmCfgBase64))
if err1 != nil {
return errors.Wrap(errors.WithStack(err1), "failed to generate kubeadm config")
}
}
return nil
}

View File

@ -16,7 +16,7 @@ import (
func SyncKubeBinaries(mgr *manager.Manager) error {
mgr.Logger.Infoln("Syncing kube binaries……")
return mgr.RunTaskOnAllNodes(syncKubeBinaries, true)
return mgr.RunTaskOnK8sNodes(syncKubeBinaries, true)
}
func syncKubeBinaries(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
@ -51,7 +51,7 @@ func syncKubeBinaries(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.C
}
}
cmd := strings.Join(cmdlist, " && ")
_, err3 := mgr.Runner.RunRaw(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", cmd))
_, err3 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", cmd))
if err3 != nil {
return errors.Wrap(errors.WithStack(err3), fmt.Sprintf("failed to create kubelet link"))
}
@ -70,7 +70,7 @@ func setKubelet(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connect
return err1
}
kubeletServiceBase64 := base64.StdEncoding.EncodeToString([]byte(kubeletService))
_, err2 := mgr.Runner.RunRaw(fmt.Sprintf("echo %s | base64 -d > %s/kubelet.service", kubeletServiceBase64, "/tmp/kubekey"))
_, err2 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"echo %s | base64 -d > /etc/systemd/system/kubelet.service && ln -snf /usr/local/bin/kubelet /usr/bin/kubelet\"", kubeletServiceBase64))
if err2 != nil {
return errors.Wrap(errors.WithStack(err2), "failed to generate kubelet service")
}
@ -80,15 +80,10 @@ func setKubelet(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connect
return err3
}
kubeletEnvBase64 := base64.StdEncoding.EncodeToString([]byte(kubeletEnv))
_, err4 := mgr.Runner.RunRaw(fmt.Sprintf("echo %s | base64 -d > %s/10-kubeadm.conf", kubeletEnvBase64, "/tmp/kubekey"))
_, err4 := mgr.Runner.RunCmd(fmt.Sprintf("sudo -E /bin/sh -c \"mkdir -p /etc/systemd/system/kubelet.service.d && echo %s | base64 -d > /etc/systemd/system/kubelet.service.d/10-kubeadm.conf\"", kubeletEnvBase64))
if err4 != nil {
return errors.Wrap(errors.WithStack(err2), "failed to generate kubelet env")
}
_, err5 := mgr.Runner.RunRaw("sudo -E /bin/sh -c \"cp -f /tmp/kubekey/kubelet.service /etc/systemd/system && mkdir -p /etc/systemd/system/kubelet.service.d && cp -f /tmp/kubekey/10-kubeadm.conf /etc/systemd/system/kubelet.service.d\"")
if err5 != nil {
return errors.Wrap(errors.WithStack(err2), "failed to configure kubelet service")
}
return nil
}

View File

@ -1 +1,114 @@
package tmpl
import (
"fmt"
"github.com/lithammer/dedent"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util"
"github.com/pixiake/kubekey/util/manager"
"text/template"
)
var KubeadmCfgTempl = template.Must(template.New("kubeadmCfg").Parse(
dedent.Dedent(`---
apiVersion: kubeadm.k8s.io/v1beta2
kind: ClusterConfiguration
etcd:
external:
endpoints:
{{- range .ExternalEtcd.Endpoints }}
- {{ . }}
{{- end }}
caFile: {{ .ExternalEtcd.CaFile }}
certFile: {{ .ExternalEtcd.CertFile }}
keyFile: {{ .ExternalEtcd.KeyFile }}
dns:
type: CoreDNS
imageRepository: coredns
imageTag: 1.6.0
imageRepository: {{ .ImageRepo }}
kubernetesVersion: {{ .Version }}
certificatesDir: /etc/kubernetes/pki
clusterName: {{ .ClusterName }}
controlPlaneEndpoint: {{ .ControlPlaneEndpoint }}
networking:
dnsDomain: {{ .ClusterName }}
podSubnet: {{ .PodSubnet }}
serviceSubnet: {{ .ServiceSubnet }}
apiServer:
extraArgs:
authorization-mode: Node,RBAC
timeoutForControlPlane: 4m0s
certSANs:
{{- range .CertSANs }}
- {{ . }}
{{- end }}
---
apiVersion: kubeproxy.config.k8s.io/v1alpha1
kind: KubeProxyConfiguration
bindAddress: 0.0.0.0
clientConnection:
acceptContentTypes:
burst: 10
contentType: application/vnd.kubernetes.protobuf
kubeconfig:
qps: 5
clusterCIDR: {{ .PodSubnet }}
configSyncPeriod: 15m0s
conntrack:
maxPerCore: 32768
min: 131072
tcpCloseWaitTimeout: 1h0m0s
tcpEstablishedTimeout: 24h0m0s
enableProfiling: False
healthzBindAddress: 0.0.0.0:10256
iptables:
masqueradeAll: False
masqueradeBit: 14
minSyncPeriod: 0s
syncPeriod: 30s
ipvs:
excludeCIDRs: []
minSyncPeriod: 0s
scheduler: rr
syncPeriod: 30s
strictARP: False
metricsBindAddress: 127.0.0.1:10249
mode: ipvs
nodePortAddresses: []
oomScoreAdj: -999
portRange:
udpIdleTimeout: 250ms
`)))
func GenerateKubeadmCfg(mgr *manager.Manager) (string, error) {
var externalEtcd kubekeyapi.ExternalEtcd
var endpointsList []string
var caFile, certFile, keyFile string
for _, host := range mgr.EtcdNodes.Hosts {
endpoint := fmt.Sprintf("https://%s:%s", host.InternalAddress, kubekeyapi.DefaultEtcdPort)
endpointsList = append(endpointsList, endpoint)
}
externalEtcd.Endpoints = endpointsList
caFile = "/etc/ssl/etcd/ssl/ca.pem"
certFile = fmt.Sprintf("/etc/ssl/etcd/ssl/admin-%s.pem", mgr.EtcdNodes.Hosts[0].HostName)
keyFile = fmt.Sprintf("/etc/ssl/etcd/ssl/admin-%s-key.pem", mgr.EtcdNodes.Hosts[0].HostName)
externalEtcd.CaFile = caFile
externalEtcd.CertFile = certFile
externalEtcd.KeyFile = keyFile
return util.Render(KubeadmCfgTempl, util.Data{
"ImageRepo": mgr.Cluster.KubeImageRepo,
"Version": mgr.Cluster.KubeVersion,
"ClusterName": mgr.Cluster.KubeClusterName,
"ControlPlaneEndpoint": fmt.Sprintf("%s:%s", mgr.Cluster.LBKubeApiserver.Address, mgr.Cluster.LBKubeApiserver.Port),
"PodSubnet": mgr.Cluster.Network.KubePodsCIDR,
"ServiceSubnet": mgr.Cluster.Network.KubeServiceCIDR,
"CertSANs": mgr.Cluster.GenerateCertSANs(),
"ExternalEtcd": externalEtcd,
})
}

View File

@ -18,7 +18,7 @@ func InitOS(mgr *manager.Manager) error {
func initOsOnNode(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
tmpDir := "/tmp/kubekey"
_, err := mgr.Runner.RunRaw(fmt.Sprintf("mkdir -p %s", tmpDir))
_, err := mgr.Runner.RunCmd(fmt.Sprintf("mkdir -p %s", tmpDir))
if err != nil {
return errors.Wrap(errors.WithStack(err), "failed to init operating system")
}
@ -29,12 +29,12 @@ func initOsOnNode(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Conne
}
str := base64.StdEncoding.EncodeToString([]byte(initOsScript))
_, err2 := mgr.Runner.RunRaw(fmt.Sprintf("echo %s | base64 -d > %s/initOS.sh && chmod +x %s/initOS.sh", str, tmpDir, tmpDir))
_, err2 := mgr.Runner.RunCmd(fmt.Sprintf("echo %s | base64 -d > %s/initOS.sh && chmod +x %s/initOS.sh", str, tmpDir, tmpDir))
if err2 != nil {
return errors.Wrap(errors.WithStack(err2), "failed to init operating system")
}
_, err3 := mgr.Runner.RunRaw(fmt.Sprintf("sudo %s/initOS.sh", tmpDir))
_, err3 := mgr.Runner.RunCmd(fmt.Sprintf("sudo %s/initOS.sh", tmpDir))
if err3 != nil {
return errors.Wrap(errors.WithStack(err3), "failed to init operating system")
}

View File

@ -37,7 +37,7 @@ modinfo br_netfilter > /dev/null 2>&1
if [ $? -eq 0 ]; then
modprobe br_netfilter
mkdir -p /etc/modules-load.d
echo 'br_netfilter' > /etc/modules-load.d/kubeocean-br_netfilter.conf
echo 'br_netfilter' > /etc/modules-load.d/kubekey-br_netfilter.conf
fi
modprobe ip_vs

View File

@ -1,32 +1,17 @@
package install
import (
"github.com/pixiake/kubekey/util/manager"
ssh2 "github.com/pixiake/kubekey/util/ssh"
"github.com/sirupsen/logrus"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util/manager"
ssh "github.com/pixiake/kubekey/util/ssh"
"github.com/sirupsen/logrus"
)
// Options groups the various possible options for running
// the Kubernetes installation.
type Options struct {
Verbose bool
Manifest string
CredentialsFile string
BackupFile string
DestroyWorkers bool
RemoveBinaries bool
}
// Installer is entrypoint for installation process
type Executor struct {
cluster *kubekeyapi.ClusterCfg
logger *logrus.Logger
}
// NewInstaller returns a new installer, responsible for dispatching
// between the different supported Kubernetes versions and running the
func NewExecutor(cluster *kubekeyapi.ClusterCfg, logger *logrus.Logger) *Executor {
return &Executor{
cluster: cluster,
@ -34,7 +19,6 @@ func NewExecutor(cluster *kubekeyapi.ClusterCfg, logger *logrus.Logger) *Executo
}
}
// Install run the installation process
func (executor *Executor) Execute() error {
mgr, err := executor.createManager()
if err != nil {
@ -43,30 +27,18 @@ func (executor *Executor) Execute() error {
return ExecTasks(mgr)
}
// Reset resets cluster:
// * destroys all the worker machines
// * kubeadm reset masters
//func (i *Installer) Reset(options *Options) error {
// s, err := i.createState(options)
// if err != nil {
// return err
// }
// return installation.Reset(s)
//}
func (executor *Executor) createManager() (*manager.Manager, error) {
mgr := &manager.Manager{}
allNodes, etcdNodes, masterNodes, workerNodes, k8sNodes := executor.cluster.GroupHosts()
mgr.AllNodes = allNodes
mgr.EtcdNodes = etcdNodes
mgr.MasterNodes = masterNodes
mgr.WorkerNodes = workerNodes
mgr.K8sNodes = k8sNodes
mgr.Cluster = executor.cluster
mgr.Connector = ssh2.NewConnector()
//s.Configuration = configupload.NewConfiguration()
//mgr.WorkDir = "kubekey"
mgr.Connector = ssh.NewConnector()
mgr.Logger = executor.logger
mgr.Verbose = true
//s.ManifestFilePath = options.Manifest
//s.CredentialsFilePath = options.CredentialsFile
//s.BackupFile = options.BackupFile
//s.DestroyWorkers = options.DestroyWorkers
//s.RemoveBinaries = options.RemoveBinaries
return mgr, nil
}

View File

@ -2,6 +2,7 @@ package install
import (
"github.com/pixiake/kubekey/cluster/container-engine/docker"
"github.com/pixiake/kubekey/cluster/etcd"
"github.com/pixiake/kubekey/cluster/kubernetes"
"github.com/pixiake/kubekey/cluster/preinstall"
"github.com/pixiake/kubekey/util/manager"
@ -14,7 +15,11 @@ func ExecTasks(mgr *manager.Manager) error {
{Fn: preinstall.InitOS, ErrMsg: "failed to download kube binaries"},
{Fn: docker.InstallerDocker, ErrMsg: "failed to install docker"},
{Fn: kubernetes.SyncKubeBinaries, ErrMsg: "failed to sync kube binaries"},
{Fn: etcd.GenerateEtcdCerts, ErrMsg: "failed to generate etcd certs"},
{Fn: etcd.SyncEtcdCertsToMaster, ErrMsg: "failed to sync etcd certs"},
{Fn: etcd.GenerateEtcdService, ErrMsg: "failed to start etcd cluster"},
{Fn: kubernetes.ConfigureKubeletService, ErrMsg: "failed to sync kube binaries"},
{Fn: kubernetes.InitKubernetesCluster, ErrMsg: "failed to init kubernetes cluster"},
}
for _, step := range createTasks {

View File

@ -12,35 +12,20 @@ import (
//dynclient "sigs.k8s.io/manager-runtime/pkg/client"
)
//func New() (*State, error) {
// joinToken, err := bootstraputil.GenerateBootstrapToken()
// return &State{
// JoinToken: joinToken,
// }, err
//}
// State holds together currently test flags and parsed info, along with
// utilities like logger
type Manager struct {
Cluster *kubekeyapi.ClusterCfg
Logger logrus.FieldLogger
Connector *ssh2.Dialer
//Configuration *configupload.Configuration
Cluster *kubekeyapi.ClusterCfg
Logger logrus.FieldLogger
Connector *ssh2.Dialer
Runner *runner.Runner
AllNodes *kubekeyapi.Hosts
EtcdNodes *kubekeyapi.Hosts
MasterNodes *kubekeyapi.Hosts
WorkerNodes *kubekeyapi.Hosts
K8sNodes *kubekeyapi.Hosts
WorkDir string
JoinCommand string
JoinToken string
//RESTConfig *rest.Config
//DynamicClient dynclient.Client
Verbose bool
//BackupFile string
//DestroyWorkers bool
//RemoveBinaries bool
//ForceUpgrade bool
//UpgradeMachineDeployments bool
//PatchCNI bool
//CredentialsFilePath string
//ManifestFilePath string
Verbose bool
}
func (mgr *Manager) KubeadmVerboseFlag() string {
@ -50,8 +35,7 @@ func (mgr *Manager) KubeadmVerboseFlag() string {
return ""
}
// Clone returns a shallow copy of the State.
func (mgr *Manager) Clone() *Manager {
newState := *mgr
return &newState
newManager := *mgr
return &newManager
}

View File

@ -20,7 +20,7 @@ const (
// NodeTask is a task that is specifically tailored to run on a single node.
type NodeTask func(mgr *Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error
func (mgr *Manager) runTask(node *kubekeyapi.HostCfg, task NodeTask, prefixed bool) error {
func (mgr *Manager) runTask(node *kubekeyapi.HostCfg, task NodeTask, prefixed bool, index int) error {
var (
err error
conn ssh.Connection
@ -40,9 +40,9 @@ func (mgr *Manager) runTask(node *kubekeyapi.HostCfg, task NodeTask, prefixed bo
mgr.Runner = &runner.Runner{
Conn: conn,
Verbose: mgr.Verbose,
//OS: node.OS,
Prefix: prefix,
Host: node,
Prefix: prefix,
Host: node,
Index: index,
}
return task(mgr, node, conn)
@ -81,16 +81,16 @@ func (mgr *Manager) RunTaskOnNodes(nodes []kubekeyapi.HostCfg, task NodeTask, pa
if parallel {
ccons <- struct{}{}
wg.Add(1)
go func(mgr *Manager, node *kubekeyapi.HostCfg, result chan string) {
err = mgr.runTask(node, task, parallel)
go func(mgr *Manager, node *kubekeyapi.HostCfg, result chan string, index int) {
err = mgr.runTask(node, task, parallel, index)
if err != nil {
mgr.Logger.Error(err)
hasErrors = true
}
result <- "done"
}(mgrTask, &nodes[i], result)
}(mgrTask, &nodes[i], result, i)
} else {
err = mgrTask.runTask(&nodes[i], task, parallel)
err = mgrTask.runTask(&nodes[i], task, parallel, i)
if err != nil {
break
}
@ -106,38 +106,37 @@ func (mgr *Manager) RunTaskOnNodes(nodes []kubekeyapi.HostCfg, task NodeTask, pa
return err
}
// RunTaskOnAllNodes runs the given task on all hosts.
func (mgr *Manager) RunTaskOnAllNodes(task NodeTask, parallel bool) error {
// It's not possible to concatenate host lists in this function.
// Some of the task(determineOS, determineHostname) write to the state and sending a copy would break that.
if err := mgr.RunTaskOnNodes(mgr.Cluster.Hosts, task, parallel); err != nil {
if err := mgr.RunTaskOnNodes(mgr.AllNodes.Hosts, task, parallel); err != nil {
return err
}
//if s.Cluster.StaticWorkers != nil {
// return s.RunTaskOnNodes(s.Cluster.StaticWorkers, task, parallel)
//}
return nil
}
// RunTaskOnLeader runs the given task on the leader host.
//func (s *State) RunTaskOnLeader(task NodeTask) error {
// leader, err := s.Cluster.Leader()
// if err != nil {
// return err
// }
//
// hosts := []kubekeyapi.HostConfig{
// leader,
// }
//
// return s.RunTaskOnNodes(hosts, task, false)
//}
func (mgr *Manager) RunTaskOnEtcdNodes(task NodeTask, parallel bool) error {
if err := mgr.RunTaskOnNodes(mgr.EtcdNodes.Hosts, task, parallel); err != nil {
return err
}
return nil
}
// RunTaskOnFollowers runs the given task on the follower hosts.
//func (s *State) RunTaskOnFollowers(task NodeTask, parallel bool) error {
// return s.RunTaskOnNodes(s.Cluster.Followers(), task, parallel)
//}
//
//func (s *State) RunTaskOnStaticWorkers(task NodeTask, parallel bool) error {
// return s.RunTaskOnNodes(s.Cluster.StaticWorkers, task, parallel)
//}
func (mgr *Manager) RunTaskOnMasterNodes(task NodeTask, parallel bool) error {
if err := mgr.RunTaskOnNodes(mgr.MasterNodes.Hosts, task, parallel); err != nil {
return err
}
return nil
}
func (mgr *Manager) RunTaskOnWorkerNodes(task NodeTask, parallel bool) error {
if err := mgr.RunTaskOnNodes(mgr.WorkerNodes.Hosts, task, parallel); err != nil {
return err
}
return nil
}
func (mgr *Manager) RunTaskOnK8sNodes(task NodeTask, parallel bool) error {
if err := mgr.RunTaskOnNodes(mgr.K8sNodes.Hosts, task, parallel); err != nil {
return err
}
return nil
}

View File

@ -3,29 +3,24 @@ package runner
import (
"fmt"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util"
ssh2 "github.com/pixiake/kubekey/util/ssh"
"github.com/pkg/errors"
"strings"
"text/template"
"time"
)
// Runner bundles a connection to a host with the verbosity and
// other options for running commands via SSH.
type Runner struct {
Conn ssh2.Connection
Prefix string
OS string
Verbose bool
Host *kubekeyapi.HostCfg
Result chan string
Index int
}
// TemplateVariables is a render context for templates
type TemplateVariables map[string]interface{}
func (r *Runner) RunRaw(cmd string) (string, error) {
func (r *Runner) RunCmd(cmd string) (string, error) {
if r.Conn == nil {
return "", errors.New("runner is not tied to an opened SSH connection")
}
@ -37,12 +32,19 @@ func (r *Runner) RunRaw(cmd string) (string, error) {
return output, nil
}
if output != "" {
fmt.Printf("[%s %s] MSG:\n", r.Host.HostName, r.Host.SSHAddress)
fmt.Println(output)
if err != nil {
return "", err
}
return "", err
if output != "" {
if strings.Contains(cmd, "base64") && strings.Contains(cmd, "--wrap=0") {
} else {
fmt.Printf("[%s %s] MSG:\n", r.Host.HostName, r.Host.SSHAddress)
fmt.Println(output)
}
}
return output, nil
}
func (r *Runner) ScpFile(src, dst string) error {
@ -64,18 +66,6 @@ func (r *Runner) ScpFile(src, dst string) error {
return nil
}
// Run executes a given command/script, optionally printing its output to
// stdout/stderr.
func (r *Runner) Run(cmd string, variables TemplateVariables) (string, error) {
tmpl, _ := template.New("base").Parse(cmd)
cmd, err := util.Render(tmpl, variables)
if err != nil {
return "", err
}
return r.RunRaw(cmd)
}
// WaitForPod waits for the availability of the given Kubernetes element.
func (r *Runner) WaitForPod(namespace string, name string, timeout time.Duration) error {
cmd := fmt.Sprintf(`sudo kubectl --kubeconfig=/etc/kubernetes/admin.conf -n "%s" get pod "%s" -o jsonpath='{.status.phase}' --ignore-not-found`, namespace, name)
@ -98,7 +88,7 @@ func (r *Runner) WaitForCondition(cmd string, timeout time.Duration, validator v
cutoff := time.Now().Add(timeout)
for time.Now().Before(cutoff) {
stdout, _ := r.Run(cmd, nil)
stdout, _ := r.RunCmd(cmd)
if validator(stdout) {
return true
}

View File

@ -1,119 +0,0 @@
package runner
import (
"bytes"
"io"
"strings"
"sync"
)
// NewTee constructor
func NewTee(wc io.WriteCloser) *Tee {
return &Tee{upstream: wc}
}
// Tee mimics the unix `tee` command by piping its
// input through to the upstream writer and also
// capturing it in a buffer.
type Tee struct {
buffer bytes.Buffer
upstream io.WriteCloser
}
func (t *Tee) Write(p []byte) (int, error) {
t.buffer.Write(p)
return t.upstream.Write(p)
}
func (t *Tee) String() string {
return strings.TrimSpace(t.buffer.String())
}
// Close underlying io.Closer
func (t *Tee) Close() error {
return t.upstream.Close()
}
// Writer implements io.Writer with prefix each lines.
type Writer struct {
w io.Writer
p []byte
l sync.Mutex
b *bytes.Buffer
}
// New creates a new prefix Writer.
func New(w io.Writer, prefix string) *Writer {
return &Writer{
w: w,
p: []byte(prefix),
}
}
// Write writes data to base Writer with prefix.
func (w *Writer) Write(p []byte) (int, error) {
w.l.Lock()
defer w.l.Unlock()
if w.w == nil {
return 0, io.EOF
}
size := len(p)
if w.b != nil {
w.b.Write(p)
p = w.b.Bytes()
w.b = nil
}
b := new(bytes.Buffer)
for len(p) > 0 {
n := bytes.IndexByte(p, '\n')
if n < 0 {
w.b = new(bytes.Buffer)
w.b.Write(p)
break
}
b.Write(w.p)
b.Write(p[:n+1])
p = p[n+1:]
}
if b.Len() > 0 {
_, err := b.WriteTo(w.w)
if err != nil {
return 0, err
}
}
return size, nil
}
func (w *Writer) flush() error {
if w.w == nil {
return io.EOF
}
if w.b == nil {
return nil
}
b := new(bytes.Buffer)
b.Write(w.p)
w.b.WriteTo(b)
w.b = nil
b.WriteByte('\n')
_, err := b.WriteTo(w.w)
return err
}
// Close flush buffered data and close Writer.
func (w *Writer) Close() error {
w.l.Lock()
defer w.l.Unlock()
if w.w == nil {
return nil
}
err := w.flush()
w.w = nil
return err
}
var _ io.WriteCloser = &Writer{}

View File

@ -264,7 +264,7 @@ func (c *connection) Exec(cmd string, host *v1alpha1.HostCfg) (string, int, erro
ssh.TTY_OP_OSPEED: 14400, // output speed = 14.4kbaud
}
err = sess.RequestPty("xterm", 80, 40, modes)
err = sess.RequestPty("xterm", 100, 50, modes)
if err != nil {
return "", 0, err
}
@ -273,37 +273,41 @@ func (c *connection) Exec(cmd string, host *v1alpha1.HostCfg) (string, int, erro
out, _ := sess.StdoutPipe()
var output []byte
go func(in io.WriteCloser, out io.Reader, output *[]byte) {
var (
line string
r = bufio.NewReader(out)
)
for {
b, err := r.ReadByte()
err = sess.Start(strings.TrimSpace(cmd))
if err != nil {
return "", 0, err
}
var (
line = ""
r = bufio.NewReader(out)
)
for {
b, err := r.ReadByte()
if err != nil {
break
}
output = append(output, b)
if b == byte('\n') {
line = ""
continue
}
line += string(b)
if (strings.HasPrefix(line, "[sudo] password for ") || strings.HasPrefix(line, "Password")) && strings.HasSuffix(line, ": ") {
_, err = stdin.Write([]byte(host.Password + "\n"))
if err != nil {
break
}
*output = append(*output, b)
if b == byte('\n') {
line = ""
continue
}
line += string(b)
if (strings.HasPrefix(line, "[sudo] password for ") || strings.HasPrefix(line, "Password")) && strings.HasSuffix(line, ": ") {
_, err = in.Write([]byte(host.Password + "\n"))
if err != nil {
break
}
}
}
}(stdin, out, &output)
}
exitCode := 0
_, err = sess.CombinedOutput(strings.TrimSpace(cmd))
err = sess.Wait()
if err != nil {
exitCode = 1
}

View File

@ -1,9 +1,12 @@
package util
import (
"encoding/binary"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"net"
"os"
"strconv"
"strings"
"text/template"
)
@ -66,3 +69,130 @@ func Render(tmpl *template.Template, variables map[string]interface{}) (string,
}
return buf.String(), nil
}
func ParseIp(ip string) []string {
var availableIPs []string
// if ip is "1.1.1.1/",trim /
ip = strings.TrimRight(ip, "/")
if strings.Contains(ip, "/") == true {
if strings.Contains(ip, "/32") == true {
aip := strings.Replace(ip, "/32", "", -1)
availableIPs = append(availableIPs, aip)
} else {
availableIPs = GetAvailableIP(ip)
}
} else if strings.Contains(ip, "-") == true {
ipRange := strings.SplitN(ip, "-", 2)
availableIPs = GetAvailableIPRange(ipRange[0], ipRange[1])
} else {
availableIPs = append(availableIPs, ip)
}
return availableIPs
}
func GetAvailableIPRange(ipStart, ipEnd string) []string {
var availableIPs []string
firstIP := net.ParseIP(ipStart)
endIP := net.ParseIP(ipEnd)
if firstIP.To4() == nil || endIP.To4() == nil {
return availableIPs
}
firstIPNum := ipToInt(firstIP.To4())
EndIPNum := ipToInt(endIP.To4())
pos := int32(1)
newNum := firstIPNum
for newNum <= EndIPNum {
availableIPs = append(availableIPs, intToIP(newNum).String())
newNum = newNum + pos
}
return availableIPs
}
func GetAvailableIP(ipAndMask string) []string {
var availableIPs []string
ipAndMask = strings.TrimSpace(ipAndMask)
ipAndMask = IPAddressToCIDR(ipAndMask)
_, ipnet, _ := net.ParseCIDR(ipAndMask)
firstIP, _ := networkRange(ipnet)
ipNum := ipToInt(firstIP)
size := networkSize(ipnet.Mask)
pos := int32(1)
max := size - 2 // -1 for the broadcast address, -1 for the gateway address
var newNum int32
for attempt := int32(0); attempt < max; attempt++ {
newNum = ipNum + pos
pos = pos%max + 1
availableIPs = append(availableIPs, intToIP(newNum).String())
}
return availableIPs
}
func IPAddressToCIDR(ipAdress string) string {
if strings.Contains(ipAdress, "/") == true {
ipAndMask := strings.Split(ipAdress, "/")
ip := ipAndMask[0]
mask := ipAndMask[1]
if strings.Contains(mask, ".") == true {
mask = IPMaskStringToCIDR(mask)
}
return ip + "/" + mask
} else {
return ipAdress
}
}
func IPMaskStringToCIDR(netmask string) string {
netmaskList := strings.Split(netmask, ".")
var mint []int
for _, v := range netmaskList {
strv, _ := strconv.Atoi(v)
mint = append(mint, strv)
}
myIPMask := net.IPv4Mask(byte(mint[0]), byte(mint[1]), byte(mint[2]), byte(mint[3]))
ones, _ := myIPMask.Size()
return strconv.Itoa(ones)
}
func IPMaskCIDRToString(one string) string {
oneInt, _ := strconv.Atoi(one)
mIPmask := net.CIDRMask(oneInt, 32)
var maskstring []string
for _, v := range mIPmask {
maskstring = append(maskstring, strconv.Itoa(int(v)))
}
return strings.Join(maskstring, ".")
}
func networkRange(network *net.IPNet) (net.IP, net.IP) {
netIP := network.IP.To4()
firstIP := netIP.Mask(network.Mask)
lastIP := net.IPv4(0, 0, 0, 0).To4()
for i := 0; i < len(lastIP); i++ {
lastIP[i] = netIP[i] | ^network.Mask[i]
}
return firstIP, lastIP
}
func networkSize(mask net.IPMask) int32 {
m := net.IPv4Mask(0, 0, 0, 0)
for i := 0; i < net.IPv4len; i++ {
m[i] = ^mask[i]
}
return int32(binary.BigEndian.Uint32(m)) + 1
}
func ipToInt(ip net.IP) int32 {
return int32(binary.BigEndian.Uint32(ip.To4()))
}
func intToIP(n int32) net.IP {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(n))
return net.IP(b)
}