add runner

This commit is contained in:
pixiake 2020-04-12 16:43:26 +08:00
parent f95f07d92f
commit 609725777e
3 changed files with 241 additions and 0 deletions

30
util/runner/render.go Normal file
View File

@ -0,0 +1,30 @@
package runner
import (
"strings"
"text/template"
"github.com/pkg/errors"
)
type Data map[string]interface{}
// Render text template with given `variables` Render-context
func Render(cmd string, variables map[string]interface{}) (string, error) {
tpl, err := template.New("base").Parse(cmd)
if err != nil {
return "", errors.Wrap(err, "failed to parse script template")
}
var buf strings.Builder
buf.WriteString(`set -xeu pipefail`)
buf.WriteString("\n\n")
buf.WriteString(`export "PATH=$PATH:/sbin:/usr/local/bin:/opt/bin"`)
buf.WriteString("\n\n")
if err := tpl.Execute(&buf, variables); err != nil {
return "", errors.Wrap(err, "failed to render script template")
}
return buf.String(), nil
}

92
util/runner/runner.go Normal file
View File

@ -0,0 +1,92 @@
package runner
import (
"fmt"
"github.com/pixiake/kubekey/util/dialer/ssh"
"github.com/pkg/errors"
"os"
"strings"
"time"
)
// Runner bundles a connection to a host with the verbosity and
// other options for running commands via SSH.
type Runner struct {
Conn ssh.Connection
Prefix string
OS string
Verbose bool
}
// TemplateVariables is a render context for templates
type TemplateVariables map[string]interface{}
func (r *Runner) RunRaw(cmd string) (string, string, error) {
if r.Conn == nil {
return "", "", errors.New("runner is not tied to an opened SSH connection")
}
if !r.Verbose {
stdout, stderr, _, err := r.Conn.Exec(cmd)
if err != nil {
err = errors.Wrap(err, stderr)
}
return stdout, stderr, err
}
stdout := NewTee(New(os.Stdout, r.Prefix))
defer stdout.Close()
stderr := NewTee(New(os.Stderr, r.Prefix))
defer stderr.Close()
// run the command
_, err := r.Conn.Stream(cmd, stdout, stderr)
return stdout.String(), stderr.String(), err
}
// Run executes a given command/script, optionally printing its output to
// stdout/stderr.
func (r *Runner) Run(cmd string, variables TemplateVariables) (string, string, error) {
cmd, err := Render(cmd, variables)
if err != nil {
return "", "", err
}
return r.RunRaw(cmd)
}
// WaitForPod waits for the availability of the given Kubernetes element.
func (r *Runner) WaitForPod(namespace string, name string, timeout time.Duration) error {
cmd := fmt.Sprintf(`sudo kubectl --kubeconfig=/etc/kubernetes/admin.conf -n "%s" get pod "%s" -o jsonpath='{.status.phase}' --ignore-not-found`, namespace, name)
if !r.WaitForCondition(cmd, timeout, IsRunning) {
return errors.Errorf("timed out while waiting for %s/%s to come up for %v", namespace, name, timeout)
}
return nil
}
type validatorFunc func(stdout string) bool
// IsRunning checks if the given output represents the "Running" status of a Kubernetes pod.
func IsRunning(stdout string) bool {
return strings.ToLower(stdout) == "running"
}
// WaitForCondition waits for something to be true.
func (r *Runner) WaitForCondition(cmd string, timeout time.Duration, validator validatorFunc) bool {
cutoff := time.Now().Add(timeout)
for time.Now().Before(cutoff) {
stdout, _, _ := r.Run(cmd, nil)
if validator(stdout) {
return true
}
time.Sleep(1 * time.Second)
}
return false
}

119
util/runner/tee.go Normal file
View File

@ -0,0 +1,119 @@
package runner
import (
"bytes"
"io"
"strings"
"sync"
)
// NewTee constructor
func NewTee(wc io.WriteCloser) *Tee {
return &Tee{upstream: wc}
}
// Tee mimics the unix `tee` command by piping its
// input through to the upstream writer and also
// capturing it in a buffer.
type Tee struct {
buffer bytes.Buffer
upstream io.WriteCloser
}
func (t *Tee) Write(p []byte) (int, error) {
t.buffer.Write(p)
return t.upstream.Write(p)
}
func (t *Tee) String() string {
return strings.TrimSpace(t.buffer.String())
}
// Close underlying io.Closer
func (t *Tee) Close() error {
return t.upstream.Close()
}
// Writer implements io.Writer with prefix each lines.
type Writer struct {
w io.Writer
p []byte
l sync.Mutex
b *bytes.Buffer
}
// New creates a new prefix Writer.
func New(w io.Writer, prefix string) *Writer {
return &Writer{
w: w,
p: []byte(prefix),
}
}
// Write writes data to base Writer with prefix.
func (w *Writer) Write(p []byte) (int, error) {
w.l.Lock()
defer w.l.Unlock()
if w.w == nil {
return 0, io.EOF
}
size := len(p)
if w.b != nil {
w.b.Write(p)
p = w.b.Bytes()
w.b = nil
}
b := new(bytes.Buffer)
for len(p) > 0 {
n := bytes.IndexByte(p, '\n')
if n < 0 {
w.b = new(bytes.Buffer)
w.b.Write(p)
break
}
b.Write(w.p)
b.Write(p[:n+1])
p = p[n+1:]
}
if b.Len() > 0 {
_, err := b.WriteTo(w.w)
if err != nil {
return 0, err
}
}
return size, nil
}
func (w *Writer) flush() error {
if w.w == nil {
return io.EOF
}
if w.b == nil {
return nil
}
b := new(bytes.Buffer)
b.Write(w.p)
w.b.WriteTo(b)
w.b = nil
b.WriteByte('\n')
_, err := b.WriteTo(w.w)
return err
}
// Close flush buffered data and close Writer.
func (w *Writer) Close() error {
w.l.Lock()
defer w.l.Unlock()
if w.w == nil {
return nil
}
err := w.flush()
w.w = nil
return err
}
var _ io.WriteCloser = &Writer{}