Refactor batch execution

This commit is contained in:
pixiake 2020-04-16 20:54:43 +08:00
parent 5e9c9bab4c
commit 3cb237961a
27 changed files with 966 additions and 315 deletions

View File

@ -1,31 +1,50 @@
package v1alpha1
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
const (
DefaultPreDir = "kubekey"
DefaultSSHPort = "22"
DefaultDockerSockPath = "/var/run/docker.sock"
DefaultLBPort = "6443"
DefaultLBDomain = "lb.kubesphere.local"
DefaultNetworkPlugin = "calico"
DefaultPodsCIDR = "10.233.64.0/18"
DefaultServiceCIDR = "10.233.0.0/18"
DefaultKubeImageRepo = "kubekey"
DefaultClusterName = "cluster.local"
DefaultArch = "amd64"
DefaultHostName = "allinone"
DefaultEtcdRepo = "kubekey/etcd"
DefaultEtcdVersion = "v3.3.12"
DefaultEtcdPort = "2379"
DefaultKubeVersion = "v1.17.4"
DefaultCniVersion = "v0.8.2"
DefaultHelmVersion = "v3.1.2"
ETCDRole = "etcd"
MasterRole = "master"
WorkerRole = "worker"
)
type ClusterCfg struct {
Hosts []HostCfg `yaml:"hosts" json:"hosts,omitempty"`
LBKubeApiserver LBKubeApiserverCfg `yaml:"lb_kubeapiserver" json:"lb_kubeapiserver,omitempty"`
KubeVersion string `yaml:"kube_version" json:"kube_version,omitempty"`
KubeImageRepo string `yaml:"kube_image_repo" json:"kube_image_repo,omitempty"`
KubeClusterName string `yaml:"kube_cluster_name" json:"kube_cluster_name,omitempty"`
LBKubeApiserver LBKubeApiserverCfg `yaml:"lbKubeapiserver" json:"lbKubeapiserver,omitempty"`
KubeVersion string `yaml:"kubeVersion" json:"kubeVersion,omitempty"`
KubeImageRepo string `yaml:"kubeImageRepo" json:"kubeImageRepo,omitempty"`
KubeClusterName string `yaml:"kubeClusterName" json:"kubeClusterName,omitempty"`
Network NetworkConfig `yaml:"network" json:"network,omitempty"`
}
type HostCfg struct {
HostName string `yaml:"hostName,omitempty" json:"hostName,omitempty"`
Address string `yaml:"address" json:"address,omitempty"`
Port string `yaml:"port" json:"port,omitempty"`
InternalAddress string `yaml:"internal_address" json:"internalAddress,omitempty"`
Role []string `yaml:"role" json:"role,omitempty" norman:"type=array[enum],options=etcd|worker|worker"`
//HostnameOverride string `yaml:"hostname_override" json:"hostnameOverride,omitempty"`
User string `yaml:"user" json:"user,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
//SSHAgentAuth bool `yaml:"ssh_agent_auth,omitempty" json:"sshAgentAuth,omitempty"`
//SSHKey string `yaml:"ssh_key" json:"sshKey,omitempty" norman:"type=password"`
SSHKeyPath string `yaml:"ssh_key_path" json:"sshKeyPath,omitempty"`
//SSHCert string `yaml:"ssh_cert" json:"sshCert,omitempty"`
//SSHCertPath string `yaml:"ssh_cert_path" json:"sshCertPath,omitempty"`
//Labels map[string]string `yaml:"labels" json:"labels,omitempty"`
//Taints []Taint `yaml:"taints" json:"taints,omitempty"`
ID int `json:"-"`
func (c ClusterCfg) GetObjectKind() schema.ObjectKind {
panic("implement me")
}
func (c ClusterCfg) DeepCopyObject() runtime.Object {
panic("implement me")
}
type Taint struct {
@ -57,3 +76,33 @@ type LBKubeApiserverCfg struct {
Address string `yaml:"address" json:"address,omitempty"`
Port string `yaml:"port" json:"port,omitempty"`
}
func addDefaultingFuncs(scheme *runtime.Scheme) error {
return RegisterDefaults(scheme)
}
func RegisterDefaults(scheme *runtime.Scheme) error {
scheme.AddTypeDefaultingFunc(&ClusterCfg{}, func(obj interface{}) { SetDefaultClusterCfg(obj.(*ClusterCfg)) })
return nil
}
func (cfg *ClusterCfg) GenerateHosts() []string {
var lbHost string
hostsList := []string{}
_, _, masters, _, _ := cfg.GroupHosts()
if cfg.LBKubeApiserver.Address != "" {
lbHost = fmt.Sprintf("%s %s", cfg.LBKubeApiserver.Address, cfg.LBKubeApiserver.Domain)
} else {
lbHost = fmt.Sprintf("%s %s", masters.Hosts[0].InternalAddress, DefaultLBDomain)
}
for _, host := range cfg.Hosts {
if host.HostName != "" {
hostsList = append(hostsList, fmt.Sprintf("%s %s.%s %s", host.InternalAddress, host.HostName, cfg.KubeClusterName, host.HostName))
}
}
hostsList = append(hostsList, lbHost)
return hostsList
}

View File

@ -1,66 +1,155 @@
package v1alpha1
import (
"fmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
"io/ioutil"
"os"
"path/filepath"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"strconv"
)
func GetClusterCfg(clusterCfgFile string) *ClusterCfg {
if clusterCfgFile != "" {
clusterInfo, err := ResolveClusterInfoFile(clusterCfgFile)
if err != nil {
log.Fatal("Failed to parse the configuration file: ", err)
}
return clusterInfo
} else {
clusterInfo := &ClusterCfg{
Hosts: []HostCfg{{
Role: []string{"etcd", "master", "worker"},
User: "root",
}},
Network: NetworkConfig{
Plugin: DefaultNetworkPlugin,
KubePodsCIDR: DefaultPodsCIDR,
KubeServiceCIDR: DefaultServiceCIDR,
},
}
return clusterInfo
var Scheme = runtime.NewScheme()
var Codecs = serializer.NewCodecFactory(Scheme, serializer.EnableStrict)
func LoadClusterCfg(clusterCfgPath string, logger *log.Logger) (*ClusterCfg, error) {
if len(clusterCfgPath) == 0 {
return nil, errors.New("cluster configuration path not provided")
}
cluster, err := ioutil.ReadFile(clusterCfgPath)
if err != nil {
return nil, errors.Wrap(err, "unable to read the given cluster configuration file")
}
return ParseClusterCfg(cluster)
}
func ResolveClusterInfoFile(configFile string) (*ClusterCfg, error) {
fp, err := filepath.Abs(configFile)
if err != nil {
return nil, fmt.Errorf("failed to lookup current directory name: %v", err)
}
file, err := os.Open(fp)
if err != nil {
return nil, fmt.Errorf("can not find cluster info file: %v", err)
}
defer file.Close()
clusterInfo, err := GetYamlFile(configFile)
if err != nil {
return nil, fmt.Errorf("failed to read file: %v", err)
}
return clusterInfo, nil
}
func GetYamlFile(filePath string) (*ClusterCfg, error) {
result := ClusterCfg{}
b, err := ioutil.ReadFile(filePath)
if err != nil {
func ParseClusterCfg(cluster []byte) (*ClusterCfg, error) {
initCfg := &ClusterCfg{}
if err := runtime.DecodeInto(Codecs.UniversalDecoder(), cluster, initCfg); err != nil {
return nil, err
}
//var m HostJson
err = yaml.Unmarshal(b, &result)
if err != nil {
return nil, err
}
return &result, nil
return GetDefaultClusterCfg(initCfg)
}
func GetDefaultClusterCfg(cfg *ClusterCfg) (*ClusterCfg, error) {
internalCfg := &ClusterCfg{}
// Default and convert to the internal API type
Scheme.Default(cfg)
if err := Scheme.Convert(cfg, internalCfg, nil); err != nil {
return nil, errors.Wrap(err, "unable to convert versioned to internal cluster object")
}
return internalCfg, nil
}
func SetDefaultClusterCfg(cfg *ClusterCfg) *ClusterCfg {
clusterCfg := &ClusterCfg{}
cfg.Hosts = SetDefaultHostsCfg(cfg)
cfg.LBKubeApiserver = SetDefaultLBCfg(cfg)
cfg.Network = SetDefaultNetworkCfg(cfg)
if cfg.KubeImageRepo == "" {
cfg.KubeImageRepo = DefaultKubeImageRepo
}
if cfg.KubeClusterName == "" {
cfg.KubeClusterName = DefaultClusterName
}
if cfg.KubeVersion == "" {
cfg.KubeVersion = DefaultKubeVersion
}
clusterCfg = cfg
return clusterCfg
}
func SetDefaultHostsCfg(cfg *ClusterCfg) []HostCfg {
var hostscfg []HostCfg
if len(cfg.Hosts) == 0 {
return nil
}
for index, host := range cfg.Hosts {
host.ID = index
if len(host.SSHAddress) == 0 && len(host.InternalAddress) > 0 {
host.SSHAddress = host.InternalAddress
}
if len(host.InternalAddress) == 0 && len(host.SSHAddress) > 0 {
host.InternalAddress = host.SSHAddress
}
if host.User == "" {
host.User = "root"
}
if host.Port == "" {
host.Port = strconv.Itoa(22)
}
for _, role := range host.Role {
if role == "etcd" {
host.IsEtcd = true
}
if role == "master" {
host.IsMaster = true
}
if role == "worker" {
host.IsWorker = true
}
}
hostscfg = append(hostscfg, host)
}
return hostscfg
}
func SetDefaultLBCfg(cfg *ClusterCfg) LBKubeApiserverCfg {
masterHosts := []HostCfg{}
hosts := SetDefaultHostsCfg(cfg)
for _, host := range hosts {
for _, role := range host.Role {
if role == "etcd" {
host.IsEtcd = true
}
if role == "master" {
host.IsMaster = true
}
if role == "worker" {
host.IsWorker = true
}
}
if host.IsMaster {
masterHosts = append(masterHosts, host)
}
}
if cfg.LBKubeApiserver.Address == "" {
cfg.LBKubeApiserver.Address = masterHosts[0].InternalAddress
}
if cfg.LBKubeApiserver.Domain == "" {
cfg.LBKubeApiserver.Domain = DefaultLBDomain
}
if cfg.LBKubeApiserver.Port == "" {
cfg.LBKubeApiserver.Port = DefaultLBPort
}
defaultLbCfg := cfg.LBKubeApiserver
return defaultLbCfg
}
func SetDefaultNetworkCfg(cfg *ClusterCfg) NetworkConfig {
if cfg.Network.Plugin == "" {
cfg.Network.Plugin = DefaultNetworkPlugin
}
if cfg.Network.KubePodsCIDR == "" {
cfg.Network.KubePodsCIDR = DefaultPodsCIDR
}
if cfg.Network.KubeServiceCIDR == "" {
cfg.Network.KubeServiceCIDR = DefaultServiceCIDR
}
defaultNetworkCfg := cfg.Network
return defaultNetworkCfg
}

View File

@ -1,44 +1,44 @@
package v1alpha1
const (
DefaultPreDir = "/tmp/kubekey"
DefaultSSHPort = "22"
DefaultDockerSockPath = "/var/run/docker.sock"
DefaultLBPort = "6443"
DefaultLBDomain = "lb.kubesphere.local"
DefaultNetworkPlugin = "calico"
DefaultPodsCIDR = "10.233.64.0/18"
DefaultServiceCIDR = "10.233.0.0/18"
DefaultKubeImageRepo = "kubekey"
DefaultClusterName = "cluster.local"
DefaultArch = "amd64"
DefaultHostName = "allinone"
DefaultEtcdRepo = "kubekey/etcd"
DefaultEtcdVersion = "v3.3.12"
DefaultEtcdPort = "2379"
DefaultKubeVersion = "v1.17.4"
DefaultCniVersion = "v0.8.2"
DefaultHelmVersion = "v3.1.2"
ETCDRole = "etcd"
MasterRole = "master"
WorkerRole = "worker"
)
type HostConfig struct {
ID int `json:"-"`
PublicAddress string `json:"publicAddress"`
PrivateAddress string `json:"privateAddress"`
SSHPort int `json:"sshPort"`
SSHUsername string `json:"sshUsername"`
SSHPrivateKeyFile string `json:"sshPrivateKeyFile"`
SSHAgentSocket string `json:"sshAgentSocket"`
Bastion string `json:"bastion"`
BastionPort int `json:"bastionPort"`
BastionUser string `json:"bastionUser"`
Hostname string `json:"hostname"`
IsLeader bool `json:"isLeader"`
Untaint bool `json:"untaint"`
// Information populated at the runtime
OperatingSystem string `json:"-"`
}
//const (
// DefaultPreDir = "/tmp/kubekey"
// DefaultSSHPort = "22"
// DefaultDockerSockPath = "/var/run/docker.sock"
// DefaultLBPort = "6443"
// DefaultLBDomain = "lb.kubesphere.local"
// DefaultNetworkPlugin = "calico"
// DefaultPodsCIDR = "10.233.64.0/18"
// DefaultServiceCIDR = "10.233.0.0/18"
// DefaultKubeImageRepo = "kubekey"
// DefaultClusterName = "cluster.local"
// DefaultArch = "amd64"
// DefaultHostName = "allinone"
// DefaultEtcdRepo = "kubekey/etcd"
// DefaultEtcdVersion = "v3.3.12"
// DefaultEtcdPort = "2379"
// DefaultKubeVersion = "v1.17.4"
// DefaultCniVersion = "v0.8.2"
// DefaultHelmVersion = "v3.1.2"
// ETCDRole = "etcd"
// MasterRole = "master"
// WorkerRole = "worker"
//)
//
//type HostConfig struct {
// ID int `json:"-"`
// PublicAddress string `json:"publicAddress"`
// PrivateAddress string `json:"privateAddress"`
// SSHPort int `json:"sshPort"`
// SSHUsername string `json:"sshUsername"`
// SSHPrivateKeyFile string `json:"sshPrivateKeyFile"`
// SSHAgentSocket string `json:"sshAgentSocket"`
// Bastion string `json:"bastion"`
// BastionPort int `json:"bastionPort"`
// BastionUser string `json:"bastionUser"`
// Hostname string `json:"hostname"`
// IsLeader bool `json:"isLeader"`
// Untaint bool `json:"untaint"`
//
// // Information populated at the runtime
// OperatingSystem string `json:"-"`
//}

58
apis/v1alpha1/host.go Normal file
View File

@ -0,0 +1,58 @@
package v1alpha1
type HostCfg struct {
HostName string `yaml:"hostName,omitempty" json:"hostName,omitempty"`
SSHAddress string `yaml:"sshAddress" json:"sshAddress,omitempty"`
InternalAddress string `yaml:"internalAddress" json:"internalAddress,omitempty"`
Port string `yaml:"port" json:"port,omitempty"`
User string `yaml:"user" json:"user,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
SSHKeyPath string `yaml:"sshKeyPath" json:"sshKeyPath,omitempty"`
Role []string `yaml:"role" json:"role,omitempty" norman:"type=array[enum],options=etcd|master|worker"`
ID int `json:"-"`
IsEtcd bool
IsMaster bool
IsWorker bool
OSFamily string
}
type Hosts struct {
Hosts []HostCfg
}
func (cfg *ClusterCfg) GroupHosts() (*Hosts, *Hosts, *Hosts, *Hosts, *Hosts) {
allHosts := Hosts{}
etcdHosts := Hosts{}
masterHosts := Hosts{}
workerHosts := Hosts{}
k8sHosts := Hosts{}
for _, host := range cfg.Hosts {
//clusterNode := HostCfg{}
for _, role := range host.Role {
if role == "etcd" {
host.IsEtcd = true
}
if role == "master" {
host.IsMaster = true
}
if role == "worker" {
host.IsWorker = true
}
}
if host.IsEtcd == true {
etcdHosts.Hosts = append(etcdHosts.Hosts, host)
}
if host.IsMaster == true {
masterHosts.Hosts = append(masterHosts.Hosts, host)
}
if host.IsWorker == true {
workerHosts.Hosts = append(workerHosts.Hosts, host)
}
if host.IsMaster == true || host.IsWorker == true {
k8sHosts.Hosts = append(k8sHosts.Hosts, host)
}
allHosts.Hosts = append(allHosts.Hosts, host)
}
return &allHosts, &etcdHosts, &masterHosts, &workerHosts, &k8sHosts
}

