From 8e47fc4d2c4314e79bb1e44116e660d1bcc2b600 Mon Sep 17 00:00:00 2001 From: pixiake Date: Sun, 12 Apr 2020 19:41:25 +0800 Subject: [PATCH] add task --- install/install.go | 83 ++++++++++++++++++++++++++- kubekey.go | 27 +++++++++ util/dialer/ssh/dialer.go | 4 +- util/state/context.go | 57 +++++++++++++++++++ util/state/task.go | 117 ++++++++++++++++++++++++++++++++++++++ util/task/task.go | 53 +++++++++++++++++ 6 files changed, 337 insertions(+), 4 deletions(-) create mode 100644 kubekey.go create mode 100644 util/state/context.go create mode 100644 util/state/task.go create mode 100644 util/task/task.go diff --git a/install/install.go b/install/install.go index 0db63e06..73821244 100644 --- a/install/install.go +++ b/install/install.go @@ -2,8 +2,87 @@ package install import ( kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "kubeone/pkg/configupload" + "kubeone/pkg/credentials" + "kubeone/pkg/installer" + "kubeone/pkg/installer/installation" + "kubeone/pkg/state" + //"kubeone/pkg/credentials" + //"kubeone/pkg/installer" + //"kubeone/pkg/installer/installation" ) -func CreateCluster(clusterCfgFile string, addons string, pkg string) { - kubekeyapi.GetClusterCfg(clusterCfgFile) +type Installer struct { + cluster *kubekeyapi.ClusterCfg + logger *log.Logger +} + +type Options struct { + Verbose bool + Manifest string + CredentialsFile string + BackupFile string + DestroyWorkers bool + RemoveBinaries bool +} + +func NewInstaller(cluster *kubekeyapi.ClusterCfg, logger *log.Logger) *Installer { + return &Installer{ + cluster: cluster, + logger: logger, + } +} + +func (i *Installer) Install(options *Options) error { + s, err := i.createState(options) + if err != nil { + return err + } + return installation.Install(s) +} + +func (i *Installer) createState(options *Options) (*state.State, error) { + s, err := state.New() + if err != nil { + return nil, err + } + + s.Cluster = i.cluster + s.Connector = ssh.NewConnector() + s.Configuration = configupload.NewConfiguration() + s.WorkDir = "kubeone" + s.Logger = i.logger + s.Verbose = options.Verbose + s.ManifestFilePath = options.Manifest + s.CredentialsFilePath = options.CredentialsFile + s.BackupFile = options.BackupFile + s.DestroyWorkers = options.DestroyWorkers + s.RemoveBinaries = options.RemoveBinaries + return s, nil +} + +func CreateCluster(clusterCfgFile string, addons string, pkg string) { + cluster := kubekeyapi.GetClusterCfg(clusterCfgFile) +} + +func runInstall(logger *log.Logger) error { + cluster, err := loadClusterConfig(installOptions.Manifest, installOptions.TerraformState, installOptions.CredentialsFilePath, logger) + if err != nil { + return errors.Wrap(err, "failed to load cluster") + } + + options, err := createInstallerOptions(installOptions.Manifest, cluster, installOptions) + if err != nil { + return errors.Wrap(err, "failed to create installer options") + } + + // Validate credentials + _, err = credentials.ProviderCredentials(cluster.CloudProvider.Name, installOptions.CredentialsFilePath) + if err != nil { + return errors.Wrap(err, "failed to validate credentials") + } + + return installer.NewInstaller(cluster, logger).Install(options) } diff --git a/kubekey.go b/kubekey.go new file mode 100644 index 00000000..58f86bfc --- /dev/null +++ b/kubekey.go @@ -0,0 +1,27 @@ +package main + +import ( + "flag" + "github.com/pixiake/kubekey/cmd" + "github.com/spf13/pflag" +) + +func main() { + //klog.InitFlags(nil) + //pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) + pflag.CommandLine.AddGoFlagSet(flag.CommandLine) + + pflag.Set("logtostderr", "true") + // We do not want these flags to show up in --help + // These MarkHidden calls must be after the lines above + pflag.CommandLine.MarkHidden("version") + pflag.CommandLine.MarkHidden("log-flush-frequency") + pflag.CommandLine.MarkHidden("alsologtostderr") + pflag.CommandLine.MarkHidden("log-backtrace-at") + pflag.CommandLine.MarkHidden("log-dir") + pflag.CommandLine.MarkHidden("logtostderr") + pflag.CommandLine.MarkHidden("stderrthreshold") + pflag.CommandLine.MarkHidden("vmodule") + + cmd.NewKubeoceanCommand().Execute() +} diff --git a/util/dialer/ssh/dialer.go b/util/dialer/ssh/dialer.go index a9af9cbd..a64b6bb8 100644 --- a/util/dialer/ssh/dialer.go +++ b/util/dialer/ssh/dialer.go @@ -23,7 +23,7 @@ func NewConnector() *Dialer { } // Tunnel returns established SSH tunnel -func (dialer *Dialer) Tunnel(host kubekeyapi.HostConfig) (Tunneler, error) { +func (dialer *Dialer) Tunnel(host kubekeyapi.HostCfg) (Tunneler, error) { conn, err := dialer.Connect(host) if err != nil { return nil, err @@ -38,7 +38,7 @@ func (dialer *Dialer) Tunnel(host kubekeyapi.HostConfig) (Tunneler, error) { } // Connect to the node -func (dialer *Dialer) Connect(host kubekeyapi.HostConfig) (Connection, error) { +func (dialer *Dialer) Connect(host kubekeyapi.HostCfg) (Connection, error) { var err error dialer.lock.Lock() diff --git a/util/state/context.go b/util/state/context.go new file mode 100644 index 00000000..cb33dbd5 --- /dev/null +++ b/util/state/context.go @@ -0,0 +1,57 @@ +package state + +import ( + "github.com/sirupsen/logrus" + + kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1" + //"github.com/kubermatic/kubeone/pkg/configupload" + "github.com/pixiake/kubekey/util/dialer/ssh" + "github.com/pixiake/kubekey/util/runner" + //"k8s.io/client-go/rest" + //bootstraputil "k8s.io/cluster-bootstrap/token/util" + //dynclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +//func New() (*State, error) { +// joinToken, err := bootstraputil.GenerateBootstrapToken() +// return &State{ +// JoinToken: joinToken, +// }, err +//} + +// State holds together currently test flags and parsed info, along with +// utilities like logger +type State struct { + Cluster *kubekeyapi.ClusterCfg + Logger logrus.FieldLogger + Connector *ssh.Dialer + //Configuration *configupload.Configuration + Runner *runner.Runner + WorkDir string + JoinCommand string + JoinToken string + //RESTConfig *rest.Config + //DynamicClient dynclient.Client + Verbose bool + BackupFile string + DestroyWorkers bool + RemoveBinaries bool + ForceUpgrade bool + UpgradeMachineDeployments bool + PatchCNI bool + CredentialsFilePath string + ManifestFilePath string +} + +func (s *State) KubeadmVerboseFlag() string { + if s.Verbose { + return "--v=6" + } + return "" +} + +// Clone returns a shallow copy of the State. +func (s *State) Clone() *State { + newState := *s + return &newState +} diff --git a/util/state/task.go b/util/state/task.go new file mode 100644 index 00000000..b26f100a --- /dev/null +++ b/util/state/task.go @@ -0,0 +1,117 @@ +package state + +import ( + "fmt" + "sync" + + "github.com/pkg/errors" + + kubekeyapi "github.com/pixiake/kubekey/apis/v1alpha1" + "github.com/pixiake/kubekey/util/dialer/ssh" + "github.com/pixiake/kubekey/util/runner" +) + +// NodeTask is a task that is specifically tailored to run on a single node. +type NodeTask func(ctx *State, node *kubekeyapi.HostCfg, conn ssh.Connection) error + +func (s *State) runTask(node *kubekeyapi.HostCfg, task NodeTask, prefixed bool) error { + var ( + err error + conn ssh.Connection + ) + + // connect to the host (and do not close connection + // because we want to re-use it for future task) + conn, err = s.Connector.Connect(*node) + if err != nil { + return errors.Wrapf(err, "failed to connect to %s", node.Address) + } + + prefix := "" + if prefixed { + prefix = fmt.Sprintf("[%s] ", node.Address) + } + + s.Runner = &runner.Runner{ + Conn: conn, + Verbose: s.Verbose, + //OS: node.OS, + Prefix: prefix, + } + + return task(s, node, conn) +} + +// RunTaskOnNodes runs the given task on the given selection of hosts. +func (s *State) RunTaskOnNodes(nodes []kubekeyapi.HostCfg, task NodeTask, parallel bool) error { + var err error + + wg := sync.WaitGroup{} + hasErrors := false + + for i := range nodes { + ctx := s.Clone() + ctx.Logger = ctx.Logger.WithField("node", nodes[i].Address) + + if parallel { + wg.Add(1) + go func(ctx *State, node *kubekeyapi.HostCfg) { + err = ctx.runTask(node, task, parallel) + if err != nil { + ctx.Logger.Error(err) + hasErrors = true + } + wg.Done() + }(ctx, &nodes[i]) + } else { + err = ctx.runTask(&nodes[i], task, parallel) + if err != nil { + break + } + } + } + + wg.Wait() + + if hasErrors { + err = errors.New("at least one of the task has encountered an error") + } + + return err +} + +// RunTaskOnAllNodes runs the given task on all hosts. +func (s *State) RunTaskOnAllNodes(task NodeTask, parallel bool) error { + // It's not possible to concatenate host lists in this function. + // Some of the task(determineOS, determineHostname) write to the state and sending a copy would break that. + if err := s.RunTaskOnNodes(s.Cluster.Hosts, task, parallel); err != nil { + return err + } + //if s.Cluster.StaticWorkers != nil { + // return s.RunTaskOnNodes(s.Cluster.StaticWorkers, task, parallel) + //} + return nil +} + +// RunTaskOnLeader runs the given task on the leader host. +//func (s *State) RunTaskOnLeader(task NodeTask) error { +// leader, err := s.Cluster.Leader() +// if err != nil { +// return err +// } +// +// hosts := []kubekeyapi.HostConfig{ +// leader, +// } +// +// return s.RunTaskOnNodes(hosts, task, false) +//} + +// RunTaskOnFollowers runs the given task on the follower hosts. +//func (s *State) RunTaskOnFollowers(task NodeTask, parallel bool) error { +// return s.RunTaskOnNodes(s.Cluster.Followers(), task, parallel) +//} +// +//func (s *State) RunTaskOnStaticWorkers(task NodeTask, parallel bool) error { +// return s.RunTaskOnNodes(s.Cluster.StaticWorkers, task, parallel) +//} diff --git a/util/task/task.go b/util/task/task.go new file mode 100644 index 00000000..d7845fe8 --- /dev/null +++ b/util/task/task.go @@ -0,0 +1,53 @@ +package task + +import ( + "time" + + "github.com/pixiake/kubekey/util/state" + + "k8s.io/apimachinery/pkg/util/wait" +) + +// defaultRetryBackoff is backoff with with duration of 5 seconds and factor of 2.0 +func defaultRetryBackoff(retries int) wait.Backoff { + return wait.Backoff{ + Steps: retries, + Duration: 5 * time.Second, + Factor: 2.0, + } +} + +// Task is a runnable task +type Task struct { + Fn func(*state.State) error + ErrMsg string + Retries int +} + +// Run runs a task +func (t *Task) Run(ctx *state.State) error { + if t.Retries == 0 { + t.Retries = 1 + } + backoff := defaultRetryBackoff(t.Retries) + + var lastError error + err := wait.ExponentialBackoff(backoff, func() (bool, error) { + if lastError != nil { + ctx.Logger.Warn("Retrying task…") + } + lastError = t.Fn(ctx) + if lastError != nil { + ctx.Logger.Warn("Task failed…") + if ctx.Verbose { + ctx.Logger.Warnf("error was: %s", lastError) + } + return false, nil + } + return true, nil + }) + if err == wait.ErrWaitTimeout { + err = lastError + } + return err +}