From 5205c4cbdd1981775e34cd737ab336f4405bcfec Mon Sep 17 00:00:00 2001 From: joyceliu Date: Mon, 15 Jan 2024 19:23:31 +0800 Subject: [PATCH] feat: Change the klog print format. Remove custom-defined cache instances and use Kubernetes cache instead. Signed-off-by: joyceliu --- config/helm/templates/_helpers.tpl | 8 -- go.mod | 2 +- go.sum | 19 ---- pkg/cache/cache.go | 89 ------------------- pkg/cache/cache_test.go | 31 ------- pkg/connector/local_connector.go | 34 ++----- pkg/connector/ssh_connector.go | 20 +++-- pkg/const/helper.go | 2 +- pkg/controllers/pipeline_controller.go | 28 +++--- pkg/controllers/task_controller.go | 76 +++++++++------- pkg/converter/converter.go | 54 ++++++----- pkg/converter/tmpl/template.go | 22 ++--- pkg/manager/command_manager.go | 34 +++---- pkg/manager/controller_manager.go | 30 ++++--- pkg/manager/manager.go | 4 +- pkg/modules/assert.go | 14 +-- pkg/modules/command.go | 2 +- pkg/modules/copy.go | 24 +++-- pkg/modules/debug.go | 6 +- pkg/modules/helper.go | 57 ------------ pkg/modules/module.go | 2 - .../{helper_test.go => module_test.go} | 4 + pkg/modules/set_fact.go | 6 +- pkg/modules/template.go | 18 ++-- pkg/project/project_git.go | 7 +- pkg/{cache => proxy}/runtime_client.go | 75 ++++------------ pkg/task/controller.go | 7 +- pkg/task/helper.go | 16 ++-- pkg/task/internal.go | 65 ++++++++------ pkg/variable/helper.go | 18 ++-- pkg/variable/internal.go | 25 ++++-- pkg/variable/source/file.go | 5 +- pkg/variable/source/source.go | 2 +- pkg/variable/variable.go | 27 ++++-- 34 files changed, 326 insertions(+), 507 deletions(-) delete mode 100644 pkg/cache/cache.go delete mode 100644 pkg/cache/cache_test.go delete mode 100644 pkg/modules/helper.go rename pkg/modules/{helper_test.go => module_test.go} (96%) rename pkg/{cache => proxy}/runtime_client.go (81%) diff --git a/config/helm/templates/_helpers.tpl b/config/helm/templates/_helpers.tpl index f51c77d6..e7719ad0 100644 --- a/config/helm/templates/_helpers.tpl +++ b/config/helm/templates/_helpers.tpl @@ -10,14 +10,6 @@ app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} app.kubernetes.io/managed-by: {{ .Release.Service }} {{- end }} -{{/* -Selector labels -*/}} -{{- define "common.selectorLabels" -}} -app.kubernetes.io/name: {{ .Chart.Name }} -app.kubernetes.io/instance: {{ .Release.Name }} -{{- end }} - {{/* Create chart name and version as used by the chart label. */}} diff --git a/go.mod b/go.mod index 6d80c0cd..ce6841db 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/kubesphere/kubekey/v4 go 1.20 require ( - github.com/evanphx/json-patch v5.7.0+incompatible github.com/flosch/pongo2/v6 v6.0.0 github.com/go-git/go-git/v5 v5.11.0 github.com/google/gops v0.3.28 @@ -37,6 +36,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.1 // indirect github.com/emirpasic/gods v1.18.1 // indirect + github.com/evanphx/json-patch v5.7.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.7.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect diff --git a/go.sum b/go.sum index 66c5eb1d..9bea54c4 100644 --- a/go.sum +++ b/go.sum @@ -24,14 +24,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU= -github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= -github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/emicklei/go-restful/v3 v3.11.1 h1:S+9bSbua1z3FgCnV0KKOSSZ3mDthb5NyEPL5gEpCvyk= github.com/emicklei/go-restful/v3 v3.11.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= -github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= -github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v5.7.0+incompatible h1:vgGkfT/9f8zE6tvSCe74nfpAVDQ2tG6yudJd8LBksgI= github.com/evanphx/json-patch v5.7.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.7.0 h1:nJqP7uwL84RJInrohHfW0Fx3awjbm8qZeFv0nW9SYGc= @@ -48,7 +44,6 @@ github.com/go-git/go-billy/v5 v5.5.0/go.mod h1:hmexnoNsr2SJU1Ju67OaNz5ASJY3+sHgF github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMje31YglSBqCdIqdhKBW8lokaMrL3uTkpGYlE2OOT4= github.com/go-git/go-git/v5 v5.11.0 h1:XIZc1p+8YzypNr34itUfSvYJcv+eYdTnTvOZ2vD3cA4= github.com/go-git/go-git/v5 v5.11.0/go.mod h1:6GFcX2P3NM7FPBfpePbpLd21XxsgdAt+lKqXmCUiUCY= -github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -88,13 +83,10 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= -github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= -github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4= github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -106,10 +98,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -119,7 +109,6 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4= @@ -151,7 +140,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -177,8 +165,6 @@ golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2Uz golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/exp v0.0.0-20231226003508-02704c960a9b h1:kLiC65FbiHWFAOu+lxwNPujcsl8VYyTYYEZnsOO1WK4= -golang.org/x/exp v0.0.0-20231226003508-02704c960a9b/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc h1:ao2WRsKSzW6KuUY9IWPwWahcHCgR0s52IfwutMfEbdM= golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -269,7 +255,6 @@ google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME= @@ -293,12 +278,8 @@ k8s.io/component-base v0.29.0 h1:T7rjd5wvLnPBV1vC4zWd/iWRbV8Mdxs+nGaoaFzGw3s= k8s.io/component-base v0.29.0/go.mod h1:sADonFTQ9Zc9yFLghpDpmNXEdHyQmFIGbiuZbqAXQ1M= k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= -k8s.io/kube-openapi v0.0.0-20231214164306-ab13479f8bf8 h1:yHNkNuLjht7iq95pO9QmbjOWCguvn8mDe3lT78nqPkw= -k8s.io/kube-openapi v0.0.0-20231214164306-ab13479f8bf8/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/kube-openapi v0.0.0-20240103195357-a9f8850cb432 h1:+XYBQU3ZKUu60H6fEnkitTTabGoKfIG8zczhZBENu9o= k8s.io/kube-openapi v0.0.0-20240103195357-a9f8850cb432/go.mod h1:Pa1PvrP7ACSkuX6I7KYomY6cmMA0Tx86waBhDUgoKPw= -k8s.io/utils v0.0.0-20231127182322-b307cd553661 h1:FepOBzJ0GXm8t0su67ln2wAZjbQ6RxQGZDnzuLcrUTI= -k8s.io/utils v0.0.0-20231127182322-b307cd553661/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4= diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go deleted file mode 100644 index 68e25daa..00000000 --- a/pkg/cache/cache.go +++ /dev/null @@ -1,89 +0,0 @@ -/* -Copyright 2023 The KubeSphere Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cache - -import ( - "sync" -) - -// Cache is the interface for cache. -type Cache interface { - // Name of pool - Name() string - // Put the cached value for the given key. - Put(key string, value any) - // Get the cached value for the given key. - Get(key string) (any, bool) - // Release the cached value for the given id. - Release(id string) - // Clean all cached value - Clean() -} - -type local struct { - name string - cache map[string]any - - sync.Mutex -} - -func (p *local) Name() string { - return p.name -} - -func (p *local) Put(key string, value any) { - p.Lock() - defer p.Unlock() - - p.cache[key] = value -} - -func (p *local) Get(key string) (any, bool) { - v, ok := p.cache[key] - if ok { - return v, ok - } - return v, false -} - -func (p *local) Release(id string) { - p.Lock() - defer p.Unlock() - - delete(p.cache, id) -} - -func (p *local) Clean() { - p.Lock() - defer p.Unlock() - for id := range p.cache { - delete(p.cache, id) - } -} - -// NewLocalCache return a local cache -func NewLocalCache(name string) Cache { - return &local{ - name: name, - cache: make(map[string]any), - } -} - -var ( - // LocalVariable is a local cache for variable.Variable - LocalVariable = NewLocalCache("variable") -) diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go deleted file mode 100644 index 031ef1d4..00000000 --- a/pkg/cache/cache_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package cache - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestCache(t *testing.T) { - testCache := NewLocalCache("test") - assert.Equal(t, "test", testCache.Name()) - - // should not be able to get the key - _, ok := testCache.Get("foo") - assert.False(t, ok) - - // put a key - testCache.Put("foo", "bar") - - // should be able to get the key - v, ok := testCache.Get("foo") - assert.True(t, ok) - assert.Equal(t, "bar", v) - - // release the key - testCache.Release("foo") - - // should not be able to get the key - _, ok = testCache.Get("foo") - assert.False(t, ok) -} diff --git a/pkg/connector/local_connector.go b/pkg/connector/local_connector.go index 63be5ddd..e0286708 100644 --- a/pkg/connector/local_connector.go +++ b/pkg/connector/local_connector.go @@ -41,20 +41,19 @@ func (c *localConnector) Close(ctx context.Context) error { func (c *localConnector) CopyFile(ctx context.Context, local []byte, remoteFile string, mode fs.FileMode) error { // create remote file - if _, err := os.Stat(filepath.Dir(remoteFile)); err != nil { - klog.Warningf("Failed to stat dir %s: %v create it", filepath.Dir(remoteFile), err) + if _, err := os.Stat(filepath.Dir(remoteFile)); err != nil && os.IsNotExist(err) { if err := os.MkdirAll(filepath.Dir(remoteFile), mode); err != nil { - klog.Errorf("Failed to create dir %s: %v", filepath.Dir(remoteFile), err) + klog.ErrorS(err, "Failed to create remote dir", "remote_file", remoteFile) return err } } rf, err := os.Create(remoteFile) if err != nil { - klog.Errorf("Failed to create file %s: %v", remoteFile, err) + klog.ErrorS(err, "Failed to create remote file", "remote_file", remoteFile) return err } if _, err := rf.Write(local); err != nil { - klog.Errorf("Failed to write file %s: %v", remoteFile, err) + klog.ErrorS(err, "Failed to write content to remote file", "remote_file", remoteFile) return err } return rf.Chmod(mode) @@ -64,11 +63,11 @@ func (c *localConnector) FetchFile(ctx context.Context, remoteFile string, local var err error file, err := os.Open(remoteFile) if err != nil { - klog.Errorf("Failed to read file %s: %v", remoteFile, err) + klog.ErrorS(err, "Failed to read remote file failed", "remote_file", remoteFile) return err } if _, err := io.Copy(local, file); err != nil { - klog.Errorf("Failed to copy file %s: %v", remoteFile, err) + klog.ErrorS(err, "Failed to copy remote file to local", "remote_file", remoteFile) return err } return nil @@ -77,24 +76,3 @@ func (c *localConnector) FetchFile(ctx context.Context, remoteFile string, local func (c *localConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) { return c.Cmd.CommandContext(ctx, cmd).CombinedOutput() } - -func (c *localConnector) copyFile(sourcePath, destinationPath string) error { - sourceFile, err := os.Open(sourcePath) - if err != nil { - return err - } - defer sourceFile.Close() - - destinationFile, err := os.Create(destinationPath) - if err != nil { - return err - } - defer destinationFile.Close() - - _, err = io.Copy(destinationFile, sourceFile) - if err != nil { - return err - } - - return nil -} diff --git a/pkg/connector/ssh_connector.go b/pkg/connector/ssh_connector.go index 417ad2ff..949a394a 100644 --- a/pkg/connector/ssh_connector.go +++ b/pkg/connector/ssh_connector.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "io/fs" + "os" "path/filepath" "strconv" @@ -57,6 +58,7 @@ func (c *sshConnector) Init(ctx context.Context) error { HostKeyCallback: ssh.InsecureIgnoreHostKey(), }) if err != nil { + klog.ErrorS(err, "Dial ssh server failed", "host", c.Host, "port", *c.Port) return err } c.client = sshClient @@ -72,27 +74,26 @@ func (c *sshConnector) CopyFile(ctx context.Context, src []byte, remoteFile stri // create sftp client sftpClient, err := sftp.NewClient(c.client) if err != nil { - klog.Errorf("Failed to create sftp client: %v", err) + klog.ErrorS(err, "Failed to create sftp client") return err } defer sftpClient.Close() // create remote file - if _, err := sftpClient.Stat(filepath.Dir(remoteFile)); err != nil { - klog.Warningf("Failed to stat dir %s: %v create it", filepath.Dir(remoteFile), err) + if _, err := sftpClient.Stat(filepath.Dir(remoteFile)); err != nil && os.IsNotExist(err) { if err := sftpClient.MkdirAll(filepath.Dir(remoteFile)); err != nil { - klog.Errorf("Failed to create dir %s: %v", filepath.Dir(remoteFile), err) + klog.ErrorS(err, "Failed to create remote dir", "remote_file", remoteFile) return err } } rf, err := sftpClient.Create(remoteFile) if err != nil { - klog.Errorf("Failed to create file %s: %v", remoteFile, err) + klog.ErrorS(err, "Failed to create remote file", "remote_file", remoteFile) return err } defer rf.Close() if _, err = rf.Write(src); err != nil { - klog.Errorf("Failed to write file %s: %v", remoteFile, err) + klog.ErrorS(err, "Failed to write content to remote file", "remote_file", remoteFile) return err } return rf.Chmod(mode) @@ -102,18 +103,18 @@ func (c *sshConnector) FetchFile(ctx context.Context, remoteFile string, local i // create sftp client sftpClient, err := sftp.NewClient(c.client) if err != nil { - klog.Errorf("Failed to create sftp client: %v", err) + klog.ErrorS(err, "Failed to create sftp client", "remote_file", remoteFile) return err } defer sftpClient.Close() rf, err := sftpClient.Open(remoteFile) if err != nil { - klog.Errorf("Failed to open file %s: %v", remoteFile, err) + klog.ErrorS(err, "Failed to open file", "remote_file", remoteFile) return err } defer rf.Close() if _, err := io.Copy(local, rf); err != nil { - klog.Errorf("Failed to copy file %s: %v", remoteFile, err) + klog.ErrorS(err, "Failed to copy file", "remote_file", remoteFile) return err } return nil @@ -123,6 +124,7 @@ func (c *sshConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, // create ssh session session, err := c.client.NewSession() if err != nil { + klog.ErrorS(err, "Failed to create ssh session") return nil, err } defer session.Close() diff --git a/pkg/const/helper.go b/pkg/const/helper.go index 8496ff1a..d2a8524c 100644 --- a/pkg/const/helper.go +++ b/pkg/const/helper.go @@ -66,7 +66,7 @@ func RuntimeDirFromObject(obj runtime.Object) string { mo, ok := obj.(metav1.Object) if !ok { - klog.Errorf("failed convert to metav1.Object: %s", obj.GetObjectKind().GroupVersionKind().String()) + klog.Errorf("Failed convert to metav1.Object: %s", obj.GetObjectKind().GroupVersionKind().String()) return "" } return filepath.Join(workDir, RuntimeDir, mo.GetNamespace(), resource, mo.GetName()) diff --git a/pkg/controllers/pipeline_controller.go b/pkg/controllers/pipeline_controller.go index 0ac4d040..df0e3eaa 100644 --- a/pkg/controllers/pipeline_controller.go +++ b/pkg/controllers/pipeline_controller.go @@ -41,23 +41,21 @@ type PipelineReconciler struct { } func (r PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - klog.Infof("[Pipeline %s] begin reconcile", req.NamespacedName.String()) - defer func() { - klog.Infof("[Pipeline %s] end reconcile", req.NamespacedName.String()) - }() - + klog.V(5).InfoS("start pipeline reconcile", "pipeline", req.String()) + defer klog.V(5).InfoS("finish pipeline reconcile", "pipeline", req.String()) + // get pipeline pipeline := &kubekeyv1.Pipeline{} err := r.Client.Get(ctx, req.NamespacedName, pipeline) if err != nil { if errors.IsNotFound(err) { - klog.V(5).Infof("[Pipeline %s] pipeline not found", req.NamespacedName.String()) + klog.V(5).InfoS("pipeline not found", "pipeline", req.String()) return ctrl.Result{}, nil } return ctrl.Result{}, err } if pipeline.DeletionTimestamp != nil { - klog.V(5).Infof("[Pipeline %s] pipeline is deleting", req.NamespacedName.String()) + klog.V(5).InfoS("pipeline is deleting", "pipeline", req.String()) return ctrl.Result{}, nil } @@ -66,14 +64,14 @@ func (r PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct excepted := pipeline.DeepCopy() pipeline.Status.Phase = kubekeyv1.PipelinePhasePending if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(excepted)); err != nil { - klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err) + klog.ErrorS(err, "update pipeline error", "pipeline", req.String()) return ctrl.Result{}, err } case kubekeyv1.PipelinePhasePending: excepted := pipeline.DeepCopy() pipeline.Status.Phase = kubekeyv1.PipelinePhaseRunning if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(excepted)); err != nil { - klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err) + klog.ErrorS(err, "update pipeline error", "pipeline", req.String()) return ctrl.Result{}, err } case kubekeyv1.PipelinePhaseRunning: @@ -89,7 +87,7 @@ func (r PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline *kubekeyv1.Pipeline) (ctrl.Result, error) { if _, ok := pipeline.Annotations[kubekeyv1.PauseAnnotation]; ok { // if pipeline is paused, do nothing - klog.V(5).Infof("[Pipeline %s] pipeline is paused", ctrlclient.ObjectKeyFromObject(pipeline)) + klog.V(5).InfoS("pipeline is paused", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) return ctrl.Result{}, nil } @@ -97,14 +95,14 @@ func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline * defer func() { // update pipeline status if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(cp)); err != nil { - klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err) + klog.ErrorS(err, "update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) } }() if err := r.TaskController.AddTasks(ctx, task.AddTaskOptions{ Pipeline: pipeline, }); err != nil { - klog.Errorf("[Pipeline %s] add task error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err) + klog.ErrorS(err, "add task error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed pipeline.Status.Reason = fmt.Sprintf("add task to controller failed: %v", err) return ctrl.Result{}, err @@ -116,10 +114,10 @@ func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline * // clean runtime directory func (r *PipelineReconciler) clean(ctx context.Context, pipeline *kubekeyv1.Pipeline) { if !pipeline.Spec.Debug && pipeline.Status.Phase == kubekeyv1.PipelinePhaseSucceed { - klog.Infof("[Pipeline %s] clean runtimeDir", ctrlclient.ObjectKeyFromObject(pipeline)) + klog.V(5).InfoS("clean runtimeDir", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) // clean runtime directory if err := os.RemoveAll(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir)); err != nil { - klog.Errorf("clean runtime directory %s error: %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), err) + klog.ErrorS(err, "clean runtime directory error", "runtime dir", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline)) } } } @@ -127,7 +125,7 @@ func (r *PipelineReconciler) clean(ctx context.Context, pipeline *kubekeyv1.Pipe // SetupWithManager sets up the controller with the Manager. func (r *PipelineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options Options) error { if !options.IsControllerEnabled("pipeline") { - klog.V(5).Infof("pipeline controller is disabled") + klog.V(5).InfoS("controller is disabled", "controller", "pipeline") return nil } diff --git a/pkg/controllers/task_controller.go b/pkg/controllers/task_controller.go index 5d94e482..4dfdcd7b 100644 --- a/pkg/controllers/task_controller.go +++ b/pkg/controllers/task_controller.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + cgcache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/utils/strings/slices" ctrl "sigs.k8s.io/controller-runtime" @@ -31,7 +32,6 @@ import ( kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1" kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1" - "github.com/kubesphere/kubekey/v4/pkg/cache" "github.com/kubesphere/kubekey/v4/pkg/converter" "github.com/kubesphere/kubekey/v4/pkg/converter/tmpl" "github.com/kubesphere/kubekey/v4/pkg/modules" @@ -42,7 +42,7 @@ type TaskReconciler struct { // Client to resources ctrlclient.Client // VariableCache to store variable - VariableCache cache.Cache + VariableCache cgcache.Store } type taskReconcileOptions struct { @@ -52,18 +52,18 @@ type taskReconcileOptions struct { } func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { - klog.V(5).Infof("[Task %s] start reconcile", request.String()) - defer klog.V(5).Infof("[Task %s] finish reconcile", request.String()) + klog.V(5).InfoS("start task reconcile", "task", request.String()) + defer klog.V(5).InfoS("finish task reconcile", "task", request.String()) // get task var task = &kubekeyv1alpha1.Task{} if err := r.Client.Get(ctx, request.NamespacedName, task); err != nil { - klog.Errorf("get task %s error %v", request, err) + klog.ErrorS(err, "get task error", "task", request.String()) return ctrl.Result{}, nil } // if task is deleted, skip if task.DeletionTimestamp != nil { - klog.V(5).Infof("[Task %s] task is deleted, skip", request.String()) + klog.V(5).InfoS("task is deleted, skip", "task", request.String()) return ctrl.Result{}, nil } @@ -72,11 +72,11 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c for _, ref := range task.OwnerReferences { if ref.Kind == "Pipeline" { if err := r.Client.Get(ctx, types.NamespacedName{Namespace: task.Namespace, Name: ref.Name}, pipeline); err != nil { - klog.Errorf("[Task %s] get pipeline %s error %v", request.String(), types.NamespacedName{Namespace: task.Namespace, Name: ref.Name}.String(), err) if errors.IsNotFound(err) { - klog.V(4).Infof("[Task %s] pipeline is deleted, skip", request.String()) + klog.V(5).InfoS("pipeline is deleted, skip", "task", request.String()) return ctrl.Result{}, nil } + klog.ErrorS(err, "get pipeline error", "task", request.String(), "pipeline", types.NamespacedName{Namespace: task.Namespace, Name: ref.Name}.String()) return ctrl.Result{}, err } break @@ -84,33 +84,41 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c } if _, ok := pipeline.Annotations[kubekeyv1.PauseAnnotation]; ok { - klog.V(5).Infof("[Task %s] pipeline is paused, skip", request.String()) + klog.V(5).InfoS("pipeline is paused, skip", "task", request.String()) return ctrl.Result{}, nil } // get variable var v variable.Variable - if vc, ok := r.VariableCache.Get(string(pipeline.UID)); !ok { - // create new variable + vars, ok, err := r.VariableCache.GetByKey(string(pipeline.UID)) + if err != nil { + klog.ErrorS(err, "get variable error", "task", request.String()) + return ctrl.Result{}, err + } + if ok { + v = vars.(variable.Variable) + } else { nv, err := variable.New(variable.Options{ Ctx: ctx, Client: r.Client, Pipeline: *pipeline, }) if err != nil { + klog.ErrorS(err, "create variable error", "task", request.String()) + return ctrl.Result{}, err + } + if err := r.VariableCache.Add(nv); err != nil { + klog.ErrorS(err, "add variable to store error", "task", request.String()) return ctrl.Result{}, err } - r.VariableCache.Put(string(pipeline.UID), nv) v = nv - } else { - v = vc.(variable.Variable) } defer func() { var nsTasks = &kubekeyv1alpha1.TaskList{} - klog.V(5).Infof("[Task %s] update pipeline %s status", ctrlclient.ObjectKeyFromObject(task).String(), ctrlclient.ObjectKeyFromObject(pipeline).String()) + klog.V(5).InfoS("update pipeline status", "task", request.String(), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline).String()) if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(task.Namespace)); err != nil { - klog.Errorf("[Task %s] list task error %v", ctrlclient.ObjectKeyFromObject(task).String(), err) + klog.ErrorS(err, "list task error", "task", request.String()) return } // filter by ownerReference @@ -129,7 +137,7 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c cp := pipeline.DeepCopy() converter.CalculatePipelineStatus(nsTasks, pipeline) if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(cp)); err != nil { - klog.Errorf("[Task %s] update pipeline %s status error %v", ctrlclient.ObjectKeyFromObject(task).String(), pipeline.Name, err) + klog.ErrorS(err, "update pipeline status error", "task", request.String(), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline).String()) } }() @@ -139,7 +147,7 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c task.Status.Phase = kubekeyv1alpha1.TaskPhasePending task.Status.RestartCount++ if err := r.Client.Update(ctx, task); err != nil { - klog.Errorf("update task %s error %v", task.Name, err) + klog.ErrorS(err, "update task error", "task", request.String()) return ctrl.Result{}, err } } @@ -169,18 +177,18 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc LocationUID: string(options.Task.UID), }) if err != nil { - klog.Errorf("[Task %s] find dependency error %v", ctrlclient.ObjectKeyFromObject(options.Task).String(), err) + klog.ErrorS(err, "find dependency error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String()) return ctrl.Result{}, err } dt, ok := dl.(variable.DependencyTask) if !ok { - klog.Errorf("[Task %s] failed to convert dependency", ctrlclient.ObjectKeyFromObject(options.Task).String()) + klog.ErrorS(err, "failed to convert dependency", "task", ctrlclient.ObjectKeyFromObject(options.Task).String()) return ctrl.Result{}, fmt.Errorf("[Task %s] failed to convert dependency", ctrlclient.ObjectKeyFromObject(options.Task).String()) } var nsTasks = &kubekeyv1alpha1.TaskList{} if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(options.Task.Namespace)); err != nil { - klog.Errorf("[Task %s] list task error %v", ctrlclient.ObjectKeyFromObject(options.Task).String(), err) + klog.ErrorS(err, "list task error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String(), err) return ctrl.Result{}, err } // filter by ownerReference @@ -210,13 +218,13 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc // update task phase to running options.Task.Status.Phase = kubekeyv1alpha1.TaskPhaseRunning if err := r.Client.Update(ctx, options.Task); err != nil { - klog.Errorf("[Task %s] update task to Running error %v", ctrlclient.ObjectKeyFromObject(options.Task), err) + klog.ErrorS(err, "update task to Running error", "task", ctrlclient.ObjectKeyFromObject(options.Task)) } return ctrl.Result{Requeue: true}, nil case kubekeyv1alpha1.TaskPhaseSkipped: options.Task.Status.Phase = kubekeyv1alpha1.TaskPhaseSkipped if err := r.Client.Update(ctx, options.Task); err != nil { - klog.Errorf("[Task %s] update task to Skipped error %v", ctrlclient.ObjectKeyFromObject(options.Task), err) + klog.ErrorS(err, "update task to Skipped error", "task", ctrlclient.ObjectKeyFromObject(options.Task)) } return ctrl.Result{}, nil default: @@ -226,13 +234,11 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc func (r *TaskReconciler) dealRunningTask(ctx context.Context, options taskReconcileOptions) (ctrl.Result, error) { // find task in location - klog.Infof("[Task %s] dealRunningTask begin", ctrlclient.ObjectKeyFromObject(options.Task)) - defer func() { - klog.Infof("[Task %s] dealRunningTask end, task phase: %s", ctrlclient.ObjectKeyFromObject(options.Task), options.Task.Status.Phase) - }() + klog.InfoS("dealRunningTask begin", "task", ctrlclient.ObjectKeyFromObject(options.Task)) + defer klog.Info("dealRunningTask end, task phase", "task", ctrlclient.ObjectKeyFromObject(options.Task), "phase", options.Task.Status.Phase) if err := r.executeTask(ctx, options); err != nil { - klog.Errorf("[Task %s] execute task error %v", ctrlclient.ObjectKeyFromObject(options.Task), err) + klog.ErrorS(err, "execute task error", "task", ctrlclient.ObjectKeyFromObject(options.Task)) return ctrl.Result{}, nil } return ctrl.Result{}, nil @@ -246,7 +252,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO cd.EndTimestamp = metav1.Now() options.Task.Status.Conditions = append(options.Task.Status.Conditions, cd) if err := r.Client.Update(ctx, options.Task); err != nil { - klog.Errorf("[Task %s] update task status error %v", ctrlclient.ObjectKeyFromObject(options.Task), err) + klog.ErrorS(err, "update task status error", "task", ctrlclient.ObjectKeyFromObject(options.Task)) } }() @@ -270,7 +276,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO if options.Task.Spec.Register != "" { puid, err := options.Variable.Get(variable.ParentLocation{LocationUID: string(options.Task.UID)}) if err != nil { - klog.Errorf("[Task %s] get location error %v", ctrlclient.ObjectKeyFromObject(options.Task), err) + klog.ErrorS(err, "get location error", "task", ctrlclient.ObjectKeyFromObject(options.Task)) return } // set variable to parent location @@ -284,7 +290,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO }, }, }); err != nil { - klog.Errorf("[Task %s] register error %v", ctrlclient.ObjectKeyFromObject(options.Task), err) + klog.ErrorS(err, "register task result to variable error", "task", ctrlclient.ObjectKeyFromObject(options.Task)) return } } @@ -295,6 +301,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO LocationUID: string(options.Task.UID), }) if err != nil { + klog.ErrorS(err, "get location variable error", "task", ctrlclient.ObjectKeyFromObject(options.Task)) stderr = err.Error() return } @@ -302,6 +309,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO if len(options.Task.Spec.When) > 0 { ok, err := tmpl.ParseBool(lg.(variable.VariableData), options.Task.Spec.When) if err != nil { + klog.ErrorS(err, "parse when condition error", "task", ctrlclient.ObjectKeyFromObject(options.Task)) stderr = err.Error() return } @@ -326,6 +334,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO case string: item, err = tmpl.ParseString(lg.(variable.VariableData), item.(string)) if err != nil { + klog.ErrorS(err, "parse loop vars error", "task", ctrlclient.ObjectKeyFromObject(options.Task)) stderr = err.Error() return } @@ -333,6 +342,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO for k, v := range item.(variable.VariableData) { sv, err := tmpl.ParseString(lg.(variable.VariableData), v.(string)) if err != nil { + klog.ErrorS(err, "parse loop vars error", "task", ctrlclient.ObjectKeyFromObject(options.Task)) stderr = err.Error() return } @@ -392,7 +402,7 @@ func (r *TaskReconciler) executeModule(ctx context.Context, task *kubekeyv1alpha LocationUID: string(task.UID), }) if err != nil { - klog.Errorf("[Task %s] get location variable error %v", ctrlclient.ObjectKeyFromObject(task), err) + klog.ErrorS(err, "get location variable error", "task", ctrlclient.ObjectKeyFromObject(task)) return "", err.Error() } @@ -400,7 +410,7 @@ func (r *TaskReconciler) executeModule(ctx context.Context, task *kubekeyv1alpha if len(task.Spec.FailedWhen) > 0 { ok, err := tmpl.ParseBool(lg.(variable.VariableData), task.Spec.FailedWhen) if err != nil { - klog.Errorf("[Task %s] validate FailedWhen condition error %v", ctrlclient.ObjectKeyFromObject(task), err) + klog.ErrorS(err, "validate FailedWhen condition error", "task", ctrlclient.ObjectKeyFromObject(task)) return "", err.Error() } if ok { diff --git a/pkg/converter/converter.go b/pkg/converter/converter.go index 27d517d7..a41d6ee8 100644 --- a/pkg/converter/converter.go +++ b/pkg/converter/converter.go @@ -48,23 +48,23 @@ func MarshalPlaybook(baseFS fs.FS, pbPath string) (*kkcorev1.Playbook, error) { // convert playbook to kkcorev1.Playbook pb := &kkcorev1.Playbook{} if err := loadPlaybook(baseFS, pbPath, pb); err != nil { - klog.Errorf(" load playbook with include %s failed: %v", pbPath, err) + klog.ErrorS(err, "Load playbook failed", "playbook", pbPath) return nil, err } // convertRoles if err := convertRoles(baseFS, pbPath, pb); err != nil { - klog.Errorf("convertRoles error %v", err) + klog.ErrorS(err, "ConvertRoles error", "playbook", pbPath) return nil, err } if err := convertIncludeTasks(baseFS, pbPath, pb); err != nil { - klog.Errorf("convertIncludeTasks error %v", err) + klog.ErrorS(err, "ConvertIncludeTasks error", "playbook", pbPath) return nil, err } if err := pb.Validate(); err != nil { - klog.Errorf("validate playbook %s failed: %v", pbPath, err) + klog.ErrorS(err, "Validate playbook failed", "playbook", pbPath) return nil, err } return pb, nil @@ -75,12 +75,12 @@ func loadPlaybook(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error { // baseDir is the local ansible project dir which playbook belong to pbData, err := fs.ReadFile(baseFS, pbPath) if err != nil { - klog.Errorf("read playbook %s failed: %v", pbPath, err) + klog.ErrorS(err, "Read playbook failed", "playbook", pbPath) return err } var plays []kkcorev1.Play if err := yaml.Unmarshal(pbData, &plays); err != nil { - klog.Errorf("unmarshal playbook %s failed: %v", pbPath, err) + klog.ErrorS(err, "Unmarshal playbook failed", "playbook", pbPath) return err } @@ -108,12 +108,12 @@ func loadPlaybook(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error { rdata, err := fs.ReadFile(baseFS, mainTask) if err != nil { - klog.Errorf("read role %s failed: %v", mainTask, err) + klog.ErrorS(err, "Read role failed", "playbook", pbPath, "role", r.Role) return err } var blocks []kkcorev1.Block if err := yaml.Unmarshal(rdata, &blocks); err != nil { - klog.Errorf("unmarshal role %s failed: %v", r.Role, err) + klog.ErrorS(err, "Unmarshal role failed", "playbook", pbPath, "role", r.Role) return err } p.Roles[i].Block = blocks @@ -141,12 +141,12 @@ func convertRoles(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error { rdata, err := fs.ReadFile(baseFS, mainTask) if err != nil { - klog.Errorf("read role %s failed: %v", mainTask, err) + klog.ErrorS(err, "Read role failed", "playbook", pbPath, "role", r.Role) return err } var blocks []kkcorev1.Block if err := yaml.Unmarshal(rdata, &blocks); err != nil { - klog.Errorf("unmarshal role %s failed: %v", r.Role, err) + klog.ErrorS(err, "Unmarshal role failed", "playbook", pbPath, "role", r.Role) return err } p.Roles[i].Block = blocks @@ -156,12 +156,12 @@ func convertRoles(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error { if mainDefault != "" { mainData, err := fs.ReadFile(baseFS, mainDefault) if err != nil { - klog.Errorf("read defaults variable for role %s error: %v", r.Role, err) + klog.ErrorS(err, "Read defaults variable for role error", "playbook", pbPath, "role", r.Role) return err } var vars variable.VariableData if err := yaml.Unmarshal(mainData, &vars); err != nil { - klog.Errorf("unmarshal defaults variable for role %s error: %v", r.Role, err) + klog.ErrorS(err, "Unmarshal defaults variable for role error", "playbook", pbPath, "role", r.Role) return err } p.Roles[i].Vars = vars @@ -177,18 +177,22 @@ func convertIncludeTasks(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) err var pbBase = filepath.Dir(filepath.Dir(pbPath)) for _, play := range pb.Play { if err := fileToBlock(baseFS, pbBase, play.PreTasks); err != nil { + klog.ErrorS(err, "Convert pre_tasks error", "playbook", pbPath) return err } if err := fileToBlock(baseFS, pbBase, play.Tasks); err != nil { + klog.ErrorS(err, "Convert tasks error", "playbook", pbPath) return err } if err := fileToBlock(baseFS, pbBase, play.PostTasks); err != nil { + klog.ErrorS(err, "Convert post_tasks error", "playbook", pbPath) return err } for _, r := range play.Roles { roleBase := project.GetRoleBaseFromPlaybook(baseFS, pbPath, r.Role) if err := fileToBlock(baseFS, filepath.Join(roleBase, _const.ProjectRolesTasksDir), r.Block); err != nil { + klog.ErrorS(err, "Convert role error", "playbook", pbPath, "role", r.Role) return err } } @@ -201,24 +205,27 @@ func fileToBlock(baseFS fs.FS, baseDir string, blocks []kkcorev1.Block) error { if b.IncludeTasks != "" { data, err := fs.ReadFile(baseFS, filepath.Join(baseDir, b.IncludeTasks)) if err != nil { - klog.Errorf("readFile %s error %v", filepath.Join(baseDir, b.IncludeTasks), err) + klog.ErrorS(err, "Read includeTask file error", "name", b.Name, "file_path", filepath.Join(baseDir, b.IncludeTasks)) return err } var bs []kkcorev1.Block if err := yaml.Unmarshal(data, &bs); err != nil { - klog.Errorf("unmarshal data %s to []Block error %v", filepath.Join(baseDir, b.IncludeTasks), err) + klog.ErrorS(err, "Unmarshal includeTask data error", "name", b.Name, "file_path", filepath.Join(baseDir, b.IncludeTasks)) return err } b.Block = bs blocks[i] = b } if err := fileToBlock(baseFS, baseDir, b.Block); err != nil { + klog.ErrorS(err, "Convert block error", "name", b.Name) return err } if err := fileToBlock(baseFS, baseDir, b.Rescue); err != nil { + klog.ErrorS(err, "Convert rescue error", "name", b.Name) return err } if err := fileToBlock(baseFS, baseDir, b.Always); err != nil { + klog.ErrorS(err, "Convert always error", "name", b.Name) return err } } @@ -247,7 +254,7 @@ func MarshalBlock(ctx context.Context, block kkcorev1.Block, owner ctrlclient.Ob task := &kubekeyv1alpha1.Task{ TypeMeta: metav1.TypeMeta{ Kind: "Task", - APIVersion: "kubekey.kubesphere.io/v1alpha1", + APIVersion: kubekeyv1alpha1.SchemeGroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", owner.GetName(), rand.String(12)), @@ -273,10 +280,9 @@ func MarshalBlock(ctx context.Context, block kkcorev1.Block, owner ctrlclient.Ob Hosts: hosts, IgnoreError: block.IgnoreErrors, Retries: block.Retries, - //Loop: block.Loop, - When: when, - FailedWhen: block.FailedWhen.Data, - Register: block.Register, + When: when, + FailedWhen: block.FailedWhen.Data, + Register: block.Register, }, Status: kubekeyv1alpha1.TaskStatus{ Phase: kubekeyv1alpha1.TaskPhasePending, @@ -285,7 +291,7 @@ func MarshalBlock(ctx context.Context, block kkcorev1.Block, owner ctrlclient.Ob if len(block.Loop) != 0 { data, err := json.Marshal(block.Loop) if err != nil { - klog.Errorf("marshal loop %v error: %v", block.Loop, err) + klog.ErrorS(err, "Marshal loop failed", "task", task.Name, "block", block.Name) } task.Spec.Loop = runtime.RawExtension{Raw: data} } @@ -313,7 +319,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) { if strings.HasSuffix(a.(string), "%") { b, err := strconv.Atoi(strings.TrimSuffix(a.(string), "%")) if err != nil { - klog.Errorf("convert serial %s to int failed: %v", a.(string), err) + klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string)) return nil, err } if sp+int(math.Ceil(float64(len(hosts)*b)/100.0)) > len(hosts)-1 { @@ -325,7 +331,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) { } else { b, err := strconv.Atoi(a.(string)) if err != nil { - klog.Errorf("convert serial %s to int failed: %v", a.(string), err) + klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string)) return nil, err } if sp+b > len(hosts)-1 { @@ -355,7 +361,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) { if strings.HasSuffix(a.(string), "%") { b, err := strconv.Atoi(strings.TrimSuffix(a.(string), "%")) if err != nil { - klog.Errorf("convert serial %s to int failed: %v", a.(string), err) + klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string)) return nil, err } if sp+int(math.Ceil(float64(len(hosts)*b)/100.0)) > len(hosts)-1 { @@ -367,7 +373,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) { } else { b, err := strconv.Atoi(a.(string)) if err != nil { - klog.Errorf("convert serial %s to int failed: %v", a.(string), err) + klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string)) return nil, err } if sp+b > len(hosts)-1 { diff --git a/pkg/converter/tmpl/template.go b/pkg/converter/tmpl/template.go index aff74fd0..636be844 100644 --- a/pkg/converter/tmpl/template.go +++ b/pkg/converter/tmpl/template.go @@ -32,12 +32,12 @@ func ParseBool(v variable.VariableData, inputs []string) (bool, error) { // first convert. intql, err := pongo2.FromString(input) if err != nil { - klog.Errorf("failed to get string: %v", err) + klog.ErrorS(err, "Failed to get string") return false, err } inres, err := intql.Execute(pongo2.Context(v)) if err != nil { - klog.Errorf("failed to execute string: %v", err) + klog.ErrorS(err, "Failed to execute string") return false, err } @@ -48,15 +48,15 @@ func ParseBool(v variable.VariableData, inputs []string) (bool, error) { // second convert. tql, err := pongo2.FromString(inres) if err != nil { - klog.Errorf("failed to get string: %v", err) + klog.ErrorS(err, "failed to get string") return false, err } result, err := tql.Execute(pongo2.Context(v)) if err != nil { - klog.Errorf("failed to execute string: %v", err) + klog.ErrorS(err, "failed to execute string") return false, err } - klog.V(4).Infof("the template parse result: %s", result) + klog.V(4).InfoS(" parse template succeed", "result", result) if result != "True" { return false, nil } @@ -68,29 +68,29 @@ func ParseBool(v variable.VariableData, inputs []string) (bool, error) { func ParseString(v variable.VariableData, input string) (string, error) { tql, err := pongo2.FromString(input) if err != nil { - klog.Errorf("failed to get string: %v", err) + klog.ErrorS(err, "Failed to get string") return input, err } result, err := tql.Execute(pongo2.Context(v)) if err != nil { - klog.Errorf("failed to execute string: %v", err) + klog.ErrorS(err, "Failed to execute string") return input, err } - klog.V(4).Infof("the template parse result: %s", result) + klog.V(4).InfoS(" parse template succeed", "result", result) return result, nil } func ParseFile(v variable.VariableData, file []byte) (string, error) { tql, err := pongo2.FromBytes(file) if err != nil { - klog.Errorf("transfer file to pongo2 template error %v", err) + klog.ErrorS(err, "Transfer file to template error") return "", err } result, err := tql.Execute(pongo2.Context(v)) if err != nil { - klog.Errorf("exec pongo2 template error %v", err) + klog.ErrorS(err, "exec template error") return "", err } - klog.V(4).Infof("the template file: %s", result) + klog.V(4).InfoS(" parse template succeed", "result", result) return result, nil } diff --git a/pkg/manager/command_manager.go b/pkg/manager/command_manager.go index 461a0c99..8d02dccd 100644 --- a/pkg/manager/command_manager.go +++ b/pkg/manager/command_manager.go @@ -29,10 +29,10 @@ import ( ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1" - "github.com/kubesphere/kubekey/v4/pkg/cache" _const "github.com/kubesphere/kubekey/v4/pkg/const" "github.com/kubesphere/kubekey/v4/pkg/controllers" "github.com/kubesphere/kubekey/v4/pkg/task" + "github.com/kubesphere/kubekey/v4/pkg/variable" ) type commandManager struct { @@ -46,45 +46,47 @@ type commandManager struct { func (m *commandManager) Run(ctx context.Context) error { // create config, inventory and pipeline klog.Infof("[Pipeline %s] start", ctrlclient.ObjectKeyFromObject(m.Pipeline)) + defer klog.Infof("[Pipeline %s] finish", ctrlclient.ObjectKeyFromObject(m.Pipeline)) + if err := m.Client.Create(ctx, m.Config); err != nil { - klog.Errorf("[Pipeline %s] create config error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err) + klog.ErrorS(err, "Create config error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline)) return err } if err := m.Client.Create(ctx, m.Inventory); err != nil { - klog.Errorf("[Pipeline %s] create inventory error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err) + klog.ErrorS(err, "Create inventory error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline)) return err } if err := m.Client.Create(ctx, m.Pipeline); err != nil { - klog.Errorf("[Pipeline %s] create pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err) + klog.ErrorS(err, "Create pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline)) return err } defer func() { // update pipeline status if err := m.Client.Update(ctx, m.Pipeline); err != nil { - klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err) + klog.ErrorS(err, "Update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline)) } - klog.Infof("[Pipeline %s] finish", ctrlclient.ObjectKeyFromObject(m.Pipeline)) if !m.Pipeline.Spec.Debug && m.Pipeline.Status.Phase == kubekeyv1.PipelinePhaseSucceed { klog.Infof("[Pipeline %s] clean runtime directory", ctrlclient.ObjectKeyFromObject(m.Pipeline)) // clean runtime directory if err := os.RemoveAll(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir)); err != nil { - klog.Errorf("clean runtime directory %s error: %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), err) + klog.ErrorS(err, "Clean runtime directory error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline), "runtime_dir", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir)) } } }() klog.Infof("[Pipeline %s] start task controller", ctrlclient.ObjectKeyFromObject(m.Pipeline)) kd, err := task.NewController(task.ControllerOptions{ - Client: m.Client, + VariableCache: variable.Cache, + Client: m.Client, TaskReconciler: &controllers.TaskReconciler{ Client: m.Client, - VariableCache: cache.LocalVariable, + VariableCache: variable.Cache, }, }) if err != nil { - klog.Errorf("[Pipeline %s] create task controller error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err) + klog.ErrorS(err, "Create task controller error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline)) m.Pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed m.Pipeline.Status.Reason = fmt.Sprintf("create task controller failed: %v", err) return err @@ -94,20 +96,23 @@ func (m *commandManager) Run(ctx context.Context) error { if err := kd.AddTasks(ctx, task.AddTaskOptions{ Pipeline: m.Pipeline, }); err != nil { - klog.Errorf("[Pipeline %s] add task error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err) + klog.ErrorS(err, "Add task error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline)) m.Pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed m.Pipeline.Status.Reason = fmt.Sprintf("add task to controller failed: %v", err) return err } // update pipeline status if err := m.Client.Update(ctx, m.Pipeline); err != nil { - klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err) + klog.ErrorS(err, "Update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline)) + return err } + klog.Infof("[Pipeline %s] start deal task total %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), m.Pipeline.Status.TaskResult.Total) go kd.Start(ctx) _ = wait.PollUntilContextCancel(ctx, time.Millisecond*100, false, func(ctx context.Context) (done bool, err error) { if err := m.Client.Get(ctx, ctrlclient.ObjectKeyFromObject(m.Pipeline), m.Pipeline); err != nil { - klog.Errorf("[Pipeline %s] get pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err) + klog.ErrorS(err, "Get pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline)) + return false, nil } if m.Pipeline.Status.Phase == kubekeyv1.PipelinePhaseFailed || m.Pipeline.Status.Phase == kubekeyv1.PipelinePhaseSucceed { return true, nil @@ -116,10 +121,9 @@ func (m *commandManager) Run(ctx context.Context) error { }) // kill by signal if err := syscall.Kill(os.Getpid(), syscall.SIGTERM); err != nil { - klog.Errorf("[Pipeline %s] manager terminated error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err) + klog.ErrorS(err, "Kill process error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline)) return err } - klog.Infof("[Pipeline %s] task finish", ctrlclient.ObjectKeyFromObject(m.Pipeline)) return nil } diff --git a/pkg/manager/controller_manager.go b/pkg/manager/controller_manager.go index 57553dfa..6ab7abef 100644 --- a/pkg/manager/controller_manager.go +++ b/pkg/manager/controller_manager.go @@ -28,9 +28,10 @@ import ( ctrlmanager "sigs.k8s.io/controller-runtime/pkg/manager" kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1" - "github.com/kubesphere/kubekey/v4/pkg/cache" "github.com/kubesphere/kubekey/v4/pkg/controllers" + "github.com/kubesphere/kubekey/v4/pkg/proxy" "github.com/kubesphere/kubekey/v4/pkg/task" + "github.com/kubesphere/kubekey/v4/pkg/variable" ) type controllerManager struct { @@ -44,43 +45,48 @@ func (c controllerManager) Run(ctx context.Context) error { scheme := runtime.NewScheme() // add default scheme if err := clientgoscheme.AddToScheme(scheme); err != nil { - klog.Errorf("add default scheme error: %v", err) + klog.ErrorS(err, "Add default scheme error") return err } - // add kubekey scheme + // add kubekey scheme, + // exclude task resource,Because will manager in local if err := kubekeyv1.AddToScheme(scheme); err != nil { - klog.Errorf("add kubekey scheme error: %v", err) + klog.ErrorS(err, "Add kk scheme error") return err } + mgr, err := ctrl.NewManager(config.GetConfigOrDie(), ctrlmanager.Options{ Scheme: scheme, LeaderElection: c.LeaderElection, LeaderElectionID: "controller-leader-election-kk", }) if err != nil { - klog.Errorf("create manager error: %v", err) + klog.ErrorS(err, "Create manager error") return err } taskController, err := task.NewController(task.ControllerOptions{ + VariableCache: variable.Cache, MaxConcurrent: c.MaxConcurrentReconciles, - Client: cache.NewDelegatingClient(mgr.GetClient()), + Client: proxy.NewDelegatingClient(mgr.GetClient()), TaskReconciler: &controllers.TaskReconciler{ - Client: cache.NewDelegatingClient(mgr.GetClient()), - VariableCache: cache.LocalVariable, + Client: proxy.NewDelegatingClient(mgr.GetClient()), + VariableCache: variable.Cache, }, }) if err != nil { - klog.Errorf("create task controller error: %v", err) + klog.ErrorS(err, "Create task controller error") return err } + + // add task controller to manager if err := mgr.Add(taskController); err != nil { - klog.Errorf("add task controller error: %v", err) + klog.ErrorS(err, "Add task controller error") return err } if err := (&controllers.PipelineReconciler{ - Client: cache.NewDelegatingClient(mgr.GetClient()), + Client: proxy.NewDelegatingClient(mgr.GetClient()), EventRecorder: mgr.GetEventRecorderFor("pipeline"), TaskController: taskController, }).SetupWithManager(ctx, mgr, controllers.Options{ @@ -89,7 +95,7 @@ func (c controllerManager) Run(ctx context.Context) error { MaxConcurrentReconciles: c.MaxConcurrentReconciles, }, }); err != nil { - klog.Errorf("create pipeline controller error: %v", err) + klog.ErrorS(err, "create pipeline controller error") return err } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2b6cf09f..6caa0040 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -20,7 +20,7 @@ import ( "context" kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1" - "github.com/kubesphere/kubekey/v4/pkg/cache" + "github.com/kubesphere/kubekey/v4/pkg/proxy" ) // Manager shared dependencies such as Addr and , and provides them to Runnable. @@ -40,7 +40,7 @@ func NewCommandManager(o CommandManagerOptions) Manager { Pipeline: o.Pipeline, Config: o.Config, Inventory: o.Inventory, - Client: cache.NewDelegatingClient(nil), + Client: proxy.NewDelegatingClient(nil), } } diff --git a/pkg/modules/assert.go b/pkg/modules/assert.go index c838e330..9b62eac9 100644 --- a/pkg/modules/assert.go +++ b/pkg/modules/assert.go @@ -62,13 +62,13 @@ func ModuleAssert(ctx context.Context, options ExecOptions) (string, string) { return "False", r } } - //if v := variable.StringVar(args, "msg"); v != nil { - // if r, err := tmpl.ParseString(lg.(variable.VariableData), *v); err != nil { - // return "", err.Error() - // } else { - // return "False", r - // } - //} + if v := variable.StringVar(args, "msg"); v != nil { + if r, err := tmpl.ParseString(lg.(variable.VariableData), *v); err != nil { + return "", err.Error() + } else { + return "False", r + } + } return "False", "False" } } diff --git a/pkg/modules/command.go b/pkg/modules/command.go index a3d67dea..a6c37dee 100644 --- a/pkg/modules/command.go +++ b/pkg/modules/command.go @@ -36,7 +36,7 @@ func ModuleCommand(ctx context.Context, options ExecOptions) (string, string) { conn = connector.NewConnector(options.Host, ha.(variable.VariableData)) } if err := conn.Init(ctx); err != nil { - klog.Errorf("failed to init connector %v", err) + klog.ErrorS(err, "failed to init connector") return "", err.Error() } defer conn.Close(ctx) diff --git a/pkg/modules/copy.go b/pkg/modules/copy.go index c21327a0..494e93ff 100644 --- a/pkg/modules/copy.go +++ b/pkg/modules/copy.go @@ -49,12 +49,12 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) { LocationUID: string(options.Task.UID), }) if err != nil { - klog.Errorf("failed to get location vars %v", err) + klog.ErrorS(err, "failed to get location vars") return "", err.Error() } destStr, err := tmpl.ParseString(lv.(variable.VariableData), *dest) if err != nil { - klog.Errorf("template parse src %s error: %v", *dest, err) + klog.ErrorS(err, "template parse dest error") return "", err.Error() } @@ -65,13 +65,13 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) { // get connector ha, err := options.Variable.Get(variable.HostVars{HostName: options.Host}) if err != nil { - klog.Errorf("failed to get host vars %v", err) + klog.ErrorS(err, "failed to get host vars") return "", err.Error() } conn = connector.NewConnector(options.Host, ha.(variable.VariableData)) } if err := conn.Init(ctx); err != nil { - klog.Errorf("failed to init connector %v", err) + klog.ErrorS(err, "failed to init connector") return "", err.Error() } defer conn.Close(ctx) @@ -80,7 +80,7 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) { // convert src srcStr, err := tmpl.ParseString(lv.(variable.VariableData), *src) if err != nil { - klog.Errorf("template parse src %s error: %v", *src, err) + klog.ErrorS(err, "template parse src error") return "", err.Error() } var baseFS fs.FS @@ -89,7 +89,7 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) { } else { projectFs, err := project.New(project.Options{Pipeline: &options.Pipeline}).FS(ctx, false) if err != nil { - klog.Errorf("failed to get project fs %v", err) + klog.ErrorS(err, "failed to get project fs") return "", err.Error() } baseFS = projectFs @@ -98,17 +98,19 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) { flPath := project.GetFilesFromPlayBook(baseFS, options.Pipeline.Spec.Playbook, roleName, srcStr) fileInfo, err := fs.Stat(baseFS, flPath) if err != nil { - klog.Errorf("failed to get src file in local %v", err) + klog.ErrorS(err, "failed to get src file in local") return "", err.Error() } if fileInfo.IsDir() { // src is dir if err := fs.WalkDir(baseFS, flPath, func(path string, info fs.DirEntry, err error) error { if err != nil { + klog.ErrorS(err, "failed to walk dir") return err } rel, err := filepath.Rel(srcStr, path) if err != nil { + klog.ErrorS(err, "failed to get relative path") return err } if info.IsDir() { @@ -116,6 +118,7 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) { } fi, err := info.Info() if err != nil { + klog.ErrorS(err, "failed to get file info") return err } mode := fi.Mode() @@ -124,27 +127,30 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) { } data, err := fs.ReadFile(baseFS, rel) if err != nil { + klog.ErrorS(err, "failed to read file") return err } if err := conn.CopyFile(ctx, data, filepath.Join(destStr, rel), mode); err != nil { + klog.ErrorS(err, "failed to copy file", "src", srcStr, "dest", destStr) return err } return nil }); err != nil { + klog.ErrorS(err, "failed to walk dir") return "", err.Error() } } else { // src is file data, err := fs.ReadFile(baseFS, flPath) if err != nil { - klog.Errorf("failed to read file %v", err) + klog.ErrorS(err, "failed to read file") return "", err.Error() } if strings.HasSuffix(destStr, "/") { destStr = destStr + filepath.Base(srcStr) } if err := conn.CopyFile(ctx, data, destStr, fileInfo.Mode()); err != nil { - klog.Errorf("failed to copy file %v", err) + klog.ErrorS(err, "failed to copy file", "src", srcStr, "dest", destStr) return "", err.Error() } return "success", "" diff --git a/pkg/modules/debug.go b/pkg/modules/debug.go index 94900c47..bc2a0ad0 100644 --- a/pkg/modules/debug.go +++ b/pkg/modules/debug.go @@ -34,11 +34,12 @@ func ModuleDebug(ctx context.Context, options ExecOptions) (string, string) { LocationUID: string(options.Task.UID), }) if err != nil { + klog.ErrorS(err, "Failed to get location vars") return "", err.Error() } result, err := tmpl.ParseString(lg.(variable.VariableData), fmt.Sprintf("{{ %s }}", *v)) if err != nil { - klog.Errorf("failed to get var %v", err) + klog.ErrorS(err, "Failed to get var") return "", err.Error() } return result, "" @@ -50,11 +51,12 @@ func ModuleDebug(ctx context.Context, options ExecOptions) (string, string) { LocationUID: string(options.Task.UID), }) if err != nil { + klog.ErrorS(err, "Failed to get location vars") return "", err.Error() } result, err := tmpl.ParseString(lg.(variable.VariableData), *v) if err != nil { - klog.Errorf("failed to get var %v", err) + klog.ErrorS(err, "Failed to get var") return "", err.Error() } return result, "" diff --git a/pkg/modules/helper.go b/pkg/modules/helper.go deleted file mode 100644 index 7f4aa1df..00000000 --- a/pkg/modules/helper.go +++ /dev/null @@ -1,57 +0,0 @@ -/* -Copyright 2023 The KubeSphere Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package modules - -import ( - "io/fs" - "os" - "path/filepath" - - kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1" - _const "github.com/kubesphere/kubekey/v4/pkg/const" -) - -// findPath from roles/roleSub dir, playbook dir or current dir -func findPath(fs fs.FS, roleSub string, src *string, options ExecOptions) *string { - if src == nil || filepath.IsAbs(*src) { - return src - } - - findFile := []string{} - absPlaybook := options.Pipeline.Spec.Playbook - if !filepath.IsAbs(absPlaybook) { - absPlaybook = filepath.Join(_const.GetWorkDir(), _const.ProjectDir, options.Pipeline.Spec.Project.Name, absPlaybook) - } - baseDir := filepath.Dir(filepath.Dir(absPlaybook)) - // findFile from roles/files - if role := options.Task.Annotations[kubekeyv1alpha1.TaskAnnotationRole]; role != "" { - findFile = append(findFile, filepath.Join(baseDir, _const.ProjectRolesDir, role, roleSub, *src)) - } - // find from playbook dir - findFile = append(findFile, filepath.Join(filepath.Dir(absPlaybook), *src)) - // find from current dir - if dir, err := os.Getwd(); err == nil { - findFile = append(findFile, filepath.Join(dir, *src)) - } - - for _, s := range findFile { - if _, err := os.Stat(s); err == nil { - return &s - } - } - return nil -} diff --git a/pkg/modules/module.go b/pkg/modules/module.go index c0006002..8cae1d73 100644 --- a/pkg/modules/module.go +++ b/pkg/modules/module.go @@ -21,7 +21,6 @@ import ( "fmt" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/klog/v2" kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1" kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1" @@ -47,7 +46,6 @@ var module = make(map[string]ModuleExecFunc) func RegisterModule(moduleName string, exec ModuleExecFunc) error { if _, ok := module[moduleName]; ok { - klog.Errorf("module %s is exist", moduleName) return fmt.Errorf("module %s is exist", moduleName) } module[moduleName] = exec diff --git a/pkg/modules/helper_test.go b/pkg/modules/module_test.go similarity index 96% rename from pkg/modules/helper_test.go rename to pkg/modules/module_test.go index 012639b0..d43a79c3 100644 --- a/pkg/modules/helper_test.go +++ b/pkg/modules/module_test.go @@ -29,6 +29,10 @@ type testVariable struct { err error } +func (v testVariable) Key() string { + return "testModule" +} + func (v testVariable) Get(option variable.GetOption) (any, error) { return v.value, v.err } diff --git a/pkg/modules/set_fact.go b/pkg/modules/set_fact.go index 679958ec..8087f3ce 100644 --- a/pkg/modules/set_fact.go +++ b/pkg/modules/set_fact.go @@ -32,7 +32,7 @@ func ModuleSetFact(ctx context.Context, options ExecOptions) (string, string) { LocationUID: string(options.Task.UID), }) if err != nil { - klog.Errorf("failed to get location vars %v", err) + klog.ErrorS(err, "failed to get location vars") return "", err.Error() } @@ -42,7 +42,7 @@ func ModuleSetFact(ctx context.Context, options ExecOptions) (string, string) { case string: factVars[k], err = tmpl.ParseString(lv.(variable.VariableData), v.(string)) if err != nil { - klog.Errorf("template parse %s error: %v", v.(string), err) + klog.ErrorS(err, "template parse error", "input", v) return "", err.Error() } default: @@ -55,7 +55,7 @@ func ModuleSetFact(ctx context.Context, options ExecOptions) (string, string) { LocationUID: "", Data: factVars, }); err != nil { - klog.Errorf("merge fact error: %v", err) + klog.ErrorS(err, "merge fact error") return "", err.Error() } return "success", "" diff --git a/pkg/modules/template.go b/pkg/modules/template.go index 6dc89d5d..3e6c0b11 100644 --- a/pkg/modules/template.go +++ b/pkg/modules/template.go @@ -48,17 +48,17 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) { LocationUID: string(options.Task.UID), }) if err != nil { - klog.Errorf("failed to get location vars %v", err) + klog.ErrorS(err, "failed to get location vars") return "", err.Error() } srcStr, err := tmpl.ParseString(lv.(variable.VariableData), *src) if err != nil { - klog.Errorf("template parse src %s error: %v", *src, err) + klog.ErrorS(err, "template parse src error", "input", *src) return "", err.Error() } destStr, err := tmpl.ParseString(lv.(variable.VariableData), *dest) if err != nil { - klog.Errorf("template parse src %s error: %v", *dest, err) + klog.ErrorS(err, "template parse dest error", "input", *dest) return "", err.Error() } @@ -68,7 +68,7 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) { } else { projectFs, err := project.New(project.Options{Pipeline: &options.Pipeline}).FS(ctx, false) if err != nil { - klog.Errorf("failed to get project fs %v", err) + klog.ErrorS(err, "failed to get project fs") return "", err.Error() } baseFS = projectFs @@ -76,7 +76,7 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) { roleName := options.Task.Annotations[kubekeyv1alpha1.TaskAnnotationRole] flPath := project.GetTemplatesFromPlayBook(baseFS, options.Pipeline.Spec.Playbook, roleName, srcStr) if _, err := fs.Stat(baseFS, flPath); err != nil { - klog.Errorf("find src error %v", err) + klog.ErrorS(err, "find src error") return "", err.Error() } @@ -87,13 +87,13 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) { // get connector ha, err := options.Variable.Get(variable.HostVars{HostName: options.Host}) if err != nil { - klog.Errorf("failed to get host %v", err) + klog.ErrorS(err, "failed to get host vars") return "", err.Error() } conn = connector.NewConnector(options.Host, ha.(variable.VariableData)) } if err := conn.Init(ctx); err != nil { - klog.Errorf("failed to init connector %v", err) + klog.ErrorS(err, "failed to init connector") return "", err.Error() } defer conn.Close(ctx) @@ -104,15 +104,18 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) { LocationUID: string(options.Task.UID), }) if err != nil { + klog.ErrorS(err, "failed to get location vars") return "", err.Error() } data, err := fs.ReadFile(baseFS, flPath) if err != nil { + klog.ErrorS(err, "failed to read src file", "file_path", flPath) return "", err.Error() } result, err := tmpl.ParseFile(lg.(variable.VariableData), data) if err != nil { + klog.ErrorS(err, "failed to parse file", "file_path", flPath) return "", err.Error() } @@ -122,6 +125,7 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) { mode = fs.FileMode(*v) } if err := conn.CopyFile(ctx, []byte(result), destStr, mode); err != nil { + klog.ErrorS(err, "failed to copy file", "src", flPath, "dest", destStr) return "", err.Error() } return "success", "" diff --git a/pkg/project/project_git.go b/pkg/project/project_git.go index 54fb713c..30f2493b 100644 --- a/pkg/project/project_git.go +++ b/pkg/project/project_git.go @@ -41,6 +41,7 @@ func (r gitProject) FS(ctx context.Context, update bool) (fs.FS, error) { return os.DirFS(r.localDir), nil } if err := r.init(ctx); err != nil { + klog.ErrorS(err, "Init git project error", "project_addr", r.Pipeline.Spec.Project.Addr) return nil, err } return os.DirFS(r.localDir), nil @@ -74,12 +75,12 @@ func (r gitProject) gitClone(ctx context.Context) error { func (r gitProject) gitPull(ctx context.Context) error { open, err := git.PlainOpen(r.localDir) if err != nil { - klog.Errorf("git open local %s error: %v", r.localDir, err) + klog.ErrorS(err, "git open error", "local_dir", r.localDir) return err } wt, err := open.Worktree() if err != nil { - klog.Errorf("git open worktree error: %v", err) + klog.ErrorS(err, "git open worktree error", "local_dir", r.localDir) return err } if err := wt.PullContext(ctx, &git.PullOptions{ @@ -89,7 +90,7 @@ func (r gitProject) gitPull(ctx context.Context) error { Auth: &http.TokenAuth{r.Pipeline.Spec.Project.Token}, InsecureSkipTLS: false, }); err != nil && err != git.NoErrAlreadyUpToDate { - klog.Errorf("pull project %s failed: %v", r.Pipeline.Spec.Project.Addr, err) + klog.ErrorS(err, "git pull error", "local_dir", r.localDir) return err } diff --git a/pkg/cache/runtime_client.go b/pkg/proxy/runtime_client.go similarity index 81% rename from pkg/cache/runtime_client.go rename to pkg/proxy/runtime_client.go index 64d1866f..fe0effc7 100644 --- a/pkg/cache/runtime_client.go +++ b/pkg/proxy/runtime_client.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cache +package proxy import ( "context" @@ -22,15 +22,11 @@ import ( "io/fs" "os" "path/filepath" - "strings" - jsonpatch "github.com/evanphx/json-patch" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/klog/v2" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -49,7 +45,7 @@ type delegatingClient struct { func NewDelegatingClient(client ctrlclient.Client) ctrlclient.Client { scheme := runtime.NewScheme() if err := kubekeyv1.AddToScheme(scheme); err != nil { - klog.Errorf("failed to add scheme: %v", err) + klog.ErrorS(err, "failed to add scheme", "gv", kubekeyv1.SchemeGroupVersion) } kubekeyv1.SchemeBuilder.Register(&kubekeyv1alpha1.Task{}, &kubekeyv1alpha1.TaskList{}) return &delegatingClient{ @@ -70,11 +66,11 @@ func (d delegatingClient) Get(ctx context.Context, key ctrlclient.ObjectKey, obj path := filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, key.Namespace, resource, key.Name, key.Name+".yaml") data, err := os.ReadFile(path) if err != nil { - klog.Errorf("failed to read yaml file: %v", err) + klog.ErrorS(err, "failed to read yaml file", "path", path) return err } if err := yaml.Unmarshal(data, obj); err != nil { - klog.Errorf("unmarshal file %s error %v", path, err) + klog.ErrorS(err, "failed to unmarshal yaml file", "path", path) return err } return nil @@ -92,7 +88,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList, var objects []runtime.Object runtimeDirEntries, err := os.ReadDir(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir)) if err != nil && !os.IsNotExist(err) { - klog.Errorf("readDir %s error %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), err) + klog.ErrorS(err, "failed to read dir", "path", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir)) return err } for _, re := range runtimeDirEntries { @@ -103,7 +99,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList, if os.IsNotExist(err) { continue } - klog.Errorf("readDir %s error %v", resourceDir, err) + klog.ErrorS(err, "failed to read dir", "path", resourceDir) return err } for _, e := range entries { @@ -116,7 +112,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList, if os.IsNotExist(err) { continue } - klog.Errorf("read file %s error: %v", resourceFile, err) + klog.ErrorS(err, "failed to read file", "path", resourceFile) return err } var obj runtime.Object @@ -131,7 +127,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList, obj = &kubekeyv1alpha1.Task{} } if err := yaml.Unmarshal(data, &obj); err != nil { - klog.Errorf("unmarshal file %s error: %v", resourceFile, err) + klog.ErrorS(err, "failed to unmarshal yaml file", "path", resourceFile) return err } objects = append(objects, obj) @@ -152,6 +148,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList, } if err := apimeta.SetList(list, objects); err != nil { + klog.ErrorS(err, "failed to set list") return err } return nil @@ -168,11 +165,11 @@ func (d delegatingClient) Create(ctx context.Context, obj ctrlclient.Object, opt data, err := yaml.Marshal(obj) if err != nil { - klog.Errorf("failed to marshal object: %v", err) + klog.ErrorS(err, "failed to marshal object") return err } if err := os.MkdirAll(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName()), fs.ModePerm); err != nil { - klog.Errorf("create dir %s error: %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName()), err) + klog.ErrorS(err, "failed to create dir", "path", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName())) return err } return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm) @@ -218,16 +215,16 @@ func (d delegatingClient) Patch(ctx context.Context, obj ctrlclient.Object, patc patchData, err := patch.Data(obj) if err != nil { - klog.Errorf("failed to get patch data: %v", err) + klog.ErrorS(err, "failed to get patch data") return err } if len(patchData) == 0 { - klog.V(4).Infof("nothing to patch, skip") + klog.V(4).Info("nothing to patch, skip") return nil } data, err := yaml.Marshal(obj) if err != nil { - klog.Errorf("failed to marshal object: %v", err) + klog.ErrorS(err, "failed to marshal object") return err } return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm) @@ -301,7 +298,7 @@ func (d delegatingSubResourceWriter) Create(ctx context.Context, obj ctrlclient. data, err := yaml.Marshal(obj) if err != nil { - klog.Errorf("failed to marshal object: %v", err) + klog.ErrorS(err, "failed to marshal object") return err } return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm) @@ -319,7 +316,7 @@ func (d delegatingSubResourceWriter) Update(ctx context.Context, obj ctrlclient. data, err := yaml.Marshal(obj) if err != nil { - klog.Errorf("failed to marshal object: %v", err) + klog.ErrorS(err, "failed to marshal object") return err } return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm) @@ -336,51 +333,17 @@ func (d delegatingSubResourceWriter) Patch(ctx context.Context, obj ctrlclient.O patchData, err := patch.Data(obj) if err != nil { - klog.Errorf("failed to get patch data: %v", err) + klog.ErrorS(err, "failed to get patch data") return err } if len(patchData) == 0 { - klog.V(4).Infof("nothing to patch, skip") + klog.V(4).Info("nothing to patch, skip") return nil } data, err := yaml.Marshal(obj) if err != nil { - klog.Errorf("failed to marshal object: %v", err) + klog.ErrorS(err, "failed to marshal object") return err } return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm) } - -func getPatchedJSON(patchType types.PatchType, originalJS, patchJS []byte, gvk schema.GroupVersionKind, creater runtime.ObjectCreater) ([]byte, error) { - switch patchType { - case types.JSONPatchType: - patchObj, err := jsonpatch.DecodePatch(patchJS) - if err != nil { - return nil, err - } - bytes, err := patchObj.Apply(originalJS) - // TODO: This is pretty hacky, we need a better structured error from the json-patch - if err != nil && strings.Contains(err.Error(), "doc is missing key") { - msg := err.Error() - ix := strings.Index(msg, "key:") - key := msg[ix+5:] - return bytes, fmt.Errorf("Object to be patched is missing field (%s)", key) - } - return bytes, err - - case types.MergePatchType: - return jsonpatch.MergePatch(originalJS, patchJS) - - case types.StrategicMergePatchType: - // get a typed object for this GVK if we need to apply a strategic merge patch - obj, err := creater.New(gvk) - if err != nil { - return nil, fmt.Errorf("cannot apply strategic merge patch for %s locally, try --type merge", gvk.String()) - } - return strategicpatch.StrategicMergePatch(originalJS, patchJS, obj) - - default: - // only here as a safety net - go-restful filters content-type - return nil, fmt.Errorf("unknown Content-Type header for patch: %v", patchType) - } -} diff --git a/pkg/task/controller.go b/pkg/task/controller.go index 89974013..e2a192c8 100644 --- a/pkg/task/controller.go +++ b/pkg/task/controller.go @@ -20,12 +20,13 @@ import ( "context" "golang.org/x/time/rate" + cgcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1" - "github.com/kubesphere/kubekey/v4/pkg/cache" + "github.com/kubesphere/kubekey/v4/pkg/proxy" "github.com/kubesphere/kubekey/v4/pkg/variable" ) @@ -47,6 +48,7 @@ type ControllerOptions struct { MaxConcurrent int ctrlclient.Client TaskReconciler reconcile.Reconciler + VariableCache cgcache.Store } func NewController(o ControllerOptions) (Controller, error) { @@ -54,7 +56,7 @@ func NewController(o ControllerOptions) (Controller, error) { o.MaxConcurrent = 1 } if o.Client == nil { - o.Client = cache.NewDelegatingClient(nil) + o.Client = proxy.NewDelegatingClient(nil) } return &taskController{ @@ -62,5 +64,6 @@ func NewController(o ControllerOptions) (Controller, error) { wq: workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}), client: o.Client, taskReconciler: o.TaskReconciler, + variableCache: o.VariableCache, }, nil } diff --git a/pkg/task/helper.go b/pkg/task/helper.go index f72d5a07..b96dff43 100644 --- a/pkg/task/helper.go +++ b/pkg/task/helper.go @@ -32,12 +32,12 @@ import ( func getGatherFact(ctx context.Context, hostname string, vars variable.Variable) (variable.VariableData, error) { v, err := vars.Get(variable.HostVars{HostName: hostname}) if err != nil { - klog.Errorf("get host %s all variable error %v", hostname, err) + klog.ErrorS(err, "Get host variable error", "hostname", hostname) return nil, err } conn := connector.NewConnector(hostname, v.(variable.VariableData)) if err := conn.Init(ctx); err != nil { - klog.Errorf("init connection error %v", err) + klog.ErrorS(err, "Init connection error", "hostname", hostname) return nil, err } defer conn.Close(ctx) @@ -46,25 +46,25 @@ func getGatherFact(ctx context.Context, hostname string, vars variable.Variable) osVars := make(variable.VariableData) var osRelease bytes.Buffer if err := conn.FetchFile(ctx, "/etc/os-release", &osRelease); err != nil { - klog.Errorf("fetch os-release error %v", err) + klog.ErrorS(err, "Fetch os-release error", "hostname", hostname) return nil, err } osVars["release"] = convertBytesToMap(osRelease.Bytes(), "=") kernel, err := conn.ExecuteCommand(ctx, "uname -r") if err != nil { - klog.Errorf("get kernel version error %v", err) + klog.ErrorS(err, "Get kernel version error", "hostname", hostname) return nil, err } osVars["kernelVersion"] = string(bytes.TrimSuffix(kernel, []byte("\n"))) hn, err := conn.ExecuteCommand(ctx, "hostname") if err != nil { - klog.Errorf("get hostname error %v", err) + klog.ErrorS(err, "Get hostname error", "hostname", hostname) return nil, err } osVars["hostname"] = string(bytes.TrimSuffix(hn, []byte("\n"))) arch, err := conn.ExecuteCommand(ctx, "arch") if err != nil { - klog.Errorf("get arch error %v", err) + klog.ErrorS(err, "Get arch error", "hostname", hostname) return nil, err } osVars["architecture"] = string(bytes.TrimSuffix(arch, []byte("\n"))) @@ -73,13 +73,13 @@ func getGatherFact(ctx context.Context, hostname string, vars variable.Variable) procVars := make(variable.VariableData) var cpu bytes.Buffer if err := conn.FetchFile(ctx, "/proc/cpuinfo", &cpu); err != nil { - klog.Errorf("fetch cpu error %v", err) + klog.ErrorS(err, "Fetch cpuinfo error", "hostname", hostname) return nil, err } procVars["cpuInfo"] = convertBytesToSlice(cpu.Bytes(), ":") var mem bytes.Buffer if err := conn.FetchFile(ctx, "/proc/meminfo", &mem); err != nil { - klog.Errorf("fetch os-release error %v", err) + klog.ErrorS(err, "Fetch meminfo error", "hostname", hostname) return nil, err } procVars["memInfo"] = convertBytesToMap(mem.Bytes(), ":") diff --git a/pkg/task/internal.go b/pkg/task/internal.go index d9112353..ac48c59b 100644 --- a/pkg/task/internal.go +++ b/pkg/task/internal.go @@ -24,6 +24,7 @@ import ( "github.com/google/uuid" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/json" + cgcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -32,7 +33,6 @@ import ( kkcorev1 "github.com/kubesphere/kubekey/v4/pkg/apis/core/v1" kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1" - "github.com/kubesphere/kubekey/v4/pkg/cache" _const "github.com/kubesphere/kubekey/v4/pkg/const" "github.com/kubesphere/kubekey/v4/pkg/converter" "github.com/kubesphere/kubekey/v4/pkg/modules" @@ -44,6 +44,8 @@ type taskController struct { client ctrlclient.Client taskReconciler reconcile.Reconciler + variableCache cgcache.Store + wq workqueue.RateLimitingInterface MaxConcurrent int } @@ -52,7 +54,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { var nsTasks = &kubekeyv1alpha1.TaskList{} if err := c.client.List(ctx, nsTasks, ctrlclient.InNamespace(o.Pipeline.Namespace)); err != nil { - klog.Errorf("[Pipeline %s] list tasks error: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err) + klog.ErrorS(err, "List tasks error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) return err } defer func() { @@ -78,28 +80,36 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { } if len(nsTasks.Items) == 0 { + vars, ok, err := c.variableCache.GetByKey(string(o.Pipeline.UID)) + if err != nil { + klog.ErrorS(err, "Get variable from store error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) + return err + } // if tasks has not generated. generate tasks from pipeline - vars, ok := cache.LocalVariable.Get(string(o.Pipeline.UID)) + //vars, ok := cache.LocalVariable.Get(string(o.Pipeline.UID)) if ok { o.variable = vars.(variable.Variable) } else { - newVars, err := variable.New(variable.Options{ + nv, err := variable.New(variable.Options{ Ctx: ctx, Client: c.client, Pipeline: *o.Pipeline, }) if err != nil { - klog.Errorf("[Pipeline %s] create variable failed: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err) + klog.ErrorS(err, "Create variable error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) return err } - cache.LocalVariable.Put(string(o.Pipeline.UID), newVars) - o.variable = newVars + if err := c.variableCache.Add(nv); err != nil { + klog.ErrorS(err, "Add variable to store error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) + return err + } + o.variable = nv } - klog.V(4).Infof("[Pipeline %s] deal project", ctrlclient.ObjectKeyFromObject(o.Pipeline)) + klog.V(4).InfoS("deal project", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) projectFs, err := project.New(project.Options{Pipeline: o.Pipeline}).FS(ctx, true) if err != nil { - klog.Errorf("[Pipeline %s] deal project error: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err) + klog.ErrorS(err, "Deal project error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) return err } @@ -116,7 +126,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { // convert Hosts (group or host) to all hosts ahn, err := o.variable.Get(variable.Hostnames{Name: play.PlayHost.Hosts}) if err != nil { - klog.Errorf("[Pipeline %s] get all host name error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err) + klog.ErrorS(err, "Get all host name error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) return err } @@ -125,7 +135,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { for _, h := range ahn.([]string) { gfv, err := getGatherFact(ctx, h, o.variable) if err != nil { - klog.Errorf("[Pipeline %s] get gather fact from host %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), h, err) + klog.ErrorS(err, "Get gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "host", h) return err } if err := o.variable.Merge(variable.HostMerge{ @@ -133,7 +143,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { LocationUID: "", Data: gfv, }); err != nil { - klog.Errorf("[Pipeline %s] merge gather fact from host %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), h, err) + klog.ErrorS(err, "Merge gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "host", h) return err } } @@ -147,7 +157,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { // group hosts by serial. run the playbook by serial hs, err = converter.GroupHostBySerial(ahn.([]string), play.Serial.Data) if err != nil { - klog.Errorf("[Pipeline %s] convert host by serial error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err) + klog.ErrorS(err, "Group host by serial error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline)) return err } } @@ -167,7 +177,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { // generate task from pre tasks preTasks, err := c.block2Task(hctx, o, play.PreTasks, nil, puid, variable.BlockLocation) if err != nil { - klog.Errorf("[Pipeline %s] get pre task from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), play.Name, err) + klog.ErrorS(err, "Get pre task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name) return err } nsTasks.Items = append(nsTasks.Items, preTasks...) @@ -185,7 +195,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { } roleTasks, err := c.block2Task(context.WithValue(hctx, _const.CtxBlockRole, role.Role), o, role.Block, role.When.Data, ruid, variable.BlockLocation) if err != nil { - klog.Errorf("[Pipeline %s] get role from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), puid, err) + klog.ErrorS(err, "Get role task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name, "role", role.Role) return err } nsTasks.Items = append(nsTasks.Items, roleTasks...) @@ -193,14 +203,14 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { // generate task from tasks tasks, err := c.block2Task(hctx, o, play.Tasks, nil, puid, variable.BlockLocation) if err != nil { - klog.Errorf("[Pipeline %s] get pre task from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), puid, err) + klog.ErrorS(err, "Get task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name) return err } nsTasks.Items = append(nsTasks.Items, tasks...) // generate task from post tasks postTasks, err := c.block2Task(hctx, o, play.Tasks, nil, puid, variable.BlockLocation) if err != nil { - klog.Errorf("[Pipeline %s] get pre task from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), puid, err) + klog.ErrorS(err, "Get post task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name) return err } nsTasks.Items = append(nsTasks.Items, postTasks...) @@ -209,7 +219,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error { for _, task := range nsTasks.Items { if err := c.client.Create(ctx, &task); err != nil { - klog.Errorf("[Pipeline %s] create task %s error: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), task.Name, err) + klog.ErrorS(err, "Create task error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "task", task.Name) return err } } @@ -234,31 +244,35 @@ func (k *taskController) block2Task(ctx context.Context, o AddTaskOptions, ats [ Name: at.Name, Vars: at.Vars, }); err != nil { + klog.ErrorS(err, "Merge block to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) return nil, err } atWhen := append(when, at.When.Data...) if len(at.Block) != 0 { // add block - bt, err := k.block2Task(ctx, o, at.Block, atWhen, buid, variable.BlockLocation) + block, err := k.block2Task(ctx, o, at.Block, atWhen, buid, variable.BlockLocation) if err != nil { + klog.ErrorS(err, "Get block task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) return nil, err } - tasks = append(tasks, bt...) + tasks = append(tasks, block...) if len(at.Always) != 0 { - at, err := k.block2Task(ctx, o, at.Always, atWhen, buid, variable.AlwaysLocation) + always, err := k.block2Task(ctx, o, at.Always, atWhen, buid, variable.AlwaysLocation) if err != nil { + klog.ErrorS(err, "Get always task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) return nil, err } - tasks = append(tasks, at...) + tasks = append(tasks, always...) } if len(at.Rescue) != 0 { - rt, err := k.block2Task(ctx, o, at.Rescue, atWhen, buid, variable.RescueLocation) + rescue, err := k.block2Task(ctx, o, at.Rescue, atWhen, buid, variable.RescueLocation) if err != nil { + klog.ErrorS(err, "Get rescue task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name) return nil, err } - tasks = append(tasks, rt...) + tasks = append(tasks, rescue...) } } else { task := converter.MarshalBlock(context.WithValue(context.WithValue(ctx, _const.CtxBlockWhen, atWhen), _const.CtxBlockTaskUID, buid), @@ -267,6 +281,7 @@ func (k *taskController) block2Task(ctx context.Context, o AddTaskOptions, ats [ for n, a := range at.UnknownFiled { data, err := json.Marshal(a) if err != nil { + klog.ErrorS(err, "Marshal unknown field error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name, "field", n) return nil, err } if m := modules.FindModule(n); m != nil { @@ -328,7 +343,7 @@ func (k *taskController) processNextWorkItem(ctx context.Context) bool { switch { case err != nil: k.wq.AddRateLimited(req) - klog.Errorf("Reconciler error: %v", err) + klog.ErrorS(err, "Reconciler error", "request", req) case result.RequeueAfter > 0: // The result.RequeueAfter request will be lost, if it is returned // along with a non-nil error. But this is intended as diff --git a/pkg/variable/helper.go b/pkg/variable/helper.go index 7faa0ab7..7bad4fa3 100644 --- a/pkg/variable/helper.go +++ b/pkg/variable/helper.go @@ -80,12 +80,12 @@ func hostsInGroup(inv kubekeyv1.Inventory, groupName string) []string { func StringVar(vars VariableData, key string) *string { value, ok := vars[key] if !ok { - klog.V(4).Infof("cannot find variable %s", key) + klog.V(4).InfoS("cannot find variable", "key", key) return nil } sv, ok := value.(string) if !ok { - klog.V(4).Infof("variable %s is not string", key) + klog.V(4).InfoS("variable is not string", "key", key) return nil } return &sv @@ -95,13 +95,13 @@ func StringVar(vars VariableData, key string) *string { func IntVar(vars VariableData, key string) *int { value, ok := vars[key] if !ok { - klog.V(4).Infof("cannot find variable %s", key) + klog.V(4).InfoS("cannot find variable", "key", key) return nil } // default convert to float64 number, ok := value.(float64) if !ok { - klog.V(4).Infof("variable %s is not string", key) + klog.V(4).InfoS("variable is not number", "key", key) return nil } vi := int(number) @@ -112,19 +112,19 @@ func IntVar(vars VariableData, key string) *int { func StringSliceVar(vars VariableData, key string) []string { value, ok := vars[key] if !ok { - klog.V(4).Infof("cannot find variable %s", key) + klog.V(4).InfoS("cannot find variable", "key", key) return nil } sv, ok := value.([]any) if !ok { - klog.V(4).Infof("variable %s is not slice", key) + klog.V(4).InfoS("variable is not string slice", "key", key) return nil } var ss []string for _, a := range sv { av, ok := a.(string) if !ok { - klog.V(4).Infof("value in variable %s is not string", key) + klog.V(4).InfoS("variable is not string", "key", key) return nil } ss = append(ss, av) @@ -139,7 +139,7 @@ func Extension2Variables(ext runtime.RawExtension) VariableData { var data VariableData if err := yaml.Unmarshal(ext.Raw, &data); err != nil { - klog.Errorf("failed to unmarshal extension to variables: %v", err) + klog.ErrorS(err, "failed to unmarshal extension to variables") } return data } @@ -151,7 +151,7 @@ func Extension2Slice(ext runtime.RawExtension) []any { var data []any if err := yaml.Unmarshal(ext.Raw, &data); err != nil { - klog.Errorf("failed to unmarshal extension to any: %v", err) + klog.ErrorS(err, "failed to unmarshal extension to slice") } return data } diff --git a/pkg/variable/internal.go b/pkg/variable/internal.go index c9010f70..c414ec76 100644 --- a/pkg/variable/internal.go +++ b/pkg/variable/internal.go @@ -31,10 +31,13 @@ import ( ) type variable struct { + // key is the unique Identifier of the variable. usually the UID of the pipeline. + key string + // source is where the variable is stored source source.Source - + // value is the data of the variable, which store in memory value *value - + // lock is the lock for value sync.Mutex } @@ -111,7 +114,7 @@ type VariableData map[string]any func (v VariableData) String() string { data, err := json.Marshal(v) if err != nil { - klog.Errorf("marshal in error: %v", err) + klog.ErrorS(err, "marshal in error", "data", v) return "" } return string(data) @@ -122,6 +125,10 @@ type host struct { RuntimeVars map[string]VariableData `json:"runtime"` } +func (v *variable) Key() string { + return v.key +} + func (v *variable) Get(option GetOption) (any, error) { return option.filter(*v.value) } @@ -139,14 +146,14 @@ func (v *variable) Merge(mo ...MergeOption) error { if !reflect.DeepEqual(old.Location, v.value.Location) { if err := v.syncLocation(); err != nil { - klog.Errorf("sync location error %v", err) + klog.ErrorS(err, "sync location error") } } for hn, hv := range v.value.Hosts { if !reflect.DeepEqual(old.Hosts[hn], hv) { if err := v.syncHosts(hn); err != nil { - klog.Errorf("sync group error %v", err) + klog.ErrorS(err, "sync host error", "hostname", hn) } } } @@ -157,11 +164,11 @@ func (v *variable) Merge(mo ...MergeOption) error { func (v *variable) syncLocation() error { data, err := json.MarshalIndent(v.value.Location, "", " ") if err != nil { - klog.Errorf("marshal location data failed: %v", err) + klog.ErrorS(err, "marshal location data error") return err } if err := v.source.Write(data, _const.RuntimePipelineVariableLocationFile); err != nil { - klog.Errorf("write location data to local file %s error %v", _const.RuntimePipelineVariableLocationFile, err) + klog.ErrorS(err, "write location data to local file error", "filename", _const.RuntimePipelineVariableLocationFile) return err } return nil @@ -173,11 +180,11 @@ func (v *variable) syncHosts(hostname ...string) error { if hv, ok := v.value.Hosts[hn]; ok { data, err := json.MarshalIndent(hv, "", " ") if err != nil { - klog.Errorf("marshal host %s data failed: %v", hn, err) + klog.ErrorS(err, "marshal host data error", "hostname", hn) return err } if err := v.source.Write(data, fmt.Sprintf("%s.json", hn)); err != nil { - klog.Errorf("write host data to local file %s.json error %v", hn, err) + klog.ErrorS(err, "write host data to local file error", "hostname", hn, "filename", fmt.Sprintf("%s.json", hn)) } } } diff --git a/pkg/variable/source/file.go b/pkg/variable/source/file.go index 39077fc9..af387056 100644 --- a/pkg/variable/source/file.go +++ b/pkg/variable/source/file.go @@ -31,7 +31,7 @@ type fileSource struct { func (f *fileSource) Read() (map[string][]byte, error) { de, err := os.ReadDir(f.path) if err != nil { - klog.Errorf("read dir %s error %v", f.path, err) + klog.ErrorS(err, "read dir error", "path", f.path) return nil, err } var result map[string][]byte @@ -46,6 +46,7 @@ func (f *fileSource) Read() (map[string][]byte, error) { if strings.HasSuffix(entry.Name(), ".json") { data, err := os.ReadFile(filepath.Join(f.path, entry.Name())) if err != nil { + klog.ErrorS(err, "read file error", "filename", entry.Name()) return nil, err } result[entry.Name()] = data @@ -58,10 +59,12 @@ func (f *fileSource) Read() (map[string][]byte, error) { func (f *fileSource) Write(data []byte, filename string) error { file, err := os.Create(filepath.Join(f.path, filename)) if err != nil { + klog.ErrorS(err, "create file error", "filename", filename) return err } defer file.Close() if _, err := file.Write(data); err != nil { + klog.ErrorS(err, "write file error", "filename", filename) return err } return nil diff --git a/pkg/variable/source/source.go b/pkg/variable/source/source.go index f8f49aba..e7328878 100644 --- a/pkg/variable/source/source.go +++ b/pkg/variable/source/source.go @@ -40,7 +40,7 @@ type Watcher interface { func New(path string) (Source, error) { if _, err := os.Stat(path); err != nil { if err := os.MkdirAll(path, fs.ModePerm); err != nil { - klog.Errorf("create source path %s error: %v", path, err) + klog.ErrorS(err, "create source path error", "path", path) return nil, err } } diff --git a/pkg/variable/variable.go b/pkg/variable/variable.go index cbd41e6a..848f316e 100644 --- a/pkg/variable/variable.go +++ b/pkg/variable/variable.go @@ -25,6 +25,8 @@ import ( "strconv" "strings" + cgcache "k8s.io/client-go/tools/cache" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -36,6 +38,7 @@ import ( ) type Variable interface { + Key() string Get(option GetOption) (any, error) Merge(option ...MergeOption) error } @@ -51,22 +54,23 @@ func New(o Options) (Variable, error) { // new source s, err := source.New(filepath.Join(_const.RuntimeDirFromObject(&o.Pipeline), _const.RuntimePipelineVariableDir)) if err != nil { - klog.Errorf("create file source failed: %v", err) + klog.ErrorS(err, "create file source failed", "path", filepath.Join(_const.RuntimeDirFromObject(&o.Pipeline), _const.RuntimePipelineVariableDir), "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline)) return nil, err } // get config var config = &kubekeyv1.Config{} if err := o.Client.Get(o.Ctx, types.NamespacedName{o.Pipeline.Spec.ConfigRef.Namespace, o.Pipeline.Spec.ConfigRef.Name}, config); err != nil { - klog.Errorf("get config from pipeline error %v", err) + klog.ErrorS(err, "get config from pipeline error", "config", o.Pipeline.Spec.ConfigRef, "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline)) return nil, err } // get inventory var inventory = &kubekeyv1.Inventory{} if err := o.Client.Get(o.Ctx, types.NamespacedName{o.Pipeline.Spec.InventoryRef.Namespace, o.Pipeline.Spec.InventoryRef.Name}, inventory); err != nil { - klog.Errorf("get inventory from pipeline error %v", err) + klog.ErrorS(err, "get inventory from pipeline error", "inventory", o.Pipeline.Spec.InventoryRef, "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline)) return nil, err } v := &variable{ + key: string(o.Pipeline.UID), source: s, value: &value{ Config: *config, @@ -77,21 +81,21 @@ func New(o Options) (Variable, error) { // read data from source data, err := v.source.Read() if err != nil { - klog.Errorf("read data from source error %v", err) + klog.ErrorS(err, "read data from source error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline)) return nil, err } for k, d := range data { if k == _const.RuntimePipelineVariableLocationFile { // set location if err := json.Unmarshal(d, &v.value.Location); err != nil { - klog.Errorf("unmarshal location error %v", err) + klog.ErrorS(err, "unmarshal location error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline)) return nil, err } } else { // set hosts h := host{} if err := json.Unmarshal(d, &h); err != nil { - klog.Errorf("unmarshal host error %v", err) + klog.ErrorS(err, "unmarshal host error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline)) return nil, err } v.value.Hosts[strings.TrimSuffix(k, ".json")] = h @@ -252,7 +256,7 @@ func (g Hostnames) filter(data value) (any, error) { if match := regex.FindStringSubmatch(n); match != nil { index, err := strconv.Atoi(match[2]) if err != nil { - klog.Errorf("convert index %s to int failed: %v", match[2], err) + klog.ErrorS(err, "convert index to int error", "index", match[2]) return nil, err } for gn, gv := range data.Inventory.Spec.Groups { @@ -549,3 +553,12 @@ func (t LocationMerge) mergeTo(v *value) error { return nil } + +// Cache is a cache for variable +var Cache = cgcache.NewStore(func(obj interface{}) (string, error) { + v, ok := obj.(Variable) + if !ok { + return "", fmt.Errorf("cannot convert %v to variable", obj) + } + return v.Key(), nil +})