From 609725777e87c998fef8da6a54375ae9c5beda34 Mon Sep 17 00:00:00 2001 From: pixiake Date: Sun, 12 Apr 2020 16:43:26 +0800 Subject: [PATCH] add runner --- util/runner/render.go | 30 +++++++++++ util/runner/runner.go | 92 ++++++++++++++++++++++++++++++++ util/runner/tee.go | 119 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 241 insertions(+) create mode 100644 util/runner/render.go create mode 100644 util/runner/runner.go create mode 100644 util/runner/tee.go diff --git a/util/runner/render.go b/util/runner/render.go new file mode 100644 index 00000000..31a5adc8 --- /dev/null +++ b/util/runner/render.go @@ -0,0 +1,30 @@ +package runner + +import ( + "strings" + "text/template" + + "github.com/pkg/errors" +) + +type Data map[string]interface{} + +// Render text template with given `variables` Render-context +func Render(cmd string, variables map[string]interface{}) (string, error) { + tpl, err := template.New("base").Parse(cmd) + if err != nil { + return "", errors.Wrap(err, "failed to parse script template") + } + + var buf strings.Builder + buf.WriteString(`set -xeu pipefail`) + buf.WriteString("\n\n") + buf.WriteString(`export "PATH=$PATH:/sbin:/usr/local/bin:/opt/bin"`) + buf.WriteString("\n\n") + + if err := tpl.Execute(&buf, variables); err != nil { + return "", errors.Wrap(err, "failed to render script template") + } + + return buf.String(), nil +} diff --git a/util/runner/runner.go b/util/runner/runner.go new file mode 100644 index 00000000..2de9f51b --- /dev/null +++ b/util/runner/runner.go @@ -0,0 +1,92 @@ +package runner + +import ( + "fmt" + "github.com/pixiake/kubekey/util/dialer/ssh" + "github.com/pkg/errors" + "os" + "strings" + "time" +) + +// Runner bundles a connection to a host with the verbosity and +// other options for running commands via SSH. +type Runner struct { + Conn ssh.Connection + Prefix string + OS string + Verbose bool +} + +// TemplateVariables is a render context for templates +type TemplateVariables map[string]interface{} + +func (r *Runner) RunRaw(cmd string) (string, string, error) { + if r.Conn == nil { + return "", "", errors.New("runner is not tied to an opened SSH connection") + } + + if !r.Verbose { + stdout, stderr, _, err := r.Conn.Exec(cmd) + if err != nil { + err = errors.Wrap(err, stderr) + } + + return stdout, stderr, err + } + + stdout := NewTee(New(os.Stdout, r.Prefix)) + defer stdout.Close() + + stderr := NewTee(New(os.Stderr, r.Prefix)) + defer stderr.Close() + + // run the command + _, err := r.Conn.Stream(cmd, stdout, stderr) + + return stdout.String(), stderr.String(), err +} + +// Run executes a given command/script, optionally printing its output to +// stdout/stderr. +func (r *Runner) Run(cmd string, variables TemplateVariables) (string, string, error) { + cmd, err := Render(cmd, variables) + if err != nil { + return "", "", err + } + + return r.RunRaw(cmd) +} + +// WaitForPod waits for the availability of the given Kubernetes element. +func (r *Runner) WaitForPod(namespace string, name string, timeout time.Duration) error { + cmd := fmt.Sprintf(`sudo kubectl --kubeconfig=/etc/kubernetes/admin.conf -n "%s" get pod "%s" -o jsonpath='{.status.phase}' --ignore-not-found`, namespace, name) + if !r.WaitForCondition(cmd, timeout, IsRunning) { + return errors.Errorf("timed out while waiting for %s/%s to come up for %v", namespace, name, timeout) + } + + return nil +} + +type validatorFunc func(stdout string) bool + +// IsRunning checks if the given output represents the "Running" status of a Kubernetes pod. +func IsRunning(stdout string) bool { + return strings.ToLower(stdout) == "running" +} + +// WaitForCondition waits for something to be true. +func (r *Runner) WaitForCondition(cmd string, timeout time.Duration, validator validatorFunc) bool { + cutoff := time.Now().Add(timeout) + + for time.Now().Before(cutoff) { + stdout, _, _ := r.Run(cmd, nil) + if validator(stdout) { + return true + } + + time.Sleep(1 * time.Second) + } + + return false +} diff --git a/util/runner/tee.go b/util/runner/tee.go new file mode 100644 index 00000000..9eb36c14 --- /dev/null +++ b/util/runner/tee.go @@ -0,0 +1,119 @@ +package runner + +import ( + "bytes" + "io" + "strings" + "sync" +) + +// NewTee constructor +func NewTee(wc io.WriteCloser) *Tee { + return &Tee{upstream: wc} +} + +// Tee mimics the unix `tee` command by piping its +// input through to the upstream writer and also +// capturing it in a buffer. +type Tee struct { + buffer bytes.Buffer + upstream io.WriteCloser +} + +func (t *Tee) Write(p []byte) (int, error) { + t.buffer.Write(p) + + return t.upstream.Write(p) +} + +func (t *Tee) String() string { + return strings.TrimSpace(t.buffer.String()) +} + +// Close underlying io.Closer +func (t *Tee) Close() error { + return t.upstream.Close() +} + +// Writer implements io.Writer with prefix each lines. +type Writer struct { + w io.Writer + p []byte + l sync.Mutex + b *bytes.Buffer +} + +// New creates a new prefix Writer. +func New(w io.Writer, prefix string) *Writer { + return &Writer{ + w: w, + p: []byte(prefix), + } +} + +// Write writes data to base Writer with prefix. +func (w *Writer) Write(p []byte) (int, error) { + w.l.Lock() + defer w.l.Unlock() + if w.w == nil { + return 0, io.EOF + } + + size := len(p) + if w.b != nil { + w.b.Write(p) + p = w.b.Bytes() + w.b = nil + } + + b := new(bytes.Buffer) + for len(p) > 0 { + n := bytes.IndexByte(p, '\n') + if n < 0 { + w.b = new(bytes.Buffer) + w.b.Write(p) + break + } + b.Write(w.p) + b.Write(p[:n+1]) + p = p[n+1:] + } + + if b.Len() > 0 { + _, err := b.WriteTo(w.w) + if err != nil { + return 0, err + } + } + return size, nil +} + +func (w *Writer) flush() error { + if w.w == nil { + return io.EOF + } + if w.b == nil { + return nil + } + b := new(bytes.Buffer) + b.Write(w.p) + w.b.WriteTo(b) + w.b = nil + b.WriteByte('\n') + _, err := b.WriteTo(w.w) + return err +} + +// Close flush buffered data and close Writer. +func (w *Writer) Close() error { + w.l.Lock() + defer w.l.Unlock() + if w.w == nil { + return nil + } + err := w.flush() + w.w = nil + return err +} + +var _ io.WriteCloser = &Writer{}