feat: enhance connector interface and implementations (#2675)

- Updated the Connector interface to return both stdout and stderr for command execution.
- Modified implementations in local, kubernetes, and ssh connectors to support the new return values.
- Improved documentation for the Connector interface methods for clarity.
- Added error handling for stderr in command execution across connectors.
- Introduced new utility functions for IP parsing and checking localhost IPs.

Signed-off-by: joyceliu <joyceliu@yunify.com>
This commit is contained in:
liujian 2025-07-30 16:02:34 +08:00 committed by GitHub
parent 7b84f4a6fc
commit 620b7f56a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1360 additions and 1133 deletions

View File

@ -38,18 +38,21 @@ const (
connectedPrometheus = "prometheus" connectedPrometheus = "prometheus"
) )
// Connector is the interface for connecting to a remote host // Connector is the interface for connecting to a remote host.
// It abstracts the operations required to interact with different types of hosts (e.g., SSH, local, Kubernetes, Prometheus).
// Implementations of this interface should provide mechanisms for initialization, cleanup, file transfer, and command execution.
type Connector interface { type Connector interface {
// Init initializes the connection // Init initializes the connection.
Init(ctx context.Context) error Init(ctx context.Context) error
// Close closes the connection // Close closes the connection and releases any resources.
Close(ctx context.Context) error Close(ctx context.Context) error
// PutFile copies a file from src to dst with mode. // PutFile copies a file from src (as bytes) to dst (remote path) with the specified file mode.
PutFile(ctx context.Context, src []byte, dst string, mode fs.FileMode) error PutFile(ctx context.Context, src []byte, dst string, mode fs.FileMode) error
// FetchFile copies a file from src to dst writer. // FetchFile copies a file from src (remote path) to dst (local writer).
FetchFile(ctx context.Context, src string, dst io.Writer) error FetchFile(ctx context.Context, src string, dst io.Writer) error
// ExecuteCommand executes a command on the remote host // ExecuteCommand executes a command on the remote host.
ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) // Returns stdout, stderr, and error (if any).
ExecuteCommand(ctx context.Context, cmd string) ([]byte, []byte, error)
} }
// NewConnector creates a new connector // NewConnector creates a new connector

View File

@ -17,6 +17,7 @@ limitations under the License.
package connector package connector
import ( import (
"bytes"
"context" "context"
"io" "io"
"io/fs" "io/fs"
@ -137,12 +138,16 @@ func (c *kubernetesConnector) FetchFile(ctx context.Context, src string, dst io.
} }
// ExecuteCommand in a kubernetes cluster // ExecuteCommand in a kubernetes cluster
func (c *kubernetesConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) { func (c *kubernetesConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, []byte, error) {
// add "--kubeconfig" to src command // add "--kubeconfig" to src command
klog.V(5).InfoS("exec local command", "cmd", cmd) klog.V(5).InfoS("exec local command", "cmd", cmd)
command := c.cmd.CommandContext(ctx, c.shell, "-c", cmd) command := c.cmd.CommandContext(ctx, c.shell, "-c", cmd)
command.SetDir(c.homedir) command.SetDir(c.homedir)
command.SetEnv([]string{"KUBECONFIG=" + filepath.Join(c.homedir, kubeconfigRelPath)}) command.SetEnv([]string{"KUBECONFIG=" + filepath.Join(c.homedir, kubeconfigRelPath)})
return command.CombinedOutput() var stdoutBuf, stderrBuf bytes.Buffer
command.SetStdout(&stdoutBuf)
command.SetStderr(&stderrBuf)
err := command.Run()
return stdoutBuf.Bytes(), stderrBuf.Bytes(), err
} }

View File

@ -105,7 +105,7 @@ func (c *localConnector) FetchFile(_ context.Context, src string, dst io.Writer)
} }
// ExecuteCommand executes a command on the local host. // ExecuteCommand executes a command on the local host.
func (c *localConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) { func (c *localConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, []byte, error) {
klog.V(5).InfoS("exec local command", "cmd", cmd) klog.V(5).InfoS("exec local command", "cmd", cmd)
// in // in
command := c.Cmd.CommandContext(ctx, "sudo", "-SE", c.shell, "-c", cmd) command := c.Cmd.CommandContext(ctx, "sudo", "-SE", c.shell, "-c", cmd)
@ -113,13 +113,19 @@ func (c *localConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte
command.SetStdin(bytes.NewBufferString(c.Password + "\n")) command.SetStdin(bytes.NewBufferString(c.Password + "\n"))
} }
// out // out
output, err := command.CombinedOutput() var stdoutBuf, stderrBuf bytes.Buffer
command.SetStdout(&stdoutBuf)
command.SetStderr(&stderrBuf)
err := command.Run()
stdout := stdoutBuf.Bytes()
stderr := stderrBuf.Bytes()
if c.Password != "" { if c.Password != "" {
// Filter out the "Password:" prompt from the output // Filter out the "Password:" prompt from the output
output = bytes.Replace(output, []byte("Password:"), []byte(""), -1) stdout = bytes.Replace(stdout, []byte("Password:"), []byte(""), -1)
stderr = bytes.Replace(stderr, []byte("Password:"), []byte(""), -1)
} }
return output, errors.Wrapf(err, "failed to execute command") return stdout, stderr, err
} }
// HostInfo from gatherFacts cache // HostInfo from gatherFacts cache
@ -138,19 +144,30 @@ func (c *localConnector) getHostInfo(ctx context.Context) (map[string]any, error
return nil, err return nil, err
} }
osVars[_const.VariableOSRelease] = convertBytesToMap(osRelease.Bytes(), "=") osVars[_const.VariableOSRelease] = convertBytesToMap(osRelease.Bytes(), "=")
kernel, err := c.ExecuteCommand(ctx, "uname -r") kernel, stderr, err := c.ExecuteCommand(ctx, "uname -r")
if err != nil { if err != nil {
return nil, err return nil, errors.Wrapf(err, "failed to get kernel: %v, stderr: %q", err, string(stderr))
}
if len(stderr) > 0 {
return nil, errors.Errorf("failed to get kernel, stderr: %q", string(stderr))
} }
osVars[_const.VariableOSKernelVersion] = string(bytes.TrimSpace(kernel)) osVars[_const.VariableOSKernelVersion] = string(bytes.TrimSpace(kernel))
hn, err := c.ExecuteCommand(ctx, "hostname")
hn, hnStderr, err := c.ExecuteCommand(ctx, "hostname")
if err != nil { if err != nil {
return nil, err return nil, errors.Wrapf(err, "failed to get hostname: %v, stderr: %q", err, string(hnStderr))
}
if len(hnStderr) > 0 {
return nil, errors.Errorf("failed to get hostname, stderr: %q", string(hnStderr))
} }
osVars[_const.VariableOSHostName] = string(bytes.TrimSpace(hn)) osVars[_const.VariableOSHostName] = string(bytes.TrimSpace(hn))
arch, err := c.ExecuteCommand(ctx, "arch")
arch, archStderr, err := c.ExecuteCommand(ctx, "arch")
if err != nil { if err != nil {
return nil, err return nil, errors.Wrapf(err, "failed to get arch: %v, stderr: %q", err, string(archStderr))
}
if len(archStderr) > 0 {
return nil, errors.Errorf("failed to get arch, stderr: %q", string(archStderr))
} }
osVars[_const.VariableOSArchitecture] = string(bytes.TrimSpace(arch)) osVars[_const.VariableOSArchitecture] = string(bytes.TrimSpace(arch))

View File

