feat: add delegate_to (#2662)

Signed-off-by: joyceliu <joyceliu@yunify.com>
This commit is contained in:
liujian 2025-07-18 15:22:58 +08:00 committed by GitHub
parent 873e4dda44
commit 98b688be10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 66 additions and 41 deletions

View File

@ -46,6 +46,7 @@ const (
type TaskSpec struct {
Name string `json:"name,omitempty"`
Hosts []string `json:"hosts,omitempty"`
DelegateTo string `yaml:"delegate_to,omitempty"`
IgnoreError *bool `json:"ignoreError,omitempty"`
Retries int `json:"retries,omitempty"`

View File

@ -49,6 +49,7 @@ func MarshalBlock(hosts []string, when []string, block kkprojectv1.Block) *kkcor
Spec: kkcorev1alpha1.TaskSpec{
Name: block.Name,
Hosts: hosts,
DelegateTo: block.DelegateTo,
IgnoreError: block.IgnoreErrors,
Retries: block.Retries,
When: when,

View File

@ -70,9 +70,9 @@ func ModuleCommand(ctx context.Context, options ExecOptions) (string, string) {
return "", err.Error()
}
// get connector
conn, err := getConnector(ctx, options.Host, options.Variable)
conn, err := options.getConnector(ctx)
if err != nil {
return "", fmt.Sprintf("failed to connector of %q error: %v", options.Host, err)
return "", fmt.Sprintf("failed to connector for %q error: %v", options.Host, err)
}
defer conn.Close(ctx)
// command string

View File

@ -39,7 +39,7 @@ func TestCommand(t *testing.T) {
Variable: &testVariable{},
},
ctxFunc: context.Background,
exceptStderr: "failed to connector of \"\" error: workdir in variable should be string",
exceptStderr: "failed to connector for \"\" error: workdir in variable should be string",
},
{
name: "exec command success",

View File

@ -131,7 +131,7 @@ func ModuleCopy(ctx context.Context, options ExecOptions) (string, string) {
}
// get connector
conn, err := getConnector(ctx, options.Host, options.Variable)
conn, err := options.getConnector(ctx)
if err != nil {
return "", fmt.Sprintf("get connector error: %v", err)
}

View File

@ -81,7 +81,7 @@ func ModuleFetch(ctx context.Context, options ExecOptions) (string, string) {
}
// get connector
conn, err := getConnector(ctx, options.Host, options.Variable)
conn, err := options.getConnector(ctx)
if err != nil {
return "", fmt.Sprintf("get connector error: %v", err)
}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"github.com/kubesphere/kubekey/v4/pkg/connector"
"github.com/kubesphere/kubekey/v4/pkg/converter/tmpl"
"github.com/kubesphere/kubekey/v4/pkg/variable"
)
@ -42,25 +43,28 @@ const (
StdoutFalse = "False"
)
// ModuleExecFunc exec module
// ModuleExecFunc defines the function signature for executing a module.
// It takes a context and ExecOptions, and returns stdout and stderr strings.
type ModuleExecFunc func(ctx context.Context, options ExecOptions) (stdout string, stderr string)
// ExecOptions represents options for module execution
// ExecOptions represents options for module execution.
type ExecOptions struct {
// the defined Args for module
// Args contains the defined arguments for the module.
Args runtime.RawExtension
// which Host to execute
// Host specifies which host to execute the module on.
Host string
// the variable module need
// Variable provides the variables needed by the module.
variable.Variable
// the task to be executed
// Task is the task to be executed.
Task kkcorev1alpha1.Task
// the playbook to be executed
// Playbook is the playbook to be executed.
Playbook kkcorev1.Playbook
// the logout for module
// LogOutput is the output writer for module logs.
LogOutput io.Writer
}
// getAllVariables retrieves all variables for the specified host in ExecOptions.
// Returns a map of variables or an error if retrieval fails.
func (o ExecOptions) getAllVariables() (map[string]any, error) {
ha, err := o.Variable.Get(variable.GetAllVariable(o.Host))
if err != nil {
@ -75,39 +79,31 @@ func (o ExecOptions) getAllVariables() (map[string]any, error) {
return vd, nil
}
var module = make(map[string]ModuleExecFunc)
// RegisterModule register module
func RegisterModule(moduleName string, exec ModuleExecFunc) error {
if _, ok := module[moduleName]; ok {
return errors.Errorf("module %s is exist", moduleName)
}
module[moduleName] = exec
return nil
}
// FindModule by module name which has register.
func FindModule(moduleName string) ModuleExecFunc {
return module[moduleName]
}
type key struct{}
// ConnKey for connector which store in context
var ConnKey = &key{}
func getConnector(ctx context.Context, host string, v variable.Variable) (connector.Connector, error) {
// getConnector returns a connector for the specified host in ExecOptions.
// If the task has a DelegateTo field, it parses and uses that host instead.
// It first checks if a connector is already present in the context, otherwise creates a new one.
func (o ExecOptions) getConnector(ctx context.Context) (connector.Connector, error) {
var conn connector.Connector
var err error
ha, err := o.getAllVariables()
if err != nil {
return nil, err
}
host := o.Host
if o.Task.Spec.DelegateTo != "" {
host, err = tmpl.ParseFunc(ha, o.Task.Spec.DelegateTo, func(b []byte) string { return string(b) })
if err != nil {
return nil, errors.Errorf("failed to delegate %q to %q", o.Host, o.Task.Spec.DelegateTo)
}
}
if val := ctx.Value(ConnKey); val != nil {
if vd, ok := val.(connector.Connector); ok {
conn = vd
}
} else {
conn, err = connector.NewConnector(host, v)
conn, err = connector.NewConnector(host, o.Variable)
if err != nil {
return conn, err
}
@ -119,3 +115,30 @@ func getConnector(ctx context.Context, host string, v variable.Variable) (connec
return conn, nil
}
// module is a registry mapping module names to their execution functions.
var module = make(map[string]ModuleExecFunc)
// RegisterModule registers a module execution function under the given module name.
// Returns an error if the module name is already registered.
func RegisterModule(moduleName string, exec ModuleExecFunc) error {
if _, ok := module[moduleName]; ok {
return errors.Errorf("module %s is exist", moduleName)
}
module[moduleName] = exec
return nil
}
// FindModule retrieves a registered module execution function by its name.
// Returns nil if the module is not found.
func FindModule(moduleName string) ModuleExecFunc {
return module[moduleName]
}
// key is an unexported type used for context keys in this package.
type key struct{}
// ConnKey is the context key for storing/retrieving a connector in context.Context.
var ConnKey = &key{}

View File

@ -95,7 +95,7 @@ func ModulePrometheus(ctx context.Context, options ExecOptions) (string, string)
}
// Get or create Prometheus connector
conn, err := getConnector(ctx, host, options.Variable)
conn, err := options.getConnector(ctx)
if err != nil {
return "", fmt.Sprintf("failed to get prometheus connector: %v", err)
}

View File

@ -30,7 +30,7 @@ Usage:
// It returns StdoutSuccess if successful, or an error message if any step fails.
func ModuleSetup(ctx context.Context, options ExecOptions) (string, string) {
// get connector
conn, err := getConnector(ctx, options.Host, options.Variable)
conn, err := options.getConnector(ctx)
if err != nil {
return StdoutFailed, fmt.Sprintf("failed to connector of %q error: %v", options.Host, err)
}

View File

@ -137,7 +137,7 @@ func prepareTemplate(ctx context.Context, options ExecOptions) (map[string]any,
return nil, nil, nil, err
}
conn, err := getConnector(ctx, options.Host, options.Variable)
conn, err := options.getConnector(ctx)
if err != nil {
return nil, nil, nil, err
}