View File

@ -2,29 +2,22 @@ package docker
import (
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util/dialer/ssh"
"github.com/pixiake/kubekey/util/state"
"github.com/pixiake/kubekey/util/manager"
"github.com/pixiake/kubekey/util/ssh"
"github.com/pkg/errors"
)
func InstallerDocker(s *state.State) error {
s.Logger.Infoln("Installing docker……")
func InstallerDocker(mgr *manager.Manager) error {
mgr.Logger.Infoln("Installing docker……")
return s.RunTaskOnAllNodes(installDockerOnNode, true)
return mgr.RunTaskOnAllNodes(installDockerOnNode, true)
}
func installDockerOnNode(s *state.State, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
err := installDocker(s)
func installDockerOnNode(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
cmd := "sudo sh -c \"[ -z $(which docker) ] && curl https://raw.githubusercontent.com/pixiake/kubeocean/master/scripts/docker-install.sh | sh ; systemctl enable docker\""
_, err := mgr.Runner.RunRaw(cmd)
if err != nil {
return errors.Wrap(err, "failed to install docker")
return errors.Wrap(errors.WithStack(err), "failed to install docker")
}
return nil
}
func installDocker(s *state.State) error {
cmd := "sudo sh -c \"[ -z $(which docker) ] && curl https://raw.githubusercontent.com/pixiake/kubeocean/master/scripts/docker-install.sh | sh ; systemctl enable docker\""
//cmd := "[ -z $(which docker) ] && curl https://raw.githubusercontent.com/pixiake/kubeocean/master/scripts/docker-install.sh | sh ; systemctl enable docker"
_, _, err := s.Runner.RunRaw(cmd)
return errors.WithStack(err)
}

View File

@ -0,0 +1,5 @@
package kubernetes
func SyncKubeFiles() {
}

View File

@ -0,0 +1,94 @@
package kubernetes
import (
"encoding/base64"
"fmt"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/cluster/kubernetes/tmpl"
"github.com/pixiake/kubekey/util/manager"
"github.com/pixiake/kubekey/util/ssh"
"github.com/pkg/errors"
"os"
"path/filepath"
"strings"
)
func SyncKubeBinaries(mgr *manager.Manager) error {
mgr.Logger.Infoln("Syncing kube binaries……")
return mgr.RunTaskOnAllNodes(syncKubeBinaries, true)
}
func syncKubeBinaries(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
currentDir, err1 := filepath.Abs(filepath.Dir(os.Args[0]))
if err1 != nil {
return errors.Wrap(err1, "faild get current dir")
}
filepath := fmt.Sprintf("%s/%s", currentDir, kubekeyapi.DefaultPreDir)
kubeadm := fmt.Sprintf("kubeadm-%s", mgr.Cluster.KubeVersion)
kubelet := fmt.Sprintf("kubelet-%s", mgr.Cluster.KubeVersion)
kubectl := fmt.Sprintf("kubectl-%s", mgr.Cluster.KubeVersion)
helm := fmt.Sprintf("helm-%s", kubekeyapi.DefaultHelmVersion)
kubecni := fmt.Sprintf("cni-plugins-linux-%s-%s.tgz", kubekeyapi.DefaultArch, kubekeyapi.DefaultCniVersion)
binaryList := []string{kubeadm, kubelet, kubectl, helm, kubecni}
for _, binary := range binaryList {
err2 := mgr.Runner.ScpFile(fmt.Sprintf("%s/%s", filepath, binary), fmt.Sprintf("%s/%s", "/tmp/kubekey", binary))
if err2 != nil {
return errors.Wrap(errors.WithStack(err2), fmt.Sprintf("failed to sync binarys"))
}
}
cmdlist := []string{}
for _, binary := range binaryList {
if strings.Contains(binary, "cni-plugins-linux") {
cmdlist = append(cmdlist, fmt.Sprintf("mkdir -p /opt/cni/bin && tar -zxf %s/%s -C /opt/cni/bin", "/tmp/kubekey", binary))
} else {
cmdlist = append(cmdlist, fmt.Sprintf("cp /tmp/kubekey/%s /usr/local/bin/%s && chmod +x /usr/local/bin/%s", binary, strings.Split(binary, "-")[0], strings.Split(binary, "-")[0]))
}
}
cmd := strings.Join(cmdlist, " && ")
_, err3 := mgr.Runner.RunRaw(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", cmd))
if err3 != nil {
return errors.Wrap(errors.WithStack(err3), fmt.Sprintf("failed to create kubelet link"))
}
return nil
}
func ConfigureKubeletService(mgr *manager.Manager) error {
mgr.Logger.Infoln("Configure kubelet service……")
return mgr.RunTaskOnAllNodes(setKubelet, true)
}
func setKubelet(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
kubeletService, err1 := tmpl.GenerateKubeletService(mgr.Cluster)
if err1 != nil {
return err1
}
kubeletServiceBase64 := base64.StdEncoding.EncodeToString([]byte(kubeletService))
_, err2 := mgr.Runner.RunRaw(fmt.Sprintf("echo %s | base64 -d > %s/kubelet.service", kubeletServiceBase64, "/tmp/kubekey"))
if err2 != nil {
return errors.Wrap(errors.WithStack(err2), "failed to generate kubelet service")
}
kubeletEnv, err3 := tmpl.GenerateKubeletEnv(mgr.Cluster)
if err3 != nil {
return err3
}
kubeletEnvBase64 := base64.StdEncoding.EncodeToString([]byte(kubeletEnv))
_, err4 := mgr.Runner.RunRaw(fmt.Sprintf("echo %s | base64 -d > %s/10-kubeadm.conf", kubeletEnvBase64, "/tmp/kubekey"))
if err4 != nil {
return errors.Wrap(errors.WithStack(err2), "failed to generate kubelet env")
}
_, err5 := mgr.Runner.RunRaw("sudo -E /bin/sh -c \"cp -f /tmp/kubekey/kubelet.service /etc/systemd/system && mkdir -p /etc/systemd/system/kubelet.service.d && cp -f /tmp/kubekey/10-kubeadm.conf /etc/systemd/system/kubelet.service.d\"")
if err5 != nil {
return errors.Wrap(errors.WithStack(err2), "failed to configure kubelet service")
}
return nil
}

View File

@ -0,0 +1 @@
package tmpl

View File

@ -0,0 +1,47 @@
package tmpl
import (
"github.com/lithammer/dedent"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util"
"text/template"
)
var (
KubeletServiceTempl = template.Must(template.New("kubeletService").Parse(
dedent.Dedent(`[Unit]
Description=kubelet: The Kubernetes Node Agent
Documentation=http://kubernetes.io/docs/
[Service]
ExecStart=/usr/local/bin/kubelet
Restart=always
StartLimitInterval=0
RestartSec=10
[Install]
WantedBy=multi-user.target
`)))
KubeletEnvTempl = template.Must(template.New("kubeletEnv").Parse(
dedent.Dedent(`# Note: This dropin only works with kubeadm and kubelet v1.11+
[Service]
Environment="KUBELET_KUBECONFIG_ARGS=--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf --kubeconfig=/etc/kubernetes/kubelet.conf"
Environment="KUBELET_CONFIG_ARGS=--config=/var/lib/kubelet/config.yaml"
# This is a file that "kubeadm init" and "kubeadm join" generate at runtime, populating the KUBELET_KUBEADM_ARGS variable dynamically
EnvironmentFile=-/var/lib/kubelet/kubeadm-flags.env
# This is a file that the user can use for overrides of the kubelet args as a last resort. Preferably, the user should use
# the .NodeRegistration.KubeletExtraArgs object in the configuration files instead. KUBELET_EXTRA_ARGS should be sourced from this file.
EnvironmentFile=-/etc/default/kubelet
ExecStart=
ExecStart=/usr/local/bin/kubelet $KUBELET_KUBECONFIG_ARGS $KUBELET_CONFIG_ARGS $KUBELET_KUBEADM_ARGS $KUBELET_EXTRA_ARGS
`)))
)
func GenerateKubeletService(cfg *kubekeyapi.ClusterCfg) (string, error) {
return util.Render(KubeletServiceTempl, util.Data{})
}
func GenerateKubeletEnv(cfg *kubekeyapi.ClusterCfg) (string, error) {
return util.Render(KubeletEnvTempl, util.Data{})
}

View File

@ -0,0 +1,42 @@
package preinstall
import (
"encoding/base64"
"fmt"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/cluster/preinstall/tmpl"
"github.com/pixiake/kubekey/util/manager"
"github.com/pixiake/kubekey/util/ssh"
"github.com/pkg/errors"
)
func InitOS(mgr *manager.Manager) error {
mgr.Logger.Infoln("Initialize operating system")
return mgr.RunTaskOnAllNodes(initOsOnNode, false)
}
func initOsOnNode(mgr *manager.Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error {
tmpDir := "/tmp/kubekey"
_, err := mgr.Runner.RunRaw(fmt.Sprintf("mkdir -p %s", tmpDir))
if err != nil {
return errors.Wrap(errors.WithStack(err), "failed to init operating system")
}
initOsScript, err1 := tmpl.InitOsScript(mgr.Cluster)
if err1 != nil {
return err1
}
str := base64.StdEncoding.EncodeToString([]byte(initOsScript))
_, err2 := mgr.Runner.RunRaw(fmt.Sprintf("echo %s | base64 -d > %s/initOS.sh && chmod +x %s/initOS.sh", str, tmpDir, tmpDir))
if err2 != nil {
return errors.Wrap(errors.WithStack(err2), "failed to init operating system")
}
_, err3 := mgr.Runner.RunRaw(fmt.Sprintf("sudo %s/initOS.sh", tmpDir))
if err3 != nil {
return errors.Wrap(errors.WithStack(err3), "failed to init operating system")
}
return nil
}

View File

@ -0,0 +1,95 @@
package preinstall
import (
"fmt"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"os"
"os/exec"
"path/filepath"
)
func FilesDownloadHttp(cfg *kubekeyapi.ClusterCfg, filepath string, logger *log.Logger) error {
kubeVersion := cfg.KubeVersion
kubeadmUrl := fmt.Sprintf("https://kubernetes-release.pek3b.qingstor.com/release/%s/bin/linux/%s/kubeadm", kubeVersion, kubekeyapi.DefaultArch)
kubeletUrl := fmt.Sprintf("https://kubernetes-release.pek3b.qingstor.com/release/%s/bin/linux/%s/kubelet", kubeVersion, kubekeyapi.DefaultArch)
kubectlUrl := fmt.Sprintf("https://kubernetes-release.pek3b.qingstor.com/release/%s/bin/linux/%s/kubectl", kubeVersion, kubekeyapi.DefaultArch)
kubeCniUrl := fmt.Sprintf("https://containernetworking.pek3b.qingstor.com/plugins/releases/download/%s/cni-plugins-linux-%s-%s.tgz", kubekeyapi.DefaultCniVersion, kubekeyapi.DefaultArch, kubekeyapi.DefaultCniVersion)
HelmUrl := fmt.Sprintf("https://kubernetes-helm.pek3b.qingstor.com/linux-amd64/%s/helm", kubekeyapi.DefaultHelmVersion)
kubeadm := fmt.Sprintf("%s/kubeadm-%s", filepath, kubeVersion)
kubelet := fmt.Sprintf("%s/kubelet-%s", filepath, kubeVersion)
kubectl := fmt.Sprintf("%s/kubectl-%s", filepath, kubeVersion)
kubeCni := fmt.Sprintf("%s/cni-plugins-linux-%s-%s.tgz", filepath, kubekeyapi.DefaultArch, kubekeyapi.DefaultCniVersion)
helm := fmt.Sprintf("%s/helm-%s", filepath, kubekeyapi.DefaultHelmVersion)
getKubeadmCmd := fmt.Sprintf("curl -o %s %s", kubeadm, kubeadmUrl)
getKubeletCmd := fmt.Sprintf("curl -o %s %s", kubelet, kubeletUrl)
getKubectlCmd := fmt.Sprintf("curl -o %s %s", kubectl, kubectlUrl)
getKubeCniCmd := fmt.Sprintf("curl -o %s %s", kubeCni, kubeCniUrl)
getHelmCmd := fmt.Sprintf("curl -o %s %s", helm, HelmUrl)
logger.Info("Kubeadm being download ...")
if util.IsExist(kubeadm) == false {
if out, err := exec.Command("/bin/sh", "-c", getKubeadmCmd).CombinedOutput(); err != nil {
fmt.Println(string(out))
return errors.Wrap(err, "faild download kubeadm binary")
}
}
logger.Info("Kubelet being download ...")
if util.IsExist(kubelet) == false {
if out, err := exec.Command("/bin/sh", "-c", getKubeletCmd).CombinedOutput(); err != nil {
fmt.Println(string(out))
return errors.Wrap(err, "faild download kubelet binary")
}
}
logger.Info("Kubectl being download ...")
if util.IsExist(kubectl) == false {
if out, err := exec.Command("/bin/sh", "-c", getKubectlCmd).CombinedOutput(); err != nil {
fmt.Println(string(out))
return errors.Wrap(err, "faild download kubectl binary")
}
}
logger.Info("KubeCni being download ...")
if util.IsExist(kubeCni) == false {
if out, err := exec.Command("/bin/sh", "-c", getKubeCniCmd).CombinedOutput(); err != nil {
fmt.Println(string(out))
return errors.Wrap(err, "faild download kubecni")
}
}
logger.Info("Helm being download ...")
if util.IsExist(helm) == false {
if out, err := exec.Command("/bin/sh", "-c", getHelmCmd).CombinedOutput(); err != nil {
fmt.Println(string(out))
return errors.Wrap(err, "faild download helm binary")
}
}
return nil
}
func Prepare(cfg *kubekeyapi.ClusterCfg, logger *log.Logger) error {
logger.Info("Install Files Download")
currentDir, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
return errors.Wrap(err, "faild get current dir")
}
filepath := fmt.Sprintf("%s/%s", currentDir, kubekeyapi.DefaultPreDir)
if err := util.CreateDir(filepath); err != nil {
return errors.Wrap(err, "faild create download target dir")
}
if err := FilesDownloadHttp(cfg, filepath, logger); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,80 @@
package tmpl
import (
"github.com/lithammer/dedent"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util"
"text/template"
)
var initOsScriptTmpl = template.Must(template.New("initOS").Parse(
dedent.Dedent(`#!/bin/sh
swapoff -a
sed -i /^[^#]*swap*/s/^/\#/g /etc/fstab
echo 'net.ipv4.ip_forward = 1' >> /etc/sysctl.conf
echo 'net.bridge.bridge-nf-call-arptables = 1' >> /etc/sysctl.conf
echo 'net.bridge.bridge-nf-call-ip6tables = 1' >> /etc/sysctl.conf
echo 'net.bridge.bridge-nf-call-iptables = 1' >> /etc/sysctl.conf
echo 'net.ipv4.ip_local_reserved_ports = 30000-32767' >> /etc/sysctl.conf
sed -r -i "s@#{0,}?net.ipv4.ip_forward ?= ?(0|1)@net.ipv4.ip_forward = 1@g" /etc/sysctl.conf
sed -r -i "s@#{0,}?net.bridge.bridge-nf-call-arptables ?= ?(0|1)@net.bridge.bridge-nf-call-arptables = 1@g" /etc/sysctl.conf
sed -r -i "s@#{0,}?net.bridge.bridge-nf-call-ip6tables ?= ?(0|1)@net.bridge.bridge-nf-call-ip6tables = 1@g" /etc/sysctl.conf
sed -r -i "s@#{0,}?net.bridge.bridge-nf-call-iptables ?= ?(0|1)@net.bridge.bridge-nf-call-iptables = 1@g" /etc/sysctl.conf
sed -r -i "s@#{0,}?net.ipv4.ip_local_reserved_ports ?= ?(0|1)@net.ipv4.ip_local_reserved_ports = 30000-32767@g" /etc/sysctl.conf
awk ' !x[$0]++{print > "/etc/sysctl.conf"}' /etc/sysctl.conf
sysctl -p
systemctl stop firewald 1>/dev/null 2>/dev/null
systemctl disable firewald 1>/dev/null 2>/dev/null
systemctl stop ufw 1>/dev/null 2>/dev/null
systemctl disable ufw 1>/dev/null 2>/dev/null
modinfo br_netfilter > /dev/null 2>&1
if [ $? -eq 0 ]; then
modprobe br_netfilter
mkdir -p /etc/modules-load.d
echo 'br_netfilter' > /etc/modules-load.d/kubeocean-br_netfilter.conf
fi
modprobe ip_vs
modprobe ip_vs_rr
modprobe ip_vs_wrr
modprobe ip_vs_sh
cat > /etc/modules-load.d/kube_proxy-ipvs.conf << EOF
ip_vs
ip_vs_rr
ip_vs_wrr
ip_vs_sh
EOF
modprobe nf_conntrack_ipv4
if [ $? -eq 0 ]; then
echo 'nf_conntrack_ipv4' > /etc/modules-load.d/kube_proxy-ipvs.conf
else
modprobe nf_conntrack
echo 'nf_conntrack' > /etc/modules-load.d/kube_proxy-ipvs.conf
fi
sed -i ':a;$!{N;ba};s@# kubekey hosts BEGIN.*# kubekey hosts END@@' /etc/hosts
sed -i '/^$/N;/\n$/N;//D' /etc/hosts
cat >>/etc/hosts<<EOF
# kubekey hosts BEGIN
{{- range .Hosts }}
{{ . }}
{{- end }}
# kubekey hosts END
EOF
`)))
func InitOsScript(cfg *kubekeyapi.ClusterCfg) (string, error) {
hostlist := cfg.GenerateHosts()
return util.Render(initOsScriptTmpl, util.Data{
"Hosts": hostlist,
})
}

View File

@ -1 +0,0 @@
package install

View File

@ -1,12 +1,11 @@
package install
import (
"github.com/pixiake/kubekey/util/manager"
ssh2 "github.com/pixiake/kubekey/util/ssh"
"github.com/sirupsen/logrus"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
//"github.com/kubermatic/kubeone/pkg/configupload"
"github.com/pixiake/kubekey/util/dialer/ssh"
"github.com/pixiake/kubekey/util/state"
)
// Options groups the various possible options for running
@ -21,27 +20,27 @@ type Options struct {
}
// Installer is entrypoint for installation process
type Installer struct {
type Executor struct {
cluster *kubekeyapi.ClusterCfg
logger *logrus.Logger
}
// NewInstaller returns a new installer, responsible for dispatching
// between the different supported Kubernetes versions and running the
func NewInstaller(cluster *kubekeyapi.ClusterCfg, logger *logrus.Logger) *Installer {
return &Installer{
func NewExecutor(cluster *kubekeyapi.ClusterCfg, logger *logrus.Logger) *Executor {
return &Executor{
cluster: cluster,
logger: logger,
}
}
// Install run the installation process
func (i *Installer) Install() error {
s, err := i.createState()
func (executor *Executor) Execute() error {
mgr, err := executor.createManager()
if err != nil {
return err
}
return ExecTasks(s)
return ExecTasks(mgr)
}
// Reset resets cluster:
@ -55,23 +54,19 @@ func (i *Installer) Install() error {
// return installation.Reset(s)
//}
// createState creates a basic, non-host bound state with
// all relevant information, but *no* Runner yet. The various
// task helper functions will take care of setting up Runner
// structs for each task individually.
func (i *Installer) createState() (*state.State, error) {
s := &state.State{}
func (executor *Executor) createManager() (*manager.Manager, error) {
mgr := &manager.Manager{}
s.Cluster = i.cluster
s.Connector = ssh.NewConnector()
mgr.Cluster = executor.cluster
mgr.Connector = ssh2.NewConnector()
//s.Configuration = configupload.NewConfiguration()
s.WorkDir = "kubekey"
s.Logger = i.logger
s.Verbose = true
//mgr.WorkDir = "kubekey"
mgr.Logger = executor.logger
mgr.Verbose = true
//s.ManifestFilePath = options.Manifest
//s.CredentialsFilePath = options.CredentialsFile
//s.BackupFile = options.BackupFile
//s.DestroyWorkers = options.DestroyWorkers
//s.RemoveBinaries = options.RemoveBinaries
return s, nil
return mgr, nil
}

View File

@ -1 +0,0 @@
package install

View File

@ -1,27 +1,24 @@
package install
import (
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/cluster/container-engine/docker"
"github.com/pixiake/kubekey/util/state"
"github.com/pixiake/kubekey/cluster/kubernetes"
"github.com/pixiake/kubekey/cluster/preinstall"
"github.com/pixiake/kubekey/util/manager"
"github.com/pixiake/kubekey/util/task"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
func CreateCluster(logger *log.Logger, clusterCfgFile string, addons string, pkg string) error {
cfg := kubekeyapi.GetClusterCfg(clusterCfgFile)
//installer.NewInstaller(cluster, logger)
return NewInstaller(cfg, logger).Install()
}
func ExecTasks(s *state.State) error {
func ExecTasks(mgr *manager.Manager) error {
createTasks := []task.Task{
{Fn: docker.InstallerDocker, ErrMsg: "failed to download kube binaries"},
{Fn: preinstall.InitOS, ErrMsg: "failed to download kube binaries"},
{Fn: docker.InstallerDocker, ErrMsg: "failed to install docker"},
{Fn: kubernetes.SyncKubeBinaries, ErrMsg: "failed to sync kube binaries"},
{Fn: kubernetes.ConfigureKubeletService, ErrMsg: "failed to sync kube binaries"},
}
for _, step := range createTasks {
if err := step.Run(s); err != nil {
if err := step.Run(mgr); err != nil {
return errors.Wrap(err, step.ErrMsg)
}
}

View File

@ -1 +0,0 @@
package install

View File

@ -1 +0,0 @@
package install

View File

@ -1,15 +1,15 @@
package state
package manager
import (
ssh2 "github.com/pixiake/kubekey/util/ssh"
"github.com/sirupsen/logrus"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
//"github.com/kubermatic/kubeone/pkg/configupload"
"github.com/pixiake/kubekey/util/dialer/ssh"
"github.com/pixiake/kubekey/util/runner"
//"k8s.io/client-go/rest"
//bootstraputil "k8s.io/cluster-bootstrap/token/util"
//dynclient "sigs.k8s.io/controller-runtime/pkg/client"
//dynclient "sigs.k8s.io/manager-runtime/pkg/client"
)
//func New() (*State, error) {
@ -21,10 +21,10 @@ import (
// State holds together currently test flags and parsed info, along with
// utilities like logger
type State struct {
type Manager struct {
Cluster *kubekeyapi.ClusterCfg
Logger logrus.FieldLogger
Connector *ssh.Dialer
Connector *ssh2.Dialer
//Configuration *configupload.Configuration
Runner *runner.Runner
WorkDir string
@ -43,15 +43,15 @@ type State struct {
//ManifestFilePath string
}
func (s *State) KubeadmVerboseFlag() string {
if s.Verbose {
func (mgr *Manager) KubeadmVerboseFlag() string {
if mgr.Verbose {
return "--v=6"
}
return ""
}
// Clone returns a shallow copy of the State.
func (s *State) Clone() *State {
newState := *s
func (mgr *Manager) Clone() *Manager {
newState := *mgr
return &newState
}

View File

@ -1,70 +1,96 @@
package state
package manager
import (
"fmt"
"github.com/pixiake/kubekey/util/ssh"
"sync"
"time"
"github.com/pkg/errors"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util/dialer/ssh"
"github.com/pixiake/kubekey/util/runner"
)
// NodeTask is a task that is specifically tailored to run on a single node.
type NodeTask func(ctx *State, node *kubekeyapi.HostCfg, conn ssh.Connection) error
const (
DefaultCon = 10
Timeout = 600
)
func (s *State) runTask(node *kubekeyapi.HostCfg, task NodeTask, prefixed bool) error {
// NodeTask is a task that is specifically tailored to run on a single node.
type NodeTask func(mgr *Manager, node *kubekeyapi.HostCfg, conn ssh.Connection) error
func (mgr *Manager) runTask(node *kubekeyapi.HostCfg, task NodeTask, prefixed bool) error {
var (
err error
conn ssh.Connection
)
// connect to the host (and do not close connection
// because we want to re-use it for future task)
conn, err = s.Connector.Connect(*node)
conn, err = mgr.Connector.Connect(*node)
if err != nil {
return errors.Wrapf(err, "failed to connect to %s", node.Address)
return errors.Wrapf(err, "failed to connect to %s", node.SSHAddress)
}
prefix := ""
if prefixed {
prefix = fmt.Sprintf("[%s] ", node.Address)
prefix = fmt.Sprintf("[%s] ", node.HostName)
}
s.Runner = &runner.Runner{
mgr.Runner = &runner.Runner{
Conn: conn,
Verbose: s.Verbose,
Verbose: mgr.Verbose,
//OS: node.OS,
Prefix: prefix,
Host: node,
}
return task(s, node, conn)
return task(mgr, node, conn)
}
// RunTaskOnNodes runs the given task on the given selection of hosts.
func (s *State) RunTaskOnNodes(nodes []kubekeyapi.HostCfg, task NodeTask, parallel bool) error {
func (mgr *Manager) RunTaskOnNodes(nodes []kubekeyapi.HostCfg, task NodeTask, parallel bool) error {
var err error
wg := sync.WaitGroup{}
hasErrors := false
for i := range nodes {
ctx := s.Clone()
ctx.Logger = ctx.Logger.WithField("node", nodes[i].Address)
wg := &sync.WaitGroup{}
result := make(chan string)
ccons := make(chan struct{}, DefaultCon)
defer close(result)
defer close(ccons)
hostNum := len(nodes)
if parallel {
wg.Add(1)
go func(ctx *State, node *kubekeyapi.HostCfg) {
err = ctx.runTask(node, task, parallel)
if err != nil {
ctx.Logger.Error(err)
hasErrors = true
if parallel {
go func(result chan string, ccons chan struct{}) {
for i := 0; i < hostNum; i++ {
select {
case <-result:
case <-time.After(time.Second * Timeout):
fmt.Sprintf("getSSHClient error,SSH-Read-TimeOut,Timeout=%ds", Timeout)
}
wg.Done()
}(ctx, &nodes[i])
<-ccons
}
}(result, ccons)
}
for i := range nodes {
mgrTask := mgr.Clone()
mgrTask.Logger = mgrTask.Logger.WithField("node", nodes[i].SSHAddress)
if parallel {
ccons <- struct{}{}
wg.Add(1)
go func(mgr *Manager, node *kubekeyapi.HostCfg, result chan string) {
err = mgr.runTask(node, task, parallel)
if err != nil {
mgr.Logger.Error(err)
hasErrors = true
}
result <- "done"
}(mgrTask, &nodes[i], result)
} else {
err = ctx.runTask(&nodes[i], task, parallel)
err = mgrTask.runTask(&nodes[i], task, parallel)
if err != nil {
break
}
@ -81,10 +107,10 @@ func (s *State) RunTaskOnNodes(nodes []kubekeyapi.HostCfg, task NodeTask, parall
}
// RunTaskOnAllNodes runs the given task on all hosts.
func (s *State) RunTaskOnAllNodes(task NodeTask, parallel bool) error {
func (mgr *Manager) RunTaskOnAllNodes(task NodeTask, parallel bool) error {
// It's not possible to concatenate host lists in this function.
// Some of the task(determineOS, determineHostname) write to the state and sending a copy would break that.
if err := s.RunTaskOnNodes(s.Cluster.Hosts, task, parallel); err != nil {
if err := mgr.RunTaskOnNodes(mgr.Cluster.Hosts, task, parallel); err != nil {
return err
}
//if s.Cluster.StaticWorkers != nil {

View File

@ -1,30 +0,0 @@
package runner
import (
"strings"
"text/template"
"github.com/pkg/errors"
)
type Data map[string]interface{}
// Render text template with given `variables` Render-context
func Render(cmd string, variables map[string]interface{}) (string, error) {
tpl, err := template.New("base").Parse(cmd)
if err != nil {
return "", errors.Wrap(err, "failed to parse script template")
}
var buf strings.Builder
buf.WriteString(`set -xeu pipefail`)
buf.WriteString("\n\n")
buf.WriteString(`export "PATH=$PATH:/sbin:/usr/local/bin:/opt/bin"`)
buf.WriteString("\n\n")
if err := tpl.Execute(&buf, variables); err != nil {
return "", errors.Wrap(err, "failed to render script template")
}
return buf.String(), nil
}

View File

@ -2,58 +2,75 @@ package runner
import (
"fmt"
"github.com/pixiake/kubekey/util/dialer/ssh"
kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1"
"github.com/pixiake/kubekey/util"
ssh2 "github.com/pixiake/kubekey/util/ssh"
"github.com/pkg/errors"
"os"
"strings"
"text/template"
"time"
)
// Runner bundles a connection to a host with the verbosity and
// other options for running commands via SSH.
type Runner struct {
Conn ssh.Connection
Conn ssh2.Connection
Prefix string
OS string
Verbose bool
Host *kubekeyapi.HostCfg
Result chan string
}
// TemplateVariables is a render context for templates
type TemplateVariables map[string]interface{}
func (r *Runner) RunRaw(cmd string) (string, string, error) {
func (r *Runner) RunRaw(cmd string) (string, error) {
if r.Conn == nil {
return "", "", errors.New("runner is not tied to an opened SSH connection")
return "", errors.New("runner is not tied to an opened SSH connection")
}
output, _, err := r.Conn.Exec(cmd, r.Host)
if !r.Verbose {
stdout, stderr, _, err := r.Conn.Exec(cmd)
if err != nil {
err = errors.Wrap(err, stderr)
return "", err
}
return stdout, stderr, err
return output, nil
}
fmt.Println(r.Prefix)
stdout := NewTee(New(os.Stdout, r.Prefix))
defer stdout.Close()
stderr := NewTee(New(os.Stderr, r.Prefix))
defer stderr.Close()
if output != "" {
fmt.Printf("[%s %s] MSG:\n", r.Host.HostName, r.Host.SSHAddress)
fmt.Println(output)
}
// run the command
_, err := r.Conn.Stream(cmd, stdout, stderr)
fmt.Println(stdout.String())
fmt.Println(stderr.String())
return stdout.String(), stderr.String(), err
return "", err
}
func (r *Runner) ScpFile(src, dst string) error {
if r.Conn == nil {
return errors.New("runner is not tied to an opened SSH connection")
}
err := r.Conn.Scp(src, dst)
if err != nil {
if r.Verbose {
fmt.Printf("push %s to %s:%s Failed\n", src, r.Host.SSHAddress, dst)
return err
}
} else {
if r.Verbose {
fmt.Printf("push %s to %s:%s Done\n", src, r.Host.SSHAddress, dst)
}
}
return nil
}
// Run executes a given command/script, optionally printing its output to
// stdout/stderr.
func (r *Runner) Run(cmd string, variables TemplateVariables) (string, string, error) {
cmd, err := Render(cmd, variables)
func (r *Runner) Run(cmd string, variables TemplateVariables) (string, error) {
tmpl, _ := template.New("base").Parse(cmd)
cmd, err := util.Render(tmpl, variables)
if err != nil {
return "", "", err
return "", err
}
return r.RunRaw(cmd)
@ -81,7 +98,7 @@ func (r *Runner) WaitForCondition(cmd string, timeout time.Duration, validator v
cutoff := time.Now().Add(timeout)
for time.Now().Before(cutoff) {
stdout, _, _ := r.Run(cmd, nil)
stdout, _ := r.Run(cmd, nil)
if validator(stdout) {
return true
}

View File

@ -52,7 +52,7 @@ func (dialer *Dialer) Connect(host kubekeyapi.HostCfg) (Connection, error) {
opts := SSHCfg{
Username: host.User,
Port: port,
Hostname: host.HostName,
Address: host.SSHAddress,
Password: host.Password,
KeyFile: host.SSHKeyPath,
//AgentSocket: host.SSHAgentSocket,

37
util/ssh/sftp.go Normal file
View File

@ -0,0 +1,37 @@
package ssh
import (
"github.com/pkg/errors"
"github.com/pkg/sftp"
"github.com/tmc/scp"
)
func (c *connection) sftp() (*sftp.Client, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.sshclient == nil {
return nil, errors.New("connection closed")
}
if c.sftpclient == nil {
s, err := sftp.NewClient(c.sshclient)
if err != nil {
return nil, errors.Wrap(err, "failed to get sftp.Client")
}
c.sftpclient = s
}
return c.sftpclient, nil
}
func (c *connection) Scp(src, dst string) error {
session, err := c.session()
err = scp.CopyPath(src, dst, session)
if err != nil {
return err
}
return nil
}

View File

@ -1,8 +1,10 @@
package ssh
import (
"bytes"
"bufio"
"context"
"fmt"
"github.com/pixiake/kubekey/apis/v1alpha1"
"io"
"io/ioutil"
"net"
@ -27,9 +29,9 @@ var (
// Connection represents an established connection to an SSH server.
type Connection interface {
Exec(cmd string) (stdout string, stderr string, exitCode int, err error)
Exec(cmd string, host *v1alpha1.HostCfg) (stdout string, exitCode int, err error)
File(filename string, flags int) (io.ReadWriteCloser, error)
Stream(cmd string, stdout io.Writer, stderr io.Writer) (exitCode int, err error)
Scp(src, dst string) error
io.Closer
}
@ -45,7 +47,7 @@ type Tunneler interface {
type SSHCfg struct {
Username string
Password string
Hostname string
Address string
Port int
PrivateKey string
KeyFile string
@ -61,8 +63,8 @@ func validateOptions(cfg SSHCfg) (SSHCfg, error) {
return cfg, errors.New("no username specified for SSH connection")
}
if len(cfg.Hostname) == 0 {
return cfg, errors.New("no hostname specified for SSH connection")
if len(cfg.Address) == 0 {
return cfg, errors.New("no address specified for SSH connection")
}
if len(cfg.Password) == 0 && len(cfg.PrivateKey) == 0 && len(cfg.KeyFile) == 0 && len(cfg.AgentSocket) == 0 {
@ -163,7 +165,7 @@ func NewConnection(cfg SSHCfg) (Connection, error) {
HostKeyCallback: ssh.InsecureIgnoreHostKey(), //nolint:gosec
}
targetHost := cfg.Hostname
targetHost := cfg.Address
targetPort := strconv.Itoa(cfg.Port)
if cfg.Bastion != "" {
@ -193,7 +195,7 @@ func NewConnection(cfg SSHCfg) (Connection, error) {
}
// continue to setup if we are running over bastion
endpointBehindBastion := net.JoinHostPort(cfg.Hostname, strconv.Itoa(cfg.Port))
endpointBehindBastion := net.JoinHostPort(cfg.Address, strconv.Itoa(cfg.Port))
// Dial a connection to the service host, from the bastion
conn, err := client.Dial("tcp", endpointBehindBastion)
@ -250,30 +252,63 @@ func (c *connection) Close() error {
return c.sshclient.Close()
}
func (c *connection) Stream(cmd string, stdout io.Writer, stderr io.Writer) (int, error) {
func (c *connection) Exec(cmd string, host *v1alpha1.HostCfg) (string, int, error) {
sess, err := c.session()
if err != nil {
return 0, errors.Wrap(err, "failed to get SSH session")
return "", 0, errors.Wrap(err, "failed to get SSH session")
}
defer sess.Close()
sess.Stdout = stdout
sess.Stderr = stderr
sess.Stdin = os.Stdin
modes := ssh.TerminalModes{
ssh.ECHO: 0, // disable echoing
ssh.TTY_OP_ISPEED: 14400, // input speed = 14.4kbaud
ssh.TTY_OP_OSPEED: 14400, // output speed = 14.4kbaud
}
err = sess.RequestPty("xterm", 80, 40, modes)
if err != nil {
return "", 0, err
}
stdin, _ := sess.StdinPipe()
out, _ := sess.StdoutPipe()
var output []byte
go func(in io.WriteCloser, out io.Reader, output *[]byte) {
var (
line string
r = bufio.NewReader(out)
)
for {
b, err := r.ReadByte()
if err != nil {
break
}
*output = append(*output, b)
if b == byte('\n') {
line = ""
continue
}
line += string(b)
if (strings.HasPrefix(line, "[sudo] password for ") || strings.HasPrefix(line, "Password")) && strings.HasSuffix(line, ": ") {
_, err = in.Write([]byte(host.Password + "\n"))
if err != nil {
break
}
}
}
}(stdin, out, &output)
exitCode := 0
err = sess.Run(strings.TrimSpace(cmd))
_, err = sess.CombinedOutput(strings.TrimSpace(cmd))
if err != nil {
exitCode = 1
}
return exitCode, errors.Wrapf(err, "failed to exec command: %s", cmd)
}
func (c *connection) Exec(cmd string) (string, string, int, error) {
var stdoutBuf, stderrBuf bytes.Buffer
exitCode, err := c.Stream(cmd, &stdoutBuf, &stderrBuf)
return strings.TrimSpace(stdoutBuf.String()), strings.TrimSpace(stderrBuf.String()), exitCode, err
outStr := strings.TrimPrefix(string(output), fmt.Sprintf("[sudo] password for %s:", host.User))
return strings.TrimSpace(outStr), exitCode, errors.Wrapf(err, "failed to exec command: %s", cmd)
}
func (c *connection) session() (*ssh.Session, error) {
@ -286,22 +321,3 @@ func (c *connection) session() (*ssh.Session, error) {
return c.sshclient.NewSession()
}
func (c *connection) sftp() (*sftp.Client, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.sshclient == nil {
return nil, errors.New("connection closed")
}
if c.sftpclient == nil {
s, err := sftp.NewClient(c.sshclient)
if err != nil {
return nil, errors.Wrap(err, "failed to get sftp.Client")
}
c.sftpclient = s
}
return c.sftpclient, nil
}

View File

@ -1,8 +1,7 @@
package task
import (
//"kubekey/util/state"
"github.com/pixiake/kubekey/util/state"
"github.com/pixiake/kubekey/util/manager"
"k8s.io/apimachinery/pkg/util/wait"
"time"
)
@ -18,13 +17,13 @@ func defaultRetryBackoff(retries int) wait.Backoff {
// Task is a runnable task
type Task struct {
Fn func(*state.State) error
Fn func(*manager.Manager) error
ErrMsg string
Retries int
}
// Run runs a task
func (t *Task) Run(ctx *state.State) error {
func (t *Task) Run(mgrTask *manager.Manager) error {
if t.Retries == 0 {
t.Retries = 1
}
@ -33,13 +32,13 @@ func (t *Task) Run(ctx *state.State) error {
var lastError error
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
if lastError != nil {
ctx.Logger.Warn("Retrying task…")
mgrTask.Logger.Warn("Retrying task…")
}
lastError = t.Fn(ctx)
lastError = t.Fn(mgrTask)
if lastError != nil {
ctx.Logger.Warn("Task failed…")
if ctx.Verbose {
ctx.Logger.Warnf("error was: %s", lastError)
mgrTask.Logger.Warn("Task failed…")
if mgrTask.Verbose {
mgrTask.Logger.Warnf("error was: %s", lastError)
}
return false, nil
}

View File

@ -1,7 +1,11 @@
package util
import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"os"
"strings"
"text/template"
)
const (
@ -21,3 +25,44 @@ func InitLogger(verbose bool) *log.Logger {
return logger
}
func IsExist(path string) bool {
_, err := os.Stat(path)
if err != nil {
if os.IsExist(err) {
return true
}
if os.IsNotExist(err) {
return false
}
return false
}
return true
}
func CreateDir(path string) error {
if IsExist(path) == false {
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return err
}
}
return nil
}
type Data map[string]interface{}
// Render text template with given `variables` Render-context
func Render(tmpl *template.Template, variables map[string]interface{}) (string, error) {
var buf strings.Builder
//buf.WriteString(`set -xeu pipefail`)
//buf.WriteString("\n\n")
//buf.WriteString(`export "PATH=$PATH:/sbin:/usr/local/bin:/opt/bin"`)
//buf.WriteString("\n\n")
if err := tmpl.Execute(&buf, variables); err != nil {
return "", errors.Wrap(err, "failed to render cmd or script template")
}
return buf.String(), nil
}