Experiment: fix some goroutine bug and implement logrus formater.

Signed-off-by: 24sama <jacksama@foxmail.com>
This commit is contained in:
24sama 2021-08-24 15:23:45 +08:00 committed by 24sama
parent fda9142592
commit da5ad1d4b1
30 changed files with 469 additions and 236 deletions

View File

@ -194,6 +194,12 @@ type ExternalEtcd struct {
KeyFile string
}
// Copy is used to create a copy for Runtime.
func (h *HostCfg) Copy() *HostCfg {
host := *h
return &host
}
// GenerateCertSANs is used to generate cert sans for cluster.
func (cfg *ClusterSpec) GenerateCertSANs() []string {
clusterSvc := fmt.Sprintf("kubernetes.default.svc.%s", cfg.Kubernetes.ClusterName)

1
go.mod
View File

@ -3,7 +3,6 @@ module github.com/kubesphere/kubekey
go 1.14
require (
github.com/antonfisher/nested-logrus-formatter v1.3.1
github.com/dominodatalab/os-release v0.0.0-20190522011736-bcdb4a3e3c2f
github.com/go-logr/logr v0.1.0
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect

2
go.sum
View File

@ -66,8 +66,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ=
github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=

View File

@ -9,20 +9,18 @@ import (
)
type BaseAction struct {
Runtime *config.Runtime
Cache *cache.Cache
RootCache *cache.Cache
Result *ending.Result
}
func (b *BaseAction) Init(runtime *config.Runtime, cache *cache.Cache, rootCache *cache.Cache) {
b.Runtime = runtime
func (b *BaseAction) Init(cache *cache.Cache, rootCache *cache.Cache) {
b.Cache = cache
b.RootCache = rootCache
b.Result = ending.NewResult()
}
func (b *BaseAction) Execute(vars vars.Vars) error {
func (b *BaseAction) Execute(runtime *config.Runtime, vars vars.Vars) error {
return nil
}

View File

@ -8,7 +8,7 @@ import (
)
type Action interface {
Execute(vars vars.Vars) (err error)
Init(mgr *config.Runtime, cache *cache.Cache, rootCache *cache.Cache)
Execute(runtime *config.Runtime, vars vars.Vars) (err error)
Init(cache *cache.Cache, rootCache *cache.Cache)
WrapResult(err error) *ending.Result
}

View File

@ -2,6 +2,7 @@ package action
import (
"fmt"
"github.com/kubesphere/kubekey/pkg/core/config"
"github.com/kubesphere/kubekey/pkg/core/util"
"github.com/kubesphere/kubekey/pkg/core/vars"
"github.com/pkg/errors"
@ -16,18 +17,18 @@ type Template struct {
Data util.Data
}
func (t *Template) Execute(vars vars.Vars) error {
func (t *Template) Execute(runtime *config.Runtime, vars vars.Vars) error {
templateStr, err := util.Render(t.Template, t.Data)
if err != nil {
return errors.Wrap(errors.WithStack(err), fmt.Sprintf("render template %s failed", t.Template.Name()))
}
fileName := filepath.Join(t.Runtime.WorkDir, t.Template.Name())
fileName := filepath.Join(runtime.WorkDir, t.Template.Name())
if err := util.WriteFile(fileName, []byte(templateStr)); err != nil {
return errors.Wrap(errors.WithStack(err), fmt.Sprintf("write file %s failed", fileName))
}
if err := t.Runtime.Runner.Scp(fileName, t.Dst); err != nil {
if err := runtime.Runner.SudoScp(fileName, t.Dst); err != nil {
return errors.Wrap(errors.WithStack(err), fmt.Sprintf("scp file %s to remote %s failed", fileName, t.Dst))
}

View File

@ -30,6 +30,10 @@ func (c *Cache) Range(f func(key, value interface{}) bool) {
c.store.Range(f)
}
func (c *Cache) Clean() {
c.store = sync.Map{}
}
func (c *Cache) GetMustInt(k string) (int, bool) {
v, ok := c.Get(k)
return v.(int), ok

View File

@ -1,6 +1,16 @@
package common
const (
Pipeline = "Pipeline"
Module = "Module"
Task = "Task"
Node = "Node"
FileMode0755 = 0755
FileMode0644 = 0644
TmpDir = "/tmp/kubekey/"
// command
CopyCmd = "cp -r %s %s"
)

View File

@ -21,11 +21,11 @@ type Runtime struct {
Connector connector.Connector
Runner *runner.Runner
DownloadCommand func(path, url string) string
AllNodes []kubekeyapiv1alpha1.HostCfg
EtcdNodes []kubekeyapiv1alpha1.HostCfg
MasterNodes []kubekeyapiv1alpha1.HostCfg
WorkerNodes []kubekeyapiv1alpha1.HostCfg
K8sNodes []kubekeyapiv1alpha1.HostCfg
AllNodes []*kubekeyapiv1alpha1.HostCfg
EtcdNodes []*kubekeyapiv1alpha1.HostCfg
MasterNodes []*kubekeyapiv1alpha1.HostCfg
WorkerNodes []*kubekeyapiv1alpha1.HostCfg
K8sNodes []*kubekeyapiv1alpha1.HostCfg
ClusterHosts []string
WorkDir string
Kubeconfig string
@ -74,11 +74,11 @@ func NewRuntime(flag string, arg Argument) (*Runtime, error) {
ObjName: cluster.Name,
Cluster: defaultCluster,
Connector: ssh.NewDialer(),
AllNodes: hostGroups.All,
EtcdNodes: hostGroups.Etcd,
MasterNodes: hostGroups.Master,
WorkerNodes: hostGroups.Worker,
K8sNodes: hostGroups.K8s,
AllNodes: hostGroupsPoint(hostGroups.All),
EtcdNodes: hostGroupsPoint(hostGroups.Etcd),
MasterNodes: hostGroupsPoint(hostGroups.Master),
WorkerNodes: hostGroupsPoint(hostGroups.Worker),
K8sNodes: hostGroupsPoint(hostGroups.K8s),
ClusterHosts: generateHosts(hostGroups, defaultCluster),
WorkDir: generateWorkDir(),
ClientSet: clientset,
@ -87,6 +87,14 @@ func NewRuntime(flag string, arg Argument) (*Runtime, error) {
return r, nil
}
func hostGroupsPoint(hosts []kubekeyapiv1alpha1.HostCfg) []*kubekeyapiv1alpha1.HostCfg {
arr := make([]*kubekeyapiv1alpha1.HostCfg, 0, len(hosts))
for i := range hosts {
arr = append(arr, &hosts[i])
}
return arr
}
// Copy is used to create a copy for Runtime.
func (r *Runtime) Copy() *Runtime {
runtime := *r

View File

@ -424,7 +424,7 @@ func (c *connection) RemoteFileExist(dst string) bool {
remoteFileName := path.Base(dst)
remoteFileDirName := path.Dir(dst)
remoteFileCommand := fmt.Sprintf("ls -l %s/%s 2>/dev/null |wc -l", remoteFileDirName, remoteFileName)
remoteFileCommand := fmt.Sprintf(SudoPrefix("ls -l %s/%s 2>/dev/null |wc -l"), remoteFileDirName, remoteFileName)
out, _, _, err := c.Exec(remoteFileCommand)
defer func() {

View File

@ -0,0 +1,188 @@
package logger
import (
"bytes"
"fmt"
"github.com/sirupsen/logrus"
"runtime"
"sort"
"strings"
"time"
)
type Formatter struct {
// TimestampFormat - default: time.StampMilli = "Jan _2 15:04:05.000"
TimestampFormat string
// NoColors - disable colors
NoColors bool
// ShowLevel - when the level < this field, it won't be show. default: TRACE
ShowLevel logrus.Level
// ShowFullLevel - show a full level [WARNING] instead of [WARN]
ShowFullLevel bool
// NoUppercaseLevel - no upper case for level value
NoUppercaseLevel bool
// HideKeys - show [fieldValue] instead of [fieldKey:fieldValue]
HideKeys bool
// FieldsDisplayWithOrder - default: all fields display and sorted alphabetically
FieldsDisplayWithOrder []string
// CallerFirst - print caller info first
CallerFirst bool
// CustomCallerFormatter - set custom formatter for caller info
CustomCallerFormatter func(*runtime.Frame) string
}
func (f *Formatter) Format(entry *logrus.Entry) ([]byte, error) {
levelColor := getColorByLevel(entry.Level)
timestampFormat := f.TimestampFormat
if timestampFormat == "" {
timestampFormat = time.StampMilli
}
// output buffer
b := &bytes.Buffer{}
// write time
b.WriteString(entry.Time.Format(timestampFormat))
if f.CallerFirst {
f.writeCaller(b, entry)
}
if !f.NoColors {
fmt.Fprintf(b, "\x1b[%dm", levelColor)
}
level := entry.Level
if f.ShowLevel >= level {
var levelStr string
if f.NoUppercaseLevel {
levelStr = entry.Level.String()
} else {
levelStr = strings.ToUpper(entry.Level.String())
}
b.WriteString(" [")
if f.ShowFullLevel {
b.WriteString(levelStr)
} else {
b.WriteString(levelStr[:4])
}
b.WriteString("]")
}
b.WriteString(" ")
// write fields
if f.FieldsDisplayWithOrder == nil {
f.writeFields(b, entry)
} else {
f.writeOrderedFields(b, entry)
}
b.WriteString(entry.Message)
if !f.CallerFirst {
f.writeCaller(b, entry)
}
b.WriteByte('\n')
return b.Bytes(), nil
}
const (
colorRed = 31
colorYellow = 33
colorBlue = 36
colorGray = 37
)
func getColorByLevel(level logrus.Level) int {
switch level {
case logrus.DebugLevel, logrus.TraceLevel:
return colorGray
case logrus.WarnLevel:
return colorYellow
case logrus.ErrorLevel, logrus.FatalLevel, logrus.PanicLevel:
return colorRed
default:
return colorBlue
}
}
func (f *Formatter) writeFields(b *bytes.Buffer, entry *logrus.Entry) {
if len(entry.Data) != 0 {
fields := make([]string, 0, len(entry.Data))
for field := range entry.Data {
fields = append(fields, field)
}
sort.Strings(fields)
b.WriteString("[")
for i, field := range fields {
f.writeField(b, entry, field, i)
}
b.WriteString("]")
}
}
func (f *Formatter) writeOrderedFields(b *bytes.Buffer, entry *logrus.Entry) {
if len(entry.Data) != 0 {
b.WriteString("[")
length := len(entry.Data)
foundFieldsMap := map[string]bool{}
for i, field := range f.FieldsDisplayWithOrder {
if _, ok := entry.Data[field]; ok {
foundFieldsMap[field] = true
length--
f.writeField(b, entry, field, i)
}
}
if length > 0 {
notFoundFields := make([]string, 0, length)
for field := range entry.Data {
if foundFieldsMap[field] == false {
notFoundFields = append(notFoundFields, field)
}
}
sort.Strings(notFoundFields)
for i, field := range notFoundFields {
f.writeField(b, entry, field, i)
}
}
b.WriteString("]")
}
}
func (f *Formatter) writeField(b *bytes.Buffer, entry *logrus.Entry, field string, i int) {
if f.HideKeys {
fmt.Fprintf(b, "%v", entry.Data[field])
} else {
fmt.Fprintf(b, "%s:%v", field, entry.Data[field])
}
if i != len(entry.Data) && len(entry.Data) != 1 {
b.WriteString(" | ")
}
}
func (f *Formatter) writeCaller(b *bytes.Buffer, entry *logrus.Entry) {
if entry.HasCaller() {
if f.CustomCallerFormatter != nil {
fmt.Fprintf(b, f.CustomCallerFormatter(entry.Caller))
} else {
fmt.Fprintf(
b,
" (%s:%d %s)",
entry.Caller.File,
entry.Caller.Line,
entry.Caller.Function,
)
}
}
}

View File

@ -1,7 +1,7 @@
package logger
import (
nested "github.com/antonfisher/nested-logrus-formatter"
"github.com/kubesphere/kubekey/pkg/core/common"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/rifflock/lfshook"
"github.com/sirupsen/logrus"
@ -22,15 +22,16 @@ type KubeKeyLog struct {
func NewLogger() *KubeKeyLog {
logger := logrus.New()
formatter := &nested.Formatter{
HideKeys: true,
TimestampFormat: "15:04:05 MST",
NoColors: true,
FieldsOrder: []string{"Pipeline", "Module", "Task", "Node"},
formatter := &Formatter{
HideKeys: true,
TimestampFormat: "15:04:05 MST",
NoColors: true,
ShowLevel: logrus.FatalLevel,
FieldsDisplayWithOrder: []string{common.Pipeline, common.Module, common.Task, common.Node},
}
logger.SetFormatter(formatter)
logger.SetLevel(logrus.DebugLevel)
logger.SetLevel(logrus.InfoLevel)
path := "./kubekey.log"
writer, _ := rotatelogs.New(
@ -49,32 +50,3 @@ func NewLogger() *KubeKeyLog {
return &KubeKeyLog{logger, logger}
}
func (l *KubeKeyLog) Flush() {
l.FieldLogger = l.RootEntry
}
func (l *KubeKeyLog) SetPipeline(pipeline string) {
l.FieldLogger = l.WithFields(logrus.Fields{
"Pipeline": pipeline,
})
l.RootEntry = l.FieldLogger
}
func (l *KubeKeyLog) SetModule(module string) {
l.FieldLogger = l.WithFields(logrus.Fields{
"Module": module,
})
}
func (l *KubeKeyLog) SetTask(task string) {
l.FieldLogger = l.WithFields(logrus.Fields{
"Task": task,
})
}
func (l *KubeKeyLog) SetNode(node string) {
l.FieldLogger = l.WithFields(logrus.Fields{
"Node": node,
})
}

View File

@ -1,7 +1,6 @@
package logger
import (
"strconv"
"sync"
"testing"
)
@ -10,16 +9,19 @@ var log = NewLogger()
func TestKubeKey_Print(t *testing.T) {
wg := &sync.WaitGroup{}
for i := 0; i < 20; i++ {
log.SetModule("CREATE")
log.SetTask("task1")
for i := 0; i < 5; i++ {
Log.Info("begin")
log.Info("empty fields")
l1 := *log
wg.Add(1)
go func(x int, log1 KubeKeyLog) {
log.SetNode("node" + strconv.Itoa(x))
log.Info("Congratulations!", "ssssss")
wg.Done()
}(i, l1)
for j := 0; j < 10; j++ {
wg.Add(1)
go func(x int, log1 KubeKeyLog) {
log.Info("Congratulations!", "ssssss")
wg.Done()
}(j, l1)
}
wg.Wait()
}
wg.Wait()
}

View File

@ -3,7 +3,6 @@ package modules
import (
"github.com/kubesphere/kubekey/pkg/core/cache"
"github.com/kubesphere/kubekey/pkg/core/config"
"github.com/kubesphere/kubekey/pkg/core/logger"
)
type BaseModule struct {
@ -13,15 +12,14 @@ type BaseModule struct {
Runtime *config.Runtime
}
func (t *BaseModule) Default(runtime *config.Runtime, rootCache *cache.Cache) {
func (t *BaseModule) Default(runtime *config.Runtime, rootCache *cache.Cache, moduleCache *cache.Cache) {
if t.Name == "" {
t.Name = DefaultModuleName
}
logger.Log.SetModule(t.Name)
t.Runtime = runtime
t.RootCache = rootCache
t.Cache = cache.NewCache()
t.Cache = moduleCache
}
func (t *BaseModule) Init() {
@ -32,6 +30,5 @@ func (t *BaseModule) Is() string {
}
func (t *BaseModule) Run() error {
logger.Log.Info("Begin Run")
return nil
}

View File

@ -6,7 +6,7 @@ import (
)
type Module interface {
Default(runtime *config.Runtime, rootCache *cache.Cache)
Default(runtime *config.Runtime, rootCache *cache.Cache, moduleCache *cache.Cache)
Init()
Is() string
Run() error

View File

@ -17,7 +17,8 @@ import (
type Task struct {
Name string
Hosts []kubekeyapiv1alpha1.HostCfg
Desc string
Hosts []*kubekeyapiv1alpha1.HostCfg
Prepare prepare.Prepare
Action action.Action
Vars vars.Vars
@ -34,17 +35,17 @@ type Task struct {
TaskResult *ending.TaskResult
}
func (t *Task) Init(runtime *config.Runtime, cache *cache.Cache, rootCache *cache.Cache) {
logger.Log.SetTask(t.Name)
func (t *Task) Init(moduleName string, runtime *config.Runtime, cache *cache.Cache, rootCache *cache.Cache) {
t.Cache = cache
t.RootCache = rootCache
t.Runtime = runtime
t.Default()
logger.Log.Infof("[%s] %s", moduleName, t.Desc)
}
// todo: maybe should redesign the ending
func (t *Task) Execute() error {
logger.Log.Info("Begin Run")
if t.TaskResult.IsFailed() {
return t.TaskResult.CombineErr()
}
@ -56,7 +57,12 @@ func (t *Task) Execute() error {
for i := range t.Hosts {
selfRuntime := t.Runtime.Copy()
_ = t.ConfigureSelfRuntime(selfRuntime, &t.Hosts[i], i)
selfHost := t.Hosts[i].Copy()
_ = t.ConfigureSelfRuntime(selfRuntime, selfHost, i)
if t.TaskResult.IsFailed() {
logger.Log.Errorf("failed: [%s]", selfHost.Name)
return t.TaskResult.CombineErr()
}
if t.Parallel {
wg.Add(1)
@ -94,25 +100,28 @@ func (t *Task) ConfigureSelfRuntime(runtime *config.Runtime, host *kubekeyapiv1a
func (t *Task) Run(runtime *config.Runtime, wg *sync.WaitGroup, pool chan struct{}) {
// todo: check if it's ok when parallel.
logger.Log.SetNode(runtime.Runner.Host.Name)
pool <- struct{}{}
t.Prepare.Init(runtime, t.Cache, t.RootCache)
if ok := t.WhenWithRetry(); !ok {
t.Prepare.Init(t.Cache, t.RootCache)
if ok := t.WhenWithRetry(runtime); !ok {
return
}
t.Action.Init(runtime, t.Cache, t.RootCache)
t.Action.Init(t.Cache, t.RootCache)
t.ExecuteWithRetry(wg, pool, runtime)
if t.TaskResult.IsFailed() {
logger.Log.Errorf("failed: [%s]", runtime.Runner.Host.Name)
} else {
logger.Log.Infof("success: [%s]", runtime.Runner.Host.Name)
}
}
func (t *Task) When() (bool, error) {
func (t *Task) When(runtime *config.Runtime) (bool, error) {
if t.Prepare == nil {
return true, nil
}
if ok, err := t.Prepare.PreCheck(); err != nil {
logger.Log.Error(err)
t.TaskResult.AppendErr(errors.Wrapf(err, "task %s precheck failed", t.Name))
if ok, err := t.Prepare.PreCheck(runtime); err != nil {
t.TaskResult.AppendErr(err)
return false, err
} else if !ok {
return false, nil
@ -121,11 +130,11 @@ func (t *Task) When() (bool, error) {
}
}
func (t *Task) WhenWithRetry() bool {
func (t *Task) WhenWithRetry(runtime *config.Runtime) bool {
pass := false
timeout := true
for i := 0; i < t.Retry; i++ {
if res, err := t.When(); err != nil {
if res, err := t.When(runtime); err != nil {
time.Sleep(t.Delay)
continue
} else {
@ -141,23 +150,6 @@ func (t *Task) WhenWithRetry() bool {
return pass
}
func (t *Task) ExecuteWithTimer(wg *sync.WaitGroup, pool chan struct{}, resChan chan string) ending.Ending {
// generate a timer
go func(result chan string, pool chan struct{}) {
select {
case <-result:
case <-time.After(time.Minute * DefaultTimeout):
logger.Log.Fatalf("Execute task timeout, Timeout=%ds", DefaultTimeout)
}
wg.Done()
<-pool
}(resChan, pool)
res := t.Action.WrapResult(t.Action.Execute(t.Vars))
var e ending.Ending = res
return e
}
func (t *Task) ExecuteWithRetry(wg *sync.WaitGroup, pool chan struct{}, runtime *config.Runtime) {
resChan := make(chan string)
defer close(resChan)
@ -174,7 +166,7 @@ func (t *Task) ExecuteWithRetry(wg *sync.WaitGroup, pool chan struct{}, runtime
var end ending.Ending
for i := 0; i < t.Retry; i++ {
res := t.Action.WrapResult(t.Action.Execute(t.Vars))
res := t.Action.WrapResult(t.Action.Execute(runtime, t.Vars))
end = res
if end.GetErr() != nil {
logger.Log.Error(end.GetErr())
@ -188,7 +180,7 @@ func (t *Task) ExecuteWithRetry(wg *sync.WaitGroup, pool chan struct{}, runtime
if end != nil {
t.TaskResult.AppendEnding(end, runtime.Runner.Host.Name)
if end.GetErr() != nil {
t.TaskResult.AppendErr(errors.Wrapf(end.GetErr(), "task %s exec failed", t.Name))
t.TaskResult.AppendErr(end.GetErr())
}
} else {
t.TaskResult.ErrResult()
@ -203,7 +195,7 @@ func (t *Task) Default() {
}
if len(t.Hosts) < 1 {
t.Hosts = []kubekeyapiv1alpha1.HostCfg{}
t.Hosts = []*kubekeyapiv1alpha1.HostCfg{}
t.TaskResult.AppendErr(errors.New("the length of task hosts is 0"))
return
}

View File

@ -1,9 +1,8 @@
package modules
import (
cache2 "github.com/kubesphere/kubekey/pkg/core/cache"
config2 "github.com/kubesphere/kubekey/pkg/core/config"
logger2 "github.com/kubesphere/kubekey/pkg/core/logger"
"github.com/kubesphere/kubekey/pkg/core/cache"
"github.com/kubesphere/kubekey/pkg/core/config"
"github.com/pkg/errors"
)
@ -12,31 +11,29 @@ type BaseTaskModule struct {
Tasks []Task
}
func (t *BaseTaskModule) Default(runtime *config2.Runtime, rootCache *cache2.Cache) {
if t.Name == "" {
t.Name = DefaultTaskModuleName
func (b *BaseTaskModule) Default(runtime *config.Runtime, rootCache *cache.Cache, moduleCache *cache.Cache) {
if b.Name == "" {
b.Name = DefaultTaskModuleName
}
t.Runtime = runtime
t.RootCache = rootCache
t.Cache = cache2.NewCache()
b.Runtime = runtime
b.RootCache = rootCache
b.Cache = moduleCache
}
func (t *BaseTaskModule) Init() {
func (b *BaseTaskModule) Init() {
}
func (t *BaseTaskModule) Is() string {
func (b *BaseTaskModule) Is() string {
return TaskModuleType
}
func (t *BaseTaskModule) Run() error {
logger2.Log.SetModule(t.Name)
logger2.Log.Info("Begin Run")
for i := range t.Tasks {
task := t.Tasks[i]
task.Init(t.Runtime, t.Cache, t.RootCache)
func (b *BaseTaskModule) Run() error {
for i := range b.Tasks {
task := b.Tasks[i]
task.Init(b.Name, b.Runtime, b.Cache, b.RootCache)
if err := task.Execute(); err != nil {
return errors.Wrapf(err, "Module %s exec failed", t.Name)
return errors.Wrapf(err, "Module[%s] exec failed", b.Name)
}
}
return nil

View File

@ -3,16 +3,17 @@ package pipeline
import (
"github.com/kubesphere/kubekey/pkg/core/cache"
"github.com/kubesphere/kubekey/pkg/core/config"
"github.com/kubesphere/kubekey/pkg/core/logger"
"github.com/kubesphere/kubekey/pkg/core/modules"
"github.com/pkg/errors"
"sync"
)
type Pipeline struct {
Name string
Modules []modules.Module
Runtime *config.Runtime
PipelineCache *cache.Cache
Name string
Modules []modules.Module
Runtime *config.Runtime
PipelineCache *cache.Cache
ModuleCachePool sync.Pool
}
func (p *Pipeline) Init() {
@ -20,27 +21,45 @@ func (p *Pipeline) Init() {
}
func (p *Pipeline) Start() error {
logger.Log.SetPipeline(p.Name)
logger.Log.Info("Begin Run")
p.Init()
for i := range p.Modules {
m := p.Modules[i]
m.Default(p.Runtime, p.PipelineCache)
m.Init()
switch m.Is() {
case modules.TaskModuleType:
if err := m.Run(); err != nil {
return errors.Wrapf(err, "Pipeline %s exec failed", p.Name)
}
case modules.ServerModuleType:
go m.Run()
default:
if err := m.Run(); err != nil {
return errors.Wrapf(err, "Pipeline %s exec failed", p.Name)
}
if err := p.RunModule(m); err != nil {
return errors.Wrapf(err, "Pipeline[%s] exec failed", p.Name)
}
logger.Log.Info("Success")
logger.Log.Flush()
}
return nil
}
func (p *Pipeline) RunModule(m modules.Module) error {
moduleCache := p.newModuleCache()
defer p.releaseModuleCache(moduleCache)
m.Default(p.Runtime, p.PipelineCache, moduleCache)
m.Init()
switch m.Is() {
case modules.TaskModuleType:
if err := m.Run(); err != nil {
return err
}
case modules.ServerModuleType:
go m.Run()
default:
if err := m.Run(); err != nil {
return err
}
}
return nil
}
func (p *Pipeline) newModuleCache() *cache.Cache {
moduleCache, ok := p.ModuleCachePool.Get().(*cache.Cache)
if ok {
return moduleCache
}
return cache.NewCache()
}
func (p *Pipeline) releaseModuleCache(c *cache.Cache) {
c.Clean()
p.ModuleCachePool.Put(c)
}

View File

@ -6,32 +6,30 @@ import (
)
type BasePrepare struct {
Runtime *config.Runtime
Cache *cache.Cache
RootCache *cache.Cache
}
func (b *BasePrepare) Init(runtime *config.Runtime, cache *cache.Cache, rootCache *cache.Cache) {
b.Runtime = runtime
func (b *BasePrepare) Init(cache *cache.Cache, rootCache *cache.Cache) {
b.Cache = cache
b.RootCache = rootCache
}
func (b *BasePrepare) PreCheck() (bool, error) {
func (b *BasePrepare) PreCheck(runtime *config.Runtime) (bool, error) {
return true, nil
}
type PrepareCollection []Prepare
func (p *PrepareCollection) Init(runtime *config.Runtime, cache *cache.Cache, rootCache *cache.Cache) {
func (p *PrepareCollection) Init(cache *cache.Cache, rootCache *cache.Cache) {
for _, v := range *p {
v.Init(runtime, cache, rootCache)
v.Init(cache, rootCache)
}
}
func (p *PrepareCollection) PreCheck() (bool, error) {
func (p *PrepareCollection) PreCheck(runtime *config.Runtime) (bool, error) {
for _, v := range *p {
res, err := v.PreCheck()
res, err := v.PreCheck(runtime)
if err != nil {
return false, err
}

View File

@ -1,10 +1,12 @@
package prepare
import "github.com/kubesphere/kubekey/pkg/core/config"
type FastPrepare struct {
BasePrepare
Inject func() (bool, error)
}
func (b *FastPrepare) PreCheck() (bool, error) {
func (b *FastPrepare) PreCheck(runtime *config.Runtime) (bool, error) {
return b.Inject()
}

View File

@ -6,6 +6,6 @@ import (
)
type Prepare interface {
PreCheck() (bool, error)
Init(mgr *config.Runtime, cache *cache.Cache, rootCache *cache.Cache)
PreCheck(runtime *config.Runtime) (bool, error)
Init(cache *cache.Cache, rootCache *cache.Cache)
}

View File

@ -1,5 +1,7 @@
package prepare
import "github.com/kubesphere/kubekey/pkg/core/config"
// Condition struct is a Default implementation.
type Condition struct {
BasePrepare
@ -17,8 +19,8 @@ type OnlyFirstMaster struct {
BasePrepare
}
func (o *OnlyFirstMaster) PreCheck() (bool, error) {
if o.Runtime.Runner.Host.IsMaster && o.Runtime.Runner.Host.Name == o.Runtime.MasterNodes[0].Name {
func (o *OnlyFirstMaster) PreCheck(runtime *config.Runtime) (bool, error) {
if runtime.Runner.Host.IsMaster && runtime.Runner.Host.Name == runtime.MasterNodes[0].Name {
return true, nil
}
return false, nil
@ -28,8 +30,8 @@ type OnlyWorker struct {
BasePrepare
}
func (o *OnlyWorker) PreCheck() (bool, error) {
if o.Runtime.Runner.Host.IsWorker && !o.Runtime.Runner.Host.IsMaster {
func (o *OnlyWorker) PreCheck(runtime *config.Runtime) (bool, error) {
if runtime.Runner.Host.IsWorker && !runtime.Runner.Host.IsMaster {
return true, nil
}
return false, nil
@ -39,8 +41,8 @@ type OnlyK3s struct {
BasePrepare
}
func (o *OnlyK3s) PreCheck() (bool, error) {
if o.Runtime.Cluster.Kubernetes.Type == "k3s" {
func (o *OnlyK3s) PreCheck(runtime *config.Runtime) (bool, error) {
if runtime.Cluster.Kubernetes.Type == "k3s" {
return true, nil
}
return false, nil
@ -50,8 +52,8 @@ type OnlyKubernetes struct {
BasePrepare
}
func (o *OnlyKubernetes) PreCheck() (bool, error) {
if o.Runtime.Cluster.Kubernetes.Type != "k3s" {
func (o *OnlyKubernetes) PreCheck(runtime *config.Runtime) (bool, error) {
if runtime.Cluster.Kubernetes.Type != "k3s" {
return true, nil
}
return false, nil

View File

@ -4,9 +4,12 @@ import (
"errors"
"fmt"
kubekeyapiv1alpha1 "github.com/kubesphere/kubekey/apis/kubekey/v1alpha1"
"github.com/kubesphere/kubekey/pkg/core/common"
"github.com/kubesphere/kubekey/pkg/core/connector"
"github.com/kubesphere/kubekey/pkg/core/connector/ssh"
"github.com/kubesphere/kubekey/pkg/core/logger"
"os"
"path/filepath"
)
type Runner struct {
@ -99,6 +102,25 @@ func (r *Runner) Scp(local, remote string) error {
return nil
}
func (r *Runner) SudoScp(local, remote string) error {
if r.Conn == nil {
return errors.New("no ssh connection available")
}
// scp to tmp dir
remoteTmp := filepath.Join(common.TmpDir, remote)
if err := r.Scp(local, remoteTmp); err != nil {
return err
}
if _, err := r.SudoCmd(fmt.Sprintf(common.CopyCmd, remoteTmp, remote), false); err != nil {
return err
}
// todo: whether need to remove the tmp file
return nil
}
func (r *Runner) FileExist(remote string) (bool, error) {
if r.Conn == nil {
return false, errors.New("no ssh connection available")
@ -160,7 +182,3 @@ func (r *Runner) FileMd5(path string) (string, error) {
}
return out, nil
}
func SudoPrefix(cmd string) string {
return fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", cmd)
}

View File

@ -0,0 +1,12 @@
package binaries
import "github.com/kubesphere/kubekey/pkg/core/modules"
type NodeBinariesModule struct {
modules.BaseTaskModule
}
func (n *NodeBinariesModule) Init() {
n.Name = "NodeBinariesModule"
}

View File

@ -0,0 +1,7 @@
package binaries
import "github.com/kubesphere/kubekey/pkg/core/action"
type NodeDownloadBinaries struct {
action.BaseAction
}

View File

@ -9,14 +9,14 @@ import (
func NewCreateClusterPipeline(runtime *config.Runtime) error {
modules := []modules.Module{
&initialization.InitializationModule{},
m := []modules.Module{
&initialization.NodeInitializationModule{},
&initialization.ConfirmModule{},
}
p := pipeline.Pipeline{
Name: "CreateClusterPipeline",
Modules: modules,
Modules: m,
Runtime: runtime,
}
if err := p.Start(); err != nil {

View File

@ -3,6 +3,7 @@ package initialization
import (
"bufio"
"fmt"
"github.com/kubesphere/kubekey/pkg/core/common"
"github.com/kubesphere/kubekey/pkg/core/logger"
"github.com/kubesphere/kubekey/pkg/core/modules"
"github.com/kubesphere/kubekey/pkg/core/prepare"
@ -13,20 +14,21 @@ import (
"strings"
)
type InitializationModule struct {
type NodeInitializationModule struct {
modules.BaseTaskModule
}
func (i *InitializationModule) Init() {
func (i *NodeInitializationModule) Init() {
i.Name = "NodeInitializationModule"
preCheck := modules.Task{
Name: "NodePreCheck",
Desc: "a pre-check on nodes",
Hosts: i.Runtime.AllNodes,
Prepare: &prepare.FastPrepare{
Inject: func() (bool, error) {
if len(i.Runtime.EtcdNodes)%2 == 0 {
logger.Log.Warnln("The number of etcd is even. Please configure it to be odd.")
logger.Log.Error("The number of etcd is even. Please configure it to be odd.")
return false, errors.New("the number of etcd is even")
}
return true, nil
@ -63,11 +65,10 @@ type ConfirmModule struct {
func (c *ConfirmModule) Init() {
c.Name = "ConfirmModule"
logger.Log.SetModule(c.Name)
}
func (c *ConfirmModule) Run() error {
logger.Log.Info("Begin Run")
logger.Log.WithField(common.Module, c.Name).Infoln()
var (
results []PreCheckResults
stopFlag bool
@ -78,7 +79,7 @@ func (c *ConfirmModule) Run() error {
return errors.New("get node check result failed")
}
pre := nodePreChecks.(map[string]interface{})
pre := nodePreChecks.(map[string]map[string]string)
for node := range pre {
var result PreCheckResults
_ = mapstructure.Decode(pre[node], &result)

View File

@ -3,6 +3,7 @@ package initialization
import (
"fmt"
"github.com/kubesphere/kubekey/pkg/core/action"
"github.com/kubesphere/kubekey/pkg/core/config"
"github.com/kubesphere/kubekey/pkg/core/logger"
"github.com/kubesphere/kubekey/pkg/core/vars"
"strings"
@ -12,11 +13,11 @@ type NodePreCheck struct {
action.BaseAction
}
func (n *NodePreCheck) Execute(vars vars.Vars) error {
func (n *NodePreCheck) Execute(runtime *config.Runtime, vars vars.Vars) error {
var results = make(map[string]string)
results["name"] = n.Runtime.Runner.Host.Name
results["name"] = runtime.Runner.Host.Name
for _, software := range baseSoftware {
_, err := n.Runtime.Runner.SudoCmd(fmt.Sprintf("which %s", software), false)
_, err := runtime.Runner.SudoCmd(fmt.Sprintf("which %s", software), false)
switch software {
case showmount:
software = nfs
@ -27,11 +28,11 @@ func (n *NodePreCheck) Execute(vars vars.Vars) error {
}
if err != nil {
results[software] = ""
logger.Log.Warnf("exec cmd 'which %s' got err return: %v", software, err)
logger.Log.Debugf("exec cmd 'which %s' got err return: %v", software, err)
} else {
results[software] = "y"
if software == docker {
dockerVersion, err := n.Runtime.Runner.SudoCmd("docker version --format '{{.Server.Version}}'", false)
dockerVersion, err := runtime.Runner.SudoCmd("docker version --format '{{.Server.Version}}'", false)
if err != nil {
results[software] = UnknownVersion
} else {
@ -41,20 +42,20 @@ func (n *NodePreCheck) Execute(vars vars.Vars) error {
}
}
output, err := n.Runtime.Runner.Cmd("date +\"%Z %H:%M:%S\"", false)
output, err := runtime.Runner.Cmd("date +\"%Z %H:%M:%S\"", false)
if err != nil {
results["time"] = ""
} else {
results["time"] = strings.TrimSpace(output)
}
checkResults := make(map[string]interface{})
checkResults[n.Runtime.Runner.Host.Name] = results
if res, ok := n.RootCache.GetOrSet("nodePreCheck", checkResults); ok {
m := res.(map[string]interface{})
m[n.Runtime.Runner.Host.Name] = results
if res, ok := n.RootCache.Get("nodePreCheck"); ok {
m := res.(map[string]map[string]string)
m[runtime.Runner.Host.Name] = results
n.RootCache.Set("nodePreCheck", m)
} else {
checkResults := make(map[string]map[string]string)
checkResults[runtime.Runner.Host.Name] = results
n.RootCache.Set("nodePreCheck", checkResults)
}
return nil

View File

@ -124,7 +124,7 @@ func (h *HaproxyModule) Init() {
// updateKubeproxyConfig is used to update kube-proxy configmap and restart tge kube-proxy pod.
updateKubeproxyConfig := modules.Task{
Name: "UpdateKubeproxyConfig",
Hosts: []kubekeyapiv1alpha1.HostCfg{h.Runtime.MasterNodes[0]},
Hosts: []*kubekeyapiv1alpha1.HostCfg{h.Runtime.MasterNodes[0]},
Prepare: &prepare.PrepareCollection{
new(prepare.OnlyKubernetes),
new(prepare.OnlyFirstMaster),

View File

@ -2,10 +2,11 @@ package loadbalancer
import (
"fmt"
action2 "github.com/kubesphere/kubekey/pkg/core/action"
logger2 "github.com/kubesphere/kubekey/pkg/core/logger"
prepare2 "github.com/kubesphere/kubekey/pkg/core/prepare"
vars2 "github.com/kubesphere/kubekey/pkg/core/vars"
"github.com/kubesphere/kubekey/pkg/core/action"
"github.com/kubesphere/kubekey/pkg/core/config"
"github.com/kubesphere/kubekey/pkg/core/logger"
"github.com/kubesphere/kubekey/pkg/core/prepare"
"github.com/kubesphere/kubekey/pkg/core/vars"
"github.com/pkg/errors"
"os"
"strconv"
@ -13,25 +14,25 @@ import (
)
type haproxyPreparatoryWork struct {
action2.BaseAction
action.BaseAction
}
func (h *haproxyPreparatoryWork) Execute(vars vars2.Vars) error {
if err := h.Runtime.Runner.MkDir("/etc/kubekey/haproxy"); err != nil {
func (h *haproxyPreparatoryWork) Execute(runtime *config.Runtime, vars vars.Vars) error {
if err := runtime.Runner.MkDir("/etc/kubekey/haproxy"); err != nil {
return err
}
if err := h.Runtime.Runner.Chmod("/etc/kubekey/haproxy", os.FileMode(0777)); err != nil {
if err := runtime.Runner.Chmod("/etc/kubekey/haproxy", os.FileMode(0777)); err != nil {
return err
}
return nil
}
type getChecksum struct {
action2.BaseAction
action.BaseAction
}
func (g *getChecksum) Execute(vars vars2.Vars) error {
md5Str, err := g.Runtime.Runner.FileMd5("/etc/kubekey/haproxy/haproxy.cfg")
func (g *getChecksum) Execute(runtime *config.Runtime, vars vars.Vars) error {
md5Str, err := runtime.Runner.FileMd5("/etc/kubekey/haproxy/haproxy.cfg")
if err != nil {
return err
}
@ -40,21 +41,21 @@ func (g *getChecksum) Execute(vars vars2.Vars) error {
}
type updateK3sPrepare struct {
prepare2.BasePrepare
prepare.BasePrepare
}
func (u *updateK3sPrepare) PreCheck() (bool, error) {
exist, err := u.Runtime.Runner.FileExist("/etc/systemd/system/k3s.service")
func (u *updateK3sPrepare) PreCheck(runtime *config.Runtime) (bool, error) {
exist, err := runtime.Runner.FileExist("/etc/systemd/system/k3s.service")
if err != nil {
return false, err
}
if exist {
if out, err := u.Runtime.Runner.SudoCmd("sed -n '/--server=.*/p' /etc/systemd/system/k3s.service", false); err != nil {
if out, err := runtime.Runner.SudoCmd("sed -n '/--server=.*/p' /etc/systemd/system/k3s.service", false); err != nil {
return false, err
} else {
if strings.Contains(strings.TrimSpace(out), LocalServer) {
logger2.Log.Debugf("do not restart kubelet, /etc/systemd/system/k3s.service content is %s", out)
logger.Log.Debugf("do not restart kubelet, /etc/systemd/system/k3s.service content is %s", out)
return false, nil
}
}
@ -65,35 +66,35 @@ func (u *updateK3sPrepare) PreCheck() (bool, error) {
}
type updateK3s struct {
action2.BaseAction
action.BaseAction
}
func (u *updateK3s) Execute(vars vars2.Vars) error {
if _, err := u.Runtime.Runner.SudoCmd("sed -i 's#--server=.*\"#--server=https://127.0.0.1:%s\"#g' /etc/systemd/system/k3s.service", false); err != nil {
func (u *updateK3s) Execute(runtime *config.Runtime, vars vars.Vars) error {
if _, err := runtime.Runner.SudoCmd("sed -i 's#--server=.*\"#--server=https://127.0.0.1:%s\"#g' /etc/systemd/system/k3s.service", false); err != nil {
return err
}
if _, err := u.Runtime.Runner.SudoCmd("systemctl restart k3s", false); err != nil {
if _, err := runtime.Runner.SudoCmd("systemctl restart k3s", false); err != nil {
return err
}
return nil
}
type updateKubeletPrepare struct {
prepare2.BasePrepare
prepare.BasePrepare
}
func (u *updateKubeletPrepare) PreCheck() (bool, error) {
exist, err := u.Runtime.Runner.FileExist("/etc/kubernetes/kubelet.conf")
func (u *updateKubeletPrepare) PreCheck(runtime *config.Runtime) (bool, error) {
exist, err := runtime.Runner.FileExist("/etc/kubernetes/kubelet.conf")
if err != nil {
return false, err
}
if exist {
if out, err := u.Runtime.Runner.SudoCmd("sed -n '/server:.*/p' /etc/kubernetes/kubelet.conf", true); err != nil {
if out, err := runtime.Runner.SudoCmd("sed -n '/server:.*/p' /etc/kubernetes/kubelet.conf", true); err != nil {
return false, err
} else {
if strings.Contains(strings.TrimSpace(out), LocalServer) {
logger2.Log.Debugf("do not restart kubelet, /etc/kubernetes/kubelet.conf content is %s", out)
logger.Log.Debugf("do not restart kubelet, /etc/kubernetes/kubelet.conf content is %s", out)
return false, nil
}
}
@ -104,33 +105,33 @@ func (u *updateKubeletPrepare) PreCheck() (bool, error) {
}
type updateKubelet struct {
action2.BaseAction
action.BaseAction
}
func (u *updateKubelet) Execute(vars vars2.Vars) error {
if _, err := u.Runtime.Runner.SudoCmd(fmt.Sprintf(
func (u *updateKubelet) Execute(runtime *config.Runtime, vars vars.Vars) error {
if _, err := runtime.Runner.SudoCmd(fmt.Sprintf(
"sed -i 's#server:.*#server: https://127.0.0.1:%s#g' /etc/kubernetes/kubelet.conf",
strconv.Itoa(u.Runtime.Cluster.ControlPlaneEndpoint.Port)), false); err != nil {
strconv.Itoa(runtime.Cluster.ControlPlaneEndpoint.Port)), false); err != nil {
return err
}
if _, err := u.Runtime.Runner.SudoCmd("systemctl daemon-reload && systemctl restart kubelet", false); err != nil {
if _, err := runtime.Runner.SudoCmd("systemctl daemon-reload && systemctl restart kubelet", false); err != nil {
return err
}
return nil
}
type updateKubeproxyPrapre struct {
prepare2.BasePrepare
prepare.BasePrepare
}
func (u *updateKubeproxyPrapre) PreCheck() (bool, error) {
if out, err := u.Runtime.Runner.SudoCmd(
func (u *updateKubeproxyPrapre) PreCheck(runtime *config.Runtime) (bool, error) {
if out, err := runtime.Runner.SudoCmd(
"set -o pipefail && /usr/local/bin/kubectl --kubeconfig /etc/kubernetes/admin.conf get configmap kube-proxy -n kube-system -o yaml "+
"| sed -n '/server:.*/p'", false); err != nil {
return false, err
} else {
if strings.Contains(strings.TrimSpace(out), LocalServer) {
logger2.Log.Debugf("do not restart kube-proxy, configmap kube-proxy content is %s", out)
logger.Log.Debugf("do not restart kube-proxy, configmap kube-proxy content is %s", out)
return false, nil
}
}
@ -138,28 +139,28 @@ func (u *updateKubeproxyPrapre) PreCheck() (bool, error) {
}
type updateKubeproxy struct {
action2.BaseAction
action.BaseAction
}
func (u *updateKubeproxy) Execute(vars vars2.Vars) error {
if _, err := u.Runtime.Runner.SudoCmd("set -o pipefail "+
func (u *updateKubeproxy) Execute(runtime *config.Runtime, vars vars.Vars) error {
if _, err := runtime.Runner.SudoCmd("set -o pipefail "+
"&& /usr/local/bin/kubectl --kubeconfig /etc/kubernetes/admin.conf get configmap kube-proxy -n kube-system -o yaml "+
"| sed 's#server:.*#server: https://127.0.0.1:%s#g' "+
"| /usr/local/bin/kubectl --kubeconfig /etc/kubernetes/admin.conf replace -f -", false); err != nil {
return err
}
if _, err := u.Runtime.Runner.SudoCmd("/usr/local/bin/kubectl --kubeconfig /etc/kubernetes/admin.conf delete pod -n kube-system -l k8s-app=kube-proxy --force --grace-period=0", false); err != nil {
if _, err := runtime.Runner.SudoCmd("/usr/local/bin/kubectl --kubeconfig /etc/kubernetes/admin.conf delete pod -n kube-system -l k8s-app=kube-proxy --force --grace-period=0", false); err != nil {
return err
}
return nil
}
type updateHosts struct {
action2.BaseAction
action.BaseAction
}
func (u *updateHosts) Execute(vars vars2.Vars) error {
if _, err := u.Runtime.Runner.SudoCmd("sed -i 's#.* %s#127.0.0.1 %s#g' /etc/hosts", false); err != nil {
func (u *updateHosts) Execute(runtime *config.Runtime, vars vars.Vars) error {
if _, err := runtime.Runner.SudoCmd("sed -i 's#.* %s#127.0.0.1 %s#g' /etc/hosts", false); err != nil {
return err
}
return nil