@ -36,13 +36,13 @@ import (
) )
const ( const (
// Prometheus API default timeout // Default timeout for Prometheus API
defaultPrometheusTimeout = 10 * time.Second defaultPrometheusTimeout = 10 * time.Second
) )
var _ Connector = &PrometheusConnector{} var _ Connector = &PrometheusConnector{}
// PrometheusConnector implements Connector interface for Prometheus connections // PrometheusConnector implements the Connector interface for Prometheus connections
type PrometheusConnector struct { type PrometheusConnector struct {
url string url string
username string username string
@ -54,48 +54,47 @@ type PrometheusConnector struct {
connected bool connected bool
} }
// newPrometheusConnector creates a new PrometheusConnector // newPrometheusConnector creates a new PrometheusConnector instance
func newPrometheusConnector(vars map[string]any) *PrometheusConnector { func newPrometheusConnector(vars map[string]any) *PrometheusConnector {
pc := &PrometheusConnector{ pc := &PrometheusConnector{
headers: make(map[string]string), headers: make(map[string]string),
timeout: defaultPrometheusTimeout, timeout: defaultPrometheusTimeout,
} }
// 修正变量名以避免导入遮蔽 // Retrieve Prometheus URL
promURL, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorURL) promURL, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorURL)
if err != nil { if err != nil {
klog.V(4).InfoS("get connector host failed use current hostname", "error", err) klog.V(4).InfoS("Failed to get connector host, using current hostname", "error", err)
} }
pc.url = promURL pc.url = promURL
// Retrieve username
username, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorUserName) username, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorUserName)
if err != nil { if err != nil {
klog.V(4).InfoS("get connector username failed use current username", "error", err) klog.V(4).InfoS("Failed to get connector username, using current username", "error", err)
} }
pc.username = username pc.username = username
// Retrieve password
password, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPassword) password, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPassword)
if err != nil { if err != nil {
klog.V(4).InfoS("get connector password failed use current password", "error", err) klog.V(4).InfoS("Failed to get connector password, using current password", "error", err)
} }
pc.password = password pc.password = password
// Retrieve token
token, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorToken) token, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorToken)
if err != nil { if err != nil {
klog.V(4).InfoS("get connector token failed use current token", "error", err) klog.V(4).InfoS("Failed to get connector token, using current token", "error", err)
} }
pc.token = token pc.token = token
// Retrieve custom headers and timeout from connector variables
prometheusVars, ok := vars["connector"].(map[string]any) prometheusVars, ok := vars["connector"].(map[string]any)
if !ok { if !ok {
klog.V(4).InfoS("connector configuration is not a map") klog.V(4).InfoS("Connector configuration is not a map")
return nil return nil
} }
// Get custom headers from connector variables
if headers, ok := prometheusVars["headers"].(map[string]any); ok { if headers, ok := prometheusVars["headers"].(map[string]any); ok {
for k, v := range headers { for k, v := range headers {
if strVal, ok := v.(string); ok { if strVal, ok := v.(string); ok {
@ -103,8 +102,6 @@ func newPrometheusConnector(vars map[string]any) *PrometheusConnector {
} }
} }
} }
// Get timeout from connector variables
if timeoutStr, ok := prometheusVars["timeout"].(string); ok { if timeoutStr, ok := prometheusVars["timeout"].(string); ok {
if timeout, err := time.ParseDuration(timeoutStr); err == nil { if timeout, err := time.ParseDuration(timeoutStr); err == nil {
pc.timeout = timeout pc.timeout = timeout
@ -116,7 +113,7 @@ func newPrometheusConnector(vars map[string]any) *PrometheusConnector {
// Init initializes the Prometheus connection // Init initializes the Prometheus connection
func (pc *PrometheusConnector) Init(ctx context.Context) error { func (pc *PrometheusConnector) Init(ctx context.Context) error {
// Ensure URL is properly formatted // Ensure URL is provided
if pc.url == "" { if pc.url == "" {
return errors.New("prometheus URL is required") return errors.New("prometheus URL is required")
} }
@ -127,7 +124,7 @@ func (pc *PrometheusConnector) Init(ctx context.Context) error {
return errors.Wrapf(err, "invalid prometheus URL: %s", pc.url) return errors.Wrapf(err, "invalid prometheus URL: %s", pc.url)
} }
// If scheme is missing, default to http // Default to http if scheme is missing
if parsedURL.Scheme == "" { if parsedURL.Scheme == "" {
klog.V(4).InfoS("No scheme specified in Prometheus URL, defaulting to HTTP", "url", pc.url) klog.V(4).InfoS("No scheme specified in Prometheus URL, defaulting to HTTP", "url", pc.url)
parsedURL.Scheme = "http" parsedURL.Scheme = "http"
@ -160,7 +157,7 @@ func (pc *PrometheusConnector) Init(ctx context.Context) error {
return errors.Wrap(err, "failed to create request") return errors.Wrap(err, "failed to create request")
} }
// Add auth headers if provided // Add authentication headers if provided
pc.addAuthHeaders(req) pc.addAuthHeaders(req)
klog.V(4).InfoS("Testing connection to Prometheus server") klog.V(4).InfoS("Testing connection to Prometheus server")
@ -186,7 +183,7 @@ func (pc *PrometheusConnector) Init(ctx context.Context) error {
// Close closes the Prometheus connection // Close closes the Prometheus connection
func (pc *PrometheusConnector) Close(ctx context.Context) error { func (pc *PrometheusConnector) Close(ctx context.Context) error {
// HTTP client doesn't need explicit closing // HTTP client does not require explicit closing
pc.connected = false pc.connected = false
return nil return nil
} }
@ -201,26 +198,29 @@ func (pc *PrometheusConnector) FetchFile(ctx context.Context, src string, dst io
return errors.New("fetchFile operation is not supported for Prometheus connector") return errors.New("fetchFile operation is not supported for Prometheus connector")
} }
// ExecuteCommand executes a PromQL query // ExecuteCommand executes a PromQL query and returns both stdout and stderr
// For Prometheus connector, the command is interpreted as a PromQL query // For Prometheus connector, the command is interpreted as a PromQL query
func (pc *PrometheusConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) { // The returned []byte is the stdout, []byte is stderr (always nil), and error is the error if any
func (pc *PrometheusConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, []byte, error) {
if !pc.connected { if !pc.connected {
return nil, errors.New("prometheus connector is not initialized, call Init() first") // If not initialized, return error and nil stderr
return nil, nil, errors.New("prometheus connector is not initialized, call Init() first")
} }
// Parse the command // Parse the command into query parameters
queryParams := parseCommand(cmd) queryParams := parseCommand(cmd)
queryString := queryParams["query"] queryString := queryParams["query"]
if queryString == "" { if queryString == "" {
return nil, errors.New("query parameter is required for Prometheus queries") // If query is missing, return error and nil stderr
return nil, nil, errors.New("query parameter is required for Prometheus queries")
} }
klog.V(4).InfoS("Executing Prometheus query", "query", queryString) klog.V(4).InfoS("Executing Prometheus query", "query", queryString)
// Build query URL // Build the Prometheus query URL
apiURL, err := url.Parse(pc.url + "api/v1/query") apiURL, err := url.Parse(pc.url + "api/v1/query")
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to parse URL with base: %s", pc.url) return nil, nil, errors.Wrapf(err, "failed to parse URL with base: %s", pc.url)
} }
// Add query parameters // Add query parameters
@ -235,56 +235,58 @@ func (pc *PrometheusConnector) ExecuteCommand(ctx context.Context, cmd string) (
apiURL.RawQuery = params.Encode() apiURL.RawQuery = params.Encode()
// Create request // Create HTTP request
req, err := http.NewRequestWithContext(ctx, http.MethodGet, apiURL.String(), http.NoBody) req, err := http.NewRequestWithContext(ctx, http.MethodGet, apiURL.String(), http.NoBody)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to create HTTP request") return nil, nil, errors.Wrap(err, "failed to create HTTP request")
} }
// Add auth headers // Add authentication and custom headers
pc.addAuthHeaders(req) pc.addAuthHeaders(req)
// Execute request // Execute HTTP request
klog.V(4).InfoS("Sending request to Prometheus", "url", req.URL.String()) klog.V(4).InfoS("Sending request to Prometheus", "url", req.URL.String())
resp, err := pc.client.Do(req) resp, err := pc.client.Do(req)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to execute Prometheus query", "query", queryString) klog.ErrorS(err, "Failed to execute Prometheus query", "query", queryString)
return nil, errors.Wrap(err, "failed to execute prometheus query") return nil, nil, errors.Wrap(err, "failed to execute prometheus query")
} }
defer resp.Body.Close() defer resp.Body.Close()
// Read response // Read response body
bodyBytes, err := io.ReadAll(resp.Body) bodyBytes, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to read response body") klog.ErrorS(err, "Failed to read response body")
return nil, errors.Wrap(err, "failed to read response body") return nil, nil, errors.Wrap(err, "failed to read response body")
} }
// Check if response is successful // If HTTP status is not OK, return error and nil stderr
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
klog.ErrorS(err, "Prometheus query failed", klog.ErrorS(err, "Prometheus query failed",
"statusCode", resp.StatusCode, "statusCode", resp.StatusCode,
"response", string(bodyBytes), "response", string(bodyBytes),
"query", queryString) "query", queryString)
return nil, errors.Errorf("prometheus query failed with status %d: %s", resp.StatusCode, string(bodyBytes)) return nil, nil, errors.Errorf("prometheus query failed with status %d: %s", resp.StatusCode, string(bodyBytes))
} }
// Format the response based on the format parameter // Format the response based on the format parameter
format := queryParams["format"] format := queryParams["format"]
if format != "" { if format != "" {
klog.V(4).InfoS("Formatting response", "format", format) klog.V(4).InfoS("Formatting response", "format", format)
return pc.formatResponse(bodyBytes, format) stdout, ferr := pc.formatResponse(bodyBytes, format)
// Always return nil for stderr as per requirement
return stdout, nil, ferr
} }
// Default to prettified JSON // Default to prettified JSON
var prettyJSON bytes.Buffer var prettyJSON bytes.Buffer
if err := json.Indent(&prettyJSON, bodyBytes, "", " "); err != nil { if err := json.Indent(&prettyJSON, bodyBytes, "", " "); err != nil {
klog.V(4).InfoS("Failed to prettify JSON response, returning raw response") klog.V(4).InfoS("Failed to prettify JSON response, returning raw response")
// If prettifying fails, return the original response // If prettifying fails, return the original response and nil stderr
return bodyBytes, nil return bodyBytes, nil, nil
} }
klog.V(4).InfoS("Prometheus query executed successfully") klog.V(4).InfoS("Prometheus query executed successfully")
return prettyJSON.Bytes(), nil return prettyJSON.Bytes(), nil, nil
} }
// addAuthHeaders adds authentication headers to the request // addAuthHeaders adds authentication headers to the request
@ -376,7 +378,7 @@ func (pc *PrometheusConnector) formatResponse(bodyBytes []byte, format string) (
// extractSimpleValue attempts to extract a simple value from the Prometheus response // extractSimpleValue attempts to extract a simple value from the Prometheus response
func (pc *PrometheusConnector) extractSimpleValue(response map[string]any) ([]byte, error) { func (pc *PrometheusConnector) extractSimpleValue(response map[string]any) ([]byte, error) {
// 验证响应格式 // Validate response format
if err := validatePrometheusResponse(response); err != nil { if err := validatePrometheusResponse(response); err != nil {
return nil, err return nil, err
} }
@ -396,7 +398,7 @@ func (pc *PrometheusConnector) extractSimpleValue(response map[string]any) ([]by
return nil, errors.New("invalid response format: result field missing") return nil, errors.New("invalid response format: result field missing")
} }
// 根据不同的结果类型处理 // Handle different result types
switch resultType { switch resultType {
case "vector": case "vector":
return extractVectorValue(result) return extractVectorValue(result)
@ -411,7 +413,7 @@ func (pc *PrometheusConnector) extractSimpleValue(response map[string]any) ([]by
} }
} }
// validatePrometheusResponse 验证Prometheus响应的基本结构 // validatePrometheusResponse validates the basic structure of a Prometheus response
func validatePrometheusResponse(response map[string]any) error { func validatePrometheusResponse(response map[string]any) error {
if status, ok := response["status"].(string); !ok || status != "success" { if status, ok := response["status"].(string); !ok || status != "success" {
return errors.New("prometheus query failed") return errors.New("prometheus query failed")
@ -419,7 +421,7 @@ func validatePrometheusResponse(response map[string]any) error {
return nil return nil
} }
// extractVectorValue 从向量结果中提取值 // extractVectorValue extracts value from a vector result
func extractVectorValue(result any) ([]byte, error) { func extractVectorValue(result any) ([]byte, error) {
samples, ok := result.([]any) samples, ok := result.([]any)
if !ok || len(samples) == 0 { if !ok || len(samples) == 0 {
@ -439,7 +441,7 @@ func extractVectorValue(result any) ([]byte, error) {
return []byte(fmt.Sprintf("%v", value[1])), nil return []byte(fmt.Sprintf("%v", value[1])), nil
} }
// extractScalarValue 从标量结果中提取值 // extractScalarValue extracts value from a scalar result
func extractScalarValue(result any) ([]byte, error) { func extractScalarValue(result any) ([]byte, error) {
value, ok := result.([]any) value, ok := result.([]any)
if !ok || len(value) < 2 { if !ok || len(value) < 2 {
@ -449,7 +451,7 @@ func extractScalarValue(result any) ([]byte, error) {
return []byte(fmt.Sprintf("%v", value[1])), nil return []byte(fmt.Sprintf("%v", value[1])), nil
} }
// extractStringValue 从字符串结果中提取值 // extractStringValue extracts value from a string result
func extractStringValue(result any) ([]byte, error) { func extractStringValue(result any) ([]byte, error) {
value, ok := result.([]any) value, ok := result.([]any)
if !ok || len(value) < 2 { if !ok || len(value) < 2 {
@ -459,7 +461,7 @@ func extractStringValue(result any) ([]byte, error) {
return []byte(fmt.Sprintf("%v", value[1])), nil return []byte(fmt.Sprintf("%v", value[1])), nil
} }
// extractMatrixValue 从矩阵结果中提取值 // extractMatrixValue extracts value from a matrix result
func extractMatrixValue(result any) ([]byte, error) { func extractMatrixValue(result any) ([]byte, error) {
matrixData, err := json.MarshalIndent(result, "", " ") matrixData, err := json.MarshalIndent(result, "", " ")
if err != nil { if err != nil {
@ -468,9 +470,9 @@ func extractMatrixValue(result any) ([]byte, error) {
return matrixData, nil return matrixData, nil
} }
// formatAsTable 重构以降低认知复杂度 // formatAsTable formats the response as a table to reduce cognitive complexity
func (pc *PrometheusConnector) formatAsTable(response map[string]any) ([]byte, error) { func (pc *PrometheusConnector) formatAsTable(response map[string]any) ([]byte, error) {
// 验证响应格式并获取结果集 // Validate response and get result set
result, err := getValidVectorResult(response) result, err := getValidVectorResult(response)
if err != nil { if err != nil {
return nil, err return nil, err
@ -480,11 +482,11 @@ func (pc *PrometheusConnector) formatAsTable(response map[string]any) ([]byte, e
return []byte("No data"), nil return []byte("No data"), nil
} }
// 构建表格 // Build table from result set
return buildTableFromResult(result) return buildTableFromResult(result)
} }
// getValidVectorResult 验证响应并获取vector类型的结果集 // getValidVectorResult validates the response and gets the vector result set
func getValidVectorResult(response map[string]any) ([]any, error) { func getValidVectorResult(response map[string]any) ([]any, error) {
if status, ok := response["status"].(string); !ok || status != "success" { if status, ok := response["status"].(string); !ok || status != "success" {
return nil, errors.New("prometheus query failed") return nil, errors.New("prometheus query failed")
@ -512,26 +514,26 @@ func getValidVectorResult(response map[string]any) ([]any, error) {
return result, nil return result, nil
} }
// buildTableFromResult 从结果集构建表格 // buildTableFromResult builds a table from the result set
func buildTableFromResult(result []any) ([]byte, error) { func buildTableFromResult(result []any) ([]byte, error) {
var builder strings.Builder var builder strings.Builder
// 表格标题 // Table header
if _, err := builder.WriteString("METRIC\tVALUE\tTIMESTAMP\n"); err != nil { if _, err := builder.WriteString("METRIC\tVALUE\tTIMESTAMP\n"); err != nil {
return nil, err return nil, err
} }
// 表格行 // Table rows
for _, item := range result { for _, item := range result {
sample, ok := item.(map[string]any) sample, ok := item.(map[string]any)
if !ok { if !ok {
continue continue
} }
// 获取指标名称 // Get metric name
metric := getMetricName(sample) metric := getMetricName(sample)
// 添加值和时间戳 // Add value and timestamp
if err := addValueAndTimestamp(&builder, sample, metric); err != nil { if err := addValueAndTimestamp(&builder, sample, metric); err != nil {
return nil, err return nil, err
} }
@ -540,7 +542,7 @@ func buildTableFromResult(result []any) ([]byte, error) {
return []byte(builder.String()), nil return []byte(builder.String()), nil
} }
// getMetricName 提取指标名称 // getMetricName extracts the metric name
func getMetricName(sample map[string]any) string { func getMetricName(sample map[string]any) string {
metric := "undefined" metric := "undefined"
m, ok := sample["metric"].(map[string]any) m, ok := sample["metric"].(map[string]any)
@ -548,7 +550,7 @@ func getMetricName(sample map[string]any) string {
return metric return metric
} }
// 提取指标名称 // Extract metric name and labels
parts := []string{} parts := []string{}
for k, v := range m { for k, v := range m {
if k == "__name__" { if k == "__name__" {
@ -558,7 +560,7 @@ func getMetricName(sample map[string]any) string {
} }
} }
// 如果有标签,在指标名称中包含它们 // If there are labels, include them in the metric name
if len(parts) > 0 { if len(parts) > 0 {
metric = fmt.Sprintf("%s{%s}", metric, strings.Join(parts, ", ")) metric = fmt.Sprintf("%s{%s}", metric, strings.Join(parts, ", "))
} }
@ -566,11 +568,11 @@ func getMetricName(sample map[string]any) string {
return metric return metric
} }
// addValueAndTimestamp 添加值和时间戳到表格行 // addValueAndTimestamp adds value and timestamp to a table row
func addValueAndTimestamp(builder *strings.Builder, sample map[string]any, metric string) error { func addValueAndTimestamp(builder *strings.Builder, sample map[string]any, metric string) error {
value, ok := sample["value"].([]any) value, ok := sample["value"].([]any)
if !ok || len(value) < 2 { if !ok || len(value) < 2 {
return nil // 跳过无效数据 return nil // Skip invalid data
} }
timestamp := "" timestamp := ""
@ -606,7 +608,7 @@ func (pc *PrometheusConnector) GetServerInfo(ctx context.Context) (map[string]an
return nil, errors.Wrap(err, "failed to create request for server info") return nil, errors.Wrap(err, "failed to create request for server info")
} }
// Add auth headers // Add authentication headers
pc.addAuthHeaders(req) pc.addAuthHeaders(req)
// Execute request // Execute request

View File

@ -244,40 +244,40 @@ func (c *sshConnector) FetchFile(_ context.Context, src string, dst io.Writer) e
} }
// ExecuteCommand in remote host // ExecuteCommand in remote host
func (c *sshConnector) ExecuteCommand(_ context.Context, cmd string) ([]byte, error) { func (c *sshConnector) ExecuteCommand(_ context.Context, cmd string) ([]byte, []byte, error) {
cmd = fmt.Sprintf("sudo -SE %s << 'KUBEKEY_EOF'\n%s\nKUBEKEY_EOF\n", c.shell, cmd) cmd = fmt.Sprintf("sudo -SE %s << 'KUBEKEY_EOF'\n%s\nKUBEKEY_EOF\n", c.shell, cmd)
klog.V(5).InfoS("exec ssh command", "cmd", cmd, "host", c.Host) klog.V(5).InfoS("exec ssh command", "cmd", cmd, "host", c.Host)
// create ssh session // create ssh session
session, err := c.client.NewSession() session, err := c.client.NewSession()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to create ssh session") return nil, nil, errors.Wrap(err, "failed to create ssh session")
} }
defer session.Close() defer session.Close()
// get pipe from session // get pipe from session
stdin, err := session.StdinPipe() stdin, err := session.StdinPipe()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to get stdin pipe") return nil, nil, errors.Wrap(err, "failed to get stdin pipe")
} }
stdout, err := session.StdoutPipe() stdout, err := session.StdoutPipe()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to get stdout pipe") return nil, nil, errors.Wrap(err, "failed to get stdout pipe")
} }
stderr, err := session.StderrPipe() stderr, err := session.StderrPipe()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to get stderr pipe") return nil, nil, errors.Wrap(err, "failed to get stderr pipe")
} }
// Start the remote command // Start the remote command
if err := session.Start(cmd); err != nil { if err := session.Start(cmd); err != nil {
return nil, errors.Wrap(err, "failed to start session") return nil, nil, errors.Wrap(err, "failed to start session")
} }
if c.Password != "" { if c.Password != "" {
if _, err := stdin.Write([]byte(c.Password + "\n")); err != nil { if _, err := stdin.Write([]byte(c.Password + "\n")); err != nil {
return nil, errors.Wrap(err, "failed to write password") return nil, nil, errors.Wrap(err, "failed to write password")
} }
} }
if err := stdin.Close(); err != nil { if err := stdin.Close(); err != nil {
return nil, errors.Wrap(err, "failed to close stdin pipe") return nil, nil, errors.Wrap(err, "failed to close stdin pipe")
} }
// Create buffers to store stdout and stderr output // Create buffers to store stdout and stderr output
@ -308,9 +308,7 @@ func (c *sshConnector) ExecuteCommand(_ context.Context, cmd string) ([]byte, er
<-stdoutDone <-stdoutDone
<-stderrDone <-stderrDone
output := append(stdoutBuf.Bytes(), stderrBuf.Bytes()...) return stdoutBuf.Bytes(), stderrBuf.Bytes(), errors.Wrap(err, "failed to execute ssh command")
return output, errors.Wrap(err, "failed to execute ssh command")
} }
// HostInfo from gatherFacts cache // HostInfo from gatherFacts cache
@ -327,19 +325,30 @@ func (c *sshConnector) getHostInfo(ctx context.Context) (map[string]any, error)
return nil, err return nil, err
} }
osVars[_const.VariableOSRelease] = convertBytesToMap(osRelease.Bytes(), "=") osVars[_const.VariableOSRelease] = convertBytesToMap(osRelease.Bytes(), "=")
kernel, err := c.ExecuteCommand(ctx, "uname -r") kernel, kernelStderr, err := c.ExecuteCommand(ctx, "uname -r")
if err != nil { if err != nil {
return nil, err return nil, errors.Wrapf(err, "failed to get kernel: %v, stderr: %q", err, string(kernelStderr))
}
if len(kernelStderr) > 0 {
return nil, errors.Errorf("failed to get kernel, stderr: %q", string(kernelStderr))
} }
osVars[_const.VariableOSKernelVersion] = string(bytes.TrimSpace(kernel)) osVars[_const.VariableOSKernelVersion] = string(bytes.TrimSpace(kernel))
hn, err := c.ExecuteCommand(ctx, "hostname")
hn, hnStderr, err := c.ExecuteCommand(ctx, "hostname")
if err != nil { if err != nil {
return nil, err return nil, errors.Wrapf(err, "failed to get hostname: %v, stderr: %q", err, string(hnStderr))
}
if len(hnStderr) > 0 {
return nil, errors.Errorf("failed to get hostname, stderr: %q", string(hnStderr))
} }
osVars[_const.VariableOSHostName] = string(bytes.TrimSpace(hn)) osVars[_const.VariableOSHostName] = string(bytes.TrimSpace(hn))
arch, err := c.ExecuteCommand(ctx, "arch")
arch, archStderr, err := c.ExecuteCommand(ctx, "arch")
if err != nil { if err != nil {
return nil, err return nil, errors.Wrapf(err, "failed to get arch: %v, stderr: %q", err, string(archStderr))
}
if len(archStderr) > 0 {
return nil, errors.Errorf("failed to get arch, stderr: %q", string(archStderr))
} }
osVars[_const.VariableOSArchitecture] = string(bytes.TrimSpace(arch)) osVars[_const.VariableOSArchitecture] = string(bytes.TrimSpace(arch))

View File

@ -9,6 +9,10 @@ import (
"strings" "strings"
) )
// ===========================================================================
// ============================= ParseIP =================================
// ===========================================================================
// ParseIP parses a CIDR, an IP range string (e.g., "xxx-xxx"), or a single IP into a slice of actual IPs. // ParseIP parses a CIDR, an IP range string (e.g., "xxx-xxx"), or a single IP into a slice of actual IPs.
// Supports both IPv4 and IPv6. // Supports both IPv4 and IPv6.
func ParseIP(ip string) []string { func ParseIP(ip string) []string {
@ -211,3 +215,47 @@ func networkRange(network *net.IPNet) (net.IP, net.IP) {
} }
return startIP, endIP return startIP, endIP
} }
// ===========================================================================
// ============================= IsLocalhostIP ===========================
// ===========================================================================
// IsLocalhostIP checks if the given IP address string (ipStr) is bound to any local network interface.
// It returns true if the IP is found on any interface, false otherwise.
// This function parses the input string as an IP address, iterates over all network interfaces on the host,
// and checks if any of the interface addresses match the target IP.
func IsLocalhostIP(ipStr string) bool {
targetIP := net.ParseIP(ipStr)
if targetIP == nil {
// The input string is not a valid IP address.
return false
}
ifaces, err := net.Interfaces()
if err != nil {
// Failed to retrieve network interfaces.
return false
}
for _, iface := range ifaces {
addrs, err := iface.Addrs()
if err != nil {
// Skip this interface if its addresses cannot be retrieved.
continue
}
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
// Check if the IP address of this interface matches the target IP.
if v.IP.Equal(targetIP) {
return true
}
case *net.IPAddr:
// Check if the IP address of this interface matches the target IP.
if v.IP.Equal(targetIP) {
return true
}
}
}
}
// The target IP was not found on any local network interface.
return false
}

View File

@ -58,7 +58,13 @@ func (e *taskExecutor) Exec(ctx context.Context) error {
} }
// exit when task run failed // exit when task run failed
if e.task.IsFailed() { if e.task.IsFailed() {
return errors.Errorf("task %q run failed", e.task.Spec.Name) failedMsg := "\n"
for _, result := range e.task.Status.HostResults {
if result.StdErr != "" {
failedMsg += fmt.Sprintf("[%s]: %s\n", result.Host, result.StdErr)
}
}
return errors.Errorf("task [%s](%s) run failed: %s", e.task.Spec.Name, ctrlclient.ObjectKeyFromObject(e.task), failedMsg)
} }
return nil return nil

View File

@ -19,7 +19,6 @@ package modules
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -81,16 +80,11 @@ func ModuleCommand(ctx context.Context, options ExecOptions) (string, string) {
return "", err.Error() return "", err.Error()
} }
// execute command // execute command
var stdout, stderr string stdout, stderr, err := conn.ExecuteCommand(ctx, string(command))
data, err := conn.ExecuteCommand(ctx, string(command))
if err != nil { if err != nil {
stderr = err.Error() return "", err.Error()
} }
if data != nil { return string(stdout), string(stderr)
stdout = strings.TrimSpace(string(data))
}
return stdout, stderr
} }
func init() { func init() {

View File

@ -30,7 +30,7 @@ import (
"github.com/kubesphere/kubekey/v4/pkg/variable/source" "github.com/kubesphere/kubekey/v4/pkg/variable/source"
) )
var successConnector = &testConnector{output: []byte("success")} var successConnector = &testConnector{stdout: []byte("success")}
var failedConnector = &testConnector{ var failedConnector = &testConnector{
copyErr: errors.New("failed"), copyErr: errors.New("failed"),
fetchErr: errors.New("failed"), fetchErr: errors.New("failed"),
@ -49,7 +49,8 @@ type testConnector struct {
// return for fetch // return for fetch
fetchErr error fetchErr error
// return for command // return for command
output []byte stdout []byte
stderr []byte
commandErr error commandErr error
} }
@ -69,8 +70,8 @@ func (t testConnector) FetchFile(context.Context, string, io.Writer) error {
return t.fetchErr return t.fetchErr
} }
func (t testConnector) ExecuteCommand(context.Context, string) ([]byte, error) { func (t testConnector) ExecuteCommand(context.Context, string) ([]byte, []byte, error) {
return t.output, t.commandErr return t.stdout, t.stderr, t.commandErr
} }
// newTestVariable creates a new variable.Variable for testing purposes. // newTestVariable creates a new variable.Variable for testing purposes.

View File

@ -172,7 +172,7 @@ func ModulePrometheus(ctx context.Context, options ExecOptions) (string, string)
} }
// Execute query // Execute query
result, err := conn.ExecuteCommand(ctx, string(cmdBytes)) result, _, err := conn.ExecuteCommand(ctx, string(cmdBytes))
if err != nil { if err != nil {
return "", fmt.Sprintf("failed to execute prometheus query: %v", err) return "", fmt.Sprintf("failed to execute prometheus query: %v", err)
} }

View File

@ -41,6 +41,7 @@ var SUCCESS = Result{Message: ResultSucceed}
// The Message field is typically used to convey error or success information. // The Message field is typically used to convey error or success information.
type Result struct { type Result struct {
Message string `description:"error message" json:"message"` // Message provides details about the result or error. Message string `description:"error message" json:"message"` // Message provides details about the result or error.
Result any `json:"result"`
} }
// ListResult is a generic struct representing a paginated list response. // ListResult is a generic struct representing a paginated list response.
@ -54,18 +55,18 @@ type ListResult[T any] struct {
// InventoryHostTable represents a host entry in an inventory with its configuration details. // InventoryHostTable represents a host entry in an inventory with its configuration details.
// It includes network information, SSH credentials, group membership, and architecture. // It includes network information, SSH credentials, group membership, and architecture.
type InventoryHostTable struct { type InventoryHostTable struct {
Name string `json:"name"` // Hostname of the inventory host Name string `json:"name"` // Hostname of the inventory host
Status string `json:"status"` // Current status of the host Status string `json:"status"` // Current status of the host
InternalIPV4 string `json:"internalIPV4"` // IPv4 address of the host InternalIPV4 string `json:"internalIPV4"` // IPv4 address of the host
InternalIPV6 string `json:"internalIPV6"` // IPv6 address of the host InternalIPV6 string `json:"internalIPV6"` // IPv6 address of the host
SSHHost string `json:"sshHost"` // SSH hostname for connection SSHHost string `json:"sshHost"` // SSH hostname for connection
SSHPort string `json:"sshPort"` // SSH port for connection SSHPort string `json:"sshPort"` // SSH port for connection
SSHUser string `json:"sshUser"` // SSH username for authentication SSHUser string `json:"sshUser"` // SSH username for authentication
SSHPassword string `json:"sshPassword"` // SSH password for authentication SSHPassword string `json:"sshPassword"` // SSH password for authentication
SSHPrivateKey string `json:"sshPrivateKey"` // SSH private key for authentication SSHPrivateKeyContent string `json:"sshPrivateKeyContent"` // SSH private key content for authentication
Vars map[string]any `json:"vars"` // Additional host variables Vars map[string]any `json:"vars"` // Additional host variables
Groups []InventoryHostGroups `json:"groups"` // Groups the host belongs to Groups []InventoryHostGroups `json:"groups"` // Groups the host belongs to
Arch string `json:"arch"` // Architecture of the host Arch string `json:"arch"` // Architecture of the host
} }
// InventoryHostGroups represents the group information for a host in the inventory. // InventoryHostGroups represents the group information for a host in the inventory.

View File

@ -1,763 +0,0 @@
package web
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"reflect"
"slices"
"strings"
"sync"
"time"
"github.com/cockroachdb/errors"
restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3"
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
kkcorev1alpha1 "github.com/kubesphere/kubekey/api/core/v1alpha1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/executor"
"github.com/kubesphere/kubekey/v4/pkg/variable"
"github.com/kubesphere/kubekey/v4/pkg/web/api"
"github.com/kubesphere/kubekey/v4/pkg/web/query"
)
// NewCoreService creates and configures a new RESTful web service for managing inventories and playbooks.
// It sets up routes for CRUD operations on inventories and playbooks, including pagination, sorting, and filtering.
func NewCoreService(workdir string, client ctrlclient.Client, config *rest.Config) *restful.WebService {
ws := new(restful.WebService).
// the GroupVersion might be empty, we need to remove the final /
Path(strings.TrimRight(_const.CoreAPIPath+kkcorev1.SchemeGroupVersion.String(), "/"))
h := newCoreHandler(workdir, client, config)
// Inventory management routes
ws.Route(ws.POST("/inventories").To(h.createInventory).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("create a inventory.").
Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON).
Reads(kkcorev1.Inventory{}).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.PATCH("/namespaces/{namespace}/inventories/{inventory}").To(h.patchInventory).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("patch a inventory.").
Consumes(string(types.JSONPatchType), string(types.MergePatchType), string(types.ApplyPatchType)).Produces(restful.MIME_JSON).
Reads(kkcorev1.Inventory{}).
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.GET("/inventories").To(h.listInventories).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all inventories.").
Produces(restful.MIME_JSON).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories").To(h.listInventories).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all inventories in a namespace.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}").To(h.inventoryInfo).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get a inventory in a namespace.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}/hosts").To(h.listInventoryHosts).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all hosts in a inventory.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.InventoryHostTable]{}))
// Playbook management routes
ws.Route(ws.POST("/playbooks").To(h.createPlaybook).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("create a playbook.").
Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON).
Reads(kkcorev1.Playbook{}).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{}))
ws.Route(ws.GET("/playbooks").To(h.listPlaybooks).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all playbooks.").
Produces(restful.MIME_JSON).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks").To(h.listPlaybooks).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all playbooks in a namespace.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}").To(h.playbookInfo).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get or watch a playbook in a namespace.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Param(ws.QueryParameter("watch", "set to true to watch this playbook")).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}/log").To(h.logPlaybook).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get a playbook execute log.").
Produces("text/plain").
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Returns(http.StatusOK, _const.StatusOK, ""))
ws.Route(ws.DELETE("/namespaces/{namespace}/playbooks/{playbook}").To(h.deletePlaybook).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("delete a playbook.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Returns(http.StatusOK, _const.StatusOK, api.Result{}))
return ws
}
// newInventoryHandler creates a new handler instance with the given workdir, client and config
// workdir: Base directory for storing work files
// client: Kubernetes client for API operations
// config: Kubernetes REST client configuration
func newCoreHandler(workdir string, client ctrlclient.Client, config *rest.Config) *coreHandler {
// Create a new coreHandler with initialized playbookManager
return &coreHandler{workdir: workdir, client: client, restconfig: config, playbookManager: playbookManager{manager: make(map[string]context.CancelFunc)}}
}
// playbookManager is responsible for managing playbook execution contexts and their cancellation.
// It uses a mutex to ensure thread-safe access to the manager map.
type playbookManager struct {
sync.Mutex
manager map[string]context.CancelFunc // Map of playbook key to its cancel function
}
// addPlaybook adds a playbook and its cancel function to the manager map.
func (m *playbookManager) addPlaybook(playbook *kkcorev1.Playbook, cancel context.CancelFunc) {
m.Lock()
defer m.Unlock()
m.manager[ctrlclient.ObjectKeyFromObject(playbook).String()] = cancel
}
// deletePlaybook removes a playbook from the manager map.
func (m *playbookManager) deletePlaybook(playbook *kkcorev1.Playbook) {
m.Lock()
defer m.Unlock()
delete(m.manager, ctrlclient.ObjectKeyFromObject(playbook).String())
}
// stopPlaybook cancels the context for a running playbook, if it exists.
func (m *playbookManager) stopPlaybook(playbook *kkcorev1.Playbook) {
// Attempt to cancel the playbook's context if it exists in the manager
if cancel, ok := m.manager[ctrlclient.ObjectKeyFromObject(playbook).String()]; ok {
cancel()
}
}
// coreHandler implements HTTP handlers for managing inventories and playbooks
// It provides methods for CRUD operations on inventories and playbooks
type coreHandler struct {
workdir string // Base directory for storing work files
restconfig *rest.Config // Kubernetes REST client configuration
client ctrlclient.Client // Kubernetes client for API operations
// playbookManager control to cancel playbook
playbookManager playbookManager
}
// createInventory creates a new inventory resource
// It reads the inventory from the request body and creates it in the Kubernetes cluster
func (h *coreHandler) createInventory(request *restful.Request, response *restful.Response) {
inventory := &kkcorev1.Inventory{}
if err := request.ReadEntity(inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
if err := h.client.Create(request.Request.Context(), inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// patchInventory patches an existing inventory resource
// It reads the patch data from the request body and applies it to the specified inventory
func (h *coreHandler) patchInventory(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
data, err := io.ReadAll(request.Request.Body)
if err != nil {
api.HandleBadRequest(response, request, err)
return
}
patchType := request.HeaderParameter("Content-Type")
// get old inventory
inventory := &kkcorev1.Inventory{}
if err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
if err := h.client.Patch(request.Request.Context(), inventory, ctrlclient.RawPatch(types.PatchType(patchType), data)); err != nil {
api.HandleBadRequest(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// listInventories lists all inventory resources with optional filtering and sorting
// It supports field selectors and label selectors for filtering the results
func (h *coreHandler) listInventories(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
var fieldselector fields.Selector
if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok {
fs, err := fields.ParseSelector(string(v))
if err != nil {
api.HandleError(response, request, err)
return
}
fieldselector = fs
}
namespace := request.PathParameter("namespace")
inventoryList := &kkcorev1.InventoryList{}
err := h.client.List(request.Request.Context(), inventoryList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector})
if err != nil {
api.HandleError(response, request, err)
return
}
// Sort and filter the inventory list using DefaultList
results := query.DefaultList(inventoryList.Items, queryParam, func(left, right kkcorev1.Inventory, sortBy query.Field) bool {
leftMeta, err := meta.Accessor(left)
if err != nil {
return false
}
rightMeta, err := meta.Accessor(right)
if err != nil {
return false
}
return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy)
}, func(o kkcorev1.Inventory, filter query.Filter) bool {
// skip fieldselector
if filter.Field == query.ParameterFieldSelector {
return true
}
objectMeta, err := meta.Accessor(o)
if err != nil {
return false
}
return query.DefaultObjectMetaFilter(objectMeta, filter)
})
_ = response.WriteEntity(results)
}
// inventoryInfo retrieves a specific inventory resource
// It returns the inventory with the specified name in the given namespace
func (h *coreHandler) inventoryInfo(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
inventory := &kkcorev1.Inventory{}
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory)
if err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// listInventoryHosts lists all hosts in an inventory with their details
// It includes information about SSH configuration, IP addresses, and group membership
func (h *coreHandler) listInventoryHosts(request *restful.Request, response *restful.Response) {
// Parse query parameters from the request
queryParam := query.ParseQueryParameter(request)
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
// Retrieve the inventory object from the cluster
inventory := &kkcorev1.Inventory{}
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory)
if err != nil {
api.HandleError(response, request, err)
return
}
// get host-check playbook if annotation exists
playbook := &kkcorev1.Playbook{}
if playbookName, ok := inventory.Annotations[kkcorev1.HostCheckPlaybookAnnotation]; ok {
if err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Name: playbookName, Namespace: inventory.Namespace}, playbook); err != nil {
klog.Warningf("cannot found host-check playbook for inventory %q", ctrlclient.ObjectKeyFromObject(inventory))
}
}
// buildHostItem constructs an InventoryHostTable from the hostname and raw extension
buildHostItem := func(hostname string, raw runtime.RawExtension) api.InventoryHostTable {
// Convert the raw extension to a map of variables
vars := variable.Extension2Variables(raw)
// Extract relevant fields from the variables
internalIPV4, _ := variable.StringVar(nil, vars, _const.VariableIPv4)
internalIPV6, _ := variable.StringVar(nil, vars, _const.VariableIPv6)
sshHost, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorHost)
sshPort, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPort)
sshUser, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorUser)
sshPassword, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPassword)
sshPrivateKey, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPrivateKey)
// Remove sensitive or redundant variables from the vars map
delete(vars, _const.VariableIPv4)
delete(vars, _const.VariableIPv6)
delete(vars, _const.VariableConnector)
// Return the constructed InventoryHostTable
return api.InventoryHostTable{
Name: hostname,
InternalIPV4: internalIPV4,
InternalIPV6: internalIPV6,
SSHHost: sshHost,
SSHPort: sshPort,
SSHUser: sshUser,
SSHPassword: sshPassword,
SSHPrivateKey: sshPrivateKey,
Vars: vars,
Groups: []api.InventoryHostGroups{},
}
}
// Convert inventory groups for host membership lookup
groups := variable.ConvertGroup(*inventory)
// Helper to check if a host is in a group and get its index
getGroupIndex := func(groupName, hostName string) int {
for i, h := range inventory.Spec.Groups[groupName].Hosts {
if h == hostName {
return i
}
}
return -1
}
// fillGroups adds group names to the InventoryHostTable item if the host is a member
fillGroups := func(item *api.InventoryHostTable) {
for groupName, hosts := range groups {
// Skip special groups
if groupName == _const.VariableGroupsAll || groupName == _const.VariableUnGrouped || groupName == "k8s_cluster" {
continue
}
// If the host is in the group, add the group info to the item
if slices.Contains(hosts, item.Name) {
g := api.InventoryHostGroups{
Role: groupName,
Index: getGroupIndex(groupName, item.Name),
}
item.Groups = append(item.Groups, g)
}
}
}
// fillByPlaybook populates status and architecture info for the host from task results
fillByPlaybook := func(playbook kkcorev1.Playbook, item *api.InventoryHostTable) {
// Set status and architecture based on playbook phase and result
switch playbook.Status.Phase {
case kkcorev1.PlaybookPhaseFailed:
item.Status = api.ResultFailed
case kkcorev1.PlaybookPhaseSucceeded:
item.Status = api.ResultFailed
// Extract architecture info from playbook result
results := variable.Extension2Variables(playbook.Status.Result)
if arch, ok := results[item.Name].(string); ok && arch != "" {
item.Arch = arch
item.Status = api.ResultSucceed
}
}
}
// less is a comparison function for sorting InventoryHostTable items by a given field
less := func(left, right api.InventoryHostTable, sortBy query.Field) bool {
// Compare fields for sorting
leftVal := query.GetFieldByJSONTag(reflect.ValueOf(left), sortBy)
rightVal := query.GetFieldByJSONTag(reflect.ValueOf(right), sortBy)
switch leftVal.Kind() {
case reflect.String:
return leftVal.String() > rightVal.String()
case reflect.Int, reflect.Int64:
return leftVal.Int() > rightVal.Int()
default:
return left.Name > right.Name
}
}
// filter is a function to filter InventoryHostTable items based on query filters
filter := func(o api.InventoryHostTable, f query.Filter) bool {
// Filter by string fields, otherwise always true
val := query.GetFieldByJSONTag(reflect.ValueOf(o), f.Field)
switch val.Kind() {
case reflect.String:
return strings.Contains(val.String(), string(f.Value))
default:
return true
}
}
// Build the host table for the inventory
hostTable := make([]api.InventoryHostTable, 0, len(inventory.Spec.Hosts))
for hostname, raw := range inventory.Spec.Hosts {
// Build the host item from raw data
item := buildHostItem(hostname, raw)
// Fill in group membership
fillGroups(&item)
// Fill in playbook status and architecture
fillByPlaybook(*playbook, &item)
// Add the item to the host table
hostTable = append(hostTable, item)
}
// Sort and filter the host table, then write the result
results := query.DefaultList(hostTable, queryParam, less, filter)
_ = response.WriteEntity(results)
}
// createPlaybook handles the creation of a new playbook resource.
// It reads the playbook from the request, sets the workdir, creates the resource, and starts execution in a goroutine.
func (h *coreHandler) createPlaybook(request *restful.Request, response *restful.Response) {
playbook := &kkcorev1.Playbook{}
// Read the playbook entity from the request body
if err := request.ReadEntity(playbook); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// Check for schema label: only one allowed, must not be empty, and must be unique among playbooks
hasSchemaLabel := false
for labelKey, labelValue := range playbook.Labels {
// Only consider labels with the schema label suffix
if !strings.HasSuffix(labelKey, api.SchemaLabelSubfix) {
continue
}
// If a schema label was already found, this is a conflict
if hasSchemaLabel {
api.HandleConflict(response, request, errors.New("a playbook can only have one schema label. Please ensure only one schema label is set"))
return
}
// The schema label value must not be empty
if labelValue == "" {
api.HandleConflict(response, request, errors.New("the schema label value must not be empty. Please provide a valid schema label value"))
return
}
hasSchemaLabel = true
// Check if there is already a playbook with the same schema label
playbookList := &kkcorev1.PlaybookList{}
if err := h.client.List(request.Request.Context(), playbookList, ctrlclient.MatchingLabels{
labelKey: labelValue,
}); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// If any playbook with the same schema label exists, this is a conflict
if len(playbookList.Items) > 0 {
api.HandleConflict(response, request, errors.New("a playbook with the same schema label already exists. Please use a different schema label or remove the existing playbook"))
return
}
}
// Set the workdir in the playbook's spec config
if err := unstructured.SetNestedField(playbook.Spec.Config.Value(), h.workdir, _const.Workdir); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// Create the playbook resource in the cluster
if err := h.client.Create(request.Request.Context(), playbook); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// Start playbook execution in a separate goroutine
go func() {
// Build the log file path for the playbook execution
filename := filepath.Join(
_const.GetWorkdirFromConfig(playbook.Spec.Config),
_const.RuntimeDir,
kkcorev1.SchemeGroupVersion.Group,
kkcorev1.SchemeGroupVersion.Version,
"playbooks",
playbook.Namespace,
playbook.Name,
playbook.Name+".log",
)
// Ensure the directory for the log file exists
if _, err := os.Stat(filepath.Dir(filename)); err != nil {
if !os.IsNotExist(err) {
api.HandleBadRequest(response, request, err)
return
}
// If directory does not exist, create it
if err := os.MkdirAll(filepath.Dir(filename), os.ModePerm); err != nil {
api.HandleBadRequest(response, request, err)
return
}
}
// Open the log file for writing
file, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
klog.ErrorS(err, "failed to open file", "file", filename)
return
}
defer file.Close()
// Create a cancellable context for playbook execution
ctx, cancel := context.WithCancel(context.Background())
// Register the playbook and its cancel function in the playbookManager
h.playbookManager.addPlaybook(playbook, cancel)
// Execute the playbook and write output to the log file
if err := executor.NewPlaybookExecutor(ctx, h.client, playbook, file).Exec(ctx); err != nil {
klog.ErrorS(err, "failed to exec playbook", "playbook", playbook.Name)
}
// Remove the playbook from the playbookManager after execution
h.playbookManager.deletePlaybook(playbook)
}()
// For web UI: it does not run in Kubernetes, so execute playbook immediately.
_ = response.WriteEntity(playbook)
}
// listPlaybooks handles listing playbook resources with filtering and pagination.
// It supports field selectors and label selectors for filtering the results.
func (h *coreHandler) listPlaybooks(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
var fieldselector fields.Selector
// Parse field selector from query parameters if present.
if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok {
fs, err := fields.ParseSelector(string(v))
if err != nil {
api.HandleError(response, request, err)
return
}
fieldselector = fs
}
namespace := request.PathParameter("namespace")
playbookList := &kkcorev1.PlaybookList{}
// List playbooks from the Kubernetes API with the specified options.
err := h.client.List(request.Request.Context(), playbookList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector})
if err != nil {
api.HandleError(response, request, err)
return
}
// Sort and filter the playbook list using DefaultList.
results := query.DefaultList(playbookList.Items, queryParam, func(left, right kkcorev1.Playbook, sortBy query.Field) bool {
leftMeta, err := meta.Accessor(left)
if err != nil {
return false
}
rightMeta, err := meta.Accessor(right)
if err != nil {
return false
}
return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy)
}, func(o kkcorev1.Playbook, filter query.Filter) bool {
// Skip fieldselector filter.
if filter.Field == query.ParameterFieldSelector {
return true
}
objectMeta, err := meta.Accessor(o)
if err != nil {
return false
}
return query.DefaultObjectMetaFilter(objectMeta, filter)
})
_ = response.WriteEntity(results)
}
// playbookInfo handles retrieving a single playbook or watching for changes.
// If the "watch" query parameter is set to "true", it streams updates to the client.
func (h *coreHandler) playbookInfo(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
watch := request.QueryParameter("watch")
playbook := &kkcorev1.Playbook{}
if watch == "true" {
// Watch for changes to the playbook resource and stream events as JSON.
h.restconfig.GroupVersion = &kkcorev1.SchemeGroupVersion
client, err := rest.RESTClientFor(h.restconfig)
if err != nil {
api.HandleError(response, request, err)
return
}
watchInterface, err := client.Get().Namespace(namespace).Resource("playbooks").Name(name).Param("watch", "true").Watch(request.Request.Context())
if err != nil {
api.HandleError(response, request, err)
return
}
defer watchInterface.Stop()
response.AddHeader("Content-Type", "application/json")
flusher, ok := response.ResponseWriter.(http.Flusher)
if !ok {
http.Error(response.ResponseWriter, "Streaming unsupported", http.StatusInternalServerError)
return
}
encoder := json.NewEncoder(response.ResponseWriter)
// Stream each event object to the client as JSON.
for event := range watchInterface.ResultChan() {
if err := encoder.Encode(event.Object); err != nil {
break
}
flusher.Flush()
}
return
}
// Retrieve the playbook resource by namespace and name.
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(playbook)
}
// logPlaybook handles streaming the log file for a playbook.
// It opens the log file and streams its contents to the client, supporting live updates.
func (h *coreHandler) logPlaybook(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
playbook := &kkcorev1.Playbook{}
// Retrieve the playbook resource to get its config for log file path.
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
// Build the log file path for the playbook.
filename := filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log")
file, err := os.Open(filename)
if err != nil {
api.HandleError(response, request, err)
return
}
defer file.Close()
response.AddHeader("Content-Type", "text/plain; charset=utf-8")
writer := response.ResponseWriter
flusher, ok := writer.(http.Flusher)
if !ok {
http.Error(writer, "Streaming unsupported", http.StatusInternalServerError)
return
}
// Stream the log file line by line, waiting for new lines if at EOF.
reader := bufio.NewReader(file)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
// If EOF, wait for new log lines to be written.
time.Sleep(500 * time.Millisecond)
continue
}
break
}
fmt.Fprint(writer, line)
flusher.Flush()
}
}
// deletePlaybook handles deletion of a playbook resource and its associated tasks.
// It stops the playbook execution if running, deletes the playbook, and deletes all related tasks.
func (h *coreHandler) deletePlaybook(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
playbook := &kkcorev1.Playbook{}
// Retrieve the playbook resource to delete.
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
// Stop the playbook execution if it is running.
h.playbookManager.stopPlaybook(playbook)
// Delete the playbook resource.
if err := h.client.Delete(request.Request.Context(), playbook); err != nil {
api.HandleError(response, request, err)
return
}
// delete relative filepath: variable and log
_ = os.Remove(filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log"))
_ = os.RemoveAll(filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name))
// Delete all tasks owned by this playbook.
if err := h.client.DeleteAllOf(request.Request.Context(), &kkcorev1alpha1.Task{}, ctrlclient.InNamespace(playbook.Namespace), ctrlclient.MatchingFields{
"playbook.name": playbook.Name,
"playbook.uid": string(playbook.UID),
}); err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(api.SUCCESS)
}

