feat: Thread-safe reading and writing (#2531)

Signed-off-by: joyceliu <joyceliu@yunify.com>
This commit is contained in:
liujian 2025-04-16 09:50:28 +08:00 committed by GitHub
parent 7560b29946
commit def153b0bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 116 additions and 50 deletions

View File

@ -16,6 +16,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package builtin
import (
@ -24,6 +25,7 @@ import (
"github.com/kubesphere/kubekey/v4/cmd/kk/app/options/builtin"
)
// NewDeleteCommand creates a new delete command that allows deleting nodes or clusters
func NewDeleteCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "delete",
@ -34,6 +36,8 @@ func NewDeleteCommand() *cobra.Command {
return cmd
}
// newDeleteClusterCommand creates a new command for deleting a Kubernetes cluster
// It uses the delete_cluster.yaml playbook to uninstall Kubernetes components and clean up the cluster
func newDeleteClusterCommand() *cobra.Command {
o := builtin.NewDeleteClusterOptions()

View File

@ -36,6 +36,7 @@ import (
// delete cluster
// ======================================================================================
// NewDeleteClusterOptions creates a new DeleteClusterOptions with default values
func NewDeleteClusterOptions() *DeleteClusterOptions {
// set default value
return &DeleteClusterOptions{
@ -45,6 +46,7 @@ func NewDeleteClusterOptions() *DeleteClusterOptions {
}
}
// DeleteClusterOptions contains options for deleting a Kubernetes cluster
type DeleteClusterOptions struct {
options.CommonOptions
// kubernetes version which the cluster will install.
@ -53,6 +55,7 @@ type DeleteClusterOptions struct {
ContainerManager string
}
// Flags returns the flag sets for DeleteClusterOptions
func (o *DeleteClusterOptions) Flags() cliflag.NamedFlagSets {
fss := o.CommonOptions.Flags()
kfs := fss.FlagSet("config")
@ -62,7 +65,9 @@ func (o *DeleteClusterOptions) Flags() cliflag.NamedFlagSets {
return fss
}
// Complete validates and completes the DeleteClusterOptions configuration
func (o *DeleteClusterOptions) Complete(cmd *cobra.Command, args []string) (*kkcorev1.Playbook, error) {
// Initialize playbook metadata
playbook := &kkcorev1.Playbook{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "delete-cluster-",
@ -73,17 +78,19 @@ func (o *DeleteClusterOptions) Complete(cmd *cobra.Command, args []string) (*kkc
},
}
// complete playbook. now only support one playbook
// Validate playbook arguments
if len(args) != 1 {
return nil, errors.Errorf("%s\nSee '%s -h' for help and examples", cmd.Use, cmd.CommandPath())
}
o.Playbook = args[0]
// Set playbook specification
playbook.Spec = kkcorev1.PlaybookSpec{
Playbook: o.Playbook,
Debug: o.Debug,
}
// override kube_version in config
// Complete configuration with kubernetes version and inventory
if err := completeConfig(o.Kubernetes, o.CommonOptions.ConfigFile, o.CommonOptions.Config); err != nil {
return nil, err
}
@ -91,6 +98,7 @@ func (o *DeleteClusterOptions) Complete(cmd *cobra.Command, args []string) (*kkc
return nil, err
}
// Complete common options
if err := o.CommonOptions.Complete(playbook); err != nil {
return nil, err
}
@ -98,6 +106,7 @@ func (o *DeleteClusterOptions) Complete(cmd *cobra.Command, args []string) (*kkc
return playbook, o.completeConfig()
}
// completeConfig updates the configuration with container manager settings
func (o *DeleteClusterOptions) completeConfig() error {
if o.ContainerManager != "" {
// override container_manager in config
@ -106,5 +115,9 @@ func (o *DeleteClusterOptions) completeConfig() error {
}
}
if err := unstructured.SetNestedField(o.CommonOptions.Config.Value(), o.Kubernetes, "kube_version"); err != nil {
return errors.Wrapf(err, "failed to set %q to config", "kube_version")
}
return nil
}

View File

@ -178,6 +178,16 @@ func TestParseBool(t *testing.T) {
},
excepted: true,
},
// ======= hasPrefix =======
{
name: "hasPrefix true-1",
condition: []string{`{{ .foo | hasPrefix "version.BuildInfo" }}`},
variable: map[string]any{
"foo": `version.BuildInfo{Version:"v3.14.3", GitCommit:"f03cc04caaa8f6d7c3e67cf918929150cf6f3f12", GitTreeState:"clean", GoVersion:"go1.22.1"}`,
"bar": "v3.14.3",
},
excepted: true,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {

View File

@ -282,6 +282,17 @@ func StringSliceVar(d map[string]any, vars map[string]any, key string) ([]string
}
switch valv := val.(type) {
case []string:
var ss []string
for _, a := range valv {
as, err := tmpl.ParseFunc(d, a, func(b []byte) string { return string(b) })
if err != nil {
return nil, err
}
ss = append(ss, as)
}
return ss, nil
case []any:
var ss []string

View File

@ -18,7 +18,7 @@ package variable
import (
"context"
"encoding/json"
"maps"
"path/filepath"
"reflect"
"sync"
@ -136,7 +136,7 @@ type variable struct {
// value is the data of the variable, which store in memory
value *value
// lock is the lock for value
sync.Mutex
sync.RWMutex
}
// value is the specific data contained in the variable
@ -154,23 +154,35 @@ type host struct {
RuntimeVars map[string]any `json:"runtime"`
}
func (v value) deepCopy() value {
nv := value{}
data, err := json.Marshal(v)
if err != nil {
return value{}
// DeepCopy creates a deep copy of the variable struct, including all nested fields.
// It copies the Config and Inventory objects, and creates new maps for Hosts with cloned
// RemoteVars and RuntimeVars. The key and source fields are copied by reference since
// they don't need deep copying.
func (v *variable) DeepCopy() *variable {
copyVal := &value{
Config: *v.value.Config.DeepCopy(),
Inventory: *v.value.Inventory.DeepCopy(),
Hosts: make(map[string]host, len(v.value.Hosts)),
}
for k, h := range v.value.Hosts {
copyVal.Hosts[k] = host{
RemoteVars: maps.Clone(h.RemoteVars),
RuntimeVars: maps.Clone(h.RuntimeVars),
}
}
if err := json.Unmarshal(data, &nv); err != nil {
return value{}
return &variable{
key: v.key,
source: v.source,
value: copyVal,
}
return nv
}
// Get vars
func (v *variable) Get(f GetFunc) (any, error) {
v.RLock()
defer v.RUnlock()
return f(v)
}
@ -179,28 +191,30 @@ func (v *variable) Merge(f MergeFunc) error {
v.Lock()
defer v.Unlock()
old := v.value.deepCopy()
if err := f(v); err != nil {
// new variable to avoid get lock in mergeFunc
nv := v.DeepCopy()
if err := f(nv); err != nil {
return err
}
return v.syncSource(old)
return v.syncSource(*nv.value)
}
// syncSource sync hosts vars to source.
func (v *variable) syncSource(old value) error {
func (v *variable) syncSource(newVal value) error {
for hn, hv := range v.value.Hosts {
if reflect.DeepEqual(old.Hosts[hn], hv) {
if reflect.DeepEqual(newVal.Hosts[hn], hv) {
// nothing change skip.
continue
}
if err := v.source.Write(map[string]any{
"remote": hv.RemoteVars,
"runtime": hv.RuntimeVars,
"remote": newVal.Hosts[hn].RemoteVars,
"runtime": newVal.Hosts[hn].RuntimeVars,
}, hn); err != nil {
return errors.Wrapf(err, "failed to write host %s variable to source", hn)
}
// update new value to variable
v.value.Hosts[hn] = newVal.Hosts[hn]
}
return nil

View File

@ -33,7 +33,7 @@ var GetHostnames = func(name []string) GetFunc {
n = pn
}
// add host to hs
if _, ok := vv.value.Hosts[n]; ok {
if _, exists := vv.value.Hosts[n]; exists {
hs = append(hs, n)
}
// add group's host to gs

View File

@ -34,7 +34,6 @@ var MergeRemoteVariable = func(data map[string]any, hostname string) MergeFunc {
}
// MergeRuntimeVariable parse variable by specific host and merge to the host.
// TODO: support merge []byte to preserve yaml definition order
var MergeRuntimeVariable = func(data map[string]any, hosts ...string) MergeFunc {
if len(data) == 0 || len(hosts) == 0 {
// skip
@ -47,10 +46,30 @@ var MergeRuntimeVariable = func(data map[string]any, hosts ...string) MergeFunc
if !ok {
return errors.New("variable type error")
}
// parse variable
if err := parseVariable(data, runtimeVarParser(v, hostname, data)); err != nil {
// Avoid nested locking: prepare context for parsing outside locking region
curVars, err := v.Get(GetAllVariable(hostname))
if err != nil {
return err
}
cv, ok := curVars.(map[string]any)
if !ok {
return errors.Errorf("host %s variables type error, expect map[string]any", hostname)
}
parser := func(s string) (string, error) {
return tmpl.ParseFunc(
CombineVariables(data, cv),
s,
func(b []byte) string { return string(b) },
)
}
// parse variable
if err := parseVariable(data, parser); err != nil {
return err
}
hv := vv.value.Hosts[hostname]
hv.RuntimeVars = CombineVariables(hv.RuntimeVars, data)
vv.value.Hosts[hostname] = hv
@ -67,14 +86,28 @@ var MergeAllRuntimeVariable = func(data map[string]any, hostname string) MergeFu
if !ok {
return errors.New("variable type error")
}
// parse variable
if err := parseVariable(data, runtimeVarParser(v, hostname, data)); err != nil {
curVars, err := v.Get(GetAllVariable(hostname))
if err != nil {
return err
}
cv, ok := curVars.(map[string]any)
if !ok {
return errors.Errorf("host %s variables type error, expect map[string]any", hostname)
}
parser := func(s string) (string, error) {
return tmpl.ParseFunc(
CombineVariables(data, cv),
s,
func(b []byte) string { return string(b) },
)
}
if err := parseVariable(data, parser); err != nil {
return err
}
for h := range vv.value.Hosts {
if _, ok := v.(*variable); !ok {
return errors.New("variable type error")
}
hv := vv.value.Hosts[h]
hv.RuntimeVars = CombineVariables(hv.RuntimeVars, data)
vv.value.Hosts[h] = hv
@ -83,22 +116,3 @@ var MergeAllRuntimeVariable = func(data map[string]any, hostname string) MergeFu
return nil
}
}
func runtimeVarParser(v Variable, hostname string, data map[string]any) func(string) (string, error) {
return func(s string) (string, error) {
curVariable, err := v.Get(GetAllVariable(hostname))
if err != nil {
return "", err
}
cv, ok := curVariable.(map[string]any)
if !ok {
return "", errors.Errorf("host %s variables type error, expect map[string]any", hostname)
}
return tmpl.ParseFunc(
CombineVariables(data, cv),
s,
func(b []byte) string { return string(b) },
)
}
}