feat: Change the klog print format. Remove custom-defined cache instances and use Kubernetes cache instead.

Signed-off-by: joyceliu <joyceliu@yunify.com>
This commit is contained in:
joyceliu 2024-01-15 19:23:31 +08:00
parent 2a676185e2
commit 5205c4cbdd
34 changed files with 326 additions and 507 deletions

View File

@ -10,14 +10,6 @@ app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "common.selectorLabels" -}}
app.kubernetes.io/name: {{ .Chart.Name }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}

2
go.mod
View File

@ -3,7 +3,6 @@ module github.com/kubesphere/kubekey/v4
go 1.20
require (
github.com/evanphx/json-patch v5.7.0+incompatible
github.com/flosch/pongo2/v6 v6.0.0
github.com/go-git/go-git/v5 v5.11.0
github.com/google/gops v0.3.28
@ -37,6 +36,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.1 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/evanphx/json-patch v5.7.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.7.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect

19
go.sum
View File

@ -24,14 +24,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/emicklei/go-restful/v3 v3.11.1 h1:S+9bSbua1z3FgCnV0KKOSSZ3mDthb5NyEPL5gEpCvyk=
github.com/emicklei/go-restful/v3 v3.11.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v5.7.0+incompatible h1:vgGkfT/9f8zE6tvSCe74nfpAVDQ2tG6yudJd8LBksgI=
github.com/evanphx/json-patch v5.7.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.7.0 h1:nJqP7uwL84RJInrohHfW0Fx3awjbm8qZeFv0nW9SYGc=
@ -48,7 +44,6 @@ github.com/go-git/go-billy/v5 v5.5.0/go.mod h1:hmexnoNsr2SJU1Ju67OaNz5ASJY3+sHgF
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMje31YglSBqCdIqdhKBW8lokaMrL3uTkpGYlE2OOT4=
github.com/go-git/go-git/v5 v5.11.0 h1:XIZc1p+8YzypNr34itUfSvYJcv+eYdTnTvOZ2vD3cA4=
github.com/go-git/go-git/v5 v5.11.0/go.mod h1:6GFcX2P3NM7FPBfpePbpLd21XxsgdAt+lKqXmCUiUCY=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@ -88,13 +83,10 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@ -106,10 +98,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -119,7 +109,6 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg=
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
@ -151,7 +140,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
@ -177,8 +165,6 @@ golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2Uz
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b h1:kLiC65FbiHWFAOu+lxwNPujcsl8VYyTYYEZnsOO1WK4=
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc h1:ao2WRsKSzW6KuUY9IWPwWahcHCgR0s52IfwutMfEbdM=
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@ -269,7 +255,6 @@ google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=
@ -293,12 +278,8 @@ k8s.io/component-base v0.29.0 h1:T7rjd5wvLnPBV1vC4zWd/iWRbV8Mdxs+nGaoaFzGw3s=
k8s.io/component-base v0.29.0/go.mod h1:sADonFTQ9Zc9yFLghpDpmNXEdHyQmFIGbiuZbqAXQ1M=
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo=
k8s.io/kube-openapi v0.0.0-20231214164306-ab13479f8bf8 h1:yHNkNuLjht7iq95pO9QmbjOWCguvn8mDe3lT78nqPkw=
k8s.io/kube-openapi v0.0.0-20231214164306-ab13479f8bf8/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA=
k8s.io/kube-openapi v0.0.0-20240103195357-a9f8850cb432 h1:+XYBQU3ZKUu60H6fEnkitTTabGoKfIG8zczhZBENu9o=
k8s.io/kube-openapi v0.0.0-20240103195357-a9f8850cb432/go.mod h1:Pa1PvrP7ACSkuX6I7KYomY6cmMA0Tx86waBhDUgoKPw=
k8s.io/utils v0.0.0-20231127182322-b307cd553661 h1:FepOBzJ0GXm8t0su67ln2wAZjbQ6RxQGZDnzuLcrUTI=
k8s.io/utils v0.0.0-20231127182322-b307cd553661/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4=

89
pkg/cache/cache.go vendored
View File

@ -1,89 +0,0 @@
/*
Copyright 2023 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"sync"
)
// Cache is the interface for cache.
type Cache interface {
// Name of pool
Name() string
// Put the cached value for the given key.
Put(key string, value any)
// Get the cached value for the given key.
Get(key string) (any, bool)
// Release the cached value for the given id.
Release(id string)
// Clean all cached value
Clean()
}
type local struct {
name string
cache map[string]any
sync.Mutex
}
func (p *local) Name() string {
return p.name
}
func (p *local) Put(key string, value any) {
p.Lock()
defer p.Unlock()
p.cache[key] = value
}
func (p *local) Get(key string) (any, bool) {
v, ok := p.cache[key]
if ok {
return v, ok
}
return v, false
}
func (p *local) Release(id string) {
p.Lock()
defer p.Unlock()
delete(p.cache, id)
}
func (p *local) Clean() {
p.Lock()
defer p.Unlock()
for id := range p.cache {
delete(p.cache, id)
}
}
// NewLocalCache return a local cache
func NewLocalCache(name string) Cache {
return &local{
name: name,
cache: make(map[string]any),
}
}
var (
// LocalVariable is a local cache for variable.Variable
LocalVariable = NewLocalCache("variable")
)

View File

@ -1,31 +0,0 @@
package cache
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCache(t *testing.T) {
testCache := NewLocalCache("test")
assert.Equal(t, "test", testCache.Name())
// should not be able to get the key
_, ok := testCache.Get("foo")
assert.False(t, ok)
// put a key
testCache.Put("foo", "bar")
// should be able to get the key
v, ok := testCache.Get("foo")
assert.True(t, ok)
assert.Equal(t, "bar", v)
// release the key
testCache.Release("foo")
// should not be able to get the key
_, ok = testCache.Get("foo")
assert.False(t, ok)
}

View File

@ -41,20 +41,19 @@ func (c *localConnector) Close(ctx context.Context) error {
func (c *localConnector) CopyFile(ctx context.Context, local []byte, remoteFile string, mode fs.FileMode) error {
// create remote file
if _, err := os.Stat(filepath.Dir(remoteFile)); err != nil {
klog.Warningf("Failed to stat dir %s: %v create it", filepath.Dir(remoteFile), err)
if _, err := os.Stat(filepath.Dir(remoteFile)); err != nil && os.IsNotExist(err) {
if err := os.MkdirAll(filepath.Dir(remoteFile), mode); err != nil {
klog.Errorf("Failed to create dir %s: %v", filepath.Dir(remoteFile), err)
klog.ErrorS(err, "Failed to create remote dir", "remote_file", remoteFile)
return err
}
}
rf, err := os.Create(remoteFile)
if err != nil {
klog.Errorf("Failed to create file %s: %v", remoteFile, err)
klog.ErrorS(err, "Failed to create remote file", "remote_file", remoteFile)
return err
}
if _, err := rf.Write(local); err != nil {
klog.Errorf("Failed to write file %s: %v", remoteFile, err)
klog.ErrorS(err, "Failed to write content to remote file", "remote_file", remoteFile)
return err
}
return rf.Chmod(mode)
@ -64,11 +63,11 @@ func (c *localConnector) FetchFile(ctx context.Context, remoteFile string, local
var err error
file, err := os.Open(remoteFile)
if err != nil {
klog.Errorf("Failed to read file %s: %v", remoteFile, err)
klog.ErrorS(err, "Failed to read remote file failed", "remote_file", remoteFile)
return err
}
if _, err := io.Copy(local, file); err != nil {
klog.Errorf("Failed to copy file %s: %v", remoteFile, err)
klog.ErrorS(err, "Failed to copy remote file to local", "remote_file", remoteFile)
return err
}
return nil
@ -77,24 +76,3 @@ func (c *localConnector) FetchFile(ctx context.Context, remoteFile string, local
func (c *localConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) {
return c.Cmd.CommandContext(ctx, cmd).CombinedOutput()
}
func (c *localConnector) copyFile(sourcePath, destinationPath string) error {
sourceFile, err := os.Open(sourcePath)
if err != nil {
return err
}
defer sourceFile.Close()
destinationFile, err := os.Create(destinationPath)
if err != nil {
return err
}
defer destinationFile.Close()
_, err = io.Copy(destinationFile, sourceFile)
if err != nil {
return err
}
return nil
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
@ -57,6 +58,7 @@ func (c *sshConnector) Init(ctx context.Context) error {
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
})
if err != nil {
klog.ErrorS(err, "Dial ssh server failed", "host", c.Host, "port", *c.Port)
return err
}
c.client = sshClient
@ -72,27 +74,26 @@ func (c *sshConnector) CopyFile(ctx context.Context, src []byte, remoteFile stri
// create sftp client
sftpClient, err := sftp.NewClient(c.client)
if err != nil {
klog.Errorf("Failed to create sftp client: %v", err)
klog.ErrorS(err, "Failed to create sftp client")
return err
}
defer sftpClient.Close()
// create remote file
if _, err := sftpClient.Stat(filepath.Dir(remoteFile)); err != nil {
klog.Warningf("Failed to stat dir %s: %v create it", filepath.Dir(remoteFile), err)
if _, err := sftpClient.Stat(filepath.Dir(remoteFile)); err != nil && os.IsNotExist(err) {
if err := sftpClient.MkdirAll(filepath.Dir(remoteFile)); err != nil {
klog.Errorf("Failed to create dir %s: %v", filepath.Dir(remoteFile), err)
klog.ErrorS(err, "Failed to create remote dir", "remote_file", remoteFile)
return err
}
}
rf, err := sftpClient.Create(remoteFile)
if err != nil {
klog.Errorf("Failed to create file %s: %v", remoteFile, err)
klog.ErrorS(err, "Failed to create remote file", "remote_file", remoteFile)
return err
}
defer rf.Close()
if _, err = rf.Write(src); err != nil {
klog.Errorf("Failed to write file %s: %v", remoteFile, err)
klog.ErrorS(err, "Failed to write content to remote file", "remote_file", remoteFile)
return err
}
return rf.Chmod(mode)
@ -102,18 +103,18 @@ func (c *sshConnector) FetchFile(ctx context.Context, remoteFile string, local i
// create sftp client
sftpClient, err := sftp.NewClient(c.client)
if err != nil {
klog.Errorf("Failed to create sftp client: %v", err)
klog.ErrorS(err, "Failed to create sftp client", "remote_file", remoteFile)
return err
}
defer sftpClient.Close()
rf, err := sftpClient.Open(remoteFile)
if err != nil {
klog.Errorf("Failed to open file %s: %v", remoteFile, err)
klog.ErrorS(err, "Failed to open file", "remote_file", remoteFile)
return err
}
defer rf.Close()
if _, err := io.Copy(local, rf); err != nil {
klog.Errorf("Failed to copy file %s: %v", remoteFile, err)
klog.ErrorS(err, "Failed to copy file", "remote_file", remoteFile)
return err
}
return nil
@ -123,6 +124,7 @@ func (c *sshConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte,
// create ssh session
session, err := c.client.NewSession()
if err != nil {
klog.ErrorS(err, "Failed to create ssh session")
return nil, err
}
defer session.Close()

View File

@ -66,7 +66,7 @@ func RuntimeDirFromObject(obj runtime.Object) string {
mo, ok := obj.(metav1.Object)
if !ok {
klog.Errorf("failed convert to metav1.Object: %s", obj.GetObjectKind().GroupVersionKind().String())
klog.Errorf("Failed convert to metav1.Object: %s", obj.GetObjectKind().GroupVersionKind().String())
return ""
}
return filepath.Join(workDir, RuntimeDir, mo.GetNamespace(), resource, mo.GetName())

View File

@ -41,23 +41,21 @@ type PipelineReconciler struct {
}
func (r PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.Infof("[Pipeline %s] begin reconcile", req.NamespacedName.String())
defer func() {
klog.Infof("[Pipeline %s] end reconcile", req.NamespacedName.String())
}()
klog.V(5).InfoS("start pipeline reconcile", "pipeline", req.String())
defer klog.V(5).InfoS("finish pipeline reconcile", "pipeline", req.String())
// get pipeline
pipeline := &kubekeyv1.Pipeline{}
err := r.Client.Get(ctx, req.NamespacedName, pipeline)
if err != nil {
if errors.IsNotFound(err) {
klog.V(5).Infof("[Pipeline %s] pipeline not found", req.NamespacedName.String())
klog.V(5).InfoS("pipeline not found", "pipeline", req.String())
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
if pipeline.DeletionTimestamp != nil {
klog.V(5).Infof("[Pipeline %s] pipeline is deleting", req.NamespacedName.String())
klog.V(5).InfoS("pipeline is deleting", "pipeline", req.String())
return ctrl.Result{}, nil
}
@ -66,14 +64,14 @@ func (r PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
excepted := pipeline.DeepCopy()
pipeline.Status.Phase = kubekeyv1.PipelinePhasePending
if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(excepted)); err != nil {
klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err)
klog.ErrorS(err, "update pipeline error", "pipeline", req.String())
return ctrl.Result{}, err
}
case kubekeyv1.PipelinePhasePending:
excepted := pipeline.DeepCopy()
pipeline.Status.Phase = kubekeyv1.PipelinePhaseRunning
if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(excepted)); err != nil {
klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err)
klog.ErrorS(err, "update pipeline error", "pipeline", req.String())
return ctrl.Result{}, err
}
case kubekeyv1.PipelinePhaseRunning:
@ -89,7 +87,7 @@ func (r PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline *kubekeyv1.Pipeline) (ctrl.Result, error) {
if _, ok := pipeline.Annotations[kubekeyv1.PauseAnnotation]; ok {
// if pipeline is paused, do nothing
klog.V(5).Infof("[Pipeline %s] pipeline is paused", ctrlclient.ObjectKeyFromObject(pipeline))
klog.V(5).InfoS("pipeline is paused", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return ctrl.Result{}, nil
}
@ -97,14 +95,14 @@ func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline *
defer func() {
// update pipeline status
if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(cp)); err != nil {
klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err)
klog.ErrorS(err, "update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
}
}()
if err := r.TaskController.AddTasks(ctx, task.AddTaskOptions{
Pipeline: pipeline,
}); err != nil {
klog.Errorf("[Pipeline %s] add task error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err)
klog.ErrorS(err, "add task error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed
pipeline.Status.Reason = fmt.Sprintf("add task to controller failed: %v", err)
return ctrl.Result{}, err
@ -116,10 +114,10 @@ func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline *
// clean runtime directory
func (r *PipelineReconciler) clean(ctx context.Context, pipeline *kubekeyv1.Pipeline) {
if !pipeline.Spec.Debug && pipeline.Status.Phase == kubekeyv1.PipelinePhaseSucceed {
klog.Infof("[Pipeline %s] clean runtimeDir", ctrlclient.ObjectKeyFromObject(pipeline))
klog.V(5).InfoS("clean runtimeDir", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
// clean runtime directory
if err := os.RemoveAll(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir)); err != nil {
klog.Errorf("clean runtime directory %s error: %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), err)
klog.ErrorS(err, "clean runtime directory error", "runtime dir", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
}
}
}
@ -127,7 +125,7 @@ func (r *PipelineReconciler) clean(ctx context.Context, pipeline *kubekeyv1.Pipe
// SetupWithManager sets up the controller with the Manager.
func (r *PipelineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options Options) error {
if !options.IsControllerEnabled("pipeline") {
klog.V(5).Infof("pipeline controller is disabled")
klog.V(5).InfoS("controller is disabled", "controller", "pipeline")
return nil
}

View File

@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
cgcache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"
ctrl "sigs.k8s.io/controller-runtime"
@ -31,7 +32,6 @@ import (
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
"github.com/kubesphere/kubekey/v4/pkg/cache"
"github.com/kubesphere/kubekey/v4/pkg/converter"
"github.com/kubesphere/kubekey/v4/pkg/converter/tmpl"
"github.com/kubesphere/kubekey/v4/pkg/modules"
@ -42,7 +42,7 @@ type TaskReconciler struct {
// Client to resources
ctrlclient.Client
// VariableCache to store variable
VariableCache cache.Cache
VariableCache cgcache.Store
}
type taskReconcileOptions struct {
@ -52,18 +52,18 @@ type taskReconcileOptions struct {
}
func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
klog.V(5).Infof("[Task %s] start reconcile", request.String())
defer klog.V(5).Infof("[Task %s] finish reconcile", request.String())
klog.V(5).InfoS("start task reconcile", "task", request.String())
defer klog.V(5).InfoS("finish task reconcile", "task", request.String())
// get task
var task = &kubekeyv1alpha1.Task{}
if err := r.Client.Get(ctx, request.NamespacedName, task); err != nil {
klog.Errorf("get task %s error %v", request, err)
klog.ErrorS(err, "get task error", "task", request.String())
return ctrl.Result{}, nil
}
// if task is deleted, skip
if task.DeletionTimestamp != nil {
klog.V(5).Infof("[Task %s] task is deleted, skip", request.String())
klog.V(5).InfoS("task is deleted, skip", "task", request.String())
return ctrl.Result{}, nil
}
@ -72,11 +72,11 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c
for _, ref := range task.OwnerReferences {
if ref.Kind == "Pipeline" {
if err := r.Client.Get(ctx, types.NamespacedName{Namespace: task.Namespace, Name: ref.Name}, pipeline); err != nil {
klog.Errorf("[Task %s] get pipeline %s error %v", request.String(), types.NamespacedName{Namespace: task.Namespace, Name: ref.Name}.String(), err)
if errors.IsNotFound(err) {
klog.V(4).Infof("[Task %s] pipeline is deleted, skip", request.String())
klog.V(5).InfoS("pipeline is deleted, skip", "task", request.String())
return ctrl.Result{}, nil
}
klog.ErrorS(err, "get pipeline error", "task", request.String(), "pipeline", types.NamespacedName{Namespace: task.Namespace, Name: ref.Name}.String())
return ctrl.Result{}, err
}
break
@ -84,33 +84,41 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c
}
if _, ok := pipeline.Annotations[kubekeyv1.PauseAnnotation]; ok {
klog.V(5).Infof("[Task %s] pipeline is paused, skip", request.String())
klog.V(5).InfoS("pipeline is paused, skip", "task", request.String())
return ctrl.Result{}, nil
}
// get variable
var v variable.Variable
if vc, ok := r.VariableCache.Get(string(pipeline.UID)); !ok {
// create new variable
vars, ok, err := r.VariableCache.GetByKey(string(pipeline.UID))
if err != nil {
klog.ErrorS(err, "get variable error", "task", request.String())
return ctrl.Result{}, err
}
if ok {
v = vars.(variable.Variable)
} else {
nv, err := variable.New(variable.Options{
Ctx: ctx,
Client: r.Client,
Pipeline: *pipeline,
})
if err != nil {
klog.ErrorS(err, "create variable error", "task", request.String())
return ctrl.Result{}, err
}
if err := r.VariableCache.Add(nv); err != nil {
klog.ErrorS(err, "add variable to store error", "task", request.String())
return ctrl.Result{}, err
}
r.VariableCache.Put(string(pipeline.UID), nv)
v = nv
} else {
v = vc.(variable.Variable)
}
defer func() {
var nsTasks = &kubekeyv1alpha1.TaskList{}
klog.V(5).Infof("[Task %s] update pipeline %s status", ctrlclient.ObjectKeyFromObject(task).String(), ctrlclient.ObjectKeyFromObject(pipeline).String())
klog.V(5).InfoS("update pipeline status", "task", request.String(), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline).String())
if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(task.Namespace)); err != nil {
klog.Errorf("[Task %s] list task error %v", ctrlclient.ObjectKeyFromObject(task).String(), err)
klog.ErrorS(err, "list task error", "task", request.String())
return
}
// filter by ownerReference
@ -129,7 +137,7 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c
cp := pipeline.DeepCopy()
converter.CalculatePipelineStatus(nsTasks, pipeline)
if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(cp)); err != nil {
klog.Errorf("[Task %s] update pipeline %s status error %v", ctrlclient.ObjectKeyFromObject(task).String(), pipeline.Name, err)
klog.ErrorS(err, "update pipeline status error", "task", request.String(), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline).String())
}
}()
@ -139,7 +147,7 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c
task.Status.Phase = kubekeyv1alpha1.TaskPhasePending
task.Status.RestartCount++
if err := r.Client.Update(ctx, task); err != nil {
klog.Errorf("update task %s error %v", task.Name, err)
klog.ErrorS(err, "update task error", "task", request.String())
return ctrl.Result{}, err
}
}
@ -169,18 +177,18 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc
LocationUID: string(options.Task.UID),
})
if err != nil {
klog.Errorf("[Task %s] find dependency error %v", ctrlclient.ObjectKeyFromObject(options.Task).String(), err)
klog.ErrorS(err, "find dependency error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String())
return ctrl.Result{}, err
}
dt, ok := dl.(variable.DependencyTask)
if !ok {
klog.Errorf("[Task %s] failed to convert dependency", ctrlclient.ObjectKeyFromObject(options.Task).String())
klog.ErrorS(err, "failed to convert dependency", "task", ctrlclient.ObjectKeyFromObject(options.Task).String())
return ctrl.Result{}, fmt.Errorf("[Task %s] failed to convert dependency", ctrlclient.ObjectKeyFromObject(options.Task).String())
}
var nsTasks = &kubekeyv1alpha1.TaskList{}
if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(options.Task.Namespace)); err != nil {
klog.Errorf("[Task %s] list task error %v", ctrlclient.ObjectKeyFromObject(options.Task).String(), err)
klog.ErrorS(err, "list task error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String(), err)
return ctrl.Result{}, err
}
// filter by ownerReference
@ -210,13 +218,13 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc
// update task phase to running
options.Task.Status.Phase = kubekeyv1alpha1.TaskPhaseRunning
if err := r.Client.Update(ctx, options.Task); err != nil {
klog.Errorf("[Task %s] update task to Running error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
klog.ErrorS(err, "update task to Running error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
}
return ctrl.Result{Requeue: true}, nil
case kubekeyv1alpha1.TaskPhaseSkipped:
options.Task.Status.Phase = kubekeyv1alpha1.TaskPhaseSkipped
if err := r.Client.Update(ctx, options.Task); err != nil {
klog.Errorf("[Task %s] update task to Skipped error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
klog.ErrorS(err, "update task to Skipped error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
}
return ctrl.Result{}, nil
default:
@ -226,13 +234,11 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc
func (r *TaskReconciler) dealRunningTask(ctx context.Context, options taskReconcileOptions) (ctrl.Result, error) {
// find task in location
klog.Infof("[Task %s] dealRunningTask begin", ctrlclient.ObjectKeyFromObject(options.Task))
defer func() {
klog.Infof("[Task %s] dealRunningTask end, task phase: %s", ctrlclient.ObjectKeyFromObject(options.Task), options.Task.Status.Phase)
}()
klog.InfoS("dealRunningTask begin", "task", ctrlclient.ObjectKeyFromObject(options.Task))
defer klog.Info("dealRunningTask end, task phase", "task", ctrlclient.ObjectKeyFromObject(options.Task), "phase", options.Task.Status.Phase)
if err := r.executeTask(ctx, options); err != nil {
klog.Errorf("[Task %s] execute task error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
klog.ErrorS(err, "execute task error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
@ -246,7 +252,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
cd.EndTimestamp = metav1.Now()
options.Task.Status.Conditions = append(options.Task.Status.Conditions, cd)
if err := r.Client.Update(ctx, options.Task); err != nil {
klog.Errorf("[Task %s] update task status error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
klog.ErrorS(err, "update task status error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
}
}()
@ -270,7 +276,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
if options.Task.Spec.Register != "" {
puid, err := options.Variable.Get(variable.ParentLocation{LocationUID: string(options.Task.UID)})
if err != nil {
klog.Errorf("[Task %s] get location error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
klog.ErrorS(err, "get location error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
return
}
// set variable to parent location
@ -284,7 +290,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
},
},
}); err != nil {
klog.Errorf("[Task %s] register error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
klog.ErrorS(err, "register task result to variable error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
return
}
}
@ -295,6 +301,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
LocationUID: string(options.Task.UID),
})
if err != nil {
klog.ErrorS(err, "get location variable error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
stderr = err.Error()
return
}
@ -302,6 +309,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
if len(options.Task.Spec.When) > 0 {
ok, err := tmpl.ParseBool(lg.(variable.VariableData), options.Task.Spec.When)
if err != nil {
klog.ErrorS(err, "parse when condition error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
stderr = err.Error()
return
}
@ -326,6 +334,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
case string:
item, err = tmpl.ParseString(lg.(variable.VariableData), item.(string))
if err != nil {
klog.ErrorS(err, "parse loop vars error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
stderr = err.Error()
return
}
@ -333,6 +342,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
for k, v := range item.(variable.VariableData) {
sv, err := tmpl.ParseString(lg.(variable.VariableData), v.(string))
if err != nil {
klog.ErrorS(err, "parse loop vars error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
stderr = err.Error()
return
}
@ -392,7 +402,7 @@ func (r *TaskReconciler) executeModule(ctx context.Context, task *kubekeyv1alpha
LocationUID: string(task.UID),
})
if err != nil {
klog.Errorf("[Task %s] get location variable error %v", ctrlclient.ObjectKeyFromObject(task), err)
klog.ErrorS(err, "get location variable error", "task", ctrlclient.ObjectKeyFromObject(task))
return "", err.Error()
}
@ -400,7 +410,7 @@ func (r *TaskReconciler) executeModule(ctx context.Context, task *kubekeyv1alpha
if len(task.Spec.FailedWhen) > 0 {
ok, err := tmpl.ParseBool(lg.(variable.VariableData), task.Spec.FailedWhen)
if err != nil {
klog.Errorf("[Task %s] validate FailedWhen condition error %v", ctrlclient.ObjectKeyFromObject(task), err)
klog.ErrorS(err, "validate FailedWhen condition error", "task", ctrlclient.ObjectKeyFromObject(task))
return "", err.Error()
}
if ok {

View File

@ -48,23 +48,23 @@ func MarshalPlaybook(baseFS fs.FS, pbPath string) (*kkcorev1.Playbook, error) {
// convert playbook to kkcorev1.Playbook
pb := &kkcorev1.Playbook{}
if err := loadPlaybook(baseFS, pbPath, pb); err != nil {
klog.Errorf(" load playbook with include %s failed: %v", pbPath, err)
klog.ErrorS(err, "Load playbook failed", "playbook", pbPath)
return nil, err
}
// convertRoles
if err := convertRoles(baseFS, pbPath, pb); err != nil {
klog.Errorf("convertRoles error %v", err)
klog.ErrorS(err, "ConvertRoles error", "playbook", pbPath)
return nil, err
}
if err := convertIncludeTasks(baseFS, pbPath, pb); err != nil {
klog.Errorf("convertIncludeTasks error %v", err)
klog.ErrorS(err, "ConvertIncludeTasks error", "playbook", pbPath)
return nil, err
}
if err := pb.Validate(); err != nil {
klog.Errorf("validate playbook %s failed: %v", pbPath, err)
klog.ErrorS(err, "Validate playbook failed", "playbook", pbPath)
return nil, err
}
return pb, nil
@ -75,12 +75,12 @@ func loadPlaybook(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
// baseDir is the local ansible project dir which playbook belong to
pbData, err := fs.ReadFile(baseFS, pbPath)
if err != nil {
klog.Errorf("read playbook %s failed: %v", pbPath, err)
klog.ErrorS(err, "Read playbook failed", "playbook", pbPath)
return err
}
var plays []kkcorev1.Play
if err := yaml.Unmarshal(pbData, &plays); err != nil {
klog.Errorf("unmarshal playbook %s failed: %v", pbPath, err)
klog.ErrorS(err, "Unmarshal playbook failed", "playbook", pbPath)
return err
}
@ -108,12 +108,12 @@ func loadPlaybook(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
rdata, err := fs.ReadFile(baseFS, mainTask)
if err != nil {
klog.Errorf("read role %s failed: %v", mainTask, err)
klog.ErrorS(err, "Read role failed", "playbook", pbPath, "role", r.Role)
return err
}
var blocks []kkcorev1.Block
if err := yaml.Unmarshal(rdata, &blocks); err != nil {
klog.Errorf("unmarshal role %s failed: %v", r.Role, err)
klog.ErrorS(err, "Unmarshal role failed", "playbook", pbPath, "role", r.Role)
return err
}
p.Roles[i].Block = blocks
@ -141,12 +141,12 @@ func convertRoles(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
rdata, err := fs.ReadFile(baseFS, mainTask)
if err != nil {
klog.Errorf("read role %s failed: %v", mainTask, err)
klog.ErrorS(err, "Read role failed", "playbook", pbPath, "role", r.Role)
return err
}
var blocks []kkcorev1.Block
if err := yaml.Unmarshal(rdata, &blocks); err != nil {
klog.Errorf("unmarshal role %s failed: %v", r.Role, err)
klog.ErrorS(err, "Unmarshal role failed", "playbook", pbPath, "role", r.Role)
return err
}
p.Roles[i].Block = blocks
@ -156,12 +156,12 @@ func convertRoles(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
if mainDefault != "" {
mainData, err := fs.ReadFile(baseFS, mainDefault)
if err != nil {
klog.Errorf("read defaults variable for role %s error: %v", r.Role, err)
klog.ErrorS(err, "Read defaults variable for role error", "playbook", pbPath, "role", r.Role)
return err
}
var vars variable.VariableData
if err := yaml.Unmarshal(mainData, &vars); err != nil {
klog.Errorf("unmarshal defaults variable for role %s error: %v", r.Role, err)
klog.ErrorS(err, "Unmarshal defaults variable for role error", "playbook", pbPath, "role", r.Role)
return err
}
p.Roles[i].Vars = vars
@ -177,18 +177,22 @@ func convertIncludeTasks(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) err
var pbBase = filepath.Dir(filepath.Dir(pbPath))
for _, play := range pb.Play {
if err := fileToBlock(baseFS, pbBase, play.PreTasks); err != nil {
klog.ErrorS(err, "Convert pre_tasks error", "playbook", pbPath)
return err
}
if err := fileToBlock(baseFS, pbBase, play.Tasks); err != nil {
klog.ErrorS(err, "Convert tasks error", "playbook", pbPath)
return err
}
if err := fileToBlock(baseFS, pbBase, play.PostTasks); err != nil {
klog.ErrorS(err, "Convert post_tasks error", "playbook", pbPath)
return err
}
for _, r := range play.Roles {
roleBase := project.GetRoleBaseFromPlaybook(baseFS, pbPath, r.Role)
if err := fileToBlock(baseFS, filepath.Join(roleBase, _const.ProjectRolesTasksDir), r.Block); err != nil {
klog.ErrorS(err, "Convert role error", "playbook", pbPath, "role", r.Role)
return err
}
}
@ -201,24 +205,27 @@ func fileToBlock(baseFS fs.FS, baseDir string, blocks []kkcorev1.Block) error {
if b.IncludeTasks != "" {
data, err := fs.ReadFile(baseFS, filepath.Join(baseDir, b.IncludeTasks))
if err != nil {
klog.Errorf("readFile %s error %v", filepath.Join(baseDir, b.IncludeTasks), err)
klog.ErrorS(err, "Read includeTask file error", "name", b.Name, "file_path", filepath.Join(baseDir, b.IncludeTasks))
return err
}
var bs []kkcorev1.Block
if err := yaml.Unmarshal(data, &bs); err != nil {
klog.Errorf("unmarshal data %s to []Block error %v", filepath.Join(baseDir, b.IncludeTasks), err)
klog.ErrorS(err, "Unmarshal includeTask data error", "name", b.Name, "file_path", filepath.Join(baseDir, b.IncludeTasks))
return err
}
b.Block = bs
blocks[i] = b
}
if err := fileToBlock(baseFS, baseDir, b.Block); err != nil {
klog.ErrorS(err, "Convert block error", "name", b.Name)
return err
}
if err := fileToBlock(baseFS, baseDir, b.Rescue); err != nil {
klog.ErrorS(err, "Convert rescue error", "name", b.Name)
return err
}
if err := fileToBlock(baseFS, baseDir, b.Always); err != nil {
klog.ErrorS(err, "Convert always error", "name", b.Name)
return err
}
}
@ -247,7 +254,7 @@ func MarshalBlock(ctx context.Context, block kkcorev1.Block, owner ctrlclient.Ob
task := &kubekeyv1alpha1.Task{
TypeMeta: metav1.TypeMeta{
Kind: "Task",
APIVersion: "kubekey.kubesphere.io/v1alpha1",
APIVersion: kubekeyv1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", owner.GetName(), rand.String(12)),
@ -273,10 +280,9 @@ func MarshalBlock(ctx context.Context, block kkcorev1.Block, owner ctrlclient.Ob
Hosts: hosts,
IgnoreError: block.IgnoreErrors,
Retries: block.Retries,
//Loop: block.Loop,
When: when,
FailedWhen: block.FailedWhen.Data,
Register: block.Register,
When: when,
FailedWhen: block.FailedWhen.Data,
Register: block.Register,
},
Status: kubekeyv1alpha1.TaskStatus{
Phase: kubekeyv1alpha1.TaskPhasePending,
@ -285,7 +291,7 @@ func MarshalBlock(ctx context.Context, block kkcorev1.Block, owner ctrlclient.Ob
if len(block.Loop) != 0 {
data, err := json.Marshal(block.Loop)
if err != nil {
klog.Errorf("marshal loop %v error: %v", block.Loop, err)
klog.ErrorS(err, "Marshal loop failed", "task", task.Name, "block", block.Name)
}
task.Spec.Loop = runtime.RawExtension{Raw: data}
}
@ -313,7 +319,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) {
if strings.HasSuffix(a.(string), "%") {
b, err := strconv.Atoi(strings.TrimSuffix(a.(string), "%"))
if err != nil {
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string))
return nil, err
}
if sp+int(math.Ceil(float64(len(hosts)*b)/100.0)) > len(hosts)-1 {
@ -325,7 +331,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) {
} else {
b, err := strconv.Atoi(a.(string))
if err != nil {
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string))
return nil, err
}
if sp+b > len(hosts)-1 {
@ -355,7 +361,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) {
if strings.HasSuffix(a.(string), "%") {
b, err := strconv.Atoi(strings.TrimSuffix(a.(string), "%"))
if err != nil {
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string))
return nil, err
}
if sp+int(math.Ceil(float64(len(hosts)*b)/100.0)) > len(hosts)-1 {
@ -367,7 +373,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) {
} else {
b, err := strconv.Atoi(a.(string))
if err != nil {
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string))
return nil, err
}
if sp+b > len(hosts)-1 {

View File

@ -32,12 +32,12 @@ func ParseBool(v variable.VariableData, inputs []string) (bool, error) {
// first convert.
intql, err := pongo2.FromString(input)
if err != nil {
klog.Errorf("failed to get string: %v", err)
klog.ErrorS(err, "Failed to get string")
return false, err
}
inres, err := intql.Execute(pongo2.Context(v))
if err != nil {
klog.Errorf("failed to execute string: %v", err)
klog.ErrorS(err, "Failed to execute string")
return false, err
}
@ -48,15 +48,15 @@ func ParseBool(v variable.VariableData, inputs []string) (bool, error) {
// second convert.
tql, err := pongo2.FromString(inres)
if err != nil {
klog.Errorf("failed to get string: %v", err)
klog.ErrorS(err, "failed to get string")
return false, err
}
result, err := tql.Execute(pongo2.Context(v))
if err != nil {
klog.Errorf("failed to execute string: %v", err)
klog.ErrorS(err, "failed to execute string")
return false, err
}
klog.V(4).Infof("the template parse result: %s", result)
klog.V(4).InfoS(" parse template succeed", "result", result)
if result != "True" {
return false, nil
}
@ -68,29 +68,29 @@ func ParseBool(v variable.VariableData, inputs []string) (bool, error) {
func ParseString(v variable.VariableData, input string) (string, error) {
tql, err := pongo2.FromString(input)
if err != nil {
klog.Errorf("failed to get string: %v", err)
klog.ErrorS(err, "Failed to get string")
return input, err
}
result, err := tql.Execute(pongo2.Context(v))
if err != nil {
klog.Errorf("failed to execute string: %v", err)
klog.ErrorS(err, "Failed to execute string")
return input, err
}
klog.V(4).Infof("the template parse result: %s", result)
klog.V(4).InfoS(" parse template succeed", "result", result)
return result, nil
}
func ParseFile(v variable.VariableData, file []byte) (string, error) {
tql, err := pongo2.FromBytes(file)
if err != nil {
klog.Errorf("transfer file to pongo2 template error %v", err)
klog.ErrorS(err, "Transfer file to template error")
return "", err
}
result, err := tql.Execute(pongo2.Context(v))
if err != nil {
klog.Errorf("exec pongo2 template error %v", err)
klog.ErrorS(err, "exec template error")
return "", err
}
klog.V(4).Infof("the template file: %s", result)
klog.V(4).InfoS(" parse template succeed", "result", result)
return result, nil
}

View File

@ -29,10 +29,10 @@ import (
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
"github.com/kubesphere/kubekey/v4/pkg/cache"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/controllers"
"github.com/kubesphere/kubekey/v4/pkg/task"
"github.com/kubesphere/kubekey/v4/pkg/variable"
)
type commandManager struct {
@ -46,45 +46,47 @@ type commandManager struct {
func (m *commandManager) Run(ctx context.Context) error {
// create config, inventory and pipeline
klog.Infof("[Pipeline %s] start", ctrlclient.ObjectKeyFromObject(m.Pipeline))
defer klog.Infof("[Pipeline %s] finish", ctrlclient.ObjectKeyFromObject(m.Pipeline))
if err := m.Client.Create(ctx, m.Config); err != nil {
klog.Errorf("[Pipeline %s] create config error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
klog.ErrorS(err, "Create config error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
return err
}
if err := m.Client.Create(ctx, m.Inventory); err != nil {
klog.Errorf("[Pipeline %s] create inventory error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
klog.ErrorS(err, "Create inventory error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
return err
}
if err := m.Client.Create(ctx, m.Pipeline); err != nil {
klog.Errorf("[Pipeline %s] create pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
klog.ErrorS(err, "Create pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
return err
}
defer func() {
// update pipeline status
if err := m.Client.Update(ctx, m.Pipeline); err != nil {
klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
klog.ErrorS(err, "Update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
}
klog.Infof("[Pipeline %s] finish", ctrlclient.ObjectKeyFromObject(m.Pipeline))
if !m.Pipeline.Spec.Debug && m.Pipeline.Status.Phase == kubekeyv1.PipelinePhaseSucceed {
klog.Infof("[Pipeline %s] clean runtime directory", ctrlclient.ObjectKeyFromObject(m.Pipeline))
// clean runtime directory
if err := os.RemoveAll(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir)); err != nil {
klog.Errorf("clean runtime directory %s error: %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), err)
klog.ErrorS(err, "Clean runtime directory error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline), "runtime_dir", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir))
}
}
}()
klog.Infof("[Pipeline %s] start task controller", ctrlclient.ObjectKeyFromObject(m.Pipeline))
kd, err := task.NewController(task.ControllerOptions{
Client: m.Client,
VariableCache: variable.Cache,
Client: m.Client,
TaskReconciler: &controllers.TaskReconciler{
Client: m.Client,
VariableCache: cache.LocalVariable,
VariableCache: variable.Cache,
},
})
if err != nil {
klog.Errorf("[Pipeline %s] create task controller error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
klog.ErrorS(err, "Create task controller error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
m.Pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed
m.Pipeline.Status.Reason = fmt.Sprintf("create task controller failed: %v", err)
return err
@ -94,20 +96,23 @@ func (m *commandManager) Run(ctx context.Context) error {
if err := kd.AddTasks(ctx, task.AddTaskOptions{
Pipeline: m.Pipeline,
}); err != nil {
klog.Errorf("[Pipeline %s] add task error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
klog.ErrorS(err, "Add task error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
m.Pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed
m.Pipeline.Status.Reason = fmt.Sprintf("add task to controller failed: %v", err)
return err
}
// update pipeline status
if err := m.Client.Update(ctx, m.Pipeline); err != nil {
klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
klog.ErrorS(err, "Update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
return err
}
klog.Infof("[Pipeline %s] start deal task total %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), m.Pipeline.Status.TaskResult.Total)
go kd.Start(ctx)
_ = wait.PollUntilContextCancel(ctx, time.Millisecond*100, false, func(ctx context.Context) (done bool, err error) {
if err := m.Client.Get(ctx, ctrlclient.ObjectKeyFromObject(m.Pipeline), m.Pipeline); err != nil {
klog.Errorf("[Pipeline %s] get pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
klog.ErrorS(err, "Get pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
return false, nil
}
if m.Pipeline.Status.Phase == kubekeyv1.PipelinePhaseFailed || m.Pipeline.Status.Phase == kubekeyv1.PipelinePhaseSucceed {
return true, nil
@ -116,10 +121,9 @@ func (m *commandManager) Run(ctx context.Context) error {
})
// kill by signal
if err := syscall.Kill(os.Getpid(), syscall.SIGTERM); err != nil {
klog.Errorf("[Pipeline %s] manager terminated error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
klog.ErrorS(err, "Kill process error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
return err
}
klog.Infof("[Pipeline %s] task finish", ctrlclient.ObjectKeyFromObject(m.Pipeline))
return nil
}

View File

@ -28,9 +28,10 @@ import (
ctrlmanager "sigs.k8s.io/controller-runtime/pkg/manager"
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
"github.com/kubesphere/kubekey/v4/pkg/cache"
"github.com/kubesphere/kubekey/v4/pkg/controllers"
"github.com/kubesphere/kubekey/v4/pkg/proxy"
"github.com/kubesphere/kubekey/v4/pkg/task"
"github.com/kubesphere/kubekey/v4/pkg/variable"
)
type controllerManager struct {
@ -44,43 +45,48 @@ func (c controllerManager) Run(ctx context.Context) error {
scheme := runtime.NewScheme()
// add default scheme
if err := clientgoscheme.AddToScheme(scheme); err != nil {
klog.Errorf("add default scheme error: %v", err)
klog.ErrorS(err, "Add default scheme error")
return err
}
// add kubekey scheme
// add kubekey scheme,
// exclude task resource,Because will manager in local
if err := kubekeyv1.AddToScheme(scheme); err != nil {
klog.Errorf("add kubekey scheme error: %v", err)
klog.ErrorS(err, "Add kk scheme error")
return err
}
mgr, err := ctrl.NewManager(config.GetConfigOrDie(), ctrlmanager.Options{
Scheme: scheme,
LeaderElection: c.LeaderElection,
LeaderElectionID: "controller-leader-election-kk",
})
if err != nil {
klog.Errorf("create manager error: %v", err)
klog.ErrorS(err, "Create manager error")
return err
}
taskController, err := task.NewController(task.ControllerOptions{
VariableCache: variable.Cache,
MaxConcurrent: c.MaxConcurrentReconciles,
Client: cache.NewDelegatingClient(mgr.GetClient()),
Client: proxy.NewDelegatingClient(mgr.GetClient()),
TaskReconciler: &controllers.TaskReconciler{
Client: cache.NewDelegatingClient(mgr.GetClient()),
VariableCache: cache.LocalVariable,
Client: proxy.NewDelegatingClient(mgr.GetClient()),
VariableCache: variable.Cache,
},
})
if err != nil {
klog.Errorf("create task controller error: %v", err)
klog.ErrorS(err, "Create task controller error")
return err
}
// add task controller to manager
if err := mgr.Add(taskController); err != nil {
klog.Errorf("add task controller error: %v", err)
klog.ErrorS(err, "Add task controller error")
return err
}
if err := (&controllers.PipelineReconciler{
Client: cache.NewDelegatingClient(mgr.GetClient()),
Client: proxy.NewDelegatingClient(mgr.GetClient()),
EventRecorder: mgr.GetEventRecorderFor("pipeline"),
TaskController: taskController,
}).SetupWithManager(ctx, mgr, controllers.Options{
@ -89,7 +95,7 @@ func (c controllerManager) Run(ctx context.Context) error {
MaxConcurrentReconciles: c.MaxConcurrentReconciles,
},
}); err != nil {
klog.Errorf("create pipeline controller error: %v", err)
klog.ErrorS(err, "create pipeline controller error")
return err
}

View File

@ -20,7 +20,7 @@ import (
"context"
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
"github.com/kubesphere/kubekey/v4/pkg/cache"
"github.com/kubesphere/kubekey/v4/pkg/proxy"
)
// Manager shared dependencies such as Addr and , and provides them to Runnable.
@ -40,7 +40,7 @@ func NewCommandManager(o CommandManagerOptions) Manager {
Pipeline: o.Pipeline,
Config: o.Config,
Inventory: o.Inventory,
Client: cache.NewDelegatingClient(nil),
Client: proxy.NewDelegatingClient(nil),
}
}

View File

@ -62,13 +62,13 @@ func ModuleAssert(ctx context.Context, options ExecOptions) (string, string) {
return "False", r
}
}
//if v := variable.StringVar(args, "msg"); v != nil {
// if r, err := tmpl.ParseString(lg.(variable.VariableData), *v); err != nil {
// return "", err.Error()
// } else {
// return "False", r
// }
//}
if v := variable.StringVar(args, "msg"); v != nil {
if r, err := tmpl.ParseString(lg.(variable.VariableData), *v); err != nil {
return "", err.Error()
} else {
return "False", r
}
}
return "False", "False"
}
}

View File

@ -36,7 +36,7 @@ func ModuleCommand(ctx context.Context, options ExecOptions) (string, string) {
conn = connector.NewConnector(options.Host, ha.(variable.VariableData))
}
if err := conn.Init(ctx); err != nil {
klog.Errorf("failed to init connector %v", err)
klog.ErrorS(err, "failed to init connector")
return "", err.Error()
}
defer conn.Close(ctx)

View File

@ -49,12 +49,12 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
LocationUID: string(options.Task.UID),
})
if err != nil {
klog.Errorf("failed to get location vars %v", err)
klog.ErrorS(err, "failed to get location vars")
return "", err.Error()
}
destStr, err := tmpl.ParseString(lv.(variable.VariableData), *dest)
if err != nil {
klog.Errorf("template parse src %s error: %v", *dest, err)
klog.ErrorS(err, "template parse dest error")
return "", err.Error()
}
@ -65,13 +65,13 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
// get connector
ha, err := options.Variable.Get(variable.HostVars{HostName: options.Host})
if err != nil {
klog.Errorf("failed to get host vars %v", err)
klog.ErrorS(err, "failed to get host vars")
return "", err.Error()
}
conn = connector.NewConnector(options.Host, ha.(variable.VariableData))
}
if err := conn.Init(ctx); err != nil {
klog.Errorf("failed to init connector %v", err)
klog.ErrorS(err, "failed to init connector")
return "", err.Error()
}
defer conn.Close(ctx)
@ -80,7 +80,7 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
// convert src
srcStr, err := tmpl.ParseString(lv.(variable.VariableData), *src)
if err != nil {
klog.Errorf("template parse src %s error: %v", *src, err)
klog.ErrorS(err, "template parse src error")
return "", err.Error()
}
var baseFS fs.FS
@ -89,7 +89,7 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
} else {
projectFs, err := project.New(project.Options{Pipeline: &options.Pipeline}).FS(ctx, false)
if err != nil {
klog.Errorf("failed to get project fs %v", err)
klog.ErrorS(err, "failed to get project fs")
return "", err.Error()
}
baseFS = projectFs
@ -98,17 +98,19 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
flPath := project.GetFilesFromPlayBook(baseFS, options.Pipeline.Spec.Playbook, roleName, srcStr)
fileInfo, err := fs.Stat(baseFS, flPath)
if err != nil {
klog.Errorf("failed to get src file in local %v", err)
klog.ErrorS(err, "failed to get src file in local")
return "", err.Error()
}
if fileInfo.IsDir() {
// src is dir
if err := fs.WalkDir(baseFS, flPath, func(path string, info fs.DirEntry, err error) error {
if err != nil {
klog.ErrorS(err, "failed to walk dir")
return err
}
rel, err := filepath.Rel(srcStr, path)
if err != nil {
klog.ErrorS(err, "failed to get relative path")
return err
}
if info.IsDir() {
@ -116,6 +118,7 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
}
fi, err := info.Info()
if err != nil {
klog.ErrorS(err, "failed to get file info")
return err
}
mode := fi.Mode()
@ -124,27 +127,30 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
}
data, err := fs.ReadFile(baseFS, rel)
if err != nil {
klog.ErrorS(err, "failed to read file")
return err
}
if err := conn.CopyFile(ctx, data, filepath.Join(destStr, rel), mode); err != nil {
klog.ErrorS(err, "failed to copy file", "src", srcStr, "dest", destStr)
return err
}
return nil
}); err != nil {
klog.ErrorS(err, "failed to walk dir")
return "", err.Error()
}
} else {
// src is file
data, err := fs.ReadFile(baseFS, flPath)
if err != nil {
klog.Errorf("failed to read file %v", err)
klog.ErrorS(err, "failed to read file")
return "", err.Error()
}
if strings.HasSuffix(destStr, "/") {
destStr = destStr + filepath.Base(srcStr)
}
if err := conn.CopyFile(ctx, data, destStr, fileInfo.Mode()); err != nil {
klog.Errorf("failed to copy file %v", err)
klog.ErrorS(err, "failed to copy file", "src", srcStr, "dest", destStr)
return "", err.Error()
}
return "success", ""

View File

@ -34,11 +34,12 @@ func ModuleDebug(ctx context.Context, options ExecOptions) (string, string) {
LocationUID: string(options.Task.UID),
})
if err != nil {
klog.ErrorS(err, "Failed to get location vars")
return "", err.Error()
}
result, err := tmpl.ParseString(lg.(variable.VariableData), fmt.Sprintf("{{ %s }}", *v))
if err != nil {
klog.Errorf("failed to get var %v", err)
klog.ErrorS(err, "Failed to get var")
return "", err.Error()
}
return result, ""
@ -50,11 +51,12 @@ func ModuleDebug(ctx context.Context, options ExecOptions) (string, string) {
LocationUID: string(options.Task.UID),
})
if err != nil {
klog.ErrorS(err, "Failed to get location vars")
return "", err.Error()
}
result, err := tmpl.ParseString(lg.(variable.VariableData), *v)
if err != nil {
klog.Errorf("failed to get var %v", err)
klog.ErrorS(err, "Failed to get var")
return "", err.Error()
}
return result, ""

View File

@ -1,57 +0,0 @@
/*
Copyright 2023 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package modules
import (
"io/fs"
"os"
"path/filepath"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
)
// findPath from roles/roleSub dir, playbook dir or current dir
func findPath(fs fs.FS, roleSub string, src *string, options ExecOptions) *string {
if src == nil || filepath.IsAbs(*src) {
return src
}
findFile := []string{}
absPlaybook := options.Pipeline.Spec.Playbook
if !filepath.IsAbs(absPlaybook) {
absPlaybook = filepath.Join(_const.GetWorkDir(), _const.ProjectDir, options.Pipeline.Spec.Project.Name, absPlaybook)
}
baseDir := filepath.Dir(filepath.Dir(absPlaybook))
// findFile from roles/files
if role := options.Task.Annotations[kubekeyv1alpha1.TaskAnnotationRole]; role != "" {
findFile = append(findFile, filepath.Join(baseDir, _const.ProjectRolesDir, role, roleSub, *src))
}
// find from playbook dir
findFile = append(findFile, filepath.Join(filepath.Dir(absPlaybook), *src))
// find from current dir
if dir, err := os.Getwd(); err == nil {
findFile = append(findFile, filepath.Join(dir, *src))
}
for _, s := range findFile {
if _, err := os.Stat(s); err == nil {
return &s
}
}
return nil
}

View File

@ -21,7 +21,6 @@ import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
@ -47,7 +46,6 @@ var module = make(map[string]ModuleExecFunc)
func RegisterModule(moduleName string, exec ModuleExecFunc) error {
if _, ok := module[moduleName]; ok {
klog.Errorf("module %s is exist", moduleName)
return fmt.Errorf("module %s is exist", moduleName)
}
module[moduleName] = exec

View File

@ -29,6 +29,10 @@ type testVariable struct {
err error
}
func (v testVariable) Key() string {
return "testModule"
}
func (v testVariable) Get(option variable.GetOption) (any, error) {
return v.value, v.err
}

View File

@ -32,7 +32,7 @@ func ModuleSetFact(ctx context.Context, options ExecOptions) (string, string) {
LocationUID: string(options.Task.UID),
})
if err != nil {
klog.Errorf("failed to get location vars %v", err)
klog.ErrorS(err, "failed to get location vars")
return "", err.Error()
}
@ -42,7 +42,7 @@ func ModuleSetFact(ctx context.Context, options ExecOptions) (string, string) {
case string:
factVars[k], err = tmpl.ParseString(lv.(variable.VariableData), v.(string))
if err != nil {
klog.Errorf("template parse %s error: %v", v.(string), err)
klog.ErrorS(err, "template parse error", "input", v)
return "", err.Error()
}
default:
@ -55,7 +55,7 @@ func ModuleSetFact(ctx context.Context, options ExecOptions) (string, string) {
LocationUID: "",
Data: factVars,
}); err != nil {
klog.Errorf("merge fact error: %v", err)
klog.ErrorS(err, "merge fact error")
return "", err.Error()
}
return "success", ""

View File

@ -48,17 +48,17 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
LocationUID: string(options.Task.UID),
})
if err != nil {
klog.Errorf("failed to get location vars %v", err)
klog.ErrorS(err, "failed to get location vars")
return "", err.Error()
}
srcStr, err := tmpl.ParseString(lv.(variable.VariableData), *src)
if err != nil {
klog.Errorf("template parse src %s error: %v", *src, err)
klog.ErrorS(err, "template parse src error", "input", *src)
return "", err.Error()
}
destStr, err := tmpl.ParseString(lv.(variable.VariableData), *dest)
if err != nil {
klog.Errorf("template parse src %s error: %v", *dest, err)
klog.ErrorS(err, "template parse dest error", "input", *dest)
return "", err.Error()
}
@ -68,7 +68,7 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
} else {
projectFs, err := project.New(project.Options{Pipeline: &options.Pipeline}).FS(ctx, false)
if err != nil {
klog.Errorf("failed to get project fs %v", err)
klog.ErrorS(err, "failed to get project fs")
return "", err.Error()
}
baseFS = projectFs
@ -76,7 +76,7 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
roleName := options.Task.Annotations[kubekeyv1alpha1.TaskAnnotationRole]
flPath := project.GetTemplatesFromPlayBook(baseFS, options.Pipeline.Spec.Playbook, roleName, srcStr)
if _, err := fs.Stat(baseFS, flPath); err != nil {
klog.Errorf("find src error %v", err)
klog.ErrorS(err, "find src error")
return "", err.Error()
}
@ -87,13 +87,13 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
// get connector
ha, err := options.Variable.Get(variable.HostVars{HostName: options.Host})
if err != nil {
klog.Errorf("failed to get host %v", err)
klog.ErrorS(err, "failed to get host vars")
return "", err.Error()
}
conn = connector.NewConnector(options.Host, ha.(variable.VariableData))
}
if err := conn.Init(ctx); err != nil {
klog.Errorf("failed to init connector %v", err)
klog.ErrorS(err, "failed to init connector")
return "", err.Error()
}
defer conn.Close(ctx)
@ -104,15 +104,18 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
LocationUID: string(options.Task.UID),
})
if err != nil {
klog.ErrorS(err, "failed to get location vars")
return "", err.Error()
}
data, err := fs.ReadFile(baseFS, flPath)
if err != nil {
klog.ErrorS(err, "failed to read src file", "file_path", flPath)
return "", err.Error()
}
result, err := tmpl.ParseFile(lg.(variable.VariableData), data)
if err != nil {
klog.ErrorS(err, "failed to parse file", "file_path", flPath)
return "", err.Error()
}
@ -122,6 +125,7 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
mode = fs.FileMode(*v)
}
if err := conn.CopyFile(ctx, []byte(result), destStr, mode); err != nil {
klog.ErrorS(err, "failed to copy file", "src", flPath, "dest", destStr)
return "", err.Error()
}
return "success", ""

View File

@ -41,6 +41,7 @@ func (r gitProject) FS(ctx context.Context, update bool) (fs.FS, error) {
return os.DirFS(r.localDir), nil
}
if err := r.init(ctx); err != nil {
klog.ErrorS(err, "Init git project error", "project_addr", r.Pipeline.Spec.Project.Addr)
return nil, err
}
return os.DirFS(r.localDir), nil
@ -74,12 +75,12 @@ func (r gitProject) gitClone(ctx context.Context) error {
func (r gitProject) gitPull(ctx context.Context) error {
open, err := git.PlainOpen(r.localDir)
if err != nil {
klog.Errorf("git open local %s error: %v", r.localDir, err)
klog.ErrorS(err, "git open error", "local_dir", r.localDir)
return err
}
wt, err := open.Worktree()
if err != nil {
klog.Errorf("git open worktree error: %v", err)
klog.ErrorS(err, "git open worktree error", "local_dir", r.localDir)
return err
}
if err := wt.PullContext(ctx, &git.PullOptions{
@ -89,7 +90,7 @@ func (r gitProject) gitPull(ctx context.Context) error {
Auth: &http.TokenAuth{r.Pipeline.Spec.Project.Token},
InsecureSkipTLS: false,
}); err != nil && err != git.NoErrAlreadyUpToDate {
klog.Errorf("pull project %s failed: %v", r.Pipeline.Spec.Project.Addr, err)
klog.ErrorS(err, "git pull error", "local_dir", r.localDir)
return err
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
package proxy
import (
"context"
@ -22,15 +22,11 @@ import (
"io/fs"
"os"
"path/filepath"
"strings"
jsonpatch "github.com/evanphx/json-patch"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
@ -49,7 +45,7 @@ type delegatingClient struct {
func NewDelegatingClient(client ctrlclient.Client) ctrlclient.Client {
scheme := runtime.NewScheme()
if err := kubekeyv1.AddToScheme(scheme); err != nil {
klog.Errorf("failed to add scheme: %v", err)
klog.ErrorS(err, "failed to add scheme", "gv", kubekeyv1.SchemeGroupVersion)
}
kubekeyv1.SchemeBuilder.Register(&kubekeyv1alpha1.Task{}, &kubekeyv1alpha1.TaskList{})
return &delegatingClient{
@ -70,11 +66,11 @@ func (d delegatingClient) Get(ctx context.Context, key ctrlclient.ObjectKey, obj
path := filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, key.Namespace, resource, key.Name, key.Name+".yaml")
data, err := os.ReadFile(path)
if err != nil {
klog.Errorf("failed to read yaml file: %v", err)
klog.ErrorS(err, "failed to read yaml file", "path", path)
return err
}
if err := yaml.Unmarshal(data, obj); err != nil {
klog.Errorf("unmarshal file %s error %v", path, err)
klog.ErrorS(err, "failed to unmarshal yaml file", "path", path)
return err
}
return nil
@ -92,7 +88,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList,
var objects []runtime.Object
runtimeDirEntries, err := os.ReadDir(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir))
if err != nil && !os.IsNotExist(err) {
klog.Errorf("readDir %s error %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), err)
klog.ErrorS(err, "failed to read dir", "path", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir))
return err
}
for _, re := range runtimeDirEntries {
@ -103,7 +99,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList,
if os.IsNotExist(err) {
continue
}
klog.Errorf("readDir %s error %v", resourceDir, err)
klog.ErrorS(err, "failed to read dir", "path", resourceDir)
return err
}
for _, e := range entries {
@ -116,7 +112,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList,
if os.IsNotExist(err) {
continue
}
klog.Errorf("read file %s error: %v", resourceFile, err)
klog.ErrorS(err, "failed to read file", "path", resourceFile)
return err
}
var obj runtime.Object
@ -131,7 +127,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList,
obj = &kubekeyv1alpha1.Task{}
}
if err := yaml.Unmarshal(data, &obj); err != nil {
klog.Errorf("unmarshal file %s error: %v", resourceFile, err)
klog.ErrorS(err, "failed to unmarshal yaml file", "path", resourceFile)
return err
}
objects = append(objects, obj)
@ -152,6 +148,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList,
}
if err := apimeta.SetList(list, objects); err != nil {
klog.ErrorS(err, "failed to set list")
return err
}
return nil
@ -168,11 +165,11 @@ func (d delegatingClient) Create(ctx context.Context, obj ctrlclient.Object, opt
data, err := yaml.Marshal(obj)
if err != nil {
klog.Errorf("failed to marshal object: %v", err)
klog.ErrorS(err, "failed to marshal object")
return err
}
if err := os.MkdirAll(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName()), fs.ModePerm); err != nil {
klog.Errorf("create dir %s error: %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName()), err)
klog.ErrorS(err, "failed to create dir", "path", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName()))
return err
}
return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm)
@ -218,16 +215,16 @@ func (d delegatingClient) Patch(ctx context.Context, obj ctrlclient.Object, patc
patchData, err := patch.Data(obj)
if err != nil {
klog.Errorf("failed to get patch data: %v", err)
klog.ErrorS(err, "failed to get patch data")
return err
}
if len(patchData) == 0 {
klog.V(4).Infof("nothing to patch, skip")
klog.V(4).Info("nothing to patch, skip")
return nil
}
data, err := yaml.Marshal(obj)
if err != nil {
klog.Errorf("failed to marshal object: %v", err)
klog.ErrorS(err, "failed to marshal object")
return err
}
return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm)
@ -301,7 +298,7 @@ func (d delegatingSubResourceWriter) Create(ctx context.Context, obj ctrlclient.
data, err := yaml.Marshal(obj)
if err != nil {
klog.Errorf("failed to marshal object: %v", err)
klog.ErrorS(err, "failed to marshal object")
return err
}
return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm)
@ -319,7 +316,7 @@ func (d delegatingSubResourceWriter) Update(ctx context.Context, obj ctrlclient.
data, err := yaml.Marshal(obj)
if err != nil {
klog.Errorf("failed to marshal object: %v", err)
klog.ErrorS(err, "failed to marshal object")
return err
}
return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm)
@ -336,51 +333,17 @@ func (d delegatingSubResourceWriter) Patch(ctx context.Context, obj ctrlclient.O
patchData, err := patch.Data(obj)
if err != nil {
klog.Errorf("failed to get patch data: %v", err)
klog.ErrorS(err, "failed to get patch data")
return err
}
if len(patchData) == 0 {
klog.V(4).Infof("nothing to patch, skip")
klog.V(4).Info("nothing to patch, skip")
return nil
}
data, err := yaml.Marshal(obj)
if err != nil {
klog.Errorf("failed to marshal object: %v", err)
klog.ErrorS(err, "failed to marshal object")
return err
}
return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm)
}
func getPatchedJSON(patchType types.PatchType, originalJS, patchJS []byte, gvk schema.GroupVersionKind, creater runtime.ObjectCreater) ([]byte, error) {
switch patchType {
case types.JSONPatchType:
patchObj, err := jsonpatch.DecodePatch(patchJS)
if err != nil {
return nil, err
}
bytes, err := patchObj.Apply(originalJS)
// TODO: This is pretty hacky, we need a better structured error from the json-patch
if err != nil && strings.Contains(err.Error(), "doc is missing key") {
msg := err.Error()
ix := strings.Index(msg, "key:")
key := msg[ix+5:]
return bytes, fmt.Errorf("Object to be patched is missing field (%s)", key)
}
return bytes, err
case types.MergePatchType:
return jsonpatch.MergePatch(originalJS, patchJS)
case types.StrategicMergePatchType:
// get a typed object for this GVK if we need to apply a strategic merge patch
obj, err := creater.New(gvk)
if err != nil {
return nil, fmt.Errorf("cannot apply strategic merge patch for %s locally, try --type merge", gvk.String())
}
return strategicpatch.StrategicMergePatch(originalJS, patchJS, obj)
default:
// only here as a safety net - go-restful filters content-type
return nil, fmt.Errorf("unknown Content-Type header for patch: %v", patchType)
}
}

View File

@ -20,12 +20,13 @@ import (
"context"
"golang.org/x/time/rate"
cgcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
"github.com/kubesphere/kubekey/v4/pkg/cache"
"github.com/kubesphere/kubekey/v4/pkg/proxy"
"github.com/kubesphere/kubekey/v4/pkg/variable"
)
@ -47,6 +48,7 @@ type ControllerOptions struct {
MaxConcurrent int
ctrlclient.Client
TaskReconciler reconcile.Reconciler
VariableCache cgcache.Store
}
func NewController(o ControllerOptions) (Controller, error) {
@ -54,7 +56,7 @@ func NewController(o ControllerOptions) (Controller, error) {
o.MaxConcurrent = 1
}
if o.Client == nil {
o.Client = cache.NewDelegatingClient(nil)
o.Client = proxy.NewDelegatingClient(nil)
}
return &taskController{
@ -62,5 +64,6 @@ func NewController(o ControllerOptions) (Controller, error) {
wq: workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}),
client: o.Client,
taskReconciler: o.TaskReconciler,
variableCache: o.VariableCache,
}, nil
}

View File

@ -32,12 +32,12 @@ import (
func getGatherFact(ctx context.Context, hostname string, vars variable.Variable) (variable.VariableData, error) {
v, err := vars.Get(variable.HostVars{HostName: hostname})
if err != nil {
klog.Errorf("get host %s all variable error %v", hostname, err)
klog.ErrorS(err, "Get host variable error", "hostname", hostname)
return nil, err
}
conn := connector.NewConnector(hostname, v.(variable.VariableData))
if err := conn.Init(ctx); err != nil {
klog.Errorf("init connection error %v", err)
klog.ErrorS(err, "Init connection error", "hostname", hostname)
return nil, err
}
defer conn.Close(ctx)
@ -46,25 +46,25 @@ func getGatherFact(ctx context.Context, hostname string, vars variable.Variable)
osVars := make(variable.VariableData)
var osRelease bytes.Buffer
if err := conn.FetchFile(ctx, "/etc/os-release", &osRelease); err != nil {
klog.Errorf("fetch os-release error %v", err)
klog.ErrorS(err, "Fetch os-release error", "hostname", hostname)
return nil, err
}
osVars["release"] = convertBytesToMap(osRelease.Bytes(), "=")
kernel, err := conn.ExecuteCommand(ctx, "uname -r")
if err != nil {
klog.Errorf("get kernel version error %v", err)
klog.ErrorS(err, "Get kernel version error", "hostname", hostname)
return nil, err
}
osVars["kernelVersion"] = string(bytes.TrimSuffix(kernel, []byte("\n")))
hn, err := conn.ExecuteCommand(ctx, "hostname")
if err != nil {
klog.Errorf("get hostname error %v", err)
klog.ErrorS(err, "Get hostname error", "hostname", hostname)
return nil, err
}
osVars["hostname"] = string(bytes.TrimSuffix(hn, []byte("\n")))
arch, err := conn.ExecuteCommand(ctx, "arch")
if err != nil {
klog.Errorf("get arch error %v", err)
klog.ErrorS(err, "Get arch error", "hostname", hostname)
return nil, err
}
osVars["architecture"] = string(bytes.TrimSuffix(arch, []byte("\n")))
@ -73,13 +73,13 @@ func getGatherFact(ctx context.Context, hostname string, vars variable.Variable)
procVars := make(variable.VariableData)
var cpu bytes.Buffer
if err := conn.FetchFile(ctx, "/proc/cpuinfo", &cpu); err != nil {
klog.Errorf("fetch cpu error %v", err)
klog.ErrorS(err, "Fetch cpuinfo error", "hostname", hostname)
return nil, err
}
procVars["cpuInfo"] = convertBytesToSlice(cpu.Bytes(), ":")
var mem bytes.Buffer
if err := conn.FetchFile(ctx, "/proc/meminfo", &mem); err != nil {
klog.Errorf("fetch os-release error %v", err)
klog.ErrorS(err, "Fetch meminfo error", "hostname", hostname)
return nil, err
}
procVars["memInfo"] = convertBytesToMap(mem.Bytes(), ":")

View File

@ -24,6 +24,7 @@ import (
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
cgcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
@ -32,7 +33,6 @@ import (
kkcorev1 "github.com/kubesphere/kubekey/v4/pkg/apis/core/v1"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
"github.com/kubesphere/kubekey/v4/pkg/cache"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/converter"
"github.com/kubesphere/kubekey/v4/pkg/modules"
@ -44,6 +44,8 @@ type taskController struct {
client ctrlclient.Client
taskReconciler reconcile.Reconciler
variableCache cgcache.Store
wq workqueue.RateLimitingInterface
MaxConcurrent int
}
@ -52,7 +54,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
var nsTasks = &kubekeyv1alpha1.TaskList{}
if err := c.client.List(ctx, nsTasks, ctrlclient.InNamespace(o.Pipeline.Namespace)); err != nil {
klog.Errorf("[Pipeline %s] list tasks error: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err)
klog.ErrorS(err, "List tasks error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
defer func() {
@ -78,28 +80,36 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
}
if len(nsTasks.Items) == 0 {
vars, ok, err := c.variableCache.GetByKey(string(o.Pipeline.UID))
if err != nil {
klog.ErrorS(err, "Get variable from store error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
// if tasks has not generated. generate tasks from pipeline
vars, ok := cache.LocalVariable.Get(string(o.Pipeline.UID))
//vars, ok := cache.LocalVariable.Get(string(o.Pipeline.UID))
if ok {
o.variable = vars.(variable.Variable)
} else {
newVars, err := variable.New(variable.Options{
nv, err := variable.New(variable.Options{
Ctx: ctx,
Client: c.client,
Pipeline: *o.Pipeline,
})
if err != nil {
klog.Errorf("[Pipeline %s] create variable failed: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err)
klog.ErrorS(err, "Create variable error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
cache.LocalVariable.Put(string(o.Pipeline.UID), newVars)
o.variable = newVars
if err := c.variableCache.Add(nv); err != nil {
klog.ErrorS(err, "Add variable to store error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
o.variable = nv
}
klog.V(4).Infof("[Pipeline %s] deal project", ctrlclient.ObjectKeyFromObject(o.Pipeline))
klog.V(4).InfoS("deal project", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
projectFs, err := project.New(project.Options{Pipeline: o.Pipeline}).FS(ctx, true)
if err != nil {
klog.Errorf("[Pipeline %s] deal project error: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err)
klog.ErrorS(err, "Deal project error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
@ -116,7 +126,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
// convert Hosts (group or host) to all hosts
ahn, err := o.variable.Get(variable.Hostnames{Name: play.PlayHost.Hosts})
if err != nil {
klog.Errorf("[Pipeline %s] get all host name error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err)
klog.ErrorS(err, "Get all host name error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
@ -125,7 +135,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
for _, h := range ahn.([]string) {
gfv, err := getGatherFact(ctx, h, o.variable)
if err != nil {
klog.Errorf("[Pipeline %s] get gather fact from host %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), h, err)
klog.ErrorS(err, "Get gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "host", h)
return err
}
if err := o.variable.Merge(variable.HostMerge{
@ -133,7 +143,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
LocationUID: "",
Data: gfv,
}); err != nil {
klog.Errorf("[Pipeline %s] merge gather fact from host %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), h, err)
klog.ErrorS(err, "Merge gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "host", h)
return err
}
}
@ -147,7 +157,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
// group hosts by serial. run the playbook by serial
hs, err = converter.GroupHostBySerial(ahn.([]string), play.Serial.Data)
if err != nil {
klog.Errorf("[Pipeline %s] convert host by serial error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err)
klog.ErrorS(err, "Group host by serial error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
return err
}
}
@ -167,7 +177,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
// generate task from pre tasks
preTasks, err := c.block2Task(hctx, o, play.PreTasks, nil, puid, variable.BlockLocation)
if err != nil {
klog.Errorf("[Pipeline %s] get pre task from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), play.Name, err)
klog.ErrorS(err, "Get pre task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name)
return err
}
nsTasks.Items = append(nsTasks.Items, preTasks...)
@ -185,7 +195,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
}
roleTasks, err := c.block2Task(context.WithValue(hctx, _const.CtxBlockRole, role.Role), o, role.Block, role.When.Data, ruid, variable.BlockLocation)
if err != nil {
klog.Errorf("[Pipeline %s] get role from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), puid, err)
klog.ErrorS(err, "Get role task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name, "role", role.Role)
return err
}
nsTasks.Items = append(nsTasks.Items, roleTasks...)
@ -193,14 +203,14 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
// generate task from tasks
tasks, err := c.block2Task(hctx, o, play.Tasks, nil, puid, variable.BlockLocation)
if err != nil {
klog.Errorf("[Pipeline %s] get pre task from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), puid, err)
klog.ErrorS(err, "Get task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name)
return err
}
nsTasks.Items = append(nsTasks.Items, tasks...)
// generate task from post tasks
postTasks, err := c.block2Task(hctx, o, play.Tasks, nil, puid, variable.BlockLocation)
if err != nil {
klog.Errorf("[Pipeline %s] get pre task from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), puid, err)
klog.ErrorS(err, "Get post task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name)
return err
}
nsTasks.Items = append(nsTasks.Items, postTasks...)
@ -209,7 +219,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
for _, task := range nsTasks.Items {
if err := c.client.Create(ctx, &task); err != nil {
klog.Errorf("[Pipeline %s] create task %s error: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), task.Name, err)
klog.ErrorS(err, "Create task error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "task", task.Name)
return err
}
}
@ -234,31 +244,35 @@ func (k *taskController) block2Task(ctx context.Context, o AddTaskOptions, ats [
Name: at.Name,
Vars: at.Vars,
}); err != nil {
klog.ErrorS(err, "Merge block to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
return nil, err
}
atWhen := append(when, at.When.Data...)
if len(at.Block) != 0 {
// add block
bt, err := k.block2Task(ctx, o, at.Block, atWhen, buid, variable.BlockLocation)
block, err := k.block2Task(ctx, o, at.Block, atWhen, buid, variable.BlockLocation)
if err != nil {
klog.ErrorS(err, "Get block task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
return nil, err
}
tasks = append(tasks, bt...)
tasks = append(tasks, block...)
if len(at.Always) != 0 {
at, err := k.block2Task(ctx, o, at.Always, atWhen, buid, variable.AlwaysLocation)
always, err := k.block2Task(ctx, o, at.Always, atWhen, buid, variable.AlwaysLocation)
if err != nil {
klog.ErrorS(err, "Get always task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
return nil, err
}
tasks = append(tasks, at...)
tasks = append(tasks, always...)
}
if len(at.Rescue) != 0 {
rt, err := k.block2Task(ctx, o, at.Rescue, atWhen, buid, variable.RescueLocation)
rescue, err := k.block2Task(ctx, o, at.Rescue, atWhen, buid, variable.RescueLocation)
if err != nil {
klog.ErrorS(err, "Get rescue task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
return nil, err
}
tasks = append(tasks, rt...)
tasks = append(tasks, rescue...)
}
} else {
task := converter.MarshalBlock(context.WithValue(context.WithValue(ctx, _const.CtxBlockWhen, atWhen), _const.CtxBlockTaskUID, buid),
@ -267,6 +281,7 @@ func (k *taskController) block2Task(ctx context.Context, o AddTaskOptions, ats [
for n, a := range at.UnknownFiled {
data, err := json.Marshal(a)
if err != nil {
klog.ErrorS(err, "Marshal unknown field error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name, "field", n)
return nil, err
}
if m := modules.FindModule(n); m != nil {
@ -328,7 +343,7 @@ func (k *taskController) processNextWorkItem(ctx context.Context) bool {
switch {
case err != nil:
k.wq.AddRateLimited(req)
klog.Errorf("Reconciler error: %v", err)
klog.ErrorS(err, "Reconciler error", "request", req)
case result.RequeueAfter > 0:
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as

View File

@ -80,12 +80,12 @@ func hostsInGroup(inv kubekeyv1.Inventory, groupName string) []string {
func StringVar(vars VariableData, key string) *string {
value, ok := vars[key]
if !ok {
klog.V(4).Infof("cannot find variable %s", key)
klog.V(4).InfoS("cannot find variable", "key", key)
return nil
}
sv, ok := value.(string)
if !ok {
klog.V(4).Infof("variable %s is not string", key)
klog.V(4).InfoS("variable is not string", "key", key)
return nil
}
return &sv
@ -95,13 +95,13 @@ func StringVar(vars VariableData, key string) *string {
func IntVar(vars VariableData, key string) *int {
value, ok := vars[key]
if !ok {
klog.V(4).Infof("cannot find variable %s", key)
klog.V(4).InfoS("cannot find variable", "key", key)
return nil
}
// default convert to float64
number, ok := value.(float64)
if !ok {
klog.V(4).Infof("variable %s is not string", key)
klog.V(4).InfoS("variable is not number", "key", key)
return nil
}
vi := int(number)
@ -112,19 +112,19 @@ func IntVar(vars VariableData, key string) *int {
func StringSliceVar(vars VariableData, key string) []string {
value, ok := vars[key]
if !ok {
klog.V(4).Infof("cannot find variable %s", key)
klog.V(4).InfoS("cannot find variable", "key", key)
return nil
}
sv, ok := value.([]any)
if !ok {
klog.V(4).Infof("variable %s is not slice", key)
klog.V(4).InfoS("variable is not string slice", "key", key)
return nil
}
var ss []string
for _, a := range sv {
av, ok := a.(string)
if !ok {
klog.V(4).Infof("value in variable %s is not string", key)
klog.V(4).InfoS("variable is not string", "key", key)
return nil
}
ss = append(ss, av)
@ -139,7 +139,7 @@ func Extension2Variables(ext runtime.RawExtension) VariableData {
var data VariableData
if err := yaml.Unmarshal(ext.Raw, &data); err != nil {
klog.Errorf("failed to unmarshal extension to variables: %v", err)
klog.ErrorS(err, "failed to unmarshal extension to variables")
}
return data
}
@ -151,7 +151,7 @@ func Extension2Slice(ext runtime.RawExtension) []any {
var data []any
if err := yaml.Unmarshal(ext.Raw, &data); err != nil {
klog.Errorf("failed to unmarshal extension to any: %v", err)
klog.ErrorS(err, "failed to unmarshal extension to slice")
}
return data
}

View File

@ -31,10 +31,13 @@ import (
)
type variable struct {
// key is the unique Identifier of the variable. usually the UID of the pipeline.
key string
// source is where the variable is stored
source source.Source
// value is the data of the variable, which store in memory
value *value
// lock is the lock for value
sync.Mutex
}
@ -111,7 +114,7 @@ type VariableData map[string]any
func (v VariableData) String() string {
data, err := json.Marshal(v)
if err != nil {
klog.Errorf("marshal in error: %v", err)
klog.ErrorS(err, "marshal in error", "data", v)
return ""
}
return string(data)
@ -122,6 +125,10 @@ type host struct {
RuntimeVars map[string]VariableData `json:"runtime"`
}
func (v *variable) Key() string {
return v.key
}
func (v *variable) Get(option GetOption) (any, error) {
return option.filter(*v.value)
}
@ -139,14 +146,14 @@ func (v *variable) Merge(mo ...MergeOption) error {
if !reflect.DeepEqual(old.Location, v.value.Location) {
if err := v.syncLocation(); err != nil {
klog.Errorf("sync location error %v", err)
klog.ErrorS(err, "sync location error")
}
}
for hn, hv := range v.value.Hosts {
if !reflect.DeepEqual(old.Hosts[hn], hv) {
if err := v.syncHosts(hn); err != nil {
klog.Errorf("sync group error %v", err)
klog.ErrorS(err, "sync host error", "hostname", hn)
}
}
}
@ -157,11 +164,11 @@ func (v *variable) Merge(mo ...MergeOption) error {
func (v *variable) syncLocation() error {
data, err := json.MarshalIndent(v.value.Location, "", " ")
if err != nil {
klog.Errorf("marshal location data failed: %v", err)
klog.ErrorS(err, "marshal location data error")
return err
}
if err := v.source.Write(data, _const.RuntimePipelineVariableLocationFile); err != nil {
klog.Errorf("write location data to local file %s error %v", _const.RuntimePipelineVariableLocationFile, err)
klog.ErrorS(err, "write location data to local file error", "filename", _const.RuntimePipelineVariableLocationFile)
return err
}
return nil
@ -173,11 +180,11 @@ func (v *variable) syncHosts(hostname ...string) error {
if hv, ok := v.value.Hosts[hn]; ok {
data, err := json.MarshalIndent(hv, "", " ")
if err != nil {
klog.Errorf("marshal host %s data failed: %v", hn, err)
klog.ErrorS(err, "marshal host data error", "hostname", hn)
return err
}
if err := v.source.Write(data, fmt.Sprintf("%s.json", hn)); err != nil {
klog.Errorf("write host data to local file %s.json error %v", hn, err)
klog.ErrorS(err, "write host data to local file error", "hostname", hn, "filename", fmt.Sprintf("%s.json", hn))
}
}
}

View File

@ -31,7 +31,7 @@ type fileSource struct {
func (f *fileSource) Read() (map[string][]byte, error) {
de, err := os.ReadDir(f.path)
if err != nil {
klog.Errorf("read dir %s error %v", f.path, err)
klog.ErrorS(err, "read dir error", "path", f.path)
return nil, err
}
var result map[string][]byte
@ -46,6 +46,7 @@ func (f *fileSource) Read() (map[string][]byte, error) {
if strings.HasSuffix(entry.Name(), ".json") {
data, err := os.ReadFile(filepath.Join(f.path, entry.Name()))
if err != nil {
klog.ErrorS(err, "read file error", "filename", entry.Name())
return nil, err
}
result[entry.Name()] = data
@ -58,10 +59,12 @@ func (f *fileSource) Read() (map[string][]byte, error) {
func (f *fileSource) Write(data []byte, filename string) error {
file, err := os.Create(filepath.Join(f.path, filename))
if err != nil {
klog.ErrorS(err, "create file error", "filename", filename)
return err
}
defer file.Close()
if _, err := file.Write(data); err != nil {
klog.ErrorS(err, "write file error", "filename", filename)
return err
}
return nil

View File

@ -40,7 +40,7 @@ type Watcher interface {
func New(path string) (Source, error) {
if _, err := os.Stat(path); err != nil {
if err := os.MkdirAll(path, fs.ModePerm); err != nil {
klog.Errorf("create source path %s error: %v", path, err)
klog.ErrorS(err, "create source path error", "path", path)
return nil, err
}
}

View File

@ -25,6 +25,8 @@ import (
"strconv"
"strings"
cgcache "k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
@ -36,6 +38,7 @@ import (
)
type Variable interface {
Key() string
Get(option GetOption) (any, error)
Merge(option ...MergeOption) error
}
@ -51,22 +54,23 @@ func New(o Options) (Variable, error) {
// new source
s, err := source.New(filepath.Join(_const.RuntimeDirFromObject(&o.Pipeline), _const.RuntimePipelineVariableDir))
if err != nil {
klog.Errorf("create file source failed: %v", err)
klog.ErrorS(err, "create file source failed", "path", filepath.Join(_const.RuntimeDirFromObject(&o.Pipeline), _const.RuntimePipelineVariableDir), "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
return nil, err
}
// get config
var config = &kubekeyv1.Config{}
if err := o.Client.Get(o.Ctx, types.NamespacedName{o.Pipeline.Spec.ConfigRef.Namespace, o.Pipeline.Spec.ConfigRef.Name}, config); err != nil {
klog.Errorf("get config from pipeline error %v", err)
klog.ErrorS(err, "get config from pipeline error", "config", o.Pipeline.Spec.ConfigRef, "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
return nil, err
}
// get inventory
var inventory = &kubekeyv1.Inventory{}
if err := o.Client.Get(o.Ctx, types.NamespacedName{o.Pipeline.Spec.InventoryRef.Namespace, o.Pipeline.Spec.InventoryRef.Name}, inventory); err != nil {
klog.Errorf("get inventory from pipeline error %v", err)
klog.ErrorS(err, "get inventory from pipeline error", "inventory", o.Pipeline.Spec.InventoryRef, "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
return nil, err
}
v := &variable{
key: string(o.Pipeline.UID),
source: s,
value: &value{
Config: *config,
@ -77,21 +81,21 @@ func New(o Options) (Variable, error) {
// read data from source
data, err := v.source.Read()
if err != nil {
klog.Errorf("read data from source error %v", err)
klog.ErrorS(err, "read data from source error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
return nil, err
}
for k, d := range data {
if k == _const.RuntimePipelineVariableLocationFile {
// set location
if err := json.Unmarshal(d, &v.value.Location); err != nil {
klog.Errorf("unmarshal location error %v", err)
klog.ErrorS(err, "unmarshal location error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
return nil, err
}
} else {
// set hosts
h := host{}
if err := json.Unmarshal(d, &h); err != nil {
klog.Errorf("unmarshal host error %v", err)
klog.ErrorS(err, "unmarshal host error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
return nil, err
}
v.value.Hosts[strings.TrimSuffix(k, ".json")] = h
@ -252,7 +256,7 @@ func (g Hostnames) filter(data value) (any, error) {
if match := regex.FindStringSubmatch(n); match != nil {
index, err := strconv.Atoi(match[2])
if err != nil {
klog.Errorf("convert index %s to int failed: %v", match[2], err)
klog.ErrorS(err, "convert index to int error", "index", match[2])
return nil, err
}
for gn, gv := range data.Inventory.Spec.Groups {
@ -549,3 +553,12 @@ func (t LocationMerge) mergeTo(v *value) error {
return nil
}
// Cache is a cache for variable
var Cache = cgcache.NewStore(func(obj interface{}) (string, error) {
v, ok := obj.(Variable)
if !ok {
return "", fmt.Errorf("cannot convert %v to variable", obj)
}
return v.Key(), nil
})