mirror of
https://github.com/kubesphere/kubekey.git
synced 2025-12-26 01:22:51 +00:00
Merge pull request #2090 from littleBlackHouse/feature
feat: Change the klog print format. Remove custom-defined cache insta…
This commit is contained in:
commit
025570a91e
|
|
@ -10,14 +10,6 @@ app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
|
|||
app.kubernetes.io/managed-by: {{ .Release.Service }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Selector labels
|
||||
*/}}
|
||||
{{- define "common.selectorLabels" -}}
|
||||
app.kubernetes.io/name: {{ .Chart.Name }}
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Create chart name and version as used by the chart label.
|
||||
*/}}
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -3,7 +3,6 @@ module github.com/kubesphere/kubekey/v4
|
|||
go 1.20
|
||||
|
||||
require (
|
||||
github.com/evanphx/json-patch v5.7.0+incompatible
|
||||
github.com/flosch/pongo2/v6 v6.0.0
|
||||
github.com/go-git/go-git/v5 v5.11.0
|
||||
github.com/google/gops v0.3.28
|
||||
|
|
@ -37,6 +36,7 @@ require (
|
|||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.11.1 // indirect
|
||||
github.com/emirpasic/gods v1.18.1 // indirect
|
||||
github.com/evanphx/json-patch v5.7.0+incompatible // indirect
|
||||
github.com/evanphx/json-patch/v5 v5.7.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
||||
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
|
||||
|
|
|
|||
19
go.sum
19
go.sum
|
|
@ -24,14 +24,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
|
|||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU=
|
||||
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
|
||||
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
|
||||
github.com/emicklei/go-restful/v3 v3.11.1 h1:S+9bSbua1z3FgCnV0KKOSSZ3mDthb5NyEPL5gEpCvyk=
|
||||
github.com/emicklei/go-restful/v3 v3.11.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
|
||||
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
|
||||
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
|
||||
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
|
||||
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
||||
github.com/evanphx/json-patch v5.7.0+incompatible h1:vgGkfT/9f8zE6tvSCe74nfpAVDQ2tG6yudJd8LBksgI=
|
||||
github.com/evanphx/json-patch v5.7.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
||||
github.com/evanphx/json-patch/v5 v5.7.0 h1:nJqP7uwL84RJInrohHfW0Fx3awjbm8qZeFv0nW9SYGc=
|
||||
|
|
@ -48,7 +44,6 @@ github.com/go-git/go-billy/v5 v5.5.0/go.mod h1:hmexnoNsr2SJU1Ju67OaNz5ASJY3+sHgF
|
|||
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMje31YglSBqCdIqdhKBW8lokaMrL3uTkpGYlE2OOT4=
|
||||
github.com/go-git/go-git/v5 v5.11.0 h1:XIZc1p+8YzypNr34itUfSvYJcv+eYdTnTvOZ2vD3cA4=
|
||||
github.com/go-git/go-git/v5 v5.11.0/go.mod h1:6GFcX2P3NM7FPBfpePbpLd21XxsgdAt+lKqXmCUiUCY=
|
||||
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
|
||||
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
|
||||
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
|
|
@ -88,13 +83,10 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
|
|||
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
||||
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
|
||||
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
|
||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
|
||||
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
|
||||
github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
|
|
@ -106,10 +98,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
|||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
|
||||
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
|
||||
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
|
|
@ -119,7 +109,6 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
|
|||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
|
||||
github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg=
|
||||
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
|
||||
|
|
@ -151,7 +140,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
|||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
|
|
@ -177,8 +165,6 @@ golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2Uz
|
|||
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
|
||||
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
|
||||
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
|
||||
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b h1:kLiC65FbiHWFAOu+lxwNPujcsl8VYyTYYEZnsOO1WK4=
|
||||
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
|
||||
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc h1:ao2WRsKSzW6KuUY9IWPwWahcHCgR0s52IfwutMfEbdM=
|
||||
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
|
|
@ -269,7 +255,6 @@ google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh
|
|||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
|
||||
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=
|
||||
|
|
@ -293,12 +278,8 @@ k8s.io/component-base v0.29.0 h1:T7rjd5wvLnPBV1vC4zWd/iWRbV8Mdxs+nGaoaFzGw3s=
|
|||
k8s.io/component-base v0.29.0/go.mod h1:sADonFTQ9Zc9yFLghpDpmNXEdHyQmFIGbiuZbqAXQ1M=
|
||||
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
|
||||
k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo=
|
||||
k8s.io/kube-openapi v0.0.0-20231214164306-ab13479f8bf8 h1:yHNkNuLjht7iq95pO9QmbjOWCguvn8mDe3lT78nqPkw=
|
||||
k8s.io/kube-openapi v0.0.0-20231214164306-ab13479f8bf8/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA=
|
||||
k8s.io/kube-openapi v0.0.0-20240103195357-a9f8850cb432 h1:+XYBQU3ZKUu60H6fEnkitTTabGoKfIG8zczhZBENu9o=
|
||||
k8s.io/kube-openapi v0.0.0-20240103195357-a9f8850cb432/go.mod h1:Pa1PvrP7ACSkuX6I7KYomY6cmMA0Tx86waBhDUgoKPw=
|
||||
k8s.io/utils v0.0.0-20231127182322-b307cd553661 h1:FepOBzJ0GXm8t0su67ln2wAZjbQ6RxQGZDnzuLcrUTI=
|
||||
k8s.io/utils v0.0.0-20231127182322-b307cd553661/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
|
||||
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
|
||||
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
|
||||
sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4=
|
||||
|
|
|
|||
|
|
@ -1,89 +0,0 @@
|
|||
/*
|
||||
Copyright 2023 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Cache is the interface for cache.
|
||||
type Cache interface {
|
||||
// Name of pool
|
||||
Name() string
|
||||
// Put the cached value for the given key.
|
||||
Put(key string, value any)
|
||||
// Get the cached value for the given key.
|
||||
Get(key string) (any, bool)
|
||||
// Release the cached value for the given id.
|
||||
Release(id string)
|
||||
// Clean all cached value
|
||||
Clean()
|
||||
}
|
||||
|
||||
type local struct {
|
||||
name string
|
||||
cache map[string]any
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func (p *local) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *local) Put(key string, value any) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
p.cache[key] = value
|
||||
}
|
||||
|
||||
func (p *local) Get(key string) (any, bool) {
|
||||
v, ok := p.cache[key]
|
||||
if ok {
|
||||
return v, ok
|
||||
}
|
||||
return v, false
|
||||
}
|
||||
|
||||
func (p *local) Release(id string) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
delete(p.cache, id)
|
||||
}
|
||||
|
||||
func (p *local) Clean() {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
for id := range p.cache {
|
||||
delete(p.cache, id)
|
||||
}
|
||||
}
|
||||
|
||||
// NewLocalCache return a local cache
|
||||
func NewLocalCache(name string) Cache {
|
||||
return &local{
|
||||
name: name,
|
||||
cache: make(map[string]any),
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
// LocalVariable is a local cache for variable.Variable
|
||||
LocalVariable = NewLocalCache("variable")
|
||||
)
|
||||
|
|
@ -1,31 +0,0 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
testCache := NewLocalCache("test")
|
||||
assert.Equal(t, "test", testCache.Name())
|
||||
|
||||
// should not be able to get the key
|
||||
_, ok := testCache.Get("foo")
|
||||
assert.False(t, ok)
|
||||
|
||||
// put a key
|
||||
testCache.Put("foo", "bar")
|
||||
|
||||
// should be able to get the key
|
||||
v, ok := testCache.Get("foo")
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, "bar", v)
|
||||
|
||||
// release the key
|
||||
testCache.Release("foo")
|
||||
|
||||
// should not be able to get the key
|
||||
_, ok = testCache.Get("foo")
|
||||
assert.False(t, ok)
|
||||
}
|
||||
|
|
@ -41,20 +41,19 @@ func (c *localConnector) Close(ctx context.Context) error {
|
|||
|
||||
func (c *localConnector) CopyFile(ctx context.Context, local []byte, remoteFile string, mode fs.FileMode) error {
|
||||
// create remote file
|
||||
if _, err := os.Stat(filepath.Dir(remoteFile)); err != nil {
|
||||
klog.Warningf("Failed to stat dir %s: %v create it", filepath.Dir(remoteFile), err)
|
||||
if _, err := os.Stat(filepath.Dir(remoteFile)); err != nil && os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(filepath.Dir(remoteFile), mode); err != nil {
|
||||
klog.Errorf("Failed to create dir %s: %v", filepath.Dir(remoteFile), err)
|
||||
klog.ErrorS(err, "Failed to create remote dir", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
}
|
||||
rf, err := os.Create(remoteFile)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create file %s: %v", remoteFile, err)
|
||||
klog.ErrorS(err, "Failed to create remote file", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
if _, err := rf.Write(local); err != nil {
|
||||
klog.Errorf("Failed to write file %s: %v", remoteFile, err)
|
||||
klog.ErrorS(err, "Failed to write content to remote file", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
return rf.Chmod(mode)
|
||||
|
|
@ -64,11 +63,11 @@ func (c *localConnector) FetchFile(ctx context.Context, remoteFile string, local
|
|||
var err error
|
||||
file, err := os.Open(remoteFile)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to read file %s: %v", remoteFile, err)
|
||||
klog.ErrorS(err, "Failed to read remote file failed", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
if _, err := io.Copy(local, file); err != nil {
|
||||
klog.Errorf("Failed to copy file %s: %v", remoteFile, err)
|
||||
klog.ErrorS(err, "Failed to copy remote file to local", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -77,24 +76,3 @@ func (c *localConnector) FetchFile(ctx context.Context, remoteFile string, local
|
|||
func (c *localConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) {
|
||||
return c.Cmd.CommandContext(ctx, cmd).CombinedOutput()
|
||||
}
|
||||
|
||||
func (c *localConnector) copyFile(sourcePath, destinationPath string) error {
|
||||
sourceFile, err := os.Open(sourcePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer sourceFile.Close()
|
||||
|
||||
destinationFile, err := os.Create(destinationPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer destinationFile.Close()
|
||||
|
||||
_, err = io.Copy(destinationFile, sourceFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
|
|
@ -57,6 +58,7 @@ func (c *sshConnector) Init(ctx context.Context) error {
|
|||
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
||||
})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Dial ssh server failed", "host", c.Host, "port", *c.Port)
|
||||
return err
|
||||
}
|
||||
c.client = sshClient
|
||||
|
|
@ -72,27 +74,26 @@ func (c *sshConnector) CopyFile(ctx context.Context, src []byte, remoteFile stri
|
|||
// create sftp client
|
||||
sftpClient, err := sftp.NewClient(c.client)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create sftp client: %v", err)
|
||||
klog.ErrorS(err, "Failed to create sftp client")
|
||||
return err
|
||||
}
|
||||
defer sftpClient.Close()
|
||||
// create remote file
|
||||
if _, err := sftpClient.Stat(filepath.Dir(remoteFile)); err != nil {
|
||||
klog.Warningf("Failed to stat dir %s: %v create it", filepath.Dir(remoteFile), err)
|
||||
if _, err := sftpClient.Stat(filepath.Dir(remoteFile)); err != nil && os.IsNotExist(err) {
|
||||
if err := sftpClient.MkdirAll(filepath.Dir(remoteFile)); err != nil {
|
||||
klog.Errorf("Failed to create dir %s: %v", filepath.Dir(remoteFile), err)
|
||||
klog.ErrorS(err, "Failed to create remote dir", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
}
|
||||
rf, err := sftpClient.Create(remoteFile)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create file %s: %v", remoteFile, err)
|
||||
klog.ErrorS(err, "Failed to create remote file", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
defer rf.Close()
|
||||
|
||||
if _, err = rf.Write(src); err != nil {
|
||||
klog.Errorf("Failed to write file %s: %v", remoteFile, err)
|
||||
klog.ErrorS(err, "Failed to write content to remote file", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
return rf.Chmod(mode)
|
||||
|
|
@ -102,18 +103,18 @@ func (c *sshConnector) FetchFile(ctx context.Context, remoteFile string, local i
|
|||
// create sftp client
|
||||
sftpClient, err := sftp.NewClient(c.client)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create sftp client: %v", err)
|
||||
klog.ErrorS(err, "Failed to create sftp client", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
defer sftpClient.Close()
|
||||
rf, err := sftpClient.Open(remoteFile)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to open file %s: %v", remoteFile, err)
|
||||
klog.ErrorS(err, "Failed to open file", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
defer rf.Close()
|
||||
if _, err := io.Copy(local, rf); err != nil {
|
||||
klog.Errorf("Failed to copy file %s: %v", remoteFile, err)
|
||||
klog.ErrorS(err, "Failed to copy file", "remote_file", remoteFile)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -123,6 +124,7 @@ func (c *sshConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte,
|
|||
// create ssh session
|
||||
session, err := c.client.NewSession()
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to create ssh session")
|
||||
return nil, err
|
||||
}
|
||||
defer session.Close()
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ func RuntimeDirFromObject(obj runtime.Object) string {
|
|||
mo, ok := obj.(metav1.Object)
|
||||
|
||||
if !ok {
|
||||
klog.Errorf("failed convert to metav1.Object: %s", obj.GetObjectKind().GroupVersionKind().String())
|
||||
klog.Errorf("Failed convert to metav1.Object: %s", obj.GetObjectKind().GroupVersionKind().String())
|
||||
return ""
|
||||
}
|
||||
return filepath.Join(workDir, RuntimeDir, mo.GetNamespace(), resource, mo.GetName())
|
||||
|
|
|
|||
|
|
@ -41,23 +41,21 @@ type PipelineReconciler struct {
|
|||
}
|
||||
|
||||
func (r PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
klog.Infof("[Pipeline %s] begin reconcile", req.NamespacedName.String())
|
||||
defer func() {
|
||||
klog.Infof("[Pipeline %s] end reconcile", req.NamespacedName.String())
|
||||
}()
|
||||
|
||||
klog.V(5).InfoS("start pipeline reconcile", "pipeline", req.String())
|
||||
defer klog.V(5).InfoS("finish pipeline reconcile", "pipeline", req.String())
|
||||
// get pipeline
|
||||
pipeline := &kubekeyv1.Pipeline{}
|
||||
err := r.Client.Get(ctx, req.NamespacedName, pipeline)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
klog.V(5).Infof("[Pipeline %s] pipeline not found", req.NamespacedName.String())
|
||||
klog.V(5).InfoS("pipeline not found", "pipeline", req.String())
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
if pipeline.DeletionTimestamp != nil {
|
||||
klog.V(5).Infof("[Pipeline %s] pipeline is deleting", req.NamespacedName.String())
|
||||
klog.V(5).InfoS("pipeline is deleting", "pipeline", req.String())
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
|
|
@ -66,14 +64,14 @@ func (r PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
|
|||
excepted := pipeline.DeepCopy()
|
||||
pipeline.Status.Phase = kubekeyv1.PipelinePhasePending
|
||||
if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(excepted)); err != nil {
|
||||
klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err)
|
||||
klog.ErrorS(err, "update pipeline error", "pipeline", req.String())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
case kubekeyv1.PipelinePhasePending:
|
||||
excepted := pipeline.DeepCopy()
|
||||
pipeline.Status.Phase = kubekeyv1.PipelinePhaseRunning
|
||||
if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(excepted)); err != nil {
|
||||
klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err)
|
||||
klog.ErrorS(err, "update pipeline error", "pipeline", req.String())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
case kubekeyv1.PipelinePhaseRunning:
|
||||
|
|
@ -89,7 +87,7 @@ func (r PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
|
|||
func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline *kubekeyv1.Pipeline) (ctrl.Result, error) {
|
||||
if _, ok := pipeline.Annotations[kubekeyv1.PauseAnnotation]; ok {
|
||||
// if pipeline is paused, do nothing
|
||||
klog.V(5).Infof("[Pipeline %s] pipeline is paused", ctrlclient.ObjectKeyFromObject(pipeline))
|
||||
klog.V(5).InfoS("pipeline is paused", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
|
|
@ -97,14 +95,14 @@ func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline *
|
|||
defer func() {
|
||||
// update pipeline status
|
||||
if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(cp)); err != nil {
|
||||
klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err)
|
||||
klog.ErrorS(err, "update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := r.TaskController.AddTasks(ctx, task.AddTaskOptions{
|
||||
Pipeline: pipeline,
|
||||
}); err != nil {
|
||||
klog.Errorf("[Pipeline %s] add task error: %v", ctrlclient.ObjectKeyFromObject(pipeline), err)
|
||||
klog.ErrorS(err, "add task error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
|
||||
pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed
|
||||
pipeline.Status.Reason = fmt.Sprintf("add task to controller failed: %v", err)
|
||||
return ctrl.Result{}, err
|
||||
|
|
@ -116,10 +114,10 @@ func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline *
|
|||
// clean runtime directory
|
||||
func (r *PipelineReconciler) clean(ctx context.Context, pipeline *kubekeyv1.Pipeline) {
|
||||
if !pipeline.Spec.Debug && pipeline.Status.Phase == kubekeyv1.PipelinePhaseSucceed {
|
||||
klog.Infof("[Pipeline %s] clean runtimeDir", ctrlclient.ObjectKeyFromObject(pipeline))
|
||||
klog.V(5).InfoS("clean runtimeDir", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
|
||||
// clean runtime directory
|
||||
if err := os.RemoveAll(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir)); err != nil {
|
||||
klog.Errorf("clean runtime directory %s error: %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), err)
|
||||
klog.ErrorS(err, "clean runtime directory error", "runtime dir", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -127,7 +125,7 @@ func (r *PipelineReconciler) clean(ctx context.Context, pipeline *kubekeyv1.Pipe
|
|||
// SetupWithManager sets up the controller with the Manager.
|
||||
func (r *PipelineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options Options) error {
|
||||
if !options.IsControllerEnabled("pipeline") {
|
||||
klog.V(5).Infof("pipeline controller is disabled")
|
||||
klog.V(5).InfoS("controller is disabled", "controller", "pipeline")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
cgcache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/strings/slices"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
|
|
@ -31,7 +32,6 @@ import (
|
|||
|
||||
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
|
||||
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/cache"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/converter"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/converter/tmpl"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/modules"
|
||||
|
|
@ -42,7 +42,7 @@ type TaskReconciler struct {
|
|||
// Client to resources
|
||||
ctrlclient.Client
|
||||
// VariableCache to store variable
|
||||
VariableCache cache.Cache
|
||||
VariableCache cgcache.Store
|
||||
}
|
||||
|
||||
type taskReconcileOptions struct {
|
||||
|
|
@ -52,18 +52,18 @@ type taskReconcileOptions struct {
|
|||
}
|
||||
|
||||
func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
|
||||
klog.V(5).Infof("[Task %s] start reconcile", request.String())
|
||||
defer klog.V(5).Infof("[Task %s] finish reconcile", request.String())
|
||||
klog.V(5).InfoS("start task reconcile", "task", request.String())
|
||||
defer klog.V(5).InfoS("finish task reconcile", "task", request.String())
|
||||
// get task
|
||||
var task = &kubekeyv1alpha1.Task{}
|
||||
if err := r.Client.Get(ctx, request.NamespacedName, task); err != nil {
|
||||
klog.Errorf("get task %s error %v", request, err)
|
||||
klog.ErrorS(err, "get task error", "task", request.String())
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// if task is deleted, skip
|
||||
if task.DeletionTimestamp != nil {
|
||||
klog.V(5).Infof("[Task %s] task is deleted, skip", request.String())
|
||||
klog.V(5).InfoS("task is deleted, skip", "task", request.String())
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
|
|
@ -72,11 +72,11 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c
|
|||
for _, ref := range task.OwnerReferences {
|
||||
if ref.Kind == "Pipeline" {
|
||||
if err := r.Client.Get(ctx, types.NamespacedName{Namespace: task.Namespace, Name: ref.Name}, pipeline); err != nil {
|
||||
klog.Errorf("[Task %s] get pipeline %s error %v", request.String(), types.NamespacedName{Namespace: task.Namespace, Name: ref.Name}.String(), err)
|
||||
if errors.IsNotFound(err) {
|
||||
klog.V(4).Infof("[Task %s] pipeline is deleted, skip", request.String())
|
||||
klog.V(5).InfoS("pipeline is deleted, skip", "task", request.String())
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
klog.ErrorS(err, "get pipeline error", "task", request.String(), "pipeline", types.NamespacedName{Namespace: task.Namespace, Name: ref.Name}.String())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
break
|
||||
|
|
@ -84,33 +84,41 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c
|
|||
}
|
||||
|
||||
if _, ok := pipeline.Annotations[kubekeyv1.PauseAnnotation]; ok {
|
||||
klog.V(5).Infof("[Task %s] pipeline is paused, skip", request.String())
|
||||
klog.V(5).InfoS("pipeline is paused, skip", "task", request.String())
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// get variable
|
||||
var v variable.Variable
|
||||
if vc, ok := r.VariableCache.Get(string(pipeline.UID)); !ok {
|
||||
// create new variable
|
||||
vars, ok, err := r.VariableCache.GetByKey(string(pipeline.UID))
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "get variable error", "task", request.String())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if ok {
|
||||
v = vars.(variable.Variable)
|
||||
} else {
|
||||
nv, err := variable.New(variable.Options{
|
||||
Ctx: ctx,
|
||||
Client: r.Client,
|
||||
Pipeline: *pipeline,
|
||||
})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "create variable error", "task", request.String())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if err := r.VariableCache.Add(nv); err != nil {
|
||||
klog.ErrorS(err, "add variable to store error", "task", request.String())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
r.VariableCache.Put(string(pipeline.UID), nv)
|
||||
v = nv
|
||||
} else {
|
||||
v = vc.(variable.Variable)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
var nsTasks = &kubekeyv1alpha1.TaskList{}
|
||||
klog.V(5).Infof("[Task %s] update pipeline %s status", ctrlclient.ObjectKeyFromObject(task).String(), ctrlclient.ObjectKeyFromObject(pipeline).String())
|
||||
klog.V(5).InfoS("update pipeline status", "task", request.String(), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline).String())
|
||||
if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(task.Namespace)); err != nil {
|
||||
klog.Errorf("[Task %s] list task error %v", ctrlclient.ObjectKeyFromObject(task).String(), err)
|
||||
klog.ErrorS(err, "list task error", "task", request.String())
|
||||
return
|
||||
}
|
||||
// filter by ownerReference
|
||||
|
|
@ -129,7 +137,7 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c
|
|||
cp := pipeline.DeepCopy()
|
||||
converter.CalculatePipelineStatus(nsTasks, pipeline)
|
||||
if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(cp)); err != nil {
|
||||
klog.Errorf("[Task %s] update pipeline %s status error %v", ctrlclient.ObjectKeyFromObject(task).String(), pipeline.Name, err)
|
||||
klog.ErrorS(err, "update pipeline status error", "task", request.String(), "pipeline", ctrlclient.ObjectKeyFromObject(pipeline).String())
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -139,7 +147,7 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c
|
|||
task.Status.Phase = kubekeyv1alpha1.TaskPhasePending
|
||||
task.Status.RestartCount++
|
||||
if err := r.Client.Update(ctx, task); err != nil {
|
||||
klog.Errorf("update task %s error %v", task.Name, err)
|
||||
klog.ErrorS(err, "update task error", "task", request.String())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
|
|
@ -169,18 +177,18 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc
|
|||
LocationUID: string(options.Task.UID),
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("[Task %s] find dependency error %v", ctrlclient.ObjectKeyFromObject(options.Task).String(), err)
|
||||
klog.ErrorS(err, "find dependency error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
dt, ok := dl.(variable.DependencyTask)
|
||||
if !ok {
|
||||
klog.Errorf("[Task %s] failed to convert dependency", ctrlclient.ObjectKeyFromObject(options.Task).String())
|
||||
klog.ErrorS(err, "failed to convert dependency", "task", ctrlclient.ObjectKeyFromObject(options.Task).String())
|
||||
return ctrl.Result{}, fmt.Errorf("[Task %s] failed to convert dependency", ctrlclient.ObjectKeyFromObject(options.Task).String())
|
||||
}
|
||||
|
||||
var nsTasks = &kubekeyv1alpha1.TaskList{}
|
||||
if err := r.Client.List(ctx, nsTasks, ctrlclient.InNamespace(options.Task.Namespace)); err != nil {
|
||||
klog.Errorf("[Task %s] list task error %v", ctrlclient.ObjectKeyFromObject(options.Task).String(), err)
|
||||
klog.ErrorS(err, "list task error", "task", ctrlclient.ObjectKeyFromObject(options.Task).String(), err)
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
// filter by ownerReference
|
||||
|
|
@ -210,13 +218,13 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc
|
|||
// update task phase to running
|
||||
options.Task.Status.Phase = kubekeyv1alpha1.TaskPhaseRunning
|
||||
if err := r.Client.Update(ctx, options.Task); err != nil {
|
||||
klog.Errorf("[Task %s] update task to Running error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
|
||||
klog.ErrorS(err, "update task to Running error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
}
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
case kubekeyv1alpha1.TaskPhaseSkipped:
|
||||
options.Task.Status.Phase = kubekeyv1alpha1.TaskPhaseSkipped
|
||||
if err := r.Client.Update(ctx, options.Task); err != nil {
|
||||
klog.Errorf("[Task %s] update task to Skipped error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
|
||||
klog.ErrorS(err, "update task to Skipped error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
default:
|
||||
|
|
@ -226,13 +234,11 @@ func (r *TaskReconciler) dealPendingTask(ctx context.Context, options taskReconc
|
|||
|
||||
func (r *TaskReconciler) dealRunningTask(ctx context.Context, options taskReconcileOptions) (ctrl.Result, error) {
|
||||
// find task in location
|
||||
klog.Infof("[Task %s] dealRunningTask begin", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
defer func() {
|
||||
klog.Infof("[Task %s] dealRunningTask end, task phase: %s", ctrlclient.ObjectKeyFromObject(options.Task), options.Task.Status.Phase)
|
||||
}()
|
||||
klog.InfoS("dealRunningTask begin", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
defer klog.Info("dealRunningTask end, task phase", "task", ctrlclient.ObjectKeyFromObject(options.Task), "phase", options.Task.Status.Phase)
|
||||
|
||||
if err := r.executeTask(ctx, options); err != nil {
|
||||
klog.Errorf("[Task %s] execute task error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
|
||||
klog.ErrorS(err, "execute task error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
|
|
@ -246,7 +252,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
|
|||
cd.EndTimestamp = metav1.Now()
|
||||
options.Task.Status.Conditions = append(options.Task.Status.Conditions, cd)
|
||||
if err := r.Client.Update(ctx, options.Task); err != nil {
|
||||
klog.Errorf("[Task %s] update task status error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
|
||||
klog.ErrorS(err, "update task status error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -270,7 +276,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
|
|||
if options.Task.Spec.Register != "" {
|
||||
puid, err := options.Variable.Get(variable.ParentLocation{LocationUID: string(options.Task.UID)})
|
||||
if err != nil {
|
||||
klog.Errorf("[Task %s] get location error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
|
||||
klog.ErrorS(err, "get location error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
return
|
||||
}
|
||||
// set variable to parent location
|
||||
|
|
@ -284,7 +290,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
|
|||
},
|
||||
},
|
||||
}); err != nil {
|
||||
klog.Errorf("[Task %s] register error %v", ctrlclient.ObjectKeyFromObject(options.Task), err)
|
||||
klog.ErrorS(err, "register task result to variable error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
@ -295,6 +301,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
|
|||
LocationUID: string(options.Task.UID),
|
||||
})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "get location variable error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
stderr = err.Error()
|
||||
return
|
||||
}
|
||||
|
|
@ -302,6 +309,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
|
|||
if len(options.Task.Spec.When) > 0 {
|
||||
ok, err := tmpl.ParseBool(lg.(variable.VariableData), options.Task.Spec.When)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "parse when condition error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
stderr = err.Error()
|
||||
return
|
||||
}
|
||||
|
|
@ -326,6 +334,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
|
|||
case string:
|
||||
item, err = tmpl.ParseString(lg.(variable.VariableData), item.(string))
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "parse loop vars error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
stderr = err.Error()
|
||||
return
|
||||
}
|
||||
|
|
@ -333,6 +342,7 @@ func (r *TaskReconciler) executeTask(ctx context.Context, options taskReconcileO
|
|||
for k, v := range item.(variable.VariableData) {
|
||||
sv, err := tmpl.ParseString(lg.(variable.VariableData), v.(string))
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "parse loop vars error", "task", ctrlclient.ObjectKeyFromObject(options.Task))
|
||||
stderr = err.Error()
|
||||
return
|
||||
}
|
||||
|
|
@ -392,7 +402,7 @@ func (r *TaskReconciler) executeModule(ctx context.Context, task *kubekeyv1alpha
|
|||
LocationUID: string(task.UID),
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("[Task %s] get location variable error %v", ctrlclient.ObjectKeyFromObject(task), err)
|
||||
klog.ErrorS(err, "get location variable error", "task", ctrlclient.ObjectKeyFromObject(task))
|
||||
return "", err.Error()
|
||||
}
|
||||
|
||||
|
|
@ -400,7 +410,7 @@ func (r *TaskReconciler) executeModule(ctx context.Context, task *kubekeyv1alpha
|
|||
if len(task.Spec.FailedWhen) > 0 {
|
||||
ok, err := tmpl.ParseBool(lg.(variable.VariableData), task.Spec.FailedWhen)
|
||||
if err != nil {
|
||||
klog.Errorf("[Task %s] validate FailedWhen condition error %v", ctrlclient.ObjectKeyFromObject(task), err)
|
||||
klog.ErrorS(err, "validate FailedWhen condition error", "task", ctrlclient.ObjectKeyFromObject(task))
|
||||
return "", err.Error()
|
||||
}
|
||||
if ok {
|
||||
|
|
|
|||
|
|
@ -48,23 +48,23 @@ func MarshalPlaybook(baseFS fs.FS, pbPath string) (*kkcorev1.Playbook, error) {
|
|||
// convert playbook to kkcorev1.Playbook
|
||||
pb := &kkcorev1.Playbook{}
|
||||
if err := loadPlaybook(baseFS, pbPath, pb); err != nil {
|
||||
klog.Errorf(" load playbook with include %s failed: %v", pbPath, err)
|
||||
klog.ErrorS(err, "Load playbook failed", "playbook", pbPath)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// convertRoles
|
||||
if err := convertRoles(baseFS, pbPath, pb); err != nil {
|
||||
klog.Errorf("convertRoles error %v", err)
|
||||
klog.ErrorS(err, "ConvertRoles error", "playbook", pbPath)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := convertIncludeTasks(baseFS, pbPath, pb); err != nil {
|
||||
klog.Errorf("convertIncludeTasks error %v", err)
|
||||
klog.ErrorS(err, "ConvertIncludeTasks error", "playbook", pbPath)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := pb.Validate(); err != nil {
|
||||
klog.Errorf("validate playbook %s failed: %v", pbPath, err)
|
||||
klog.ErrorS(err, "Validate playbook failed", "playbook", pbPath)
|
||||
return nil, err
|
||||
}
|
||||
return pb, nil
|
||||
|
|
@ -75,12 +75,12 @@ func loadPlaybook(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
|
|||
// baseDir is the local ansible project dir which playbook belong to
|
||||
pbData, err := fs.ReadFile(baseFS, pbPath)
|
||||
if err != nil {
|
||||
klog.Errorf("read playbook %s failed: %v", pbPath, err)
|
||||
klog.ErrorS(err, "Read playbook failed", "playbook", pbPath)
|
||||
return err
|
||||
}
|
||||
var plays []kkcorev1.Play
|
||||
if err := yaml.Unmarshal(pbData, &plays); err != nil {
|
||||
klog.Errorf("unmarshal playbook %s failed: %v", pbPath, err)
|
||||
klog.ErrorS(err, "Unmarshal playbook failed", "playbook", pbPath)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -108,12 +108,12 @@ func loadPlaybook(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
|
|||
|
||||
rdata, err := fs.ReadFile(baseFS, mainTask)
|
||||
if err != nil {
|
||||
klog.Errorf("read role %s failed: %v", mainTask, err)
|
||||
klog.ErrorS(err, "Read role failed", "playbook", pbPath, "role", r.Role)
|
||||
return err
|
||||
}
|
||||
var blocks []kkcorev1.Block
|
||||
if err := yaml.Unmarshal(rdata, &blocks); err != nil {
|
||||
klog.Errorf("unmarshal role %s failed: %v", r.Role, err)
|
||||
klog.ErrorS(err, "Unmarshal role failed", "playbook", pbPath, "role", r.Role)
|
||||
return err
|
||||
}
|
||||
p.Roles[i].Block = blocks
|
||||
|
|
@ -141,12 +141,12 @@ func convertRoles(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
|
|||
|
||||
rdata, err := fs.ReadFile(baseFS, mainTask)
|
||||
if err != nil {
|
||||
klog.Errorf("read role %s failed: %v", mainTask, err)
|
||||
klog.ErrorS(err, "Read role failed", "playbook", pbPath, "role", r.Role)
|
||||
return err
|
||||
}
|
||||
var blocks []kkcorev1.Block
|
||||
if err := yaml.Unmarshal(rdata, &blocks); err != nil {
|
||||
klog.Errorf("unmarshal role %s failed: %v", r.Role, err)
|
||||
klog.ErrorS(err, "Unmarshal role failed", "playbook", pbPath, "role", r.Role)
|
||||
return err
|
||||
}
|
||||
p.Roles[i].Block = blocks
|
||||
|
|
@ -156,12 +156,12 @@ func convertRoles(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) error {
|
|||
if mainDefault != "" {
|
||||
mainData, err := fs.ReadFile(baseFS, mainDefault)
|
||||
if err != nil {
|
||||
klog.Errorf("read defaults variable for role %s error: %v", r.Role, err)
|
||||
klog.ErrorS(err, "Read defaults variable for role error", "playbook", pbPath, "role", r.Role)
|
||||
return err
|
||||
}
|
||||
var vars variable.VariableData
|
||||
if err := yaml.Unmarshal(mainData, &vars); err != nil {
|
||||
klog.Errorf("unmarshal defaults variable for role %s error: %v", r.Role, err)
|
||||
klog.ErrorS(err, "Unmarshal defaults variable for role error", "playbook", pbPath, "role", r.Role)
|
||||
return err
|
||||
}
|
||||
p.Roles[i].Vars = vars
|
||||
|
|
@ -177,18 +177,22 @@ func convertIncludeTasks(baseFS fs.FS, pbPath string, pb *kkcorev1.Playbook) err
|
|||
var pbBase = filepath.Dir(filepath.Dir(pbPath))
|
||||
for _, play := range pb.Play {
|
||||
if err := fileToBlock(baseFS, pbBase, play.PreTasks); err != nil {
|
||||
klog.ErrorS(err, "Convert pre_tasks error", "playbook", pbPath)
|
||||
return err
|
||||
}
|
||||
if err := fileToBlock(baseFS, pbBase, play.Tasks); err != nil {
|
||||
klog.ErrorS(err, "Convert tasks error", "playbook", pbPath)
|
||||
return err
|
||||
}
|
||||
if err := fileToBlock(baseFS, pbBase, play.PostTasks); err != nil {
|
||||
klog.ErrorS(err, "Convert post_tasks error", "playbook", pbPath)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, r := range play.Roles {
|
||||
roleBase := project.GetRoleBaseFromPlaybook(baseFS, pbPath, r.Role)
|
||||
if err := fileToBlock(baseFS, filepath.Join(roleBase, _const.ProjectRolesTasksDir), r.Block); err != nil {
|
||||
klog.ErrorS(err, "Convert role error", "playbook", pbPath, "role", r.Role)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -201,24 +205,27 @@ func fileToBlock(baseFS fs.FS, baseDir string, blocks []kkcorev1.Block) error {
|
|||
if b.IncludeTasks != "" {
|
||||
data, err := fs.ReadFile(baseFS, filepath.Join(baseDir, b.IncludeTasks))
|
||||
if err != nil {
|
||||
klog.Errorf("readFile %s error %v", filepath.Join(baseDir, b.IncludeTasks), err)
|
||||
klog.ErrorS(err, "Read includeTask file error", "name", b.Name, "file_path", filepath.Join(baseDir, b.IncludeTasks))
|
||||
return err
|
||||
}
|
||||
var bs []kkcorev1.Block
|
||||
if err := yaml.Unmarshal(data, &bs); err != nil {
|
||||
klog.Errorf("unmarshal data %s to []Block error %v", filepath.Join(baseDir, b.IncludeTasks), err)
|
||||
klog.ErrorS(err, "Unmarshal includeTask data error", "name", b.Name, "file_path", filepath.Join(baseDir, b.IncludeTasks))
|
||||
return err
|
||||
}
|
||||
b.Block = bs
|
||||
blocks[i] = b
|
||||
}
|
||||
if err := fileToBlock(baseFS, baseDir, b.Block); err != nil {
|
||||
klog.ErrorS(err, "Convert block error", "name", b.Name)
|
||||
return err
|
||||
}
|
||||
if err := fileToBlock(baseFS, baseDir, b.Rescue); err != nil {
|
||||
klog.ErrorS(err, "Convert rescue error", "name", b.Name)
|
||||
return err
|
||||
}
|
||||
if err := fileToBlock(baseFS, baseDir, b.Always); err != nil {
|
||||
klog.ErrorS(err, "Convert always error", "name", b.Name)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -247,7 +254,7 @@ func MarshalBlock(ctx context.Context, block kkcorev1.Block, owner ctrlclient.Ob
|
|||
task := &kubekeyv1alpha1.Task{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "Task",
|
||||
APIVersion: "kubekey.kubesphere.io/v1alpha1",
|
||||
APIVersion: kubekeyv1alpha1.SchemeGroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-%s", owner.GetName(), rand.String(12)),
|
||||
|
|
@ -273,10 +280,9 @@ func MarshalBlock(ctx context.Context, block kkcorev1.Block, owner ctrlclient.Ob
|
|||
Hosts: hosts,
|
||||
IgnoreError: block.IgnoreErrors,
|
||||
Retries: block.Retries,
|
||||
//Loop: block.Loop,
|
||||
When: when,
|
||||
FailedWhen: block.FailedWhen.Data,
|
||||
Register: block.Register,
|
||||
When: when,
|
||||
FailedWhen: block.FailedWhen.Data,
|
||||
Register: block.Register,
|
||||
},
|
||||
Status: kubekeyv1alpha1.TaskStatus{
|
||||
Phase: kubekeyv1alpha1.TaskPhasePending,
|
||||
|
|
@ -285,7 +291,7 @@ func MarshalBlock(ctx context.Context, block kkcorev1.Block, owner ctrlclient.Ob
|
|||
if len(block.Loop) != 0 {
|
||||
data, err := json.Marshal(block.Loop)
|
||||
if err != nil {
|
||||
klog.Errorf("marshal loop %v error: %v", block.Loop, err)
|
||||
klog.ErrorS(err, "Marshal loop failed", "task", task.Name, "block", block.Name)
|
||||
}
|
||||
task.Spec.Loop = runtime.RawExtension{Raw: data}
|
||||
}
|
||||
|
|
@ -313,7 +319,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) {
|
|||
if strings.HasSuffix(a.(string), "%") {
|
||||
b, err := strconv.Atoi(strings.TrimSuffix(a.(string), "%"))
|
||||
if err != nil {
|
||||
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
|
||||
klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string))
|
||||
return nil, err
|
||||
}
|
||||
if sp+int(math.Ceil(float64(len(hosts)*b)/100.0)) > len(hosts)-1 {
|
||||
|
|
@ -325,7 +331,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) {
|
|||
} else {
|
||||
b, err := strconv.Atoi(a.(string))
|
||||
if err != nil {
|
||||
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
|
||||
klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string))
|
||||
return nil, err
|
||||
}
|
||||
if sp+b > len(hosts)-1 {
|
||||
|
|
@ -355,7 +361,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) {
|
|||
if strings.HasSuffix(a.(string), "%") {
|
||||
b, err := strconv.Atoi(strings.TrimSuffix(a.(string), "%"))
|
||||
if err != nil {
|
||||
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
|
||||
klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string))
|
||||
return nil, err
|
||||
}
|
||||
if sp+int(math.Ceil(float64(len(hosts)*b)/100.0)) > len(hosts)-1 {
|
||||
|
|
@ -367,7 +373,7 @@ func GroupHostBySerial(hosts []string, serial []any) ([][]string, error) {
|
|||
} else {
|
||||
b, err := strconv.Atoi(a.(string))
|
||||
if err != nil {
|
||||
klog.Errorf("convert serial %s to int failed: %v", a.(string), err)
|
||||
klog.ErrorS(err, "Convert serial to int failed", "serial", a.(string))
|
||||
return nil, err
|
||||
}
|
||||
if sp+b > len(hosts)-1 {
|
||||
|
|
|
|||
|
|
@ -32,12 +32,12 @@ func ParseBool(v variable.VariableData, inputs []string) (bool, error) {
|
|||
// first convert.
|
||||
intql, err := pongo2.FromString(input)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get string: %v", err)
|
||||
klog.ErrorS(err, "Failed to get string")
|
||||
return false, err
|
||||
}
|
||||
inres, err := intql.Execute(pongo2.Context(v))
|
||||
if err != nil {
|
||||
klog.Errorf("failed to execute string: %v", err)
|
||||
klog.ErrorS(err, "Failed to execute string")
|
||||
return false, err
|
||||
}
|
||||
|
||||
|
|
@ -48,15 +48,15 @@ func ParseBool(v variable.VariableData, inputs []string) (bool, error) {
|
|||
// second convert.
|
||||
tql, err := pongo2.FromString(inres)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get string: %v", err)
|
||||
klog.ErrorS(err, "failed to get string")
|
||||
return false, err
|
||||
}
|
||||
result, err := tql.Execute(pongo2.Context(v))
|
||||
if err != nil {
|
||||
klog.Errorf("failed to execute string: %v", err)
|
||||
klog.ErrorS(err, "failed to execute string")
|
||||
return false, err
|
||||
}
|
||||
klog.V(4).Infof("the template parse result: %s", result)
|
||||
klog.V(4).InfoS(" parse template succeed", "result", result)
|
||||
if result != "True" {
|
||||
return false, nil
|
||||
}
|
||||
|
|
@ -68,29 +68,29 @@ func ParseBool(v variable.VariableData, inputs []string) (bool, error) {
|
|||
func ParseString(v variable.VariableData, input string) (string, error) {
|
||||
tql, err := pongo2.FromString(input)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get string: %v", err)
|
||||
klog.ErrorS(err, "Failed to get string")
|
||||
return input, err
|
||||
}
|
||||
result, err := tql.Execute(pongo2.Context(v))
|
||||
if err != nil {
|
||||
klog.Errorf("failed to execute string: %v", err)
|
||||
klog.ErrorS(err, "Failed to execute string")
|
||||
return input, err
|
||||
}
|
||||
klog.V(4).Infof("the template parse result: %s", result)
|
||||
klog.V(4).InfoS(" parse template succeed", "result", result)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func ParseFile(v variable.VariableData, file []byte) (string, error) {
|
||||
tql, err := pongo2.FromBytes(file)
|
||||
if err != nil {
|
||||
klog.Errorf("transfer file to pongo2 template error %v", err)
|
||||
klog.ErrorS(err, "Transfer file to template error")
|
||||
return "", err
|
||||
}
|
||||
result, err := tql.Execute(pongo2.Context(v))
|
||||
if err != nil {
|
||||
klog.Errorf("exec pongo2 template error %v", err)
|
||||
klog.ErrorS(err, "exec template error")
|
||||
return "", err
|
||||
}
|
||||
klog.V(4).Infof("the template file: %s", result)
|
||||
klog.V(4).InfoS(" parse template succeed", "result", result)
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,10 +29,10 @@ import (
|
|||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/cache"
|
||||
_const "github.com/kubesphere/kubekey/v4/pkg/const"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/controllers"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/task"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/variable"
|
||||
)
|
||||
|
||||
type commandManager struct {
|
||||
|
|
@ -46,45 +46,47 @@ type commandManager struct {
|
|||
func (m *commandManager) Run(ctx context.Context) error {
|
||||
// create config, inventory and pipeline
|
||||
klog.Infof("[Pipeline %s] start", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
defer klog.Infof("[Pipeline %s] finish", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
|
||||
if err := m.Client.Create(ctx, m.Config); err != nil {
|
||||
klog.Errorf("[Pipeline %s] create config error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
|
||||
klog.ErrorS(err, "Create config error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
return err
|
||||
}
|
||||
if err := m.Client.Create(ctx, m.Inventory); err != nil {
|
||||
klog.Errorf("[Pipeline %s] create inventory error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
|
||||
klog.ErrorS(err, "Create inventory error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
return err
|
||||
}
|
||||
if err := m.Client.Create(ctx, m.Pipeline); err != nil {
|
||||
klog.Errorf("[Pipeline %s] create pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
|
||||
klog.ErrorS(err, "Create pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// update pipeline status
|
||||
if err := m.Client.Update(ctx, m.Pipeline); err != nil {
|
||||
klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
|
||||
klog.ErrorS(err, "Update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
}
|
||||
|
||||
klog.Infof("[Pipeline %s] finish", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
if !m.Pipeline.Spec.Debug && m.Pipeline.Status.Phase == kubekeyv1.PipelinePhaseSucceed {
|
||||
klog.Infof("[Pipeline %s] clean runtime directory", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
// clean runtime directory
|
||||
if err := os.RemoveAll(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir)); err != nil {
|
||||
klog.Errorf("clean runtime directory %s error: %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), err)
|
||||
klog.ErrorS(err, "Clean runtime directory error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline), "runtime_dir", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
klog.Infof("[Pipeline %s] start task controller", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
kd, err := task.NewController(task.ControllerOptions{
|
||||
Client: m.Client,
|
||||
VariableCache: variable.Cache,
|
||||
Client: m.Client,
|
||||
TaskReconciler: &controllers.TaskReconciler{
|
||||
Client: m.Client,
|
||||
VariableCache: cache.LocalVariable,
|
||||
VariableCache: variable.Cache,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("[Pipeline %s] create task controller error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
|
||||
klog.ErrorS(err, "Create task controller error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
m.Pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed
|
||||
m.Pipeline.Status.Reason = fmt.Sprintf("create task controller failed: %v", err)
|
||||
return err
|
||||
|
|
@ -94,20 +96,23 @@ func (m *commandManager) Run(ctx context.Context) error {
|
|||
if err := kd.AddTasks(ctx, task.AddTaskOptions{
|
||||
Pipeline: m.Pipeline,
|
||||
}); err != nil {
|
||||
klog.Errorf("[Pipeline %s] add task error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
|
||||
klog.ErrorS(err, "Add task error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
m.Pipeline.Status.Phase = kubekeyv1.PipelinePhaseFailed
|
||||
m.Pipeline.Status.Reason = fmt.Sprintf("add task to controller failed: %v", err)
|
||||
return err
|
||||
}
|
||||
// update pipeline status
|
||||
if err := m.Client.Update(ctx, m.Pipeline); err != nil {
|
||||
klog.Errorf("[Pipeline %s] update pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
|
||||
klog.ErrorS(err, "Update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
return err
|
||||
}
|
||||
klog.Infof("[Pipeline %s] start deal task total %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), m.Pipeline.Status.TaskResult.Total)
|
||||
go kd.Start(ctx)
|
||||
|
||||
_ = wait.PollUntilContextCancel(ctx, time.Millisecond*100, false, func(ctx context.Context) (done bool, err error) {
|
||||
if err := m.Client.Get(ctx, ctrlclient.ObjectKeyFromObject(m.Pipeline), m.Pipeline); err != nil {
|
||||
klog.Errorf("[Pipeline %s] get pipeline error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
|
||||
klog.ErrorS(err, "Get pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
return false, nil
|
||||
}
|
||||
if m.Pipeline.Status.Phase == kubekeyv1.PipelinePhaseFailed || m.Pipeline.Status.Phase == kubekeyv1.PipelinePhaseSucceed {
|
||||
return true, nil
|
||||
|
|
@ -116,10 +121,9 @@ func (m *commandManager) Run(ctx context.Context) error {
|
|||
})
|
||||
// kill by signal
|
||||
if err := syscall.Kill(os.Getpid(), syscall.SIGTERM); err != nil {
|
||||
klog.Errorf("[Pipeline %s] manager terminated error: %v", ctrlclient.ObjectKeyFromObject(m.Pipeline), err)
|
||||
klog.ErrorS(err, "Kill process error", "pipeline", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
return err
|
||||
}
|
||||
klog.Infof("[Pipeline %s] task finish", ctrlclient.ObjectKeyFromObject(m.Pipeline))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,9 +28,10 @@ import (
|
|||
ctrlmanager "sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
|
||||
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/cache"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/controllers"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/proxy"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/task"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/variable"
|
||||
)
|
||||
|
||||
type controllerManager struct {
|
||||
|
|
@ -44,43 +45,48 @@ func (c controllerManager) Run(ctx context.Context) error {
|
|||
scheme := runtime.NewScheme()
|
||||
// add default scheme
|
||||
if err := clientgoscheme.AddToScheme(scheme); err != nil {
|
||||
klog.Errorf("add default scheme error: %v", err)
|
||||
klog.ErrorS(err, "Add default scheme error")
|
||||
return err
|
||||
}
|
||||
// add kubekey scheme
|
||||
// add kubekey scheme,
|
||||
// exclude task resource,Because will manager in local
|
||||
if err := kubekeyv1.AddToScheme(scheme); err != nil {
|
||||
klog.Errorf("add kubekey scheme error: %v", err)
|
||||
klog.ErrorS(err, "Add kk scheme error")
|
||||
return err
|
||||
}
|
||||
|
||||
mgr, err := ctrl.NewManager(config.GetConfigOrDie(), ctrlmanager.Options{
|
||||
Scheme: scheme,
|
||||
LeaderElection: c.LeaderElection,
|
||||
LeaderElectionID: "controller-leader-election-kk",
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("create manager error: %v", err)
|
||||
klog.ErrorS(err, "Create manager error")
|
||||
return err
|
||||
}
|
||||
|
||||
taskController, err := task.NewController(task.ControllerOptions{
|
||||
VariableCache: variable.Cache,
|
||||
MaxConcurrent: c.MaxConcurrentReconciles,
|
||||
Client: cache.NewDelegatingClient(mgr.GetClient()),
|
||||
Client: proxy.NewDelegatingClient(mgr.GetClient()),
|
||||
TaskReconciler: &controllers.TaskReconciler{
|
||||
Client: cache.NewDelegatingClient(mgr.GetClient()),
|
||||
VariableCache: cache.LocalVariable,
|
||||
Client: proxy.NewDelegatingClient(mgr.GetClient()),
|
||||
VariableCache: variable.Cache,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("create task controller error: %v", err)
|
||||
klog.ErrorS(err, "Create task controller error")
|
||||
return err
|
||||
}
|
||||
|
||||
// add task controller to manager
|
||||
if err := mgr.Add(taskController); err != nil {
|
||||
klog.Errorf("add task controller error: %v", err)
|
||||
klog.ErrorS(err, "Add task controller error")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := (&controllers.PipelineReconciler{
|
||||
Client: cache.NewDelegatingClient(mgr.GetClient()),
|
||||
Client: proxy.NewDelegatingClient(mgr.GetClient()),
|
||||
EventRecorder: mgr.GetEventRecorderFor("pipeline"),
|
||||
TaskController: taskController,
|
||||
}).SetupWithManager(ctx, mgr, controllers.Options{
|
||||
|
|
@ -89,7 +95,7 @@ func (c controllerManager) Run(ctx context.Context) error {
|
|||
MaxConcurrentReconciles: c.MaxConcurrentReconciles,
|
||||
},
|
||||
}); err != nil {
|
||||
klog.Errorf("create pipeline controller error: %v", err)
|
||||
klog.ErrorS(err, "create pipeline controller error")
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import (
|
|||
"context"
|
||||
|
||||
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/cache"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/proxy"
|
||||
)
|
||||
|
||||
// Manager shared dependencies such as Addr and , and provides them to Runnable.
|
||||
|
|
@ -40,7 +40,7 @@ func NewCommandManager(o CommandManagerOptions) Manager {
|
|||
Pipeline: o.Pipeline,
|
||||
Config: o.Config,
|
||||
Inventory: o.Inventory,
|
||||
Client: cache.NewDelegatingClient(nil),
|
||||
Client: proxy.NewDelegatingClient(nil),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -62,13 +62,13 @@ func ModuleAssert(ctx context.Context, options ExecOptions) (string, string) {
|
|||
return "False", r
|
||||
}
|
||||
}
|
||||
//if v := variable.StringVar(args, "msg"); v != nil {
|
||||
// if r, err := tmpl.ParseString(lg.(variable.VariableData), *v); err != nil {
|
||||
// return "", err.Error()
|
||||
// } else {
|
||||
// return "False", r
|
||||
// }
|
||||
//}
|
||||
if v := variable.StringVar(args, "msg"); v != nil {
|
||||
if r, err := tmpl.ParseString(lg.(variable.VariableData), *v); err != nil {
|
||||
return "", err.Error()
|
||||
} else {
|
||||
return "False", r
|
||||
}
|
||||
}
|
||||
return "False", "False"
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ func ModuleCommand(ctx context.Context, options ExecOptions) (string, string) {
|
|||
conn = connector.NewConnector(options.Host, ha.(variable.VariableData))
|
||||
}
|
||||
if err := conn.Init(ctx); err != nil {
|
||||
klog.Errorf("failed to init connector %v", err)
|
||||
klog.ErrorS(err, "failed to init connector")
|
||||
return "", err.Error()
|
||||
}
|
||||
defer conn.Close(ctx)
|
||||
|
|
|
|||
|
|
@ -49,12 +49,12 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
|
|||
LocationUID: string(options.Task.UID),
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get location vars %v", err)
|
||||
klog.ErrorS(err, "failed to get location vars")
|
||||
return "", err.Error()
|
||||
}
|
||||
destStr, err := tmpl.ParseString(lv.(variable.VariableData), *dest)
|
||||
if err != nil {
|
||||
klog.Errorf("template parse src %s error: %v", *dest, err)
|
||||
klog.ErrorS(err, "template parse dest error")
|
||||
return "", err.Error()
|
||||
}
|
||||
|
||||
|
|
@ -65,13 +65,13 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
|
|||
// get connector
|
||||
ha, err := options.Variable.Get(variable.HostVars{HostName: options.Host})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get host vars %v", err)
|
||||
klog.ErrorS(err, "failed to get host vars")
|
||||
return "", err.Error()
|
||||
}
|
||||
conn = connector.NewConnector(options.Host, ha.(variable.VariableData))
|
||||
}
|
||||
if err := conn.Init(ctx); err != nil {
|
||||
klog.Errorf("failed to init connector %v", err)
|
||||
klog.ErrorS(err, "failed to init connector")
|
||||
return "", err.Error()
|
||||
}
|
||||
defer conn.Close(ctx)
|
||||
|
|
@ -80,7 +80,7 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
|
|||
// convert src
|
||||
srcStr, err := tmpl.ParseString(lv.(variable.VariableData), *src)
|
||||
if err != nil {
|
||||
klog.Errorf("template parse src %s error: %v", *src, err)
|
||||
klog.ErrorS(err, "template parse src error")
|
||||
return "", err.Error()
|
||||
}
|
||||
var baseFS fs.FS
|
||||
|
|
@ -89,7 +89,7 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
|
|||
} else {
|
||||
projectFs, err := project.New(project.Options{Pipeline: &options.Pipeline}).FS(ctx, false)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get project fs %v", err)
|
||||
klog.ErrorS(err, "failed to get project fs")
|
||||
return "", err.Error()
|
||||
}
|
||||
baseFS = projectFs
|
||||
|
|
@ -98,17 +98,19 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
|
|||
flPath := project.GetFilesFromPlayBook(baseFS, options.Pipeline.Spec.Playbook, roleName, srcStr)
|
||||
fileInfo, err := fs.Stat(baseFS, flPath)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get src file in local %v", err)
|
||||
klog.ErrorS(err, "failed to get src file in local")
|
||||
return "", err.Error()
|
||||
}
|
||||
if fileInfo.IsDir() {
|
||||
// src is dir
|
||||
if err := fs.WalkDir(baseFS, flPath, func(path string, info fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to walk dir")
|
||||
return err
|
||||
}
|
||||
rel, err := filepath.Rel(srcStr, path)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get relative path")
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
|
|
@ -116,6 +118,7 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
|
|||
}
|
||||
fi, err := info.Info()
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get file info")
|
||||
return err
|
||||
}
|
||||
mode := fi.Mode()
|
||||
|
|
@ -124,27 +127,30 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
|
|||
}
|
||||
data, err := fs.ReadFile(baseFS, rel)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to read file")
|
||||
return err
|
||||
}
|
||||
if err := conn.CopyFile(ctx, data, filepath.Join(destStr, rel), mode); err != nil {
|
||||
klog.ErrorS(err, "failed to copy file", "src", srcStr, "dest", destStr)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
klog.ErrorS(err, "failed to walk dir")
|
||||
return "", err.Error()
|
||||
}
|
||||
} else {
|
||||
// src is file
|
||||
data, err := fs.ReadFile(baseFS, flPath)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to read file %v", err)
|
||||
klog.ErrorS(err, "failed to read file")
|
||||
return "", err.Error()
|
||||
}
|
||||
if strings.HasSuffix(destStr, "/") {
|
||||
destStr = destStr + filepath.Base(srcStr)
|
||||
}
|
||||
if err := conn.CopyFile(ctx, data, destStr, fileInfo.Mode()); err != nil {
|
||||
klog.Errorf("failed to copy file %v", err)
|
||||
klog.ErrorS(err, "failed to copy file", "src", srcStr, "dest", destStr)
|
||||
return "", err.Error()
|
||||
}
|
||||
return "success", ""
|
||||
|
|
|
|||
|
|
@ -34,11 +34,12 @@ func ModuleDebug(ctx context.Context, options ExecOptions) (string, string) {
|
|||
LocationUID: string(options.Task.UID),
|
||||
})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to get location vars")
|
||||
return "", err.Error()
|
||||
}
|
||||
result, err := tmpl.ParseString(lg.(variable.VariableData), fmt.Sprintf("{{ %s }}", *v))
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get var %v", err)
|
||||
klog.ErrorS(err, "Failed to get var")
|
||||
return "", err.Error()
|
||||
}
|
||||
return result, ""
|
||||
|
|
@ -50,11 +51,12 @@ func ModuleDebug(ctx context.Context, options ExecOptions) (string, string) {
|
|||
LocationUID: string(options.Task.UID),
|
||||
})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to get location vars")
|
||||
return "", err.Error()
|
||||
}
|
||||
result, err := tmpl.ParseString(lg.(variable.VariableData), *v)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get var %v", err)
|
||||
klog.ErrorS(err, "Failed to get var")
|
||||
return "", err.Error()
|
||||
}
|
||||
return result, ""
|
||||
|
|
|
|||
|
|
@ -1,57 +0,0 @@
|
|||
/*
|
||||
Copyright 2023 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package modules
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
|
||||
_const "github.com/kubesphere/kubekey/v4/pkg/const"
|
||||
)
|
||||
|
||||
// findPath from roles/roleSub dir, playbook dir or current dir
|
||||
func findPath(fs fs.FS, roleSub string, src *string, options ExecOptions) *string {
|
||||
if src == nil || filepath.IsAbs(*src) {
|
||||
return src
|
||||
}
|
||||
|
||||
findFile := []string{}
|
||||
absPlaybook := options.Pipeline.Spec.Playbook
|
||||
if !filepath.IsAbs(absPlaybook) {
|
||||
absPlaybook = filepath.Join(_const.GetWorkDir(), _const.ProjectDir, options.Pipeline.Spec.Project.Name, absPlaybook)
|
||||
}
|
||||
baseDir := filepath.Dir(filepath.Dir(absPlaybook))
|
||||
// findFile from roles/files
|
||||
if role := options.Task.Annotations[kubekeyv1alpha1.TaskAnnotationRole]; role != "" {
|
||||
findFile = append(findFile, filepath.Join(baseDir, _const.ProjectRolesDir, role, roleSub, *src))
|
||||
}
|
||||
// find from playbook dir
|
||||
findFile = append(findFile, filepath.Join(filepath.Dir(absPlaybook), *src))
|
||||
// find from current dir
|
||||
if dir, err := os.Getwd(); err == nil {
|
||||
findFile = append(findFile, filepath.Join(dir, *src))
|
||||
}
|
||||
|
||||
for _, s := range findFile {
|
||||
if _, err := os.Stat(s); err == nil {
|
||||
return &s
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -21,7 +21,6 @@ import (
|
|||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
|
||||
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
|
||||
|
|
@ -47,7 +46,6 @@ var module = make(map[string]ModuleExecFunc)
|
|||
|
||||
func RegisterModule(moduleName string, exec ModuleExecFunc) error {
|
||||
if _, ok := module[moduleName]; ok {
|
||||
klog.Errorf("module %s is exist", moduleName)
|
||||
return fmt.Errorf("module %s is exist", moduleName)
|
||||
}
|
||||
module[moduleName] = exec
|
||||
|
|
|
|||
|
|
@ -29,6 +29,10 @@ type testVariable struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (v testVariable) Key() string {
|
||||
return "testModule"
|
||||
}
|
||||
|
||||
func (v testVariable) Get(option variable.GetOption) (any, error) {
|
||||
return v.value, v.err
|
||||
}
|
||||
|
|
@ -32,7 +32,7 @@ func ModuleSetFact(ctx context.Context, options ExecOptions) (string, string) {
|
|||
LocationUID: string(options.Task.UID),
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get location vars %v", err)
|
||||
klog.ErrorS(err, "failed to get location vars")
|
||||
return "", err.Error()
|
||||
}
|
||||
|
||||
|
|
@ -42,7 +42,7 @@ func ModuleSetFact(ctx context.Context, options ExecOptions) (string, string) {
|
|||
case string:
|
||||
factVars[k], err = tmpl.ParseString(lv.(variable.VariableData), v.(string))
|
||||
if err != nil {
|
||||
klog.Errorf("template parse %s error: %v", v.(string), err)
|
||||
klog.ErrorS(err, "template parse error", "input", v)
|
||||
return "", err.Error()
|
||||
}
|
||||
default:
|
||||
|
|
@ -55,7 +55,7 @@ func ModuleSetFact(ctx context.Context, options ExecOptions) (string, string) {
|
|||
LocationUID: "",
|
||||
Data: factVars,
|
||||
}); err != nil {
|
||||
klog.Errorf("merge fact error: %v", err)
|
||||
klog.ErrorS(err, "merge fact error")
|
||||
return "", err.Error()
|
||||
}
|
||||
return "success", ""
|
||||
|
|
|
|||
|
|
@ -48,17 +48,17 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
|
|||
LocationUID: string(options.Task.UID),
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get location vars %v", err)
|
||||
klog.ErrorS(err, "failed to get location vars")
|
||||
return "", err.Error()
|
||||
}
|
||||
srcStr, err := tmpl.ParseString(lv.(variable.VariableData), *src)
|
||||
if err != nil {
|
||||
klog.Errorf("template parse src %s error: %v", *src, err)
|
||||
klog.ErrorS(err, "template parse src error", "input", *src)
|
||||
return "", err.Error()
|
||||
}
|
||||
destStr, err := tmpl.ParseString(lv.(variable.VariableData), *dest)
|
||||
if err != nil {
|
||||
klog.Errorf("template parse src %s error: %v", *dest, err)
|
||||
klog.ErrorS(err, "template parse dest error", "input", *dest)
|
||||
return "", err.Error()
|
||||
}
|
||||
|
||||
|
|
@ -68,7 +68,7 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
|
|||
} else {
|
||||
projectFs, err := project.New(project.Options{Pipeline: &options.Pipeline}).FS(ctx, false)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get project fs %v", err)
|
||||
klog.ErrorS(err, "failed to get project fs")
|
||||
return "", err.Error()
|
||||
}
|
||||
baseFS = projectFs
|
||||
|
|
@ -76,7 +76,7 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
|
|||
roleName := options.Task.Annotations[kubekeyv1alpha1.TaskAnnotationRole]
|
||||
flPath := project.GetTemplatesFromPlayBook(baseFS, options.Pipeline.Spec.Playbook, roleName, srcStr)
|
||||
if _, err := fs.Stat(baseFS, flPath); err != nil {
|
||||
klog.Errorf("find src error %v", err)
|
||||
klog.ErrorS(err, "find src error")
|
||||
return "", err.Error()
|
||||
}
|
||||
|
||||
|
|
@ -87,13 +87,13 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
|
|||
// get connector
|
||||
ha, err := options.Variable.Get(variable.HostVars{HostName: options.Host})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get host %v", err)
|
||||
klog.ErrorS(err, "failed to get host vars")
|
||||
return "", err.Error()
|
||||
}
|
||||
conn = connector.NewConnector(options.Host, ha.(variable.VariableData))
|
||||
}
|
||||
if err := conn.Init(ctx); err != nil {
|
||||
klog.Errorf("failed to init connector %v", err)
|
||||
klog.ErrorS(err, "failed to init connector")
|
||||
return "", err.Error()
|
||||
}
|
||||
defer conn.Close(ctx)
|
||||
|
|
@ -104,15 +104,18 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
|
|||
LocationUID: string(options.Task.UID),
|
||||
})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get location vars")
|
||||
return "", err.Error()
|
||||
}
|
||||
|
||||
data, err := fs.ReadFile(baseFS, flPath)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to read src file", "file_path", flPath)
|
||||
return "", err.Error()
|
||||
}
|
||||
result, err := tmpl.ParseFile(lg.(variable.VariableData), data)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to parse file", "file_path", flPath)
|
||||
return "", err.Error()
|
||||
}
|
||||
|
||||
|
|
@ -122,6 +125,7 @@ func ModuleTemplate(ctx context.Context, options ExecOptions) (string, string) {
|
|||
mode = fs.FileMode(*v)
|
||||
}
|
||||
if err := conn.CopyFile(ctx, []byte(result), destStr, mode); err != nil {
|
||||
klog.ErrorS(err, "failed to copy file", "src", flPath, "dest", destStr)
|
||||
return "", err.Error()
|
||||
}
|
||||
return "success", ""
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ func (r gitProject) FS(ctx context.Context, update bool) (fs.FS, error) {
|
|||
return os.DirFS(r.localDir), nil
|
||||
}
|
||||
if err := r.init(ctx); err != nil {
|
||||
klog.ErrorS(err, "Init git project error", "project_addr", r.Pipeline.Spec.Project.Addr)
|
||||
return nil, err
|
||||
}
|
||||
return os.DirFS(r.localDir), nil
|
||||
|
|
@ -74,12 +75,12 @@ func (r gitProject) gitClone(ctx context.Context) error {
|
|||
func (r gitProject) gitPull(ctx context.Context) error {
|
||||
open, err := git.PlainOpen(r.localDir)
|
||||
if err != nil {
|
||||
klog.Errorf("git open local %s error: %v", r.localDir, err)
|
||||
klog.ErrorS(err, "git open error", "local_dir", r.localDir)
|
||||
return err
|
||||
}
|
||||
wt, err := open.Worktree()
|
||||
if err != nil {
|
||||
klog.Errorf("git open worktree error: %v", err)
|
||||
klog.ErrorS(err, "git open worktree error", "local_dir", r.localDir)
|
||||
return err
|
||||
}
|
||||
if err := wt.PullContext(ctx, &git.PullOptions{
|
||||
|
|
@ -89,7 +90,7 @@ func (r gitProject) gitPull(ctx context.Context) error {
|
|||
Auth: &http.TokenAuth{r.Pipeline.Spec.Project.Token},
|
||||
InsecureSkipTLS: false,
|
||||
}); err != nil && err != git.NoErrAlreadyUpToDate {
|
||||
klog.Errorf("pull project %s failed: %v", r.Pipeline.Spec.Project.Addr, err)
|
||||
klog.ErrorS(err, "git pull error", "local_dir", r.localDir)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -22,15 +22,11 @@ import (
|
|||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/klog/v2"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
|
|
@ -49,7 +45,7 @@ type delegatingClient struct {
|
|||
func NewDelegatingClient(client ctrlclient.Client) ctrlclient.Client {
|
||||
scheme := runtime.NewScheme()
|
||||
if err := kubekeyv1.AddToScheme(scheme); err != nil {
|
||||
klog.Errorf("failed to add scheme: %v", err)
|
||||
klog.ErrorS(err, "failed to add scheme", "gv", kubekeyv1.SchemeGroupVersion)
|
||||
}
|
||||
kubekeyv1.SchemeBuilder.Register(&kubekeyv1alpha1.Task{}, &kubekeyv1alpha1.TaskList{})
|
||||
return &delegatingClient{
|
||||
|
|
@ -70,11 +66,11 @@ func (d delegatingClient) Get(ctx context.Context, key ctrlclient.ObjectKey, obj
|
|||
path := filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, key.Namespace, resource, key.Name, key.Name+".yaml")
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to read yaml file: %v", err)
|
||||
klog.ErrorS(err, "failed to read yaml file", "path", path)
|
||||
return err
|
||||
}
|
||||
if err := yaml.Unmarshal(data, obj); err != nil {
|
||||
klog.Errorf("unmarshal file %s error %v", path, err)
|
||||
klog.ErrorS(err, "failed to unmarshal yaml file", "path", path)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -92,7 +88,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList,
|
|||
var objects []runtime.Object
|
||||
runtimeDirEntries, err := os.ReadDir(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir))
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
klog.Errorf("readDir %s error %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir), err)
|
||||
klog.ErrorS(err, "failed to read dir", "path", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir))
|
||||
return err
|
||||
}
|
||||
for _, re := range runtimeDirEntries {
|
||||
|
|
@ -103,7 +99,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList,
|
|||
if os.IsNotExist(err) {
|
||||
continue
|
||||
}
|
||||
klog.Errorf("readDir %s error %v", resourceDir, err)
|
||||
klog.ErrorS(err, "failed to read dir", "path", resourceDir)
|
||||
return err
|
||||
}
|
||||
for _, e := range entries {
|
||||
|
|
@ -116,7 +112,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList,
|
|||
if os.IsNotExist(err) {
|
||||
continue
|
||||
}
|
||||
klog.Errorf("read file %s error: %v", resourceFile, err)
|
||||
klog.ErrorS(err, "failed to read file", "path", resourceFile)
|
||||
return err
|
||||
}
|
||||
var obj runtime.Object
|
||||
|
|
@ -131,7 +127,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList,
|
|||
obj = &kubekeyv1alpha1.Task{}
|
||||
}
|
||||
if err := yaml.Unmarshal(data, &obj); err != nil {
|
||||
klog.Errorf("unmarshal file %s error: %v", resourceFile, err)
|
||||
klog.ErrorS(err, "failed to unmarshal yaml file", "path", resourceFile)
|
||||
return err
|
||||
}
|
||||
objects = append(objects, obj)
|
||||
|
|
@ -152,6 +148,7 @@ func (d delegatingClient) List(ctx context.Context, list ctrlclient.ObjectList,
|
|||
}
|
||||
|
||||
if err := apimeta.SetList(list, objects); err != nil {
|
||||
klog.ErrorS(err, "failed to set list")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -168,11 +165,11 @@ func (d delegatingClient) Create(ctx context.Context, obj ctrlclient.Object, opt
|
|||
|
||||
data, err := yaml.Marshal(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to marshal object: %v", err)
|
||||
klog.ErrorS(err, "failed to marshal object")
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName()), fs.ModePerm); err != nil {
|
||||
klog.Errorf("create dir %s error: %v", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName()), err)
|
||||
klog.ErrorS(err, "failed to create dir", "path", filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName()))
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm)
|
||||
|
|
@ -218,16 +215,16 @@ func (d delegatingClient) Patch(ctx context.Context, obj ctrlclient.Object, patc
|
|||
|
||||
patchData, err := patch.Data(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get patch data: %v", err)
|
||||
klog.ErrorS(err, "failed to get patch data")
|
||||
return err
|
||||
}
|
||||
if len(patchData) == 0 {
|
||||
klog.V(4).Infof("nothing to patch, skip")
|
||||
klog.V(4).Info("nothing to patch, skip")
|
||||
return nil
|
||||
}
|
||||
data, err := yaml.Marshal(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to marshal object: %v", err)
|
||||
klog.ErrorS(err, "failed to marshal object")
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm)
|
||||
|
|
@ -301,7 +298,7 @@ func (d delegatingSubResourceWriter) Create(ctx context.Context, obj ctrlclient.
|
|||
|
||||
data, err := yaml.Marshal(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to marshal object: %v", err)
|
||||
klog.ErrorS(err, "failed to marshal object")
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm)
|
||||
|
|
@ -319,7 +316,7 @@ func (d delegatingSubResourceWriter) Update(ctx context.Context, obj ctrlclient.
|
|||
|
||||
data, err := yaml.Marshal(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to marshal object: %v", err)
|
||||
klog.ErrorS(err, "failed to marshal object")
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm)
|
||||
|
|
@ -336,51 +333,17 @@ func (d delegatingSubResourceWriter) Patch(ctx context.Context, obj ctrlclient.O
|
|||
|
||||
patchData, err := patch.Data(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get patch data: %v", err)
|
||||
klog.ErrorS(err, "failed to get patch data")
|
||||
return err
|
||||
}
|
||||
if len(patchData) == 0 {
|
||||
klog.V(4).Infof("nothing to patch, skip")
|
||||
klog.V(4).Info("nothing to patch, skip")
|
||||
return nil
|
||||
}
|
||||
data, err := yaml.Marshal(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to marshal object: %v", err)
|
||||
klog.ErrorS(err, "failed to marshal object")
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(filepath.Join(_const.GetWorkDir(), _const.RuntimeDir, obj.GetNamespace(), resource, obj.GetName(), obj.GetName()+".yaml"), data, fs.ModePerm)
|
||||
}
|
||||
|
||||
func getPatchedJSON(patchType types.PatchType, originalJS, patchJS []byte, gvk schema.GroupVersionKind, creater runtime.ObjectCreater) ([]byte, error) {
|
||||
switch patchType {
|
||||
case types.JSONPatchType:
|
||||
patchObj, err := jsonpatch.DecodePatch(patchJS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bytes, err := patchObj.Apply(originalJS)
|
||||
// TODO: This is pretty hacky, we need a better structured error from the json-patch
|
||||
if err != nil && strings.Contains(err.Error(), "doc is missing key") {
|
||||
msg := err.Error()
|
||||
ix := strings.Index(msg, "key:")
|
||||
key := msg[ix+5:]
|
||||
return bytes, fmt.Errorf("Object to be patched is missing field (%s)", key)
|
||||
}
|
||||
return bytes, err
|
||||
|
||||
case types.MergePatchType:
|
||||
return jsonpatch.MergePatch(originalJS, patchJS)
|
||||
|
||||
case types.StrategicMergePatchType:
|
||||
// get a typed object for this GVK if we need to apply a strategic merge patch
|
||||
obj, err := creater.New(gvk)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot apply strategic merge patch for %s locally, try --type merge", gvk.String())
|
||||
}
|
||||
return strategicpatch.StrategicMergePatch(originalJS, patchJS, obj)
|
||||
|
||||
default:
|
||||
// only here as a safety net - go-restful filters content-type
|
||||
return nil, fmt.Errorf("unknown Content-Type header for patch: %v", patchType)
|
||||
}
|
||||
}
|
||||
|
|
@ -20,12 +20,13 @@ import (
|
|||
"context"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
cgcache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
|
||||
kubekeyv1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/cache"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/proxy"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/variable"
|
||||
)
|
||||
|
||||
|
|
@ -47,6 +48,7 @@ type ControllerOptions struct {
|
|||
MaxConcurrent int
|
||||
ctrlclient.Client
|
||||
TaskReconciler reconcile.Reconciler
|
||||
VariableCache cgcache.Store
|
||||
}
|
||||
|
||||
func NewController(o ControllerOptions) (Controller, error) {
|
||||
|
|
@ -54,7 +56,7 @@ func NewController(o ControllerOptions) (Controller, error) {
|
|||
o.MaxConcurrent = 1
|
||||
}
|
||||
if o.Client == nil {
|
||||
o.Client = cache.NewDelegatingClient(nil)
|
||||
o.Client = proxy.NewDelegatingClient(nil)
|
||||
}
|
||||
|
||||
return &taskController{
|
||||
|
|
@ -62,5 +64,6 @@ func NewController(o ControllerOptions) (Controller, error) {
|
|||
wq: workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}),
|
||||
client: o.Client,
|
||||
taskReconciler: o.TaskReconciler,
|
||||
variableCache: o.VariableCache,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,12 +32,12 @@ import (
|
|||
func getGatherFact(ctx context.Context, hostname string, vars variable.Variable) (variable.VariableData, error) {
|
||||
v, err := vars.Get(variable.HostVars{HostName: hostname})
|
||||
if err != nil {
|
||||
klog.Errorf("get host %s all variable error %v", hostname, err)
|
||||
klog.ErrorS(err, "Get host variable error", "hostname", hostname)
|
||||
return nil, err
|
||||
}
|
||||
conn := connector.NewConnector(hostname, v.(variable.VariableData))
|
||||
if err := conn.Init(ctx); err != nil {
|
||||
klog.Errorf("init connection error %v", err)
|
||||
klog.ErrorS(err, "Init connection error", "hostname", hostname)
|
||||
return nil, err
|
||||
}
|
||||
defer conn.Close(ctx)
|
||||
|
|
@ -46,25 +46,25 @@ func getGatherFact(ctx context.Context, hostname string, vars variable.Variable)
|
|||
osVars := make(variable.VariableData)
|
||||
var osRelease bytes.Buffer
|
||||
if err := conn.FetchFile(ctx, "/etc/os-release", &osRelease); err != nil {
|
||||
klog.Errorf("fetch os-release error %v", err)
|
||||
klog.ErrorS(err, "Fetch os-release error", "hostname", hostname)
|
||||
return nil, err
|
||||
}
|
||||
osVars["release"] = convertBytesToMap(osRelease.Bytes(), "=")
|
||||
kernel, err := conn.ExecuteCommand(ctx, "uname -r")
|
||||
if err != nil {
|
||||
klog.Errorf("get kernel version error %v", err)
|
||||
klog.ErrorS(err, "Get kernel version error", "hostname", hostname)
|
||||
return nil, err
|
||||
}
|
||||
osVars["kernelVersion"] = string(bytes.TrimSuffix(kernel, []byte("\n")))
|
||||
hn, err := conn.ExecuteCommand(ctx, "hostname")
|
||||
if err != nil {
|
||||
klog.Errorf("get hostname error %v", err)
|
||||
klog.ErrorS(err, "Get hostname error", "hostname", hostname)
|
||||
return nil, err
|
||||
}
|
||||
osVars["hostname"] = string(bytes.TrimSuffix(hn, []byte("\n")))
|
||||
arch, err := conn.ExecuteCommand(ctx, "arch")
|
||||
if err != nil {
|
||||
klog.Errorf("get arch error %v", err)
|
||||
klog.ErrorS(err, "Get arch error", "hostname", hostname)
|
||||
return nil, err
|
||||
}
|
||||
osVars["architecture"] = string(bytes.TrimSuffix(arch, []byte("\n")))
|
||||
|
|
@ -73,13 +73,13 @@ func getGatherFact(ctx context.Context, hostname string, vars variable.Variable)
|
|||
procVars := make(variable.VariableData)
|
||||
var cpu bytes.Buffer
|
||||
if err := conn.FetchFile(ctx, "/proc/cpuinfo", &cpu); err != nil {
|
||||
klog.Errorf("fetch cpu error %v", err)
|
||||
klog.ErrorS(err, "Fetch cpuinfo error", "hostname", hostname)
|
||||
return nil, err
|
||||
}
|
||||
procVars["cpuInfo"] = convertBytesToSlice(cpu.Bytes(), ":")
|
||||
var mem bytes.Buffer
|
||||
if err := conn.FetchFile(ctx, "/proc/meminfo", &mem); err != nil {
|
||||
klog.Errorf("fetch os-release error %v", err)
|
||||
klog.ErrorS(err, "Fetch meminfo error", "hostname", hostname)
|
||||
return nil, err
|
||||
}
|
||||
procVars["memInfo"] = convertBytesToMap(mem.Bytes(), ":")
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/google/uuid"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
cgcache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
|
|
@ -32,7 +33,6 @@ import (
|
|||
|
||||
kkcorev1 "github.com/kubesphere/kubekey/v4/pkg/apis/core/v1"
|
||||
kubekeyv1alpha1 "github.com/kubesphere/kubekey/v4/pkg/apis/kubekey/v1alpha1"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/cache"
|
||||
_const "github.com/kubesphere/kubekey/v4/pkg/const"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/converter"
|
||||
"github.com/kubesphere/kubekey/v4/pkg/modules"
|
||||
|
|
@ -44,6 +44,8 @@ type taskController struct {
|
|||
client ctrlclient.Client
|
||||
taskReconciler reconcile.Reconciler
|
||||
|
||||
variableCache cgcache.Store
|
||||
|
||||
wq workqueue.RateLimitingInterface
|
||||
MaxConcurrent int
|
||||
}
|
||||
|
|
@ -52,7 +54,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
|
|||
var nsTasks = &kubekeyv1alpha1.TaskList{}
|
||||
|
||||
if err := c.client.List(ctx, nsTasks, ctrlclient.InNamespace(o.Pipeline.Namespace)); err != nil {
|
||||
klog.Errorf("[Pipeline %s] list tasks error: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err)
|
||||
klog.ErrorS(err, "List tasks error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
|
|
@ -78,28 +80,36 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
|
|||
}
|
||||
|
||||
if len(nsTasks.Items) == 0 {
|
||||
vars, ok, err := c.variableCache.GetByKey(string(o.Pipeline.UID))
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Get variable from store error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
|
||||
return err
|
||||
}
|
||||
// if tasks has not generated. generate tasks from pipeline
|
||||
vars, ok := cache.LocalVariable.Get(string(o.Pipeline.UID))
|
||||
//vars, ok := cache.LocalVariable.Get(string(o.Pipeline.UID))
|
||||
if ok {
|
||||
o.variable = vars.(variable.Variable)
|
||||
} else {
|
||||
newVars, err := variable.New(variable.Options{
|
||||
nv, err := variable.New(variable.Options{
|
||||
Ctx: ctx,
|
||||
Client: c.client,
|
||||
Pipeline: *o.Pipeline,
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("[Pipeline %s] create variable failed: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err)
|
||||
klog.ErrorS(err, "Create variable error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
|
||||
return err
|
||||
}
|
||||
cache.LocalVariable.Put(string(o.Pipeline.UID), newVars)
|
||||
o.variable = newVars
|
||||
if err := c.variableCache.Add(nv); err != nil {
|
||||
klog.ErrorS(err, "Add variable to store error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
|
||||
return err
|
||||
}
|
||||
o.variable = nv
|
||||
}
|
||||
|
||||
klog.V(4).Infof("[Pipeline %s] deal project", ctrlclient.ObjectKeyFromObject(o.Pipeline))
|
||||
klog.V(4).InfoS("deal project", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
|
||||
projectFs, err := project.New(project.Options{Pipeline: o.Pipeline}).FS(ctx, true)
|
||||
if err != nil {
|
||||
klog.Errorf("[Pipeline %s] deal project error: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err)
|
||||
klog.ErrorS(err, "Deal project error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -116,7 +126,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
|
|||
// convert Hosts (group or host) to all hosts
|
||||
ahn, err := o.variable.Get(variable.Hostnames{Name: play.PlayHost.Hosts})
|
||||
if err != nil {
|
||||
klog.Errorf("[Pipeline %s] get all host name error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err)
|
||||
klog.ErrorS(err, "Get all host name error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -125,7 +135,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
|
|||
for _, h := range ahn.([]string) {
|
||||
gfv, err := getGatherFact(ctx, h, o.variable)
|
||||
if err != nil {
|
||||
klog.Errorf("[Pipeline %s] get gather fact from host %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), h, err)
|
||||
klog.ErrorS(err, "Get gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "host", h)
|
||||
return err
|
||||
}
|
||||
if err := o.variable.Merge(variable.HostMerge{
|
||||
|
|
@ -133,7 +143,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
|
|||
LocationUID: "",
|
||||
Data: gfv,
|
||||
}); err != nil {
|
||||
klog.Errorf("[Pipeline %s] merge gather fact from host %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), h, err)
|
||||
klog.ErrorS(err, "Merge gather fact error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "host", h)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -147,7 +157,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
|
|||
// group hosts by serial. run the playbook by serial
|
||||
hs, err = converter.GroupHostBySerial(ahn.([]string), play.Serial.Data)
|
||||
if err != nil {
|
||||
klog.Errorf("[Pipeline %s] convert host by serial error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), err)
|
||||
klog.ErrorS(err, "Group host by serial error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -167,7 +177,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
|
|||
// generate task from pre tasks
|
||||
preTasks, err := c.block2Task(hctx, o, play.PreTasks, nil, puid, variable.BlockLocation)
|
||||
if err != nil {
|
||||
klog.Errorf("[Pipeline %s] get pre task from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), play.Name, err)
|
||||
klog.ErrorS(err, "Get pre task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name)
|
||||
return err
|
||||
}
|
||||
nsTasks.Items = append(nsTasks.Items, preTasks...)
|
||||
|
|
@ -185,7 +195,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
|
|||
}
|
||||
roleTasks, err := c.block2Task(context.WithValue(hctx, _const.CtxBlockRole, role.Role), o, role.Block, role.When.Data, ruid, variable.BlockLocation)
|
||||
if err != nil {
|
||||
klog.Errorf("[Pipeline %s] get role from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), puid, err)
|
||||
klog.ErrorS(err, "Get role task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name, "role", role.Role)
|
||||
return err
|
||||
}
|
||||
nsTasks.Items = append(nsTasks.Items, roleTasks...)
|
||||
|
|
@ -193,14 +203,14 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
|
|||
// generate task from tasks
|
||||
tasks, err := c.block2Task(hctx, o, play.Tasks, nil, puid, variable.BlockLocation)
|
||||
if err != nil {
|
||||
klog.Errorf("[Pipeline %s] get pre task from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), puid, err)
|
||||
klog.ErrorS(err, "Get task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name)
|
||||
return err
|
||||
}
|
||||
nsTasks.Items = append(nsTasks.Items, tasks...)
|
||||
// generate task from post tasks
|
||||
postTasks, err := c.block2Task(hctx, o, play.Tasks, nil, puid, variable.BlockLocation)
|
||||
if err != nil {
|
||||
klog.Errorf("[Pipeline %s] get pre task from play %s error %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), puid, err)
|
||||
klog.ErrorS(err, "Get post task from play error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "play", play.Name)
|
||||
return err
|
||||
}
|
||||
nsTasks.Items = append(nsTasks.Items, postTasks...)
|
||||
|
|
@ -209,7 +219,7 @@ func (c *taskController) AddTasks(ctx context.Context, o AddTaskOptions) error {
|
|||
|
||||
for _, task := range nsTasks.Items {
|
||||
if err := c.client.Create(ctx, &task); err != nil {
|
||||
klog.Errorf("[Pipeline %s] create task %s error: %v", ctrlclient.ObjectKeyFromObject(o.Pipeline), task.Name, err)
|
||||
klog.ErrorS(err, "Create task error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "task", task.Name)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -234,31 +244,35 @@ func (k *taskController) block2Task(ctx context.Context, o AddTaskOptions, ats [
|
|||
Name: at.Name,
|
||||
Vars: at.Vars,
|
||||
}); err != nil {
|
||||
klog.ErrorS(err, "Merge block to variable error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
|
||||
return nil, err
|
||||
}
|
||||
atWhen := append(when, at.When.Data...)
|
||||
|
||||
if len(at.Block) != 0 {
|
||||
// add block
|
||||
bt, err := k.block2Task(ctx, o, at.Block, atWhen, buid, variable.BlockLocation)
|
||||
block, err := k.block2Task(ctx, o, at.Block, atWhen, buid, variable.BlockLocation)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Get block task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
|
||||
return nil, err
|
||||
}
|
||||
tasks = append(tasks, bt...)
|
||||
tasks = append(tasks, block...)
|
||||
|
||||
if len(at.Always) != 0 {
|
||||
at, err := k.block2Task(ctx, o, at.Always, atWhen, buid, variable.AlwaysLocation)
|
||||
always, err := k.block2Task(ctx, o, at.Always, atWhen, buid, variable.AlwaysLocation)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Get always task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
|
||||
return nil, err
|
||||
}
|
||||
tasks = append(tasks, at...)
|
||||
tasks = append(tasks, always...)
|
||||
}
|
||||
if len(at.Rescue) != 0 {
|
||||
rt, err := k.block2Task(ctx, o, at.Rescue, atWhen, buid, variable.RescueLocation)
|
||||
rescue, err := k.block2Task(ctx, o, at.Rescue, atWhen, buid, variable.RescueLocation)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Get rescue task from block error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name)
|
||||
return nil, err
|
||||
}
|
||||
tasks = append(tasks, rt...)
|
||||
tasks = append(tasks, rescue...)
|
||||
}
|
||||
} else {
|
||||
task := converter.MarshalBlock(context.WithValue(context.WithValue(ctx, _const.CtxBlockWhen, atWhen), _const.CtxBlockTaskUID, buid),
|
||||
|
|
@ -267,6 +281,7 @@ func (k *taskController) block2Task(ctx context.Context, o AddTaskOptions, ats [
|
|||
for n, a := range at.UnknownFiled {
|
||||
data, err := json.Marshal(a)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Marshal unknown field error", "pipeline", ctrlclient.ObjectKeyFromObject(o.Pipeline), "block", at.Name, "field", n)
|
||||
return nil, err
|
||||
}
|
||||
if m := modules.FindModule(n); m != nil {
|
||||
|
|
@ -328,7 +343,7 @@ func (k *taskController) processNextWorkItem(ctx context.Context) bool {
|
|||
switch {
|
||||
case err != nil:
|
||||
k.wq.AddRateLimited(req)
|
||||
klog.Errorf("Reconciler error: %v", err)
|
||||
klog.ErrorS(err, "Reconciler error", "request", req)
|
||||
case result.RequeueAfter > 0:
|
||||
// The result.RequeueAfter request will be lost, if it is returned
|
||||
// along with a non-nil error. But this is intended as
|
||||
|
|
|
|||
|
|
@ -80,12 +80,12 @@ func hostsInGroup(inv kubekeyv1.Inventory, groupName string) []string {
|
|||
func StringVar(vars VariableData, key string) *string {
|
||||
value, ok := vars[key]
|
||||
if !ok {
|
||||
klog.V(4).Infof("cannot find variable %s", key)
|
||||
klog.V(4).InfoS("cannot find variable", "key", key)
|
||||
return nil
|
||||
}
|
||||
sv, ok := value.(string)
|
||||
if !ok {
|
||||
klog.V(4).Infof("variable %s is not string", key)
|
||||
klog.V(4).InfoS("variable is not string", "key", key)
|
||||
return nil
|
||||
}
|
||||
return &sv
|
||||
|
|
@ -95,13 +95,13 @@ func StringVar(vars VariableData, key string) *string {
|
|||
func IntVar(vars VariableData, key string) *int {
|
||||
value, ok := vars[key]
|
||||
if !ok {
|
||||
klog.V(4).Infof("cannot find variable %s", key)
|
||||
klog.V(4).InfoS("cannot find variable", "key", key)
|
||||
return nil
|
||||
}
|
||||
// default convert to float64
|
||||
number, ok := value.(float64)
|
||||
if !ok {
|
||||
klog.V(4).Infof("variable %s is not string", key)
|
||||
klog.V(4).InfoS("variable is not number", "key", key)
|
||||
return nil
|
||||
}
|
||||
vi := int(number)
|
||||
|
|
@ -112,19 +112,19 @@ func IntVar(vars VariableData, key string) *int {
|
|||
func StringSliceVar(vars VariableData, key string) []string {
|
||||
value, ok := vars[key]
|
||||
if !ok {
|
||||
klog.V(4).Infof("cannot find variable %s", key)
|
||||
klog.V(4).InfoS("cannot find variable", "key", key)
|
||||
return nil
|
||||
}
|
||||
sv, ok := value.([]any)
|
||||
if !ok {
|
||||
klog.V(4).Infof("variable %s is not slice", key)
|
||||
klog.V(4).InfoS("variable is not string slice", "key", key)
|
||||
return nil
|
||||
}
|
||||
var ss []string
|
||||
for _, a := range sv {
|
||||
av, ok := a.(string)
|
||||
if !ok {
|
||||
klog.V(4).Infof("value in variable %s is not string", key)
|
||||
klog.V(4).InfoS("variable is not string", "key", key)
|
||||
return nil
|
||||
}
|
||||
ss = append(ss, av)
|
||||
|
|
@ -139,7 +139,7 @@ func Extension2Variables(ext runtime.RawExtension) VariableData {
|
|||
|
||||
var data VariableData
|
||||
if err := yaml.Unmarshal(ext.Raw, &data); err != nil {
|
||||
klog.Errorf("failed to unmarshal extension to variables: %v", err)
|
||||
klog.ErrorS(err, "failed to unmarshal extension to variables")
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
|
@ -151,7 +151,7 @@ func Extension2Slice(ext runtime.RawExtension) []any {
|
|||
|
||||
var data []any
|
||||
if err := yaml.Unmarshal(ext.Raw, &data); err != nil {
|
||||
klog.Errorf("failed to unmarshal extension to any: %v", err)
|
||||
klog.ErrorS(err, "failed to unmarshal extension to slice")
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,10 +31,13 @@ import (
|
|||
)
|
||||
|
||||
type variable struct {
|
||||
// key is the unique Identifier of the variable. usually the UID of the pipeline.
|
||||
key string
|
||||
// source is where the variable is stored
|
||||
source source.Source
|
||||
|
||||
// value is the data of the variable, which store in memory
|
||||
value *value
|
||||
|
||||
// lock is the lock for value
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
|
|
@ -111,7 +114,7 @@ type VariableData map[string]any
|
|||
func (v VariableData) String() string {
|
||||
data, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
klog.Errorf("marshal in error: %v", err)
|
||||
klog.ErrorS(err, "marshal in error", "data", v)
|
||||
return ""
|
||||
}
|
||||
return string(data)
|
||||
|
|
@ -122,6 +125,10 @@ type host struct {
|
|||
RuntimeVars map[string]VariableData `json:"runtime"`
|
||||
}
|
||||
|
||||
func (v *variable) Key() string {
|
||||
return v.key
|
||||
}
|
||||
|
||||
func (v *variable) Get(option GetOption) (any, error) {
|
||||
return option.filter(*v.value)
|
||||
}
|
||||
|
|
@ -139,14 +146,14 @@ func (v *variable) Merge(mo ...MergeOption) error {
|
|||
|
||||
if !reflect.DeepEqual(old.Location, v.value.Location) {
|
||||
if err := v.syncLocation(); err != nil {
|
||||
klog.Errorf("sync location error %v", err)
|
||||
klog.ErrorS(err, "sync location error")
|
||||
}
|
||||
}
|
||||
|
||||
for hn, hv := range v.value.Hosts {
|
||||
if !reflect.DeepEqual(old.Hosts[hn], hv) {
|
||||
if err := v.syncHosts(hn); err != nil {
|
||||
klog.Errorf("sync group error %v", err)
|
||||
klog.ErrorS(err, "sync host error", "hostname", hn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -157,11 +164,11 @@ func (v *variable) Merge(mo ...MergeOption) error {
|
|||
func (v *variable) syncLocation() error {
|
||||
data, err := json.MarshalIndent(v.value.Location, "", " ")
|
||||
if err != nil {
|
||||
klog.Errorf("marshal location data failed: %v", err)
|
||||
klog.ErrorS(err, "marshal location data error")
|
||||
return err
|
||||
}
|
||||
if err := v.source.Write(data, _const.RuntimePipelineVariableLocationFile); err != nil {
|
||||
klog.Errorf("write location data to local file %s error %v", _const.RuntimePipelineVariableLocationFile, err)
|
||||
klog.ErrorS(err, "write location data to local file error", "filename", _const.RuntimePipelineVariableLocationFile)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -173,11 +180,11 @@ func (v *variable) syncHosts(hostname ...string) error {
|
|||
if hv, ok := v.value.Hosts[hn]; ok {
|
||||
data, err := json.MarshalIndent(hv, "", " ")
|
||||
if err != nil {
|
||||
klog.Errorf("marshal host %s data failed: %v", hn, err)
|
||||
klog.ErrorS(err, "marshal host data error", "hostname", hn)
|
||||
return err
|
||||
}
|
||||
if err := v.source.Write(data, fmt.Sprintf("%s.json", hn)); err != nil {
|
||||
klog.Errorf("write host data to local file %s.json error %v", hn, err)
|
||||
klog.ErrorS(err, "write host data to local file error", "hostname", hn, "filename", fmt.Sprintf("%s.json", hn))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ type fileSource struct {
|
|||
func (f *fileSource) Read() (map[string][]byte, error) {
|
||||
de, err := os.ReadDir(f.path)
|
||||
if err != nil {
|
||||
klog.Errorf("read dir %s error %v", f.path, err)
|
||||
klog.ErrorS(err, "read dir error", "path", f.path)
|
||||
return nil, err
|
||||
}
|
||||
var result map[string][]byte
|
||||
|
|
@ -46,6 +46,7 @@ func (f *fileSource) Read() (map[string][]byte, error) {
|
|||
if strings.HasSuffix(entry.Name(), ".json") {
|
||||
data, err := os.ReadFile(filepath.Join(f.path, entry.Name()))
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "read file error", "filename", entry.Name())
|
||||
return nil, err
|
||||
}
|
||||
result[entry.Name()] = data
|
||||
|
|
@ -58,10 +59,12 @@ func (f *fileSource) Read() (map[string][]byte, error) {
|
|||
func (f *fileSource) Write(data []byte, filename string) error {
|
||||
file, err := os.Create(filepath.Join(f.path, filename))
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "create file error", "filename", filename)
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
if _, err := file.Write(data); err != nil {
|
||||
klog.ErrorS(err, "write file error", "filename", filename)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ type Watcher interface {
|
|||
func New(path string) (Source, error) {
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
if err := os.MkdirAll(path, fs.ModePerm); err != nil {
|
||||
klog.Errorf("create source path %s error: %v", path, err)
|
||||
klog.ErrorS(err, "create source path error", "path", path)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
cgcache "k8s.io/client-go/tools/cache"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/klog/v2"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
|
@ -36,6 +38,7 @@ import (
|
|||
)
|
||||
|
||||
type Variable interface {
|
||||
Key() string
|
||||
Get(option GetOption) (any, error)
|
||||
Merge(option ...MergeOption) error
|
||||
}
|
||||
|
|
@ -51,22 +54,23 @@ func New(o Options) (Variable, error) {
|
|||
// new source
|
||||
s, err := source.New(filepath.Join(_const.RuntimeDirFromObject(&o.Pipeline), _const.RuntimePipelineVariableDir))
|
||||
if err != nil {
|
||||
klog.Errorf("create file source failed: %v", err)
|
||||
klog.ErrorS(err, "create file source failed", "path", filepath.Join(_const.RuntimeDirFromObject(&o.Pipeline), _const.RuntimePipelineVariableDir), "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
|
||||
return nil, err
|
||||
}
|
||||
// get config
|
||||
var config = &kubekeyv1.Config{}
|
||||
if err := o.Client.Get(o.Ctx, types.NamespacedName{o.Pipeline.Spec.ConfigRef.Namespace, o.Pipeline.Spec.ConfigRef.Name}, config); err != nil {
|
||||
klog.Errorf("get config from pipeline error %v", err)
|
||||
klog.ErrorS(err, "get config from pipeline error", "config", o.Pipeline.Spec.ConfigRef, "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
|
||||
return nil, err
|
||||
}
|
||||
// get inventory
|
||||
var inventory = &kubekeyv1.Inventory{}
|
||||
if err := o.Client.Get(o.Ctx, types.NamespacedName{o.Pipeline.Spec.InventoryRef.Namespace, o.Pipeline.Spec.InventoryRef.Name}, inventory); err != nil {
|
||||
klog.Errorf("get inventory from pipeline error %v", err)
|
||||
klog.ErrorS(err, "get inventory from pipeline error", "inventory", o.Pipeline.Spec.InventoryRef, "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
|
||||
return nil, err
|
||||
}
|
||||
v := &variable{
|
||||
key: string(o.Pipeline.UID),
|
||||
source: s,
|
||||
value: &value{
|
||||
Config: *config,
|
||||
|
|
@ -77,21 +81,21 @@ func New(o Options) (Variable, error) {
|
|||
// read data from source
|
||||
data, err := v.source.Read()
|
||||
if err != nil {
|
||||
klog.Errorf("read data from source error %v", err)
|
||||
klog.ErrorS(err, "read data from source error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
|
||||
return nil, err
|
||||
}
|
||||
for k, d := range data {
|
||||
if k == _const.RuntimePipelineVariableLocationFile {
|
||||
// set location
|
||||
if err := json.Unmarshal(d, &v.value.Location); err != nil {
|
||||
klog.Errorf("unmarshal location error %v", err)
|
||||
klog.ErrorS(err, "unmarshal location error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// set hosts
|
||||
h := host{}
|
||||
if err := json.Unmarshal(d, &h); err != nil {
|
||||
klog.Errorf("unmarshal host error %v", err)
|
||||
klog.ErrorS(err, "unmarshal host error", "pipeline", ctrlclient.ObjectKeyFromObject(&o.Pipeline))
|
||||
return nil, err
|
||||
}
|
||||
v.value.Hosts[strings.TrimSuffix(k, ".json")] = h
|
||||
|
|
@ -252,7 +256,7 @@ func (g Hostnames) filter(data value) (any, error) {
|
|||
if match := regex.FindStringSubmatch(n); match != nil {
|
||||
index, err := strconv.Atoi(match[2])
|
||||
if err != nil {
|
||||
klog.Errorf("convert index %s to int failed: %v", match[2], err)
|
||||
klog.ErrorS(err, "convert index to int error", "index", match[2])
|
||||
return nil, err
|
||||
}
|
||||
for gn, gv := range data.Inventory.Spec.Groups {
|
||||
|
|
@ -549,3 +553,12 @@ func (t LocationMerge) mergeTo(v *value) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cache is a cache for variable
|
||||
var Cache = cgcache.NewStore(func(obj interface{}) (string, error) {
|
||||
v, ok := obj.(Variable)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("cannot convert %v to variable", obj)
|
||||
}
|
||||
return v.Key(), nil
|
||||
})
|
||||
|
|
|
|||
Loading…
Reference in New Issue