View File

@ -0,0 +1,93 @@
package handler
import (
"context"
"os"
"path/filepath"
"sync"
"github.com/cockroachdb/errors"
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/executor"
)
var playbookManager = &manager{
manager: make(map[string]context.CancelFunc),
}
// playbookManager is responsible for managing playbook execution contexts and their cancellation.
// It uses a mutex to ensure thread-safe access to the manager map.
type manager struct {
sync.Mutex
manager map[string]context.CancelFunc // Map of playbook key to its cancel function
}
func (m *manager) executor(playbook *kkcorev1.Playbook, client ctrlclient.Client) error {
// Build the log file path for the playbook execution
filename := filepath.Join(
_const.GetWorkdirFromConfig(playbook.Spec.Config),
_const.RuntimeDir,
kkcorev1.SchemeGroupVersion.Group,
kkcorev1.SchemeGroupVersion.Version,
"playbooks",
playbook.Namespace,
playbook.Name,
playbook.Name+".log",
)
// Ensure the directory for the log file exists
if _, err := os.Stat(filepath.Dir(filename)); err != nil {
if !os.IsNotExist(err) {
return errors.Wrapf(err, "failed to stat playbook dir %q", filepath.Dir(filename))
}
// If directory does not exist, create it
if err := os.MkdirAll(filepath.Dir(filename), os.ModePerm); err != nil {
return errors.Wrapf(err, "failed to create playbook dir %q", filepath.Dir(filename))
}
}
// Open the log file for writing
file, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return errors.Wrapf(err, "failed to open log file", "file", filename)
}
defer file.Close()
// Create a cancellable context for playbook execution
ctx, cancel := context.WithCancel(context.Background())
// Register the playbook and its cancel function in the playbookManager
m.addPlaybook(playbook, cancel)
// Execute the playbook and write output to the log file
if err := executor.NewPlaybookExecutor(ctx, client, playbook, file).Exec(ctx); err != nil {
klog.ErrorS(err, "failed to exec playbook", "playbook", playbook.Name)
}
// Remove the playbook from the playbookManager after execution
m.deletePlaybook(playbook)
return nil
}
// addPlaybook adds a playbook and its cancel function to the manager map.
func (m *manager) addPlaybook(playbook *kkcorev1.Playbook, cancel context.CancelFunc) {
m.Lock()
defer m.Unlock()
m.manager[ctrlclient.ObjectKeyFromObject(playbook).String()] = cancel
}
// deletePlaybook removes a playbook from the manager map.
func (m *manager) deletePlaybook(playbook *kkcorev1.Playbook) {
m.Lock()
defer m.Unlock()
delete(m.manager, ctrlclient.ObjectKeyFromObject(playbook).String())
}
// stopPlaybook cancels the context for a running playbook, if it exists.
func (m *manager) stopPlaybook(playbook *kkcorev1.Playbook) {
// Attempt to cancel the playbook's context if it exists in the manager
if cancel, ok := m.manager[ctrlclient.ObjectKeyFromObject(playbook).String()]; ok {
cancel()
}
}

View File

@ -0,0 +1,298 @@
package handler
import (
"io"
"reflect"
"slices"
"strings"
"github.com/emicklei/go-restful/v3"
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/variable"
"github.com/kubesphere/kubekey/v4/pkg/web/api"
"github.com/kubesphere/kubekey/v4/pkg/web/query"
)
// InventoryHandler handles HTTP requests for inventory resources.
type InventoryHandler struct {
workdir string // Base directory for storing work files
restconfig *rest.Config // Kubernetes REST client configuration
client ctrlclient.Client // Kubernetes client for API operations
}
// NewInventoryHandler creates a new InventoryHandler instance.
func NewInventoryHandler(workdir string, restconfig *rest.Config, client ctrlclient.Client) *InventoryHandler {
return &InventoryHandler{workdir: workdir, restconfig: restconfig, client: client}
}
// Post creates a new inventory resource.
// It reads the inventory from the request body and creates it in the Kubernetes cluster.
func (h *InventoryHandler) Post(request *restful.Request, response *restful.Response) {
inventory := &kkcorev1.Inventory{}
if err := request.ReadEntity(inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
if err := h.client.Create(request.Request.Context(), inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// Patch updates an existing inventory resource.
// It reads the patch data from the request body and applies it to the specified inventory.
func (h *InventoryHandler) Patch(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
data, err := io.ReadAll(request.Request.Body)
if err != nil {
api.HandleBadRequest(response, request, err)
return
}
patchType := request.HeaderParameter("Content-Type")
// Get the existing inventory object.
inventory := &kkcorev1.Inventory{}
if err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// Apply the patch.
if err := h.client.Patch(request.Request.Context(), inventory, ctrlclient.RawPatch(types.PatchType(patchType), data)); err != nil {
api.HandleBadRequest(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// List returns all inventory resources with optional filtering and sorting.
// It supports field selectors and label selectors for filtering the results.
func (h *InventoryHandler) List(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
var fieldselector fields.Selector
// Parse field selector from query parameters if present.
if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok {
fs, err := fields.ParseSelector(string(v))
if err != nil {
api.HandleError(response, request, err)
return
}
fieldselector = fs
}
namespace := request.PathParameter("namespace")
inventoryList := &kkcorev1.InventoryList{}
// List inventory resources from the Kubernetes API.
err := h.client.List(request.Request.Context(), inventoryList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector})
if err != nil {
api.HandleError(response, request, err)
return
}
// Sort and filter the inventory list using DefaultList.
results := query.DefaultList(inventoryList.Items, queryParam, func(left, right kkcorev1.Inventory, sortBy query.Field) bool {
leftMeta, err := meta.Accessor(left)
if err != nil {
return false
}
rightMeta, err := meta.Accessor(right)
if err != nil {
return false
}
return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy)
}, func(o kkcorev1.Inventory, filter query.Filter) bool {
// Skip fieldselector filter.
if filter.Field == query.ParameterFieldSelector {
return true
}
objectMeta, err := meta.Accessor(o)
if err != nil {
return false
}
return query.DefaultObjectMetaFilter(objectMeta, filter)
})
_ = response.WriteEntity(results)
}
// Info retrieves a specific inventory resource.
// It returns the inventory with the specified name in the given namespace.
func (h *InventoryHandler) Info(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
inventory := &kkcorev1.Inventory{}
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory)
if err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// ListHosts lists all hosts in an inventory with their details.
// It includes information about SSH configuration, IP addresses, and group membership.
func (h *InventoryHandler) ListHosts(request *restful.Request, response *restful.Response) {
// Parse query parameters from the request.
queryParam := query.ParseQueryParameter(request)
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
// Retrieve the inventory object from the cluster.
inventory := &kkcorev1.Inventory{}
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory)
if err != nil {
api.HandleError(response, request, err)
return
}
// Get host-check playbook if annotation exists.
playbook := &kkcorev1.Playbook{}
if playbookName, ok := inventory.Annotations[kkcorev1.HostCheckPlaybookAnnotation]; ok {
if err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Name: playbookName, Namespace: inventory.Namespace}, playbook); err != nil {
klog.Warningf("cannot found host-check playbook for inventory %q", ctrlclient.ObjectKeyFromObject(inventory))
}
}
// buildHostItem constructs an InventoryHostTable from the hostname and raw extension.
buildHostItem := func(hostname string, raw runtime.RawExtension) api.InventoryHostTable {
// Convert the raw extension to a map of variables.
vars := variable.Extension2Variables(raw)
// Extract relevant fields from the variables.
internalIPV4, _ := variable.StringVar(nil, vars, _const.VariableIPv4)
internalIPV6, _ := variable.StringVar(nil, vars, _const.VariableIPv6)
sshHost, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorHost)
sshPort, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPort)
sshUser, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorUser)
sshPassword, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPassword)
sshPrivateKeyContent, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPrivateKeyContent)
// Remove sensitive or redundant variables from the vars map.
delete(vars, _const.VariableIPv4)
delete(vars, _const.VariableIPv6)
delete(vars, _const.VariableConnector)
// Return the constructed InventoryHostTable.
return api.InventoryHostTable{
Name: hostname,
InternalIPV4: internalIPV4,
InternalIPV6: internalIPV6,
SSHHost: sshHost,
SSHPort: sshPort,
SSHUser: sshUser,
SSHPassword: sshPassword,
SSHPrivateKeyContent: sshPrivateKeyContent,
Vars: vars,
Groups: []api.InventoryHostGroups{},
}
}
// Convert inventory groups for host membership lookup.
groups := variable.ConvertGroup(*inventory)
// getGroupIndex checks if a host is in a group and returns its index.
getGroupIndex := func(groupName, hostName string) int {
for i, h := range inventory.Spec.Groups[groupName].Hosts {
if h == hostName {
return i
}
}
return -1
}
// fillGroups adds group names to the InventoryHostTable item if the host is a member.
fillGroups := func(item *api.InventoryHostTable) {
for groupName, hosts := range groups {
// Skip special groups.
if groupName == _const.VariableGroupsAll || groupName == _const.VariableUnGrouped || groupName == "k8s_cluster" {
continue
}
// If the host is in the group, add the group info to the item.
if slices.Contains(hosts, item.Name) {
g := api.InventoryHostGroups{
Role: groupName,
Index: getGroupIndex(groupName, item.Name),
}
item.Groups = append(item.Groups, g)
}
}
}
// fillByPlaybook populates status and architecture info for the host from task results.
fillByPlaybook := func(playbook kkcorev1.Playbook, item *api.InventoryHostTable) {
// Set status and architecture based on playbook phase and result.
switch playbook.Status.Phase {
case kkcorev1.PlaybookPhaseFailed:
item.Status = api.ResultFailed
case kkcorev1.PlaybookPhaseSucceeded:
item.Status = api.ResultFailed
// Extract architecture info from playbook result.
results := variable.Extension2Variables(playbook.Status.Result)
if arch, ok := results[item.Name].(string); ok && arch != "" {
item.Arch = arch
item.Status = api.ResultSucceed
}
}
}
// less is a comparison function for sorting InventoryHostTable items by a given field.
less := func(left, right api.InventoryHostTable, sortBy query.Field) bool {
// Compare fields for sorting.
leftVal := query.GetFieldByJSONTag(reflect.ValueOf(left), sortBy)
rightVal := query.GetFieldByJSONTag(reflect.ValueOf(right), sortBy)
switch leftVal.Kind() {
case reflect.String:
return leftVal.String() > rightVal.String()
case reflect.Int, reflect.Int64:
return leftVal.Int() > rightVal.Int()
default:
return left.Name > right.Name
}
}
// filter is a function to filter InventoryHostTable items based on query filters.
filter := func(o api.InventoryHostTable, f query.Filter) bool {
// Filter by string fields, otherwise always true.
val := query.GetFieldByJSONTag(reflect.ValueOf(o), f.Field)
switch val.Kind() {
case reflect.String:
return strings.Contains(val.String(), string(f.Value))
default:
return true
}
}
// Build the host table for the inventory.
hostTable := make([]api.InventoryHostTable, 0, len(inventory.Spec.Hosts))
for hostname, raw := range inventory.Spec.Hosts {
// Build the host item from raw data.
item := buildHostItem(hostname, raw)
// Fill in group membership.
fillGroups(&item)
// Fill in playbook status and architecture.
fillByPlaybook(*playbook, &item)
// Add the item to the host table.
hostTable = append(hostTable, item)
}
// Sort and filter the host table, then write the result.
results := query.DefaultList(hostTable, queryParam, less, filter)
_ = response.WriteEntity(results)
}

293
pkg/web/handler/playbook.go Normal file
View File

@ -0,0 +1,293 @@
package handler
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/emicklei/go-restful/v3"
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
kkcorev1alpha1 "github.com/kubesphere/kubekey/api/core/v1alpha1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/web/api"
"github.com/kubesphere/kubekey/v4/pkg/web/query"
)
// PlaybookHandler handles HTTP requests for playbook resources.
type PlaybookHandler struct {
workdir string // Base directory for storing work files
restconfig *rest.Config // Kubernetes REST client configuration
client ctrlclient.Client // Kubernetes client for API operations
}
// NewPlaybookHandler creates a new PlaybookHandler with the given workdir, restconfig, and client.
func NewPlaybookHandler(workdir string, restconfig *rest.Config, client ctrlclient.Client) *PlaybookHandler {
return &PlaybookHandler{workdir: workdir, restconfig: restconfig, client: client}
}
// Post handles the creation of a new playbook resource.
// It reads the playbook from the request, checks schema label constraints, sets the workdir, creates the resource, and starts execution in a goroutine.
func (h *PlaybookHandler) Post(request *restful.Request, response *restful.Response) {
playbook := &kkcorev1.Playbook{}
// Read the playbook entity from the request body
if err := request.ReadEntity(playbook); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// Check for schema label: only one allowed, must not be empty, and must be unique among playbooks
hasSchemaLabel := false
for labelKey, labelValue := range playbook.Labels {
// Only consider labels with the schema label suffix
if !strings.HasSuffix(labelKey, api.SchemaLabelSubfix) {
continue
}
// If a schema label was already found, this is a conflict
if hasSchemaLabel {
api.HandleConflict(response, request, errors.New("a playbook can only have one schema label. Please ensure only one schema label is set"))
return
}
// The schema label value must not be empty
if labelValue == "" {
api.HandleConflict(response, request, errors.New("the schema label value must not be empty. Please provide a valid schema label value"))
return
}
hasSchemaLabel = true
// Check if there is already a playbook with the same schema label
playbookList := &kkcorev1.PlaybookList{}
if err := h.client.List(request.Request.Context(), playbookList, ctrlclient.MatchingLabels{
labelKey: labelValue,
}); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// If any playbook with the same schema label exists, this is a conflict
if len(playbookList.Items) > 0 {
api.HandleConflict(response, request, errors.New("a playbook with the same schema label already exists. Please use a different schema label or remove the existing playbook"))
return
}
}
// Set the workdir in the playbook's spec config
if err := unstructured.SetNestedField(playbook.Spec.Config.Value(), h.workdir, _const.Workdir); err != nil {
api.HandleBadRequest(response, request, err)
return
}
playbook.Status.Phase = kkcorev1.PlaybookPhasePending
// Create the playbook resource in Kubernetes
if err := h.client.Create(context.TODO(), playbook); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// Start playbook execution in a separate goroutine
go func() {
if err := playbookManager.executor(playbook, h.client); err != nil {
klog.ErrorS(err, "failed to executor playbook", "playbook", ctrlclient.ObjectKeyFromObject(playbook))
}
}()
// For web UI: it does not run in Kubernetes, so execute playbook immediately.
_ = response.WriteEntity(playbook)
}
// List handles listing playbook resources with filtering and pagination.
// It supports field selectors and label selectors for filtering the results.
func (h *PlaybookHandler) List(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
var fieldselector fields.Selector
// Parse field selector from query parameters if present.
if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok {
fs, err := fields.ParseSelector(string(v))
if err != nil {
api.HandleError(response, request, err)
return
}
fieldselector = fs
}
namespace := request.PathParameter("namespace")
playbookList := &kkcorev1.PlaybookList{}
// List playbooks from the Kubernetes API with the specified options.
err := h.client.List(request.Request.Context(), playbookList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector})
if err != nil {
api.HandleError(response, request, err)
return
}
// Sort and filter the playbook list using DefaultList.
results := query.DefaultList(playbookList.Items, queryParam, func(left, right kkcorev1.Playbook, sortBy query.Field) bool {
leftMeta, err := meta.Accessor(left)
if err != nil {
return false
}
rightMeta, err := meta.Accessor(right)
if err != nil {
return false
}
return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy)
}, func(o kkcorev1.Playbook, filter query.Filter) bool {
// Skip fieldselector filter.
if filter.Field == query.ParameterFieldSelector {
return true
}
objectMeta, err := meta.Accessor(o)
if err != nil {
return false
}
return query.DefaultObjectMetaFilter(objectMeta, filter)
})
_ = response.WriteEntity(results)
}
// Info handles retrieving a single playbook or watching for changes.
// If the "watch" query parameter is set to "true", it streams updates to the client.
func (h *PlaybookHandler) Info(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
watch := request.QueryParameter("watch")
playbook := &kkcorev1.Playbook{}
if watch == "true" {
// Watch for changes to the playbook resource and stream events as JSON.
h.restconfig.GroupVersion = &kkcorev1.SchemeGroupVersion
client, err := rest.RESTClientFor(h.restconfig)
if err != nil {
api.HandleError(response, request, err)
return
}
watchInterface, err := client.Get().Namespace(namespace).Resource("playbooks").Name(name).Param("watch", "true").Watch(request.Request.Context())
if err != nil {
api.HandleError(response, request, err)
return
}
defer watchInterface.Stop()
response.AddHeader("Content-Type", "application/json")
flusher, ok := response.ResponseWriter.(http.Flusher)
if !ok {
http.Error(response.ResponseWriter, "Streaming unsupported", http.StatusInternalServerError)
return
}
encoder := json.NewEncoder(response.ResponseWriter)
// Stream each event object to the client as JSON.
for event := range watchInterface.ResultChan() {
if err := encoder.Encode(event.Object); err != nil {
break
}
flusher.Flush()
}
return
}
// Retrieve the playbook resource by namespace and name.
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(playbook)
}
// Log handles streaming the log file for a playbook.
// It opens the log file and streams its contents to the client, supporting live updates.
func (h *PlaybookHandler) Log(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
playbook := &kkcorev1.Playbook{}
// Retrieve the playbook resource to get its config for log file path.
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
// Build the log file path for the playbook.
filename := filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log")
file, err := os.Open(filename)
if err != nil {
api.HandleError(response, request, err)
return
}
defer file.Close()
response.AddHeader("Content-Type", "text/plain; charset=utf-8")
writer := response.ResponseWriter
flusher, ok := writer.(http.Flusher)
if !ok {
http.Error(writer, "Streaming unsupported", http.StatusInternalServerError)
return
}
// Stream the log file line by line, waiting for new lines if at EOF.
reader := bufio.NewReader(file)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
// If EOF, wait for new log lines to be written.
time.Sleep(500 * time.Millisecond)
continue
}
break
}
fmt.Fprint(writer, line)
flusher.Flush()
}
}
// Delete handles deletion of a playbook resource and its associated tasks.
// It stops the playbook execution if running, deletes the playbook, removes related files, and deletes all related tasks.
func (h *PlaybookHandler) Delete(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
playbook := &kkcorev1.Playbook{}
// Retrieve the playbook resource to delete.
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
// Stop the playbook execution if it is running.
playbookManager.stopPlaybook(playbook)
// Delete the playbook resource.
if err := h.client.Delete(request.Request.Context(), playbook); err != nil {
api.HandleError(response, request, err)
return
}
// Delete related log file and directory.
_ = os.Remove(filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log"))
_ = os.RemoveAll(filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name))
// Delete all tasks owned by this playbook.
if err := h.client.DeleteAllOf(request.Request.Context(), &kkcorev1alpha1.Task{}, ctrlclient.InNamespace(playbook.Namespace), ctrlclient.MatchingFields{
"playbook.name": playbook.Name,
"playbook.uid": string(playbook.UID),
}); err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(api.SUCCESS)
}

View File

@ -1,6 +1,7 @@
package web package handler
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -17,13 +18,16 @@ import (
"time" "time"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3" "github.com/emicklei/go-restful/v3"
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1" kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
"golang.org/x/net/icmp" "golang.org/x/net/icmp"
"golang.org/x/net/ipv4" "golang.org/x/net/ipv4"
"golang.org/x/net/ipv6" "golang.org/x/net/ipv6"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2" "k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
@ -32,65 +36,20 @@ import (
"github.com/kubesphere/kubekey/v4/pkg/web/query" "github.com/kubesphere/kubekey/v4/pkg/web/query"
) )
// NewSchemaService creates a new WebService that serves schema files from the specified root path. // ResourceHandler handles resource-related HTTP requests.
// It sets up a route that handles GET requests to /resources/schema/{subpath} and serves files from the rootPath directory. type ResourceHandler struct {
// The {subpath:*} parameter allows for matching any path under /resources/schema/.
func NewSchemaService(rootPath string, workdir string, client ctrlclient.Client) *restful.WebService {
ws := new(restful.WebService)
ws.Path(strings.TrimRight(_const.ResourcesAPIPath, "/")).
Produces(restful.MIME_JSON, "text/plain")
h := newSchemaHandler(rootPath, workdir, client)
ws.Route(ws.GET("/ip").To(h.listIP).
Doc("list available ip from ip cidr").
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}).
Param(ws.QueryParameter("cidr", "the cidr for ip").Required(true)).
Param(ws.QueryParameter("sshPort", "the ssh port for ip").Required(false)).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=ip").Required(false).DefaultValue("ip")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.IPTable]{}))
ws.Route(ws.GET("/schema/{subpath:*}").To(h.schemaInfo).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}))
ws.Route(ws.GET("/schema").To(h.allSchema).
Doc("list all schema as table").
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}).
Param(ws.PathParameter("namespace", "the namespace of the playbook").Required(false).DefaultValue("default")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=priority")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.SchemaTable]{}))
ws.Route(ws.POST("/schema/config").To(h.storeConfig).
Doc("storing user-defined configuration information").
Reads(struct{}{}).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}))
ws.Route(ws.GET("/schema/config").To(h.configInfo).
Doc("get user-defined configuration information").
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}))
return ws
}
// newSchemaHandler creates a new schemaHandler instance with the given rootPath, workdir, and client.
func newSchemaHandler(rootPath string, workdir string, client ctrlclient.Client) *schemaHandler {
return &schemaHandler{rootPath: rootPath, workdir: workdir, client: client}
}
// schemaHandler handles schema-related HTTP requests.
type schemaHandler struct {
rootPath string rootPath string
workdir string workdir string
client ctrlclient.Client client ctrlclient.Client
} }
func (h schemaHandler) configInfo(request *restful.Request, response *restful.Response) { // NewResourceHandler creates a new ResourceHandler instance.
func NewResourceHandler(rootPath string, workdir string, client ctrlclient.Client) *ResourceHandler {
return &ResourceHandler{rootPath: rootPath, workdir: workdir, client: client}
}
// ConfigInfo serves the config file content as the HTTP response.
func (h ResourceHandler) ConfigInfo(request *restful.Request, response *restful.Response) {
file, err := os.Open(filepath.Join(h.rootPath, api.SchemaConfigFile)) file, err := os.Open(filepath.Join(h.rootPath, api.SchemaConfigFile))
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -105,22 +64,122 @@ func (h schemaHandler) configInfo(request *restful.Request, response *restful.Re
_, _ = io.Copy(response.ResponseWriter, file) _, _ = io.Copy(response.ResponseWriter, file)
} }
func (h schemaHandler) storeConfig(request *restful.Request, response *restful.Response) { // PostConfig updates the config file and triggers precheck playbooks if needed.
file, err := os.OpenFile(filepath.Join(h.rootPath, api.SchemaConfigFile), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) func (h ResourceHandler) PostConfig(request *restful.Request, response *restful.Response) {
var (
oldConfig map[string]any
newConfig map[string]any
)
// Read new config from request body.
if err := request.ReadEntity(&newConfig); err != nil {
_ = response.WriteError(http.StatusInternalServerError, err)
return
}
configPath := filepath.Join(h.rootPath, api.SchemaConfigFile)
// Open config file for reading and writing.
configFile, err := os.OpenFile(configPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil { if err != nil {
_ = response.WriteError(http.StatusInternalServerError, err) _ = response.WriteError(http.StatusInternalServerError, err)
return return
} }
defer file.Close() defer configFile.Close()
_, err = io.Copy(file, request.Request.Body)
if err != nil { // Decode old config if present.
if err := json.NewDecoder(configFile).Decode(&oldConfig); err != nil && err != io.EOF {
_ = response.WriteError(http.StatusInternalServerError, err) _ = response.WriteError(http.StatusInternalServerError, err)
return return
} }
_ = response.WriteEntity(api.SUCCESS)
queryParam := query.ParseQueryParameter(request)
namespace := queryParam.Filters["cluster"]
inventory := queryParam.Filters["inventory"]
playbooks := make(map[string]*kkcorev1.Playbook)
wg := wait.Group{}
// Iterate over new config and trigger precheck playbooks if config changed.
for fileName, newVal := range newConfig {
if reflect.DeepEqual(newVal, oldConfig[fileName]) {
continue
}
schemaInfo, err := os.ReadFile(filepath.Join(h.rootPath, fileName))
if err != nil {
_ = response.WriteError(http.StatusInternalServerError, err)
return
}
var schemaFile api.SchemaFile
if err := json.Unmarshal(schemaInfo, &schemaFile); err != nil {
_ = response.WriteError(http.StatusInternalServerError, err)
return
}
// If a precheck playbook is defined, create and execute it.
if pbpath := schemaFile.PlaybookPath["precheck."+api.SchemaLabelSubfix]; pbpath != "" {
configRaw, err := json.Marshal(newVal)
if err != nil {
_ = response.WriteError(http.StatusInternalServerError, err)
return
}
playbook := &kkcorev1.Playbook{
Spec: kkcorev1.PlaybookSpec{
Config: kkcorev1.Config{
Spec: runtime.RawExtension{Raw: configRaw},
},
InventoryRef: &corev1.ObjectReference{
Kind: "Inventory",
Namespace: string(namespace),
Name: string(inventory),
},
Playbook: pbpath,
},
Status: kkcorev1.PlaybookStatus{
Phase: kkcorev1.PlaybookPhasePending,
},
}
// Set the workdir in the playbook's spec config
if err := unstructured.SetNestedField(playbook.Spec.Config.Value(), h.workdir, _const.Workdir); err != nil {
api.HandleBadRequest(response, request, err)
return
}
if err := h.client.Create(context.TODO(), playbook); err != nil {
api.HandleBadRequest(response, request, err)
return
}
playbooks[fileName] = playbook
wg.Start(func() {
// Execute the playbook asynchronously.
if err := playbookManager.executor(playbook, h.client); err != nil {
klog.ErrorS(err, "failed to executor precheck playbook", "schema", fileName)
}
})
}
}
wg.Wait()
// Collect precheck results.
preCheckResult := make(map[string]string)
for fileName, playbook := range playbooks {
if playbook.Status.FailureMessage != "" {
preCheckResult[fileName] = playbook.Status.FailureMessage
}
}
// Write new config to file.
if _, err := io.Copy(configFile, request.Request.Body); err != nil {
_ = response.WriteError(http.StatusInternalServerError, err)
return
}
// Respond with precheck results if any failures, otherwise success.
if len(preCheckResult) > 0 {
_ = response.WriteEntity(api.Result{Message: api.ResultFailed, Result: preCheckResult})
} else {
_ = response.WriteEntity(api.SUCCESS)
}
} }
func (h schemaHandler) listIP(request *restful.Request, response *restful.Response) { // ListIP lists all IPs in the given CIDR, checks their online and SSH status, and returns the result.
func (h ResourceHandler) ListIP(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request) queryParam := query.ParseQueryParameter(request)
cidr, ok := queryParam.Filters["cidr"] cidr, ok := queryParam.Filters["cidr"]
if !ok || string(cidr) == "" { if !ok || string(cidr) == "" {
@ -137,12 +196,13 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon
mu := sync.Mutex{} mu := sync.Mutex{}
jobChannel := make(chan string, 20) jobChannel := make(chan string, 20)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
// Start worker goroutines for concurrent IP checking.
for range maxConcurrency { for range maxConcurrency {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
for ip := range jobChannel { for ip := range jobChannel {
if ifLocalhostIP(ip) { if _const.IsLocalhostIP(ip) {
mu.Lock() mu.Lock()
ipTable = append(ipTable, api.IPTable{ ipTable = append(ipTable, api.IPTable{
IP: ip, IP: ip,
@ -157,10 +217,10 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon
// Check if the host is online using the ICMP protocol (ping). // Check if the host is online using the ICMP protocol (ping).
// Requires root privileges or CAP_NET_RAW capability. // Requires root privileges or CAP_NET_RAW capability.
if !ifIPOnline(ip) { if !isIPOnline(ip) {
continue continue
} }
reachable, authorized := ifIPSSHAuthorized(ip, string(sshPort)) reachable, authorized := isSSHAuthorized(ip, string(sshPort))
mu.Lock() mu.Lock()
ipTable = append(ipTable, api.IPTable{ ipTable = append(ipTable, api.IPTable{
@ -174,6 +234,7 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon
}() }()
} }
// Send IPs to job channel for processing.
for _, ip := range ips { for _, ip := range ips {
jobChannel <- ip jobChannel <- ip
} }
@ -181,7 +242,7 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon
close(jobChannel) close(jobChannel)
wg.Wait() wg.Wait()
// less is a comparison function for sorting SchemaTable items by a given field. // less is a comparison function for sorting IPTable items by a given field.
less := func(left, right api.IPTable, sortBy query.Field) bool { less := func(left, right api.IPTable, sortBy query.Field) bool {
leftVal := query.GetFieldByJSONTag(reflect.ValueOf(left), sortBy) leftVal := query.GetFieldByJSONTag(reflect.ValueOf(left), sortBy)
rightVal := query.GetFieldByJSONTag(reflect.ValueOf(right), sortBy) rightVal := query.GetFieldByJSONTag(reflect.ValueOf(right), sortBy)
@ -205,7 +266,7 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon
return true return true
} }
} }
// filter is a function for filtering SchemaTable items by a given field and value. // filter is a function for filtering IPTable items by a given field and value.
filter := func(o api.IPTable, f query.Filter) bool { filter := func(o api.IPTable, f query.Filter) bool {
val := query.GetFieldByJSONTag(reflect.ValueOf(o), f.Field) val := query.GetFieldByJSONTag(reflect.ValueOf(o), f.Field)
switch val.Kind() { switch val.Kind() {
@ -221,15 +282,15 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon
_ = response.WriteEntity(results) _ = response.WriteEntity(results)
} }
// schemaInfo serves static schema files from the rootPath directory. // SchemaInfo serves static schema files from the rootPath directory.
// It strips the /resources/schema/ prefix and serves files using http.FileServer. // It strips the /resources/schema/ prefix and serves files using http.FileServer.
func (h schemaHandler) schemaInfo(request *restful.Request, response *restful.Response) { func (h ResourceHandler) SchemaInfo(request *restful.Request, response *restful.Response) {
http.StripPrefix("/resources/schema/", http.FileServer(http.Dir(h.rootPath))).ServeHTTP(response.ResponseWriter, request.Request) http.StripPrefix("/resources/schema/", http.FileServer(http.Dir(h.rootPath))).ServeHTTP(response.ResponseWriter, request.Request)
} }
// allSchema lists all schema JSON files in the rootPath directory as a table. // ListSchema lists all schema JSON files in the rootPath directory as a table.
// It supports filtering, sorting, and pagination via query parameters. // It supports filtering, sorting, and pagination via query parameters.
func (h schemaHandler) allSchema(request *restful.Request, response *restful.Response) { func (h ResourceHandler) ListSchema(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request) queryParam := query.ParseQueryParameter(request)
// Read all entries in the rootPath directory. // Read all entries in the rootPath directory.
entries, err := os.ReadDir(h.rootPath) entries, err := os.ReadDir(h.rootPath)
@ -248,18 +309,18 @@ func (h schemaHandler) allSchema(request *restful.Request, response *restful.Res
// Read the JSON file. // Read the JSON file.
data, err := os.ReadFile(filepath.Join(h.rootPath, entry.Name())) data, err := os.ReadFile(filepath.Join(h.rootPath, entry.Name()))
if err != nil { if err != nil {
api.HandleBadRequest(response, request, err) api.HandleBadRequest(response, request, errors.Wrapf(err, "failed to read file for schema %q", entry.Name()))
return return
} }
var schemaFile api.SchemaFile var schemaFile api.SchemaFile
// Unmarshal the JSON data into a SchemaTable struct. // Unmarshal the JSON data into a SchemaTable struct.
if err := json.Unmarshal(data, &schemaFile); err != nil { if err := json.Unmarshal(data, &schemaFile); err != nil {
api.HandleBadRequest(response, request, err) api.HandleBadRequest(response, request, errors.Wrapf(err, "failed to unmarshal file for schema %q", entry.Name()))
return return
} }
// Get all playbooks in the given namespace. // Get all playbooks in the given namespace.
playbookList := &kkcorev1.PlaybookList{} playbookList := &kkcorev1.PlaybookList{}
if err := h.client.List(request.Request.Context(), playbookList, ctrlclient.InNamespace(request.PathParameter("namespace"))); err != nil { if err := h.client.List(request.Request.Context(), playbookList, ctrlclient.InNamespace(request.PathParameter("cluster"))); err != nil {
api.HandleBadRequest(response, request, err) api.HandleBadRequest(response, request, err)
return return
} }
@ -333,42 +394,14 @@ func (h schemaHandler) allSchema(request *restful.Request, response *restful.Res
_ = response.WriteEntity(results) _ = response.WriteEntity(results)
} }
// ifLocalhostIP checks if the given IP address string (ipStr) is bound to any local network interface. // ===========================================================================
// It returns true if the IP is found on any interface, false otherwise. // ============================= isIPOnline ==============================
func ifLocalhostIP(ipStr string) bool { // ===========================================================================
targetIP := net.ParseIP(ipStr)
if targetIP == nil {
return false
}
ifaces, err := net.Interfaces()
if err != nil {
return false
}
for _, iface := range ifaces {
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
if v.IP.Equal(targetIP) {
return true
}
case *net.IPAddr:
if v.IP.Equal(targetIP) {
return true
}
}
}
}
return false
}
// ifIPOnline checks if the given IP address is online by sending an ICMP Echo Request (ping). // isIPOnline checks if the given IP address is online by sending an ICMP Echo Request (ping).
// It returns true if a reply is received, false otherwise. // It returns true if a reply is received, false otherwise.
// The timeout for the ICMP connection and reply is set to 1 second. // The timeout for the ICMP connection and reply is set to 1 second.
func ifIPOnline(ipStr string) bool { func isIPOnline(ipStr string) bool {
ip := net.ParseIP(ipStr) ip := net.ParseIP(ipStr)
if ip == nil { if ip == nil {
return false return false
@ -473,6 +506,15 @@ func ifIPOnline(ipStr string) bool {
return false return false
} }
// isValidICMPReply checks if the received ICMP reply is valid and matches the expected parameters.
// n: number of bytes read
// reply: the reply buffer
// src: source address of the reply
// expectedIP: the IP we expect the reply from
// protocol: ICMP protocol number
// pid: process ID used in the Echo request
// seq: sequence number used in the Echo request
// replyFilter: function to filter valid ICMP types
func isValidICMPReply(n int, reply []byte, src net.Addr, expectedIP net.IP, protocol int, pid, seq int, replyFilter func(icmp.Type) bool) bool { func isValidICMPReply(n int, reply []byte, src net.Addr, expectedIP net.IP, protocol int, pid, seq int, replyFilter func(icmp.Type) bool) bool {
srcIP, ok := src.(*net.IPAddr) srcIP, ok := src.(*net.IPAddr)
if !ok || !srcIP.IP.Equal(expectedIP) { if !ok || !srcIP.IP.Equal(expectedIP) {
@ -503,10 +545,14 @@ func isValidICMPReply(n int, reply []byte, src net.Addr, expectedIP net.IP, prot
return false return false
} }
// ifIPSSHAuthorized checks if SSH authorization to the given IP is possible using the local private key. // ===========================================================================
// ============================= isSSHAuthorized =========================
// ===========================================================================
// isSSHAuthorized checks if SSH authorization to the given IP is possible using the local private key.
// It returns two booleans: the first indicates if the SSH port (22) is reachable, and the second indicates if SSH authorization using the local private key is successful. // It returns two booleans: the first indicates if the SSH port (22) is reachable, and the second indicates if SSH authorization using the local private key is successful.
// The function attempts to find the user's private key, read and parse it, and then connect via SSH. // The function attempts to find the user's private key, read and parse it, and then connect via SSH.
func ifIPSSHAuthorized(ipStr, sshPort string) (bool, bool) { func isSSHAuthorized(ipStr, sshPort string) (bool, bool) {
// First check if port 22 is reachable on the target IP address. // First check if port 22 is reachable on the target IP address.
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", ipStr, sshPort), time.Second) conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", ipStr, sshPort), time.Second)
if err != nil { if err != nil {

View File

@ -1,135 +0,0 @@
package web
import (
"io/fs"
"net/http"
"os"
"strings"
"github.com/cockroachdb/errors"
restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3"
"github.com/go-openapi/spec"
"k8s.io/klog/v2"
"github.com/kubesphere/kubekey/v4/config"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/version"
)
// NewUIService creates a new WebService that serves the static web UI and handles SPA routing.
// - Serves "/" with index.html
// - Serves static assets (e.g., .js, .css, .png) from the embedded web directory
// - Forwards all other non-API paths to index.html for SPA client-side routing
func NewUIService(path string) *restful.WebService {
ws := new(restful.WebService)
ws.Path("/")
// Create a sub-filesystem for the embedded web UI assets
fileServer := http.FileServer(http.FS(os.DirFS(path)))
// Serve the root path "/" with index.html
ws.Route(ws.GET("").To(func(req *restful.Request, resp *restful.Response) {
data, err := fs.ReadFile(os.DirFS(path), "index.html")
if err != nil {
_ = resp.WriteError(http.StatusNotFound, err)
return
}
resp.AddHeader("Content-Type", "text/html")
_, _ = resp.Write(data)
}))
// Serve all subpaths:
// - If the path matches an API prefix, return 404 to let other WebServices handle it
// - If the path looks like a static asset (contains a dot), serve the file
// - Otherwise, serve index.html for SPA routing
ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
requestedPath := req.PathParameter("subpath")
// If the path matches any API route, return 404 so other WebServices can handle it
if strings.HasPrefix(requestedPath, strings.TrimLeft(_const.CoreAPIPath, "/")) ||
strings.HasPrefix(requestedPath, strings.TrimLeft(_const.SwaggerAPIPath, "/")) ||
strings.HasPrefix(requestedPath, strings.TrimLeft(_const.ResourcesAPIPath, "/")) {
_ = resp.WriteError(http.StatusNotFound, errors.New("not found"))
return
}
// If the path looks like a static asset (e.g., .js, .css, .ico, .png, etc.), serve it
if strings.Contains(requestedPath, ".") {
fileServer.ServeHTTP(resp.ResponseWriter, req.Request)
return
}
// For all other paths, serve index.html (SPA client-side routing)
data, err := fs.ReadFile(os.DirFS(path), "index.html")
if err != nil {
_ = resp.WriteError(http.StatusInternalServerError, err)
return
}
resp.AddHeader("Content-Type", "text/html")
_, _ = resp.Write(data)
}))
return ws
}
// NewSwaggerUIService creates a new WebService that serves the Swagger UI interface
// It mounts the embedded swagger-ui files and handles requests to display the API documentation
func NewSwaggerUIService() *restful.WebService {
ws := new(restful.WebService)
ws.Path(strings.TrimRight(_const.SwaggerAPIPath, "/"))
subFS, err := fs.Sub(config.Swagger, "swagger-ui")
if err != nil {
panic(err)
}
fileServer := http.StripPrefix("/swagger-ui/", http.FileServer(http.FS(subFS)))
ws.Route(ws.GET("").To(func(req *restful.Request, resp *restful.Response) {
data, err := fs.ReadFile(subFS, "index.html")
if err != nil {
_ = resp.WriteError(http.StatusNotFound, err)
return
}
resp.AddHeader("Content-Type", "text/html")
_, _ = resp.Write(data)
}).Metadata(restfulspec.KeyOpenAPITags, []string{_const.OpenAPITag}))
ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
fileServer.ServeHTTP(resp.ResponseWriter, req.Request)
}).Metadata(restfulspec.KeyOpenAPITags, []string{_const.OpenAPITag}))
return ws
}
// NewAPIService creates a new WebService that serves the OpenAPI/Swagger specification
// It takes a list of WebServices and generates the API documentation
func NewAPIService(webservice []*restful.WebService) *restful.WebService {
restconfig := restfulspec.Config{
WebServices: webservice, // you control what services are visible
APIPath: "/apidocs.json",
PostBuildSwaggerObjectHandler: enrichSwaggerObject}
for _, ws := range webservice {
klog.V(2).Infof("%s", ws.RootPath())
}
return restfulspec.NewOpenAPIService(restconfig)
}
// enrichSwaggerObject customizes the Swagger documentation with KubeKey-specific information
// It sets the API title, description, version and contact information
func enrichSwaggerObject(swo *spec.Swagger) {
swo.Info = &spec.Info{
InfoProps: spec.InfoProps{
Title: "KubeKey Web API",
Description: "KubeKey Web OpenAPI",
Version: version.Get().String(),
Contact: &spec.ContactInfo{
ContactInfoProps: spec.ContactInfoProps{
Name: "KubeKey",
URL: "https://github.com/kubesphere/kubekey",
},
},
},
}
}

309
pkg/web/service.go Normal file
View File

@ -0,0 +1,309 @@
package web
import (
"io/fs"
"net/http"
"os"
"strings"
"github.com/cockroachdb/errors"
restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3"
"github.com/go-openapi/spec"
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/kubesphere/kubekey/v4/config"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/web/api"
"github.com/kubesphere/kubekey/v4/pkg/web/handler"
"github.com/kubesphere/kubekey/v4/pkg/web/query"
"github.com/kubesphere/kubekey/v4/version"
)
// NewCoreService creates and configures a new RESTful web service for managing inventories and playbooks.
// It sets up routes for CRUD operations on inventories and playbooks, including pagination, sorting, and filtering.
func NewCoreService(workdir string, client ctrlclient.Client, restconfig *rest.Config) *restful.WebService {
ws := new(restful.WebService).
// the GroupVersion might be empty, we need to remove the final /
Path(strings.TrimRight(_const.CoreAPIPath+kkcorev1.SchemeGroupVersion.String(), "/"))
inventoryHandler := handler.NewInventoryHandler(workdir, restconfig, client)
// Inventory management routes
ws.Route(ws.POST("/inventories").To(inventoryHandler.Post).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("create a inventory.").
Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON).
Reads(kkcorev1.Inventory{}).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.PATCH("/namespaces/{namespace}/inventories/{inventory}").To(inventoryHandler.Patch).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("patch a inventory.").
Consumes(string(types.JSONPatchType), string(types.MergePatchType), string(types.ApplyPatchType)).Produces(restful.MIME_JSON).
Reads(kkcorev1.Inventory{}).
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.GET("/inventories").To(inventoryHandler.List).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all inventories.").
Produces(restful.MIME_JSON).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories").To(inventoryHandler.List).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all inventories in a namespace.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}").To(inventoryHandler.Info).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get a inventory in a namespace.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}/hosts").To(inventoryHandler.ListHosts).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all hosts in a inventory.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.InventoryHostTable]{}))
playbookHandler := handler.NewPlaybookHandler(workdir, restconfig, client)
// Playbook management routes
ws.Route(ws.POST("/playbooks").To(playbookHandler.Post).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("create a playbook.").
Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON).
Reads(kkcorev1.Playbook{}).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{}))
ws.Route(ws.GET("/playbooks").To(playbookHandler.List).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all playbooks.").
Produces(restful.MIME_JSON).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks").To(playbookHandler.List).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all playbooks in a namespace.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}").To(playbookHandler.Info).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get or watch a playbook in a namespace.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Param(ws.QueryParameter("watch", "set to true to watch this playbook")).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}/log").To(playbookHandler.Log).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get a playbook execute log.").
Produces("text/plain").
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Returns(http.StatusOK, _const.StatusOK, ""))
ws.Route(ws.DELETE("/namespaces/{namespace}/playbooks/{playbook}").To(playbookHandler.Delete).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("delete a playbook.").
Produces(restful.MIME_JSON).
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Returns(http.StatusOK, _const.StatusOK, api.Result{}))
return ws
}
// NewSchemaService creates a new WebService that serves schema files from the specified root path.
// It sets up a route that handles GET requests to /resources/schema/{subpath} and serves files from the rootPath directory.
// The {subpath:*} parameter allows for matching any path under /resources/schema/.
func NewSchemaService(rootPath string, workdir string, client ctrlclient.Client) *restful.WebService {
ws := new(restful.WebService)
ws.Path(strings.TrimRight(_const.ResourcesAPIPath, "/")).
Produces(restful.MIME_JSON, "text/plain")
resourceHandler := handler.NewResourceHandler(rootPath, workdir, client)
ws.Route(ws.GET("/ip").To(resourceHandler.ListIP).
Doc("list available ip from ip cidr").
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}).
Param(ws.QueryParameter("cidr", "the cidr for ip").Required(true)).
Param(ws.QueryParameter("sshPort", "the ssh port for ip").Required(false)).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=ip").Required(false).DefaultValue("ip")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.IPTable]{}))
ws.Route(ws.GET("/schema/{subpath:*}").To(resourceHandler.SchemaInfo).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}))
ws.Route(ws.GET("/schema").To(resourceHandler.ListSchema).
Doc("list all schema as table").
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}).
Param(ws.QueryParameter("cluster", "The namespace where the cluster resides").Required(false).DefaultValue("default")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=priority")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.SchemaTable]{}))
ws.Route(ws.POST("/schema/config").To(resourceHandler.PostConfig).
Doc("storing user-defined configuration information").
Reads(struct{}{}).
Param(ws.QueryParameter("cluster", "The namespace where the cluster resides").Required(false).DefaultValue("default")).
Param(ws.QueryParameter("inventory", "the inventory of the playbook").Required(false).DefaultValue("default")).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}))
ws.Route(ws.GET("/schema/config").To(resourceHandler.ConfigInfo).
Doc("get user-defined configuration information").
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}))
return ws
}
// NewUIService creates a new WebService that serves the static web UI and handles SPA routing.
// - Serves "/" with index.html
// - Serves static assets (e.g., .js, .css, .png) from the embedded web directory
// - Forwards all other non-API paths to index.html for SPA client-side routing
func NewUIService(path string) *restful.WebService {
ws := new(restful.WebService)
ws.Path("/")
// Create a sub-filesystem for the embedded web UI assets
fileServer := http.FileServer(http.FS(os.DirFS(path)))
// Serve the root path "/" with index.html
ws.Route(ws.GET("").To(func(req *restful.Request, resp *restful.Response) {
data, err := fs.ReadFile(os.DirFS(path), "index.html")
if err != nil {
_ = resp.WriteError(http.StatusNotFound, err)
return
}
resp.AddHeader("Content-Type", "text/html")
_, _ = resp.Write(data)
}))
// Serve all subpaths:
// - If the path matches an API prefix, return 404 to let other WebServices handle it
// - If the path looks like a static asset (contains a dot), serve the file
// - Otherwise, serve index.html for SPA routing
ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
requestedPath := req.PathParameter("subpath")
// If the path matches any API route, return 404 so other WebServices can handle it
if strings.HasPrefix(requestedPath, strings.TrimLeft(_const.CoreAPIPath, "/")) ||
strings.HasPrefix(requestedPath, strings.TrimLeft(_const.SwaggerAPIPath, "/")) ||
strings.HasPrefix(requestedPath, strings.TrimLeft(_const.ResourcesAPIPath, "/")) {
_ = resp.WriteError(http.StatusNotFound, errors.New("not found"))
return
}
// If the path looks like a static asset (e.g., .js, .css, .ico, .png, etc.), serve it
if strings.Contains(requestedPath, ".") {
fileServer.ServeHTTP(resp.ResponseWriter, req.Request)
return
}
// For all other paths, serve index.html (SPA client-side routing)
data, err := fs.ReadFile(os.DirFS(path), "index.html")
if err != nil {
_ = resp.WriteError(http.StatusInternalServerError, err)
return
}
resp.AddHeader("Content-Type", "text/html")
_, _ = resp.Write(data)
}))
return ws
}
// NewSwaggerUIService creates a new WebService that serves the Swagger UI interface
// It mounts the embedded swagger-ui files and handles requests to display the API documentation
func NewSwaggerUIService() *restful.WebService {
ws := new(restful.WebService)
ws.Path(strings.TrimRight(_const.SwaggerAPIPath, "/"))
subFS, err := fs.Sub(config.Swagger, "swagger-ui")
if err != nil {
panic(err)
}
fileServer := http.StripPrefix("/swagger-ui/", http.FileServer(http.FS(subFS)))
ws.Route(ws.GET("").To(func(req *restful.Request, resp *restful.Response) {
data, err := fs.ReadFile(subFS, "index.html")
if err != nil {
_ = resp.WriteError(http.StatusNotFound, err)
return
}
resp.AddHeader("Content-Type", "text/html")
_, _ = resp.Write(data)
}).Metadata(restfulspec.KeyOpenAPITags, []string{_const.OpenAPITag}))
ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
fileServer.ServeHTTP(resp.ResponseWriter, req.Request)
}).Metadata(restfulspec.KeyOpenAPITags, []string{_const.OpenAPITag}))
return ws
}
// NewAPIService creates a new WebService that serves the OpenAPI/Swagger specification
// It takes a list of WebServices and generates the API documentation
func NewAPIService(webservice []*restful.WebService) *restful.WebService {
restconfig := restfulspec.Config{
WebServices: webservice, // you control what services are visible
APIPath: "/apidocs.json",
PostBuildSwaggerObjectHandler: func(swo *spec.Swagger) {
swo.Info = &spec.Info{
InfoProps: spec.InfoProps{
Title: "KubeKey Web API",
Description: "KubeKey Web OpenAPI",
Version: version.Get().String(),
Contact: &spec.ContactInfo{
ContactInfoProps: spec.ContactInfoProps{
Name: "KubeKey",
URL: "https://github.com/kubesphere/kubekey",
},
},
},
}
}}
for _, ws := range webservice {
klog.V(2).Infof("%s", ws.RootPath())
}
return restfulspec.NewOpenAPIService(restconfig)
}