From 620b7f56a3490c87b96e8584680874e4f10acb3f Mon Sep 17 00:00:00 2001 From: liujian Date: Wed, 30 Jul 2025 16:02:34 +0800 Subject: [PATCH] feat: enhance connector interface and implementations (#2675) - Updated the Connector interface to return both stdout and stderr for command execution. - Modified implementations in local, kubernetes, and ssh connectors to support the new return values. - Improved documentation for the Connector interface methods for clarity. - Added error handling for stderr in command execution across connectors. - Introduced new utility functions for IP parsing and checking localhost IPs. Signed-off-by: joyceliu --- pkg/connector/connector.go | 17 +- pkg/connector/kubernetes_connector.go | 9 +- pkg/connector/local_connector.go | 37 +- pkg/connector/prometheus_connector.go | 126 ++--- pkg/connector/ssh_connector.go | 43 +- pkg/const/ip.go | 48 ++ pkg/executor/task_executor.go | 8 +- pkg/modules/command.go | 12 +- pkg/modules/module_test.go | 9 +- pkg/modules/prometheus.go | 2 +- pkg/web/api/result.go | 25 +- pkg/web/corev1.go | 763 -------------------------- pkg/web/handler/executor.go | 93 ++++ pkg/web/handler/inventory.go | 298 ++++++++++ pkg/web/handler/playbook.go | 293 ++++++++++ pkg/web/{ => handler}/resources.go | 266 +++++---- pkg/web/openapi.go | 135 ----- pkg/web/service.go | 309 +++++++++++ 18 files changed, 1360 insertions(+), 1133 deletions(-) delete mode 100644 pkg/web/corev1.go create mode 100644 pkg/web/handler/executor.go create mode 100644 pkg/web/handler/inventory.go create mode 100644 pkg/web/handler/playbook.go rename pkg/web/{ => handler}/resources.go (68%) delete mode 100644 pkg/web/openapi.go create mode 100644 pkg/web/service.go diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index 22d0ea16..751b6dfd 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -38,18 +38,21 @@ const ( connectedPrometheus = "prometheus" ) -// Connector is the interface for connecting to a remote host +// Connector is the interface for connecting to a remote host. +// It abstracts the operations required to interact with different types of hosts (e.g., SSH, local, Kubernetes, Prometheus). +// Implementations of this interface should provide mechanisms for initialization, cleanup, file transfer, and command execution. type Connector interface { - // Init initializes the connection + // Init initializes the connection. Init(ctx context.Context) error - // Close closes the connection + // Close closes the connection and releases any resources. Close(ctx context.Context) error - // PutFile copies a file from src to dst with mode. + // PutFile copies a file from src (as bytes) to dst (remote path) with the specified file mode. PutFile(ctx context.Context, src []byte, dst string, mode fs.FileMode) error - // FetchFile copies a file from src to dst writer. + // FetchFile copies a file from src (remote path) to dst (local writer). FetchFile(ctx context.Context, src string, dst io.Writer) error - // ExecuteCommand executes a command on the remote host - ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) + // ExecuteCommand executes a command on the remote host. + // Returns stdout, stderr, and error (if any). + ExecuteCommand(ctx context.Context, cmd string) ([]byte, []byte, error) } // NewConnector creates a new connector diff --git a/pkg/connector/kubernetes_connector.go b/pkg/connector/kubernetes_connector.go index bc23b300..108dd3ae 100644 --- a/pkg/connector/kubernetes_connector.go +++ b/pkg/connector/kubernetes_connector.go @@ -17,6 +17,7 @@ limitations under the License. package connector import ( + "bytes" "context" "io" "io/fs" @@ -137,12 +138,16 @@ func (c *kubernetesConnector) FetchFile(ctx context.Context, src string, dst io. } // ExecuteCommand in a kubernetes cluster -func (c *kubernetesConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) { +func (c *kubernetesConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, []byte, error) { // add "--kubeconfig" to src command klog.V(5).InfoS("exec local command", "cmd", cmd) command := c.cmd.CommandContext(ctx, c.shell, "-c", cmd) command.SetDir(c.homedir) command.SetEnv([]string{"KUBECONFIG=" + filepath.Join(c.homedir, kubeconfigRelPath)}) - return command.CombinedOutput() + var stdoutBuf, stderrBuf bytes.Buffer + command.SetStdout(&stdoutBuf) + command.SetStderr(&stderrBuf) + err := command.Run() + return stdoutBuf.Bytes(), stderrBuf.Bytes(), err } diff --git a/pkg/connector/local_connector.go b/pkg/connector/local_connector.go index 8ae68e62..7a933219 100644 --- a/pkg/connector/local_connector.go +++ b/pkg/connector/local_connector.go @@ -105,7 +105,7 @@ func (c *localConnector) FetchFile(_ context.Context, src string, dst io.Writer) } // ExecuteCommand executes a command on the local host. -func (c *localConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) { +func (c *localConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, []byte, error) { klog.V(5).InfoS("exec local command", "cmd", cmd) // in command := c.Cmd.CommandContext(ctx, "sudo", "-SE", c.shell, "-c", cmd) @@ -113,13 +113,19 @@ func (c *localConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte command.SetStdin(bytes.NewBufferString(c.Password + "\n")) } // out - output, err := command.CombinedOutput() + var stdoutBuf, stderrBuf bytes.Buffer + command.SetStdout(&stdoutBuf) + command.SetStderr(&stderrBuf) + err := command.Run() + stdout := stdoutBuf.Bytes() + stderr := stderrBuf.Bytes() if c.Password != "" { // Filter out the "Password:" prompt from the output - output = bytes.Replace(output, []byte("Password:"), []byte(""), -1) + stdout = bytes.Replace(stdout, []byte("Password:"), []byte(""), -1) + stderr = bytes.Replace(stderr, []byte("Password:"), []byte(""), -1) } - return output, errors.Wrapf(err, "failed to execute command") + return stdout, stderr, err } // HostInfo from gatherFacts cache @@ -138,19 +144,30 @@ func (c *localConnector) getHostInfo(ctx context.Context) (map[string]any, error return nil, err } osVars[_const.VariableOSRelease] = convertBytesToMap(osRelease.Bytes(), "=") - kernel, err := c.ExecuteCommand(ctx, "uname -r") + kernel, stderr, err := c.ExecuteCommand(ctx, "uname -r") if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get kernel: %v, stderr: %q", err, string(stderr)) + } + if len(stderr) > 0 { + return nil, errors.Errorf("failed to get kernel, stderr: %q", string(stderr)) } osVars[_const.VariableOSKernelVersion] = string(bytes.TrimSpace(kernel)) - hn, err := c.ExecuteCommand(ctx, "hostname") + + hn, hnStderr, err := c.ExecuteCommand(ctx, "hostname") if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get hostname: %v, stderr: %q", err, string(hnStderr)) + } + if len(hnStderr) > 0 { + return nil, errors.Errorf("failed to get hostname, stderr: %q", string(hnStderr)) } osVars[_const.VariableOSHostName] = string(bytes.TrimSpace(hn)) - arch, err := c.ExecuteCommand(ctx, "arch") + + arch, archStderr, err := c.ExecuteCommand(ctx, "arch") if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get arch: %v, stderr: %q", err, string(archStderr)) + } + if len(archStderr) > 0 { + return nil, errors.Errorf("failed to get arch, stderr: %q", string(archStderr)) } osVars[_const.VariableOSArchitecture] = string(bytes.TrimSpace(arch)) diff --git a/pkg/connector/prometheus_connector.go b/pkg/connector/prometheus_connector.go index 2cfabfb4..310dbcf7 100644 --- a/pkg/connector/prometheus_connector.go +++ b/pkg/connector/prometheus_connector.go @@ -36,13 +36,13 @@ import ( ) const ( - // Prometheus API default timeout + // Default timeout for Prometheus API defaultPrometheusTimeout = 10 * time.Second ) var _ Connector = &PrometheusConnector{} -// PrometheusConnector implements Connector interface for Prometheus connections +// PrometheusConnector implements the Connector interface for Prometheus connections type PrometheusConnector struct { url string username string @@ -54,48 +54,47 @@ type PrometheusConnector struct { connected bool } -// newPrometheusConnector creates a new PrometheusConnector +// newPrometheusConnector creates a new PrometheusConnector instance func newPrometheusConnector(vars map[string]any) *PrometheusConnector { pc := &PrometheusConnector{ headers: make(map[string]string), timeout: defaultPrometheusTimeout, } - // 修正变量名以避免导入遮蔽 + // Retrieve Prometheus URL promURL, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorURL) if err != nil { - klog.V(4).InfoS("get connector host failed use current hostname", "error", err) + klog.V(4).InfoS("Failed to get connector host, using current hostname", "error", err) } - pc.url = promURL + // Retrieve username username, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorUserName) if err != nil { - klog.V(4).InfoS("get connector username failed use current username", "error", err) + klog.V(4).InfoS("Failed to get connector username, using current username", "error", err) } - pc.username = username + // Retrieve password password, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPassword) if err != nil { - klog.V(4).InfoS("get connector password failed use current password", "error", err) + klog.V(4).InfoS("Failed to get connector password, using current password", "error", err) } - pc.password = password + // Retrieve token token, err := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorToken) if err != nil { - klog.V(4).InfoS("get connector token failed use current token", "error", err) + klog.V(4).InfoS("Failed to get connector token, using current token", "error", err) } - pc.token = token + // Retrieve custom headers and timeout from connector variables prometheusVars, ok := vars["connector"].(map[string]any) if !ok { - klog.V(4).InfoS("connector configuration is not a map") + klog.V(4).InfoS("Connector configuration is not a map") return nil } - // Get custom headers from connector variables if headers, ok := prometheusVars["headers"].(map[string]any); ok { for k, v := range headers { if strVal, ok := v.(string); ok { @@ -103,8 +102,6 @@ func newPrometheusConnector(vars map[string]any) *PrometheusConnector { } } } - - // Get timeout from connector variables if timeoutStr, ok := prometheusVars["timeout"].(string); ok { if timeout, err := time.ParseDuration(timeoutStr); err == nil { pc.timeout = timeout @@ -116,7 +113,7 @@ func newPrometheusConnector(vars map[string]any) *PrometheusConnector { // Init initializes the Prometheus connection func (pc *PrometheusConnector) Init(ctx context.Context) error { - // Ensure URL is properly formatted + // Ensure URL is provided if pc.url == "" { return errors.New("prometheus URL is required") } @@ -127,7 +124,7 @@ func (pc *PrometheusConnector) Init(ctx context.Context) error { return errors.Wrapf(err, "invalid prometheus URL: %s", pc.url) } - // If scheme is missing, default to http + // Default to http if scheme is missing if parsedURL.Scheme == "" { klog.V(4).InfoS("No scheme specified in Prometheus URL, defaulting to HTTP", "url", pc.url) parsedURL.Scheme = "http" @@ -160,7 +157,7 @@ func (pc *PrometheusConnector) Init(ctx context.Context) error { return errors.Wrap(err, "failed to create request") } - // Add auth headers if provided + // Add authentication headers if provided pc.addAuthHeaders(req) klog.V(4).InfoS("Testing connection to Prometheus server") @@ -186,7 +183,7 @@ func (pc *PrometheusConnector) Init(ctx context.Context) error { // Close closes the Prometheus connection func (pc *PrometheusConnector) Close(ctx context.Context) error { - // HTTP client doesn't need explicit closing + // HTTP client does not require explicit closing pc.connected = false return nil } @@ -201,26 +198,29 @@ func (pc *PrometheusConnector) FetchFile(ctx context.Context, src string, dst io return errors.New("fetchFile operation is not supported for Prometheus connector") } -// ExecuteCommand executes a PromQL query +// ExecuteCommand executes a PromQL query and returns both stdout and stderr // For Prometheus connector, the command is interpreted as a PromQL query -func (pc *PrometheusConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, error) { +// The returned []byte is the stdout, []byte is stderr (always nil), and error is the error if any +func (pc *PrometheusConnector) ExecuteCommand(ctx context.Context, cmd string) ([]byte, []byte, error) { if !pc.connected { - return nil, errors.New("prometheus connector is not initialized, call Init() first") + // If not initialized, return error and nil stderr + return nil, nil, errors.New("prometheus connector is not initialized, call Init() first") } - // Parse the command + // Parse the command into query parameters queryParams := parseCommand(cmd) queryString := queryParams["query"] if queryString == "" { - return nil, errors.New("query parameter is required for Prometheus queries") + // If query is missing, return error and nil stderr + return nil, nil, errors.New("query parameter is required for Prometheus queries") } klog.V(4).InfoS("Executing Prometheus query", "query", queryString) - // Build query URL + // Build the Prometheus query URL apiURL, err := url.Parse(pc.url + "api/v1/query") if err != nil { - return nil, errors.Wrapf(err, "failed to parse URL with base: %s", pc.url) + return nil, nil, errors.Wrapf(err, "failed to parse URL with base: %s", pc.url) } // Add query parameters @@ -235,56 +235,58 @@ func (pc *PrometheusConnector) ExecuteCommand(ctx context.Context, cmd string) ( apiURL.RawQuery = params.Encode() - // Create request + // Create HTTP request req, err := http.NewRequestWithContext(ctx, http.MethodGet, apiURL.String(), http.NoBody) if err != nil { - return nil, errors.Wrap(err, "failed to create HTTP request") + return nil, nil, errors.Wrap(err, "failed to create HTTP request") } - // Add auth headers + // Add authentication and custom headers pc.addAuthHeaders(req) - // Execute request + // Execute HTTP request klog.V(4).InfoS("Sending request to Prometheus", "url", req.URL.String()) resp, err := pc.client.Do(req) if err != nil { klog.ErrorS(err, "Failed to execute Prometheus query", "query", queryString) - return nil, errors.Wrap(err, "failed to execute prometheus query") + return nil, nil, errors.Wrap(err, "failed to execute prometheus query") } defer resp.Body.Close() - // Read response + // Read response body bodyBytes, err := io.ReadAll(resp.Body) if err != nil { klog.ErrorS(err, "Failed to read response body") - return nil, errors.Wrap(err, "failed to read response body") + return nil, nil, errors.Wrap(err, "failed to read response body") } - // Check if response is successful + // If HTTP status is not OK, return error and nil stderr if resp.StatusCode != http.StatusOK { klog.ErrorS(err, "Prometheus query failed", "statusCode", resp.StatusCode, "response", string(bodyBytes), "query", queryString) - return nil, errors.Errorf("prometheus query failed with status %d: %s", resp.StatusCode, string(bodyBytes)) + return nil, nil, errors.Errorf("prometheus query failed with status %d: %s", resp.StatusCode, string(bodyBytes)) } // Format the response based on the format parameter format := queryParams["format"] if format != "" { klog.V(4).InfoS("Formatting response", "format", format) - return pc.formatResponse(bodyBytes, format) + stdout, ferr := pc.formatResponse(bodyBytes, format) + // Always return nil for stderr as per requirement + return stdout, nil, ferr } // Default to prettified JSON var prettyJSON bytes.Buffer if err := json.Indent(&prettyJSON, bodyBytes, "", " "); err != nil { klog.V(4).InfoS("Failed to prettify JSON response, returning raw response") - // If prettifying fails, return the original response - return bodyBytes, nil + // If prettifying fails, return the original response and nil stderr + return bodyBytes, nil, nil } klog.V(4).InfoS("Prometheus query executed successfully") - return prettyJSON.Bytes(), nil + return prettyJSON.Bytes(), nil, nil } // addAuthHeaders adds authentication headers to the request @@ -376,7 +378,7 @@ func (pc *PrometheusConnector) formatResponse(bodyBytes []byte, format string) ( // extractSimpleValue attempts to extract a simple value from the Prometheus response func (pc *PrometheusConnector) extractSimpleValue(response map[string]any) ([]byte, error) { - // 验证响应格式 + // Validate response format if err := validatePrometheusResponse(response); err != nil { return nil, err } @@ -396,7 +398,7 @@ func (pc *PrometheusConnector) extractSimpleValue(response map[string]any) ([]by return nil, errors.New("invalid response format: result field missing") } - // 根据不同的结果类型处理 + // Handle different result types switch resultType { case "vector": return extractVectorValue(result) @@ -411,7 +413,7 @@ func (pc *PrometheusConnector) extractSimpleValue(response map[string]any) ([]by } } -// validatePrometheusResponse 验证Prometheus响应的基本结构 +// validatePrometheusResponse validates the basic structure of a Prometheus response func validatePrometheusResponse(response map[string]any) error { if status, ok := response["status"].(string); !ok || status != "success" { return errors.New("prometheus query failed") @@ -419,7 +421,7 @@ func validatePrometheusResponse(response map[string]any) error { return nil } -// extractVectorValue 从向量结果中提取值 +// extractVectorValue extracts value from a vector result func extractVectorValue(result any) ([]byte, error) { samples, ok := result.([]any) if !ok || len(samples) == 0 { @@ -439,7 +441,7 @@ func extractVectorValue(result any) ([]byte, error) { return []byte(fmt.Sprintf("%v", value[1])), nil } -// extractScalarValue 从标量结果中提取值 +// extractScalarValue extracts value from a scalar result func extractScalarValue(result any) ([]byte, error) { value, ok := result.([]any) if !ok || len(value) < 2 { @@ -449,7 +451,7 @@ func extractScalarValue(result any) ([]byte, error) { return []byte(fmt.Sprintf("%v", value[1])), nil } -// extractStringValue 从字符串结果中提取值 +// extractStringValue extracts value from a string result func extractStringValue(result any) ([]byte, error) { value, ok := result.([]any) if !ok || len(value) < 2 { @@ -459,7 +461,7 @@ func extractStringValue(result any) ([]byte, error) { return []byte(fmt.Sprintf("%v", value[1])), nil } -// extractMatrixValue 从矩阵结果中提取值 +// extractMatrixValue extracts value from a matrix result func extractMatrixValue(result any) ([]byte, error) { matrixData, err := json.MarshalIndent(result, "", " ") if err != nil { @@ -468,9 +470,9 @@ func extractMatrixValue(result any) ([]byte, error) { return matrixData, nil } -// formatAsTable 重构以降低认知复杂度 +// formatAsTable formats the response as a table to reduce cognitive complexity func (pc *PrometheusConnector) formatAsTable(response map[string]any) ([]byte, error) { - // 验证响应格式并获取结果集 + // Validate response and get result set result, err := getValidVectorResult(response) if err != nil { return nil, err @@ -480,11 +482,11 @@ func (pc *PrometheusConnector) formatAsTable(response map[string]any) ([]byte, e return []byte("No data"), nil } - // 构建表格 + // Build table from result set return buildTableFromResult(result) } -// getValidVectorResult 验证响应并获取vector类型的结果集 +// getValidVectorResult validates the response and gets the vector result set func getValidVectorResult(response map[string]any) ([]any, error) { if status, ok := response["status"].(string); !ok || status != "success" { return nil, errors.New("prometheus query failed") @@ -512,26 +514,26 @@ func getValidVectorResult(response map[string]any) ([]any, error) { return result, nil } -// buildTableFromResult 从结果集构建表格 +// buildTableFromResult builds a table from the result set func buildTableFromResult(result []any) ([]byte, error) { var builder strings.Builder - // 表格标题 + // Table header if _, err := builder.WriteString("METRIC\tVALUE\tTIMESTAMP\n"); err != nil { return nil, err } - // 表格行 + // Table rows for _, item := range result { sample, ok := item.(map[string]any) if !ok { continue } - // 获取指标名称 + // Get metric name metric := getMetricName(sample) - // 添加值和时间戳 + // Add value and timestamp if err := addValueAndTimestamp(&builder, sample, metric); err != nil { return nil, err } @@ -540,7 +542,7 @@ func buildTableFromResult(result []any) ([]byte, error) { return []byte(builder.String()), nil } -// getMetricName 提取指标名称 +// getMetricName extracts the metric name func getMetricName(sample map[string]any) string { metric := "undefined" m, ok := sample["metric"].(map[string]any) @@ -548,7 +550,7 @@ func getMetricName(sample map[string]any) string { return metric } - // 提取指标名称 + // Extract metric name and labels parts := []string{} for k, v := range m { if k == "__name__" { @@ -558,7 +560,7 @@ func getMetricName(sample map[string]any) string { } } - // 如果有标签,在指标名称中包含它们 + // If there are labels, include them in the metric name if len(parts) > 0 { metric = fmt.Sprintf("%s{%s}", metric, strings.Join(parts, ", ")) } @@ -566,11 +568,11 @@ func getMetricName(sample map[string]any) string { return metric } -// addValueAndTimestamp 添加值和时间戳到表格行 +// addValueAndTimestamp adds value and timestamp to a table row func addValueAndTimestamp(builder *strings.Builder, sample map[string]any, metric string) error { value, ok := sample["value"].([]any) if !ok || len(value) < 2 { - return nil // 跳过无效数据 + return nil // Skip invalid data } timestamp := "" @@ -606,7 +608,7 @@ func (pc *PrometheusConnector) GetServerInfo(ctx context.Context) (map[string]an return nil, errors.Wrap(err, "failed to create request for server info") } - // Add auth headers + // Add authentication headers pc.addAuthHeaders(req) // Execute request diff --git a/pkg/connector/ssh_connector.go b/pkg/connector/ssh_connector.go index 15cd2cea..b28ffe61 100644 --- a/pkg/connector/ssh_connector.go +++ b/pkg/connector/ssh_connector.go @@ -244,40 +244,40 @@ func (c *sshConnector) FetchFile(_ context.Context, src string, dst io.Writer) e } // ExecuteCommand in remote host -func (c *sshConnector) ExecuteCommand(_ context.Context, cmd string) ([]byte, error) { +func (c *sshConnector) ExecuteCommand(_ context.Context, cmd string) ([]byte, []byte, error) { cmd = fmt.Sprintf("sudo -SE %s << 'KUBEKEY_EOF'\n%s\nKUBEKEY_EOF\n", c.shell, cmd) klog.V(5).InfoS("exec ssh command", "cmd", cmd, "host", c.Host) // create ssh session session, err := c.client.NewSession() if err != nil { - return nil, errors.Wrap(err, "failed to create ssh session") + return nil, nil, errors.Wrap(err, "failed to create ssh session") } defer session.Close() // get pipe from session stdin, err := session.StdinPipe() if err != nil { - return nil, errors.Wrap(err, "failed to get stdin pipe") + return nil, nil, errors.Wrap(err, "failed to get stdin pipe") } stdout, err := session.StdoutPipe() if err != nil { - return nil, errors.Wrap(err, "failed to get stdout pipe") + return nil, nil, errors.Wrap(err, "failed to get stdout pipe") } stderr, err := session.StderrPipe() if err != nil { - return nil, errors.Wrap(err, "failed to get stderr pipe") + return nil, nil, errors.Wrap(err, "failed to get stderr pipe") } // Start the remote command if err := session.Start(cmd); err != nil { - return nil, errors.Wrap(err, "failed to start session") + return nil, nil, errors.Wrap(err, "failed to start session") } if c.Password != "" { if _, err := stdin.Write([]byte(c.Password + "\n")); err != nil { - return nil, errors.Wrap(err, "failed to write password") + return nil, nil, errors.Wrap(err, "failed to write password") } } if err := stdin.Close(); err != nil { - return nil, errors.Wrap(err, "failed to close stdin pipe") + return nil, nil, errors.Wrap(err, "failed to close stdin pipe") } // Create buffers to store stdout and stderr output @@ -308,9 +308,7 @@ func (c *sshConnector) ExecuteCommand(_ context.Context, cmd string) ([]byte, er <-stdoutDone <-stderrDone - output := append(stdoutBuf.Bytes(), stderrBuf.Bytes()...) - - return output, errors.Wrap(err, "failed to execute ssh command") + return stdoutBuf.Bytes(), stderrBuf.Bytes(), errors.Wrap(err, "failed to execute ssh command") } // HostInfo from gatherFacts cache @@ -327,19 +325,30 @@ func (c *sshConnector) getHostInfo(ctx context.Context) (map[string]any, error) return nil, err } osVars[_const.VariableOSRelease] = convertBytesToMap(osRelease.Bytes(), "=") - kernel, err := c.ExecuteCommand(ctx, "uname -r") + kernel, kernelStderr, err := c.ExecuteCommand(ctx, "uname -r") if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get kernel: %v, stderr: %q", err, string(kernelStderr)) + } + if len(kernelStderr) > 0 { + return nil, errors.Errorf("failed to get kernel, stderr: %q", string(kernelStderr)) } osVars[_const.VariableOSKernelVersion] = string(bytes.TrimSpace(kernel)) - hn, err := c.ExecuteCommand(ctx, "hostname") + + hn, hnStderr, err := c.ExecuteCommand(ctx, "hostname") if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get hostname: %v, stderr: %q", err, string(hnStderr)) + } + if len(hnStderr) > 0 { + return nil, errors.Errorf("failed to get hostname, stderr: %q", string(hnStderr)) } osVars[_const.VariableOSHostName] = string(bytes.TrimSpace(hn)) - arch, err := c.ExecuteCommand(ctx, "arch") + + arch, archStderr, err := c.ExecuteCommand(ctx, "arch") if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get arch: %v, stderr: %q", err, string(archStderr)) + } + if len(archStderr) > 0 { + return nil, errors.Errorf("failed to get arch, stderr: %q", string(archStderr)) } osVars[_const.VariableOSArchitecture] = string(bytes.TrimSpace(arch)) diff --git a/pkg/const/ip.go b/pkg/const/ip.go index e1413652..9e9b90e6 100644 --- a/pkg/const/ip.go +++ b/pkg/const/ip.go @@ -9,6 +9,10 @@ import ( "strings" ) +// =========================================================================== +// ============================= ParseIP ================================= +// =========================================================================== + // ParseIP parses a CIDR, an IP range string (e.g., "xxx-xxx"), or a single IP into a slice of actual IPs. // Supports both IPv4 and IPv6. func ParseIP(ip string) []string { @@ -211,3 +215,47 @@ func networkRange(network *net.IPNet) (net.IP, net.IP) { } return startIP, endIP } + +// =========================================================================== +// ============================= IsLocalhostIP =========================== +// =========================================================================== + +// IsLocalhostIP checks if the given IP address string (ipStr) is bound to any local network interface. +// It returns true if the IP is found on any interface, false otherwise. +// This function parses the input string as an IP address, iterates over all network interfaces on the host, +// and checks if any of the interface addresses match the target IP. +func IsLocalhostIP(ipStr string) bool { + targetIP := net.ParseIP(ipStr) + if targetIP == nil { + // The input string is not a valid IP address. + return false + } + ifaces, err := net.Interfaces() + if err != nil { + // Failed to retrieve network interfaces. + return false + } + for _, iface := range ifaces { + addrs, err := iface.Addrs() + if err != nil { + // Skip this interface if its addresses cannot be retrieved. + continue + } + for _, addr := range addrs { + switch v := addr.(type) { + case *net.IPNet: + // Check if the IP address of this interface matches the target IP. + if v.IP.Equal(targetIP) { + return true + } + case *net.IPAddr: + // Check if the IP address of this interface matches the target IP. + if v.IP.Equal(targetIP) { + return true + } + } + } + } + // The target IP was not found on any local network interface. + return false +} diff --git a/pkg/executor/task_executor.go b/pkg/executor/task_executor.go index 606a0be7..0084b1a0 100644 --- a/pkg/executor/task_executor.go +++ b/pkg/executor/task_executor.go @@ -58,7 +58,13 @@ func (e *taskExecutor) Exec(ctx context.Context) error { } // exit when task run failed if e.task.IsFailed() { - return errors.Errorf("task %q run failed", e.task.Spec.Name) + failedMsg := "\n" + for _, result := range e.task.Status.HostResults { + if result.StdErr != "" { + failedMsg += fmt.Sprintf("[%s]: %s\n", result.Host, result.StdErr) + } + } + return errors.Errorf("task [%s](%s) run failed: %s", e.task.Spec.Name, ctrlclient.ObjectKeyFromObject(e.task), failedMsg) } return nil diff --git a/pkg/modules/command.go b/pkg/modules/command.go index 4951c7b3..2cdd4ad4 100644 --- a/pkg/modules/command.go +++ b/pkg/modules/command.go @@ -19,7 +19,6 @@ package modules import ( "context" "fmt" - "strings" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -81,16 +80,11 @@ func ModuleCommand(ctx context.Context, options ExecOptions) (string, string) { return "", err.Error() } // execute command - var stdout, stderr string - data, err := conn.ExecuteCommand(ctx, string(command)) + stdout, stderr, err := conn.ExecuteCommand(ctx, string(command)) if err != nil { - stderr = err.Error() + return "", err.Error() } - if data != nil { - stdout = strings.TrimSpace(string(data)) - } - - return stdout, stderr + return string(stdout), string(stderr) } func init() { diff --git a/pkg/modules/module_test.go b/pkg/modules/module_test.go index 04dd7ab0..74aeaf22 100644 --- a/pkg/modules/module_test.go +++ b/pkg/modules/module_test.go @@ -30,7 +30,7 @@ import ( "github.com/kubesphere/kubekey/v4/pkg/variable/source" ) -var successConnector = &testConnector{output: []byte("success")} +var successConnector = &testConnector{stdout: []byte("success")} var failedConnector = &testConnector{ copyErr: errors.New("failed"), fetchErr: errors.New("failed"), @@ -49,7 +49,8 @@ type testConnector struct { // return for fetch fetchErr error // return for command - output []byte + stdout []byte + stderr []byte commandErr error } @@ -69,8 +70,8 @@ func (t testConnector) FetchFile(context.Context, string, io.Writer) error { return t.fetchErr } -func (t testConnector) ExecuteCommand(context.Context, string) ([]byte, error) { - return t.output, t.commandErr +func (t testConnector) ExecuteCommand(context.Context, string) ([]byte, []byte, error) { + return t.stdout, t.stderr, t.commandErr } // newTestVariable creates a new variable.Variable for testing purposes. diff --git a/pkg/modules/prometheus.go b/pkg/modules/prometheus.go index 33bdba07..8fb7e193 100644 --- a/pkg/modules/prometheus.go +++ b/pkg/modules/prometheus.go @@ -172,7 +172,7 @@ func ModulePrometheus(ctx context.Context, options ExecOptions) (string, string) } // Execute query - result, err := conn.ExecuteCommand(ctx, string(cmdBytes)) + result, _, err := conn.ExecuteCommand(ctx, string(cmdBytes)) if err != nil { return "", fmt.Sprintf("failed to execute prometheus query: %v", err) } diff --git a/pkg/web/api/result.go b/pkg/web/api/result.go index 7092c60e..c2f15e9e 100644 --- a/pkg/web/api/result.go +++ b/pkg/web/api/result.go @@ -41,6 +41,7 @@ var SUCCESS = Result{Message: ResultSucceed} // The Message field is typically used to convey error or success information. type Result struct { Message string `description:"error message" json:"message"` // Message provides details about the result or error. + Result any `json:"result"` } // ListResult is a generic struct representing a paginated list response. @@ -54,18 +55,18 @@ type ListResult[T any] struct { // InventoryHostTable represents a host entry in an inventory with its configuration details. // It includes network information, SSH credentials, group membership, and architecture. type InventoryHostTable struct { - Name string `json:"name"` // Hostname of the inventory host - Status string `json:"status"` // Current status of the host - InternalIPV4 string `json:"internalIPV4"` // IPv4 address of the host - InternalIPV6 string `json:"internalIPV6"` // IPv6 address of the host - SSHHost string `json:"sshHost"` // SSH hostname for connection - SSHPort string `json:"sshPort"` // SSH port for connection - SSHUser string `json:"sshUser"` // SSH username for authentication - SSHPassword string `json:"sshPassword"` // SSH password for authentication - SSHPrivateKey string `json:"sshPrivateKey"` // SSH private key for authentication - Vars map[string]any `json:"vars"` // Additional host variables - Groups []InventoryHostGroups `json:"groups"` // Groups the host belongs to - Arch string `json:"arch"` // Architecture of the host + Name string `json:"name"` // Hostname of the inventory host + Status string `json:"status"` // Current status of the host + InternalIPV4 string `json:"internalIPV4"` // IPv4 address of the host + InternalIPV6 string `json:"internalIPV6"` // IPv6 address of the host + SSHHost string `json:"sshHost"` // SSH hostname for connection + SSHPort string `json:"sshPort"` // SSH port for connection + SSHUser string `json:"sshUser"` // SSH username for authentication + SSHPassword string `json:"sshPassword"` // SSH password for authentication + SSHPrivateKeyContent string `json:"sshPrivateKeyContent"` // SSH private key content for authentication + Vars map[string]any `json:"vars"` // Additional host variables + Groups []InventoryHostGroups `json:"groups"` // Groups the host belongs to + Arch string `json:"arch"` // Architecture of the host } // InventoryHostGroups represents the group information for a host in the inventory. diff --git a/pkg/web/corev1.go b/pkg/web/corev1.go deleted file mode 100644 index 76f29201..00000000 --- a/pkg/web/corev1.go +++ /dev/null @@ -1,763 +0,0 @@ -package web - -import ( - "bufio" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "os" - "path/filepath" - "reflect" - "slices" - "strings" - "sync" - "time" - - "github.com/cockroachdb/errors" - restfulspec "github.com/emicklei/go-restful-openapi/v2" - "github.com/emicklei/go-restful/v3" - kkcorev1 "github.com/kubesphere/kubekey/api/core/v1" - kkcorev1alpha1 "github.com/kubesphere/kubekey/api/core/v1alpha1" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/rest" - "k8s.io/klog/v2" - ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" - - _const "github.com/kubesphere/kubekey/v4/pkg/const" - "github.com/kubesphere/kubekey/v4/pkg/executor" - "github.com/kubesphere/kubekey/v4/pkg/variable" - "github.com/kubesphere/kubekey/v4/pkg/web/api" - "github.com/kubesphere/kubekey/v4/pkg/web/query" -) - -// NewCoreService creates and configures a new RESTful web service for managing inventories and playbooks. -// It sets up routes for CRUD operations on inventories and playbooks, including pagination, sorting, and filtering. -func NewCoreService(workdir string, client ctrlclient.Client, config *rest.Config) *restful.WebService { - ws := new(restful.WebService). - // the GroupVersion might be empty, we need to remove the final / - Path(strings.TrimRight(_const.CoreAPIPath+kkcorev1.SchemeGroupVersion.String(), "/")) - - h := newCoreHandler(workdir, client, config) - - // Inventory management routes - ws.Route(ws.POST("/inventories").To(h.createInventory). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("create a inventory."). - Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON). - Reads(kkcorev1.Inventory{}). - Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{})) - - ws.Route(ws.PATCH("/namespaces/{namespace}/inventories/{inventory}").To(h.patchInventory). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("patch a inventory."). - Consumes(string(types.JSONPatchType), string(types.MergePatchType), string(types.ApplyPatchType)).Produces(restful.MIME_JSON). - Reads(kkcorev1.Inventory{}). - Param(ws.PathParameter("namespace", "the namespace of the inventory")). - Param(ws.PathParameter("inventory", "the name of the inventory")). - Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{})) - - ws.Route(ws.GET("/inventories").To(h.listInventories). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("list all inventories."). - Produces(restful.MIME_JSON). - Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). - Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). - Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). - Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")). - Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{})) - - ws.Route(ws.GET("/namespaces/{namespace}/inventories").To(h.listInventories). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("list all inventories in a namespace."). - Produces(restful.MIME_JSON). - Param(ws.PathParameter("namespace", "the namespace of the inventory")). - Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). - Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). - Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). - Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")). - Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{})) - - ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}").To(h.inventoryInfo). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("get a inventory in a namespace."). - Produces(restful.MIME_JSON). - Param(ws.PathParameter("namespace", "the namespace of the inventory")). - Param(ws.PathParameter("inventory", "the name of the inventory")). - Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{})) - - ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}/hosts").To(h.listInventoryHosts). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("list all hosts in a inventory."). - Produces(restful.MIME_JSON). - Param(ws.PathParameter("namespace", "the namespace of the inventory")). - Param(ws.PathParameter("inventory", "the name of the inventory")). - Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). - Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). - Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). - Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")). - Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.InventoryHostTable]{})) - - // Playbook management routes - ws.Route(ws.POST("/playbooks").To(h.createPlaybook). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("create a playbook."). - Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON). - Reads(kkcorev1.Playbook{}). - Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{})) - - ws.Route(ws.GET("/playbooks").To(h.listPlaybooks). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("list all playbooks."). - Produces(restful.MIME_JSON). - Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). - Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). - Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). - Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")). - Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{})) - - ws.Route(ws.GET("/namespaces/{namespace}/playbooks").To(h.listPlaybooks). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("list all playbooks in a namespace."). - Produces(restful.MIME_JSON). - Param(ws.PathParameter("namespace", "the namespace of the playbook")). - Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). - Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). - Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). - Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")). - Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{})) - - ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}").To(h.playbookInfo). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("get or watch a playbook in a namespace."). - Produces(restful.MIME_JSON). - Param(ws.PathParameter("namespace", "the namespace of the playbook")). - Param(ws.PathParameter("playbook", "the name of the playbook")). - Param(ws.QueryParameter("watch", "set to true to watch this playbook")). - Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{})) - - ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}/log").To(h.logPlaybook). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("get a playbook execute log."). - Produces("text/plain"). - Param(ws.PathParameter("namespace", "the namespace of the playbook")). - Param(ws.PathParameter("playbook", "the name of the playbook")). - Returns(http.StatusOK, _const.StatusOK, "")) - - ws.Route(ws.DELETE("/namespaces/{namespace}/playbooks/{playbook}").To(h.deletePlaybook). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). - Doc("delete a playbook."). - Produces(restful.MIME_JSON). - Param(ws.PathParameter("namespace", "the namespace of the playbook")). - Param(ws.PathParameter("playbook", "the name of the playbook")). - Returns(http.StatusOK, _const.StatusOK, api.Result{})) - - return ws -} - -// newInventoryHandler creates a new handler instance with the given workdir, client and config -// workdir: Base directory for storing work files -// client: Kubernetes client for API operations -// config: Kubernetes REST client configuration -func newCoreHandler(workdir string, client ctrlclient.Client, config *rest.Config) *coreHandler { - // Create a new coreHandler with initialized playbookManager - return &coreHandler{workdir: workdir, client: client, restconfig: config, playbookManager: playbookManager{manager: make(map[string]context.CancelFunc)}} -} - -// playbookManager is responsible for managing playbook execution contexts and their cancellation. -// It uses a mutex to ensure thread-safe access to the manager map. -type playbookManager struct { - sync.Mutex - manager map[string]context.CancelFunc // Map of playbook key to its cancel function -} - -// addPlaybook adds a playbook and its cancel function to the manager map. -func (m *playbookManager) addPlaybook(playbook *kkcorev1.Playbook, cancel context.CancelFunc) { - m.Lock() - defer m.Unlock() - - m.manager[ctrlclient.ObjectKeyFromObject(playbook).String()] = cancel -} - -// deletePlaybook removes a playbook from the manager map. -func (m *playbookManager) deletePlaybook(playbook *kkcorev1.Playbook) { - m.Lock() - defer m.Unlock() - - delete(m.manager, ctrlclient.ObjectKeyFromObject(playbook).String()) -} - -// stopPlaybook cancels the context for a running playbook, if it exists. -func (m *playbookManager) stopPlaybook(playbook *kkcorev1.Playbook) { - // Attempt to cancel the playbook's context if it exists in the manager - if cancel, ok := m.manager[ctrlclient.ObjectKeyFromObject(playbook).String()]; ok { - cancel() - } -} - -// coreHandler implements HTTP handlers for managing inventories and playbooks -// It provides methods for CRUD operations on inventories and playbooks -type coreHandler struct { - workdir string // Base directory for storing work files - restconfig *rest.Config // Kubernetes REST client configuration - client ctrlclient.Client // Kubernetes client for API operations - - // playbookManager control to cancel playbook - playbookManager playbookManager -} - -// createInventory creates a new inventory resource -// It reads the inventory from the request body and creates it in the Kubernetes cluster -func (h *coreHandler) createInventory(request *restful.Request, response *restful.Response) { - inventory := &kkcorev1.Inventory{} - if err := request.ReadEntity(inventory); err != nil { - api.HandleBadRequest(response, request, err) - return - } - if err := h.client.Create(request.Request.Context(), inventory); err != nil { - api.HandleBadRequest(response, request, err) - return - } - - _ = response.WriteEntity(inventory) -} - -// patchInventory patches an existing inventory resource -// It reads the patch data from the request body and applies it to the specified inventory -func (h *coreHandler) patchInventory(request *restful.Request, response *restful.Response) { - namespace := request.PathParameter("namespace") - name := request.PathParameter("inventory") - data, err := io.ReadAll(request.Request.Body) - if err != nil { - api.HandleBadRequest(response, request, err) - return - } - patchType := request.HeaderParameter("Content-Type") - - // get old inventory - inventory := &kkcorev1.Inventory{} - if err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory); err != nil { - api.HandleBadRequest(response, request, err) - return - } - - if err := h.client.Patch(request.Request.Context(), inventory, ctrlclient.RawPatch(types.PatchType(patchType), data)); err != nil { - api.HandleBadRequest(response, request, err) - return - } - - _ = response.WriteEntity(inventory) -} - -// listInventories lists all inventory resources with optional filtering and sorting -// It supports field selectors and label selectors for filtering the results -func (h *coreHandler) listInventories(request *restful.Request, response *restful.Response) { - queryParam := query.ParseQueryParameter(request) - var fieldselector fields.Selector - if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok { - fs, err := fields.ParseSelector(string(v)) - if err != nil { - api.HandleError(response, request, err) - return - } - fieldselector = fs - } - namespace := request.PathParameter("namespace") - - inventoryList := &kkcorev1.InventoryList{} - err := h.client.List(request.Request.Context(), inventoryList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector}) - if err != nil { - api.HandleError(response, request, err) - return - } - - // Sort and filter the inventory list using DefaultList - results := query.DefaultList(inventoryList.Items, queryParam, func(left, right kkcorev1.Inventory, sortBy query.Field) bool { - leftMeta, err := meta.Accessor(left) - if err != nil { - return false - } - rightMeta, err := meta.Accessor(right) - if err != nil { - return false - } - - return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy) - }, func(o kkcorev1.Inventory, filter query.Filter) bool { - // skip fieldselector - if filter.Field == query.ParameterFieldSelector { - return true - } - objectMeta, err := meta.Accessor(o) - if err != nil { - return false - } - - return query.DefaultObjectMetaFilter(objectMeta, filter) - }) - - _ = response.WriteEntity(results) -} - -// inventoryInfo retrieves a specific inventory resource -// It returns the inventory with the specified name in the given namespace -func (h *coreHandler) inventoryInfo(request *restful.Request, response *restful.Response) { - namespace := request.PathParameter("namespace") - name := request.PathParameter("inventory") - - inventory := &kkcorev1.Inventory{} - - err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory) - if err != nil { - api.HandleError(response, request, err) - return - } - - _ = response.WriteEntity(inventory) -} - -// listInventoryHosts lists all hosts in an inventory with their details -// It includes information about SSH configuration, IP addresses, and group membership -func (h *coreHandler) listInventoryHosts(request *restful.Request, response *restful.Response) { - // Parse query parameters from the request - queryParam := query.ParseQueryParameter(request) - namespace := request.PathParameter("namespace") - name := request.PathParameter("inventory") - - // Retrieve the inventory object from the cluster - inventory := &kkcorev1.Inventory{} - err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory) - if err != nil { - api.HandleError(response, request, err) - return - } - - // get host-check playbook if annotation exists - playbook := &kkcorev1.Playbook{} - if playbookName, ok := inventory.Annotations[kkcorev1.HostCheckPlaybookAnnotation]; ok { - if err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Name: playbookName, Namespace: inventory.Namespace}, playbook); err != nil { - klog.Warningf("cannot found host-check playbook for inventory %q", ctrlclient.ObjectKeyFromObject(inventory)) - } - } - - // buildHostItem constructs an InventoryHostTable from the hostname and raw extension - buildHostItem := func(hostname string, raw runtime.RawExtension) api.InventoryHostTable { - // Convert the raw extension to a map of variables - vars := variable.Extension2Variables(raw) - // Extract relevant fields from the variables - internalIPV4, _ := variable.StringVar(nil, vars, _const.VariableIPv4) - internalIPV6, _ := variable.StringVar(nil, vars, _const.VariableIPv6) - sshHost, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorHost) - sshPort, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPort) - sshUser, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorUser) - sshPassword, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPassword) - sshPrivateKey, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPrivateKey) - - // Remove sensitive or redundant variables from the vars map - delete(vars, _const.VariableIPv4) - delete(vars, _const.VariableIPv6) - delete(vars, _const.VariableConnector) - - // Return the constructed InventoryHostTable - return api.InventoryHostTable{ - Name: hostname, - InternalIPV4: internalIPV4, - InternalIPV6: internalIPV6, - SSHHost: sshHost, - SSHPort: sshPort, - SSHUser: sshUser, - SSHPassword: sshPassword, - SSHPrivateKey: sshPrivateKey, - Vars: vars, - Groups: []api.InventoryHostGroups{}, - } - } - - // Convert inventory groups for host membership lookup - groups := variable.ConvertGroup(*inventory) - - // Helper to check if a host is in a group and get its index - getGroupIndex := func(groupName, hostName string) int { - for i, h := range inventory.Spec.Groups[groupName].Hosts { - if h == hostName { - return i - } - } - return -1 - } - - // fillGroups adds group names to the InventoryHostTable item if the host is a member - fillGroups := func(item *api.InventoryHostTable) { - for groupName, hosts := range groups { - // Skip special groups - if groupName == _const.VariableGroupsAll || groupName == _const.VariableUnGrouped || groupName == "k8s_cluster" { - continue - } - // If the host is in the group, add the group info to the item - if slices.Contains(hosts, item.Name) { - g := api.InventoryHostGroups{ - Role: groupName, - Index: getGroupIndex(groupName, item.Name), - } - item.Groups = append(item.Groups, g) - } - } - } - - // fillByPlaybook populates status and architecture info for the host from task results - fillByPlaybook := func(playbook kkcorev1.Playbook, item *api.InventoryHostTable) { - // Set status and architecture based on playbook phase and result - switch playbook.Status.Phase { - case kkcorev1.PlaybookPhaseFailed: - item.Status = api.ResultFailed - case kkcorev1.PlaybookPhaseSucceeded: - item.Status = api.ResultFailed - // Extract architecture info from playbook result - results := variable.Extension2Variables(playbook.Status.Result) - if arch, ok := results[item.Name].(string); ok && arch != "" { - item.Arch = arch - item.Status = api.ResultSucceed - } - } - } - - // less is a comparison function for sorting InventoryHostTable items by a given field - less := func(left, right api.InventoryHostTable, sortBy query.Field) bool { - // Compare fields for sorting - leftVal := query.GetFieldByJSONTag(reflect.ValueOf(left), sortBy) - rightVal := query.GetFieldByJSONTag(reflect.ValueOf(right), sortBy) - switch leftVal.Kind() { - case reflect.String: - return leftVal.String() > rightVal.String() - case reflect.Int, reflect.Int64: - return leftVal.Int() > rightVal.Int() - default: - return left.Name > right.Name - } - } - - // filter is a function to filter InventoryHostTable items based on query filters - filter := func(o api.InventoryHostTable, f query.Filter) bool { - // Filter by string fields, otherwise always true - val := query.GetFieldByJSONTag(reflect.ValueOf(o), f.Field) - switch val.Kind() { - case reflect.String: - return strings.Contains(val.String(), string(f.Value)) - default: - return true - } - } - - // Build the host table for the inventory - hostTable := make([]api.InventoryHostTable, 0, len(inventory.Spec.Hosts)) - for hostname, raw := range inventory.Spec.Hosts { - // Build the host item from raw data - item := buildHostItem(hostname, raw) - // Fill in group membership - fillGroups(&item) - // Fill in playbook status and architecture - fillByPlaybook(*playbook, &item) - // Add the item to the host table - hostTable = append(hostTable, item) - } - - // Sort and filter the host table, then write the result - results := query.DefaultList(hostTable, queryParam, less, filter) - _ = response.WriteEntity(results) -} - -// createPlaybook handles the creation of a new playbook resource. -// It reads the playbook from the request, sets the workdir, creates the resource, and starts execution in a goroutine. -func (h *coreHandler) createPlaybook(request *restful.Request, response *restful.Response) { - playbook := &kkcorev1.Playbook{} - // Read the playbook entity from the request body - if err := request.ReadEntity(playbook); err != nil { - api.HandleBadRequest(response, request, err) - return - } - - // Check for schema label: only one allowed, must not be empty, and must be unique among playbooks - hasSchemaLabel := false - for labelKey, labelValue := range playbook.Labels { - // Only consider labels with the schema label suffix - if !strings.HasSuffix(labelKey, api.SchemaLabelSubfix) { - continue - } - // If a schema label was already found, this is a conflict - if hasSchemaLabel { - api.HandleConflict(response, request, errors.New("a playbook can only have one schema label. Please ensure only one schema label is set")) - return - } - // The schema label value must not be empty - if labelValue == "" { - api.HandleConflict(response, request, errors.New("the schema label value must not be empty. Please provide a valid schema label value")) - return - } - hasSchemaLabel = true - // Check if there is already a playbook with the same schema label - playbookList := &kkcorev1.PlaybookList{} - if err := h.client.List(request.Request.Context(), playbookList, ctrlclient.MatchingLabels{ - labelKey: labelValue, - }); err != nil { - api.HandleBadRequest(response, request, err) - return - } - // If any playbook with the same schema label exists, this is a conflict - if len(playbookList.Items) > 0 { - api.HandleConflict(response, request, errors.New("a playbook with the same schema label already exists. Please use a different schema label or remove the existing playbook")) - return - } - } - - // Set the workdir in the playbook's spec config - if err := unstructured.SetNestedField(playbook.Spec.Config.Value(), h.workdir, _const.Workdir); err != nil { - api.HandleBadRequest(response, request, err) - return - } - - // Create the playbook resource in the cluster - if err := h.client.Create(request.Request.Context(), playbook); err != nil { - api.HandleBadRequest(response, request, err) - return - } - - // Start playbook execution in a separate goroutine - go func() { - // Build the log file path for the playbook execution - filename := filepath.Join( - _const.GetWorkdirFromConfig(playbook.Spec.Config), - _const.RuntimeDir, - kkcorev1.SchemeGroupVersion.Group, - kkcorev1.SchemeGroupVersion.Version, - "playbooks", - playbook.Namespace, - playbook.Name, - playbook.Name+".log", - ) - // Ensure the directory for the log file exists - if _, err := os.Stat(filepath.Dir(filename)); err != nil { - if !os.IsNotExist(err) { - api.HandleBadRequest(response, request, err) - return - } - // If directory does not exist, create it - if err := os.MkdirAll(filepath.Dir(filename), os.ModePerm); err != nil { - api.HandleBadRequest(response, request, err) - return - } - } - // Open the log file for writing - file, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - klog.ErrorS(err, "failed to open file", "file", filename) - return - } - defer file.Close() - - // Create a cancellable context for playbook execution - ctx, cancel := context.WithCancel(context.Background()) - // Register the playbook and its cancel function in the playbookManager - h.playbookManager.addPlaybook(playbook, cancel) - // Execute the playbook and write output to the log file - if err := executor.NewPlaybookExecutor(ctx, h.client, playbook, file).Exec(ctx); err != nil { - klog.ErrorS(err, "failed to exec playbook", "playbook", playbook.Name) - } - // Remove the playbook from the playbookManager after execution - h.playbookManager.deletePlaybook(playbook) - }() - - // For web UI: it does not run in Kubernetes, so execute playbook immediately. - _ = response.WriteEntity(playbook) -} - -// listPlaybooks handles listing playbook resources with filtering and pagination. -// It supports field selectors and label selectors for filtering the results. -func (h *coreHandler) listPlaybooks(request *restful.Request, response *restful.Response) { - queryParam := query.ParseQueryParameter(request) - var fieldselector fields.Selector - // Parse field selector from query parameters if present. - if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok { - fs, err := fields.ParseSelector(string(v)) - if err != nil { - api.HandleError(response, request, err) - return - } - fieldselector = fs - } - namespace := request.PathParameter("namespace") - - playbookList := &kkcorev1.PlaybookList{} - // List playbooks from the Kubernetes API with the specified options. - err := h.client.List(request.Request.Context(), playbookList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector}) - if err != nil { - api.HandleError(response, request, err) - return - } - - // Sort and filter the playbook list using DefaultList. - results := query.DefaultList(playbookList.Items, queryParam, func(left, right kkcorev1.Playbook, sortBy query.Field) bool { - leftMeta, err := meta.Accessor(left) - if err != nil { - return false - } - rightMeta, err := meta.Accessor(right) - if err != nil { - return false - } - - return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy) - }, func(o kkcorev1.Playbook, filter query.Filter) bool { - // Skip fieldselector filter. - if filter.Field == query.ParameterFieldSelector { - return true - } - objectMeta, err := meta.Accessor(o) - if err != nil { - return false - } - - return query.DefaultObjectMetaFilter(objectMeta, filter) - }) - - _ = response.WriteEntity(results) -} - -// playbookInfo handles retrieving a single playbook or watching for changes. -// If the "watch" query parameter is set to "true", it streams updates to the client. -func (h *coreHandler) playbookInfo(request *restful.Request, response *restful.Response) { - namespace := request.PathParameter("namespace") - name := request.PathParameter("playbook") - watch := request.QueryParameter("watch") - - playbook := &kkcorev1.Playbook{} - - if watch == "true" { - // Watch for changes to the playbook resource and stream events as JSON. - h.restconfig.GroupVersion = &kkcorev1.SchemeGroupVersion - client, err := rest.RESTClientFor(h.restconfig) - if err != nil { - api.HandleError(response, request, err) - return - } - watchInterface, err := client.Get().Namespace(namespace).Resource("playbooks").Name(name).Param("watch", "true").Watch(request.Request.Context()) - if err != nil { - api.HandleError(response, request, err) - return - } - defer watchInterface.Stop() - - response.AddHeader("Content-Type", "application/json") - flusher, ok := response.ResponseWriter.(http.Flusher) - if !ok { - http.Error(response.ResponseWriter, "Streaming unsupported", http.StatusInternalServerError) - return - } - - encoder := json.NewEncoder(response.ResponseWriter) - // Stream each event object to the client as JSON. - for event := range watchInterface.ResultChan() { - if err := encoder.Encode(event.Object); err != nil { - break - } - flusher.Flush() - } - return - } - - // Retrieve the playbook resource by namespace and name. - err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook) - if err != nil { - api.HandleError(response, request, err) - return - } - - _ = response.WriteEntity(playbook) -} - -// logPlaybook handles streaming the log file for a playbook. -// It opens the log file and streams its contents to the client, supporting live updates. -func (h *coreHandler) logPlaybook(request *restful.Request, response *restful.Response) { - namespace := request.PathParameter("namespace") - name := request.PathParameter("playbook") - - playbook := &kkcorev1.Playbook{} - // Retrieve the playbook resource to get its config for log file path. - err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook) - if err != nil { - api.HandleError(response, request, err) - return - } - - // Build the log file path for the playbook. - filename := filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log") - file, err := os.Open(filename) - if err != nil { - api.HandleError(response, request, err) - return - } - defer file.Close() - - response.AddHeader("Content-Type", "text/plain; charset=utf-8") - writer := response.ResponseWriter - flusher, ok := writer.(http.Flusher) - if !ok { - http.Error(writer, "Streaming unsupported", http.StatusInternalServerError) - return - } - - // Stream the log file line by line, waiting for new lines if at EOF. - reader := bufio.NewReader(file) - for { - line, err := reader.ReadString('\n') - if err != nil { - if err == io.EOF { - // If EOF, wait for new log lines to be written. - time.Sleep(500 * time.Millisecond) - continue - } - break - } - fmt.Fprint(writer, line) - flusher.Flush() - } -} - -// deletePlaybook handles deletion of a playbook resource and its associated tasks. -// It stops the playbook execution if running, deletes the playbook, and deletes all related tasks. -func (h *coreHandler) deletePlaybook(request *restful.Request, response *restful.Response) { - namespace := request.PathParameter("namespace") - name := request.PathParameter("playbook") - - playbook := &kkcorev1.Playbook{} - // Retrieve the playbook resource to delete. - err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook) - if err != nil { - api.HandleError(response, request, err) - return - } - // Stop the playbook execution if it is running. - h.playbookManager.stopPlaybook(playbook) - // Delete the playbook resource. - if err := h.client.Delete(request.Request.Context(), playbook); err != nil { - api.HandleError(response, request, err) - return - } - // delete relative filepath: variable and log - _ = os.Remove(filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log")) - _ = os.RemoveAll(filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name)) - // Delete all tasks owned by this playbook. - if err := h.client.DeleteAllOf(request.Request.Context(), &kkcorev1alpha1.Task{}, ctrlclient.InNamespace(playbook.Namespace), ctrlclient.MatchingFields{ - "playbook.name": playbook.Name, - "playbook.uid": string(playbook.UID), - }); err != nil { - api.HandleError(response, request, err) - return - } - - _ = response.WriteEntity(api.SUCCESS) -} diff --git a/pkg/web/handler/executor.go b/pkg/web/handler/executor.go new file mode 100644 index 00000000..ccedb515 --- /dev/null +++ b/pkg/web/handler/executor.go @@ -0,0 +1,93 @@ +package handler + +import ( + "context" + "os" + "path/filepath" + "sync" + + "github.com/cockroachdb/errors" + kkcorev1 "github.com/kubesphere/kubekey/api/core/v1" + "k8s.io/klog/v2" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + + _const "github.com/kubesphere/kubekey/v4/pkg/const" + "github.com/kubesphere/kubekey/v4/pkg/executor" +) + +var playbookManager = &manager{ + manager: make(map[string]context.CancelFunc), +} + +// playbookManager is responsible for managing playbook execution contexts and their cancellation. +// It uses a mutex to ensure thread-safe access to the manager map. +type manager struct { + sync.Mutex + manager map[string]context.CancelFunc // Map of playbook key to its cancel function +} + +func (m *manager) executor(playbook *kkcorev1.Playbook, client ctrlclient.Client) error { + // Build the log file path for the playbook execution + filename := filepath.Join( + _const.GetWorkdirFromConfig(playbook.Spec.Config), + _const.RuntimeDir, + kkcorev1.SchemeGroupVersion.Group, + kkcorev1.SchemeGroupVersion.Version, + "playbooks", + playbook.Namespace, + playbook.Name, + playbook.Name+".log", + ) + // Ensure the directory for the log file exists + if _, err := os.Stat(filepath.Dir(filename)); err != nil { + if !os.IsNotExist(err) { + return errors.Wrapf(err, "failed to stat playbook dir %q", filepath.Dir(filename)) + } + // If directory does not exist, create it + if err := os.MkdirAll(filepath.Dir(filename), os.ModePerm); err != nil { + return errors.Wrapf(err, "failed to create playbook dir %q", filepath.Dir(filename)) + } + } + // Open the log file for writing + file, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return errors.Wrapf(err, "failed to open log file", "file", filename) + } + defer file.Close() + + // Create a cancellable context for playbook execution + ctx, cancel := context.WithCancel(context.Background()) + // Register the playbook and its cancel function in the playbookManager + m.addPlaybook(playbook, cancel) + // Execute the playbook and write output to the log file + if err := executor.NewPlaybookExecutor(ctx, client, playbook, file).Exec(ctx); err != nil { + klog.ErrorS(err, "failed to exec playbook", "playbook", playbook.Name) + } + // Remove the playbook from the playbookManager after execution + m.deletePlaybook(playbook) + return nil +} + +// addPlaybook adds a playbook and its cancel function to the manager map. +func (m *manager) addPlaybook(playbook *kkcorev1.Playbook, cancel context.CancelFunc) { + m.Lock() + defer m.Unlock() + + m.manager[ctrlclient.ObjectKeyFromObject(playbook).String()] = cancel +} + +// deletePlaybook removes a playbook from the manager map. +func (m *manager) deletePlaybook(playbook *kkcorev1.Playbook) { + m.Lock() + defer m.Unlock() + + delete(m.manager, ctrlclient.ObjectKeyFromObject(playbook).String()) +} + +// stopPlaybook cancels the context for a running playbook, if it exists. +func (m *manager) stopPlaybook(playbook *kkcorev1.Playbook) { + // Attempt to cancel the playbook's context if it exists in the manager + if cancel, ok := m.manager[ctrlclient.ObjectKeyFromObject(playbook).String()]; ok { + cancel() + } +} diff --git a/pkg/web/handler/inventory.go b/pkg/web/handler/inventory.go new file mode 100644 index 00000000..5b5a5e99 --- /dev/null +++ b/pkg/web/handler/inventory.go @@ -0,0 +1,298 @@ +package handler + +import ( + "io" + "reflect" + "slices" + "strings" + + "github.com/emicklei/go-restful/v3" + kkcorev1 "github.com/kubesphere/kubekey/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + + _const "github.com/kubesphere/kubekey/v4/pkg/const" + "github.com/kubesphere/kubekey/v4/pkg/variable" + "github.com/kubesphere/kubekey/v4/pkg/web/api" + "github.com/kubesphere/kubekey/v4/pkg/web/query" +) + +// InventoryHandler handles HTTP requests for inventory resources. +type InventoryHandler struct { + workdir string // Base directory for storing work files + restconfig *rest.Config // Kubernetes REST client configuration + client ctrlclient.Client // Kubernetes client for API operations +} + +// NewInventoryHandler creates a new InventoryHandler instance. +func NewInventoryHandler(workdir string, restconfig *rest.Config, client ctrlclient.Client) *InventoryHandler { + return &InventoryHandler{workdir: workdir, restconfig: restconfig, client: client} +} + +// Post creates a new inventory resource. +// It reads the inventory from the request body and creates it in the Kubernetes cluster. +func (h *InventoryHandler) Post(request *restful.Request, response *restful.Response) { + inventory := &kkcorev1.Inventory{} + if err := request.ReadEntity(inventory); err != nil { + api.HandleBadRequest(response, request, err) + return + } + if err := h.client.Create(request.Request.Context(), inventory); err != nil { + api.HandleBadRequest(response, request, err) + return + } + + _ = response.WriteEntity(inventory) +} + +// Patch updates an existing inventory resource. +// It reads the patch data from the request body and applies it to the specified inventory. +func (h *InventoryHandler) Patch(request *restful.Request, response *restful.Response) { + namespace := request.PathParameter("namespace") + name := request.PathParameter("inventory") + data, err := io.ReadAll(request.Request.Body) + if err != nil { + api.HandleBadRequest(response, request, err) + return + } + patchType := request.HeaderParameter("Content-Type") + + // Get the existing inventory object. + inventory := &kkcorev1.Inventory{} + if err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory); err != nil { + api.HandleBadRequest(response, request, err) + return + } + + // Apply the patch. + if err := h.client.Patch(request.Request.Context(), inventory, ctrlclient.RawPatch(types.PatchType(patchType), data)); err != nil { + api.HandleBadRequest(response, request, err) + return + } + + _ = response.WriteEntity(inventory) +} + +// List returns all inventory resources with optional filtering and sorting. +// It supports field selectors and label selectors for filtering the results. +func (h *InventoryHandler) List(request *restful.Request, response *restful.Response) { + queryParam := query.ParseQueryParameter(request) + var fieldselector fields.Selector + // Parse field selector from query parameters if present. + if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok { + fs, err := fields.ParseSelector(string(v)) + if err != nil { + api.HandleError(response, request, err) + return + } + fieldselector = fs + } + namespace := request.PathParameter("namespace") + + inventoryList := &kkcorev1.InventoryList{} + // List inventory resources from the Kubernetes API. + err := h.client.List(request.Request.Context(), inventoryList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector}) + if err != nil { + api.HandleError(response, request, err) + return + } + + // Sort and filter the inventory list using DefaultList. + results := query.DefaultList(inventoryList.Items, queryParam, func(left, right kkcorev1.Inventory, sortBy query.Field) bool { + leftMeta, err := meta.Accessor(left) + if err != nil { + return false + } + rightMeta, err := meta.Accessor(right) + if err != nil { + return false + } + + return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy) + }, func(o kkcorev1.Inventory, filter query.Filter) bool { + // Skip fieldselector filter. + if filter.Field == query.ParameterFieldSelector { + return true + } + objectMeta, err := meta.Accessor(o) + if err != nil { + return false + } + + return query.DefaultObjectMetaFilter(objectMeta, filter) + }) + + _ = response.WriteEntity(results) +} + +// Info retrieves a specific inventory resource. +// It returns the inventory with the specified name in the given namespace. +func (h *InventoryHandler) Info(request *restful.Request, response *restful.Response) { + namespace := request.PathParameter("namespace") + name := request.PathParameter("inventory") + + inventory := &kkcorev1.Inventory{} + + err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory) + if err != nil { + api.HandleError(response, request, err) + return + } + + _ = response.WriteEntity(inventory) +} + +// ListHosts lists all hosts in an inventory with their details. +// It includes information about SSH configuration, IP addresses, and group membership. +func (h *InventoryHandler) ListHosts(request *restful.Request, response *restful.Response) { + // Parse query parameters from the request. + queryParam := query.ParseQueryParameter(request) + namespace := request.PathParameter("namespace") + name := request.PathParameter("inventory") + + // Retrieve the inventory object from the cluster. + inventory := &kkcorev1.Inventory{} + err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory) + if err != nil { + api.HandleError(response, request, err) + return + } + + // Get host-check playbook if annotation exists. + playbook := &kkcorev1.Playbook{} + if playbookName, ok := inventory.Annotations[kkcorev1.HostCheckPlaybookAnnotation]; ok { + if err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Name: playbookName, Namespace: inventory.Namespace}, playbook); err != nil { + klog.Warningf("cannot found host-check playbook for inventory %q", ctrlclient.ObjectKeyFromObject(inventory)) + } + } + + // buildHostItem constructs an InventoryHostTable from the hostname and raw extension. + buildHostItem := func(hostname string, raw runtime.RawExtension) api.InventoryHostTable { + // Convert the raw extension to a map of variables. + vars := variable.Extension2Variables(raw) + // Extract relevant fields from the variables. + internalIPV4, _ := variable.StringVar(nil, vars, _const.VariableIPv4) + internalIPV6, _ := variable.StringVar(nil, vars, _const.VariableIPv6) + sshHost, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorHost) + sshPort, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPort) + sshUser, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorUser) + sshPassword, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPassword) + sshPrivateKeyContent, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPrivateKeyContent) + + // Remove sensitive or redundant variables from the vars map. + delete(vars, _const.VariableIPv4) + delete(vars, _const.VariableIPv6) + delete(vars, _const.VariableConnector) + + // Return the constructed InventoryHostTable. + return api.InventoryHostTable{ + Name: hostname, + InternalIPV4: internalIPV4, + InternalIPV6: internalIPV6, + SSHHost: sshHost, + SSHPort: sshPort, + SSHUser: sshUser, + SSHPassword: sshPassword, + SSHPrivateKeyContent: sshPrivateKeyContent, + Vars: vars, + Groups: []api.InventoryHostGroups{}, + } + } + + // Convert inventory groups for host membership lookup. + groups := variable.ConvertGroup(*inventory) + + // getGroupIndex checks if a host is in a group and returns its index. + getGroupIndex := func(groupName, hostName string) int { + for i, h := range inventory.Spec.Groups[groupName].Hosts { + if h == hostName { + return i + } + } + return -1 + } + + // fillGroups adds group names to the InventoryHostTable item if the host is a member. + fillGroups := func(item *api.InventoryHostTable) { + for groupName, hosts := range groups { + // Skip special groups. + if groupName == _const.VariableGroupsAll || groupName == _const.VariableUnGrouped || groupName == "k8s_cluster" { + continue + } + // If the host is in the group, add the group info to the item. + if slices.Contains(hosts, item.Name) { + g := api.InventoryHostGroups{ + Role: groupName, + Index: getGroupIndex(groupName, item.Name), + } + item.Groups = append(item.Groups, g) + } + } + } + + // fillByPlaybook populates status and architecture info for the host from task results. + fillByPlaybook := func(playbook kkcorev1.Playbook, item *api.InventoryHostTable) { + // Set status and architecture based on playbook phase and result. + switch playbook.Status.Phase { + case kkcorev1.PlaybookPhaseFailed: + item.Status = api.ResultFailed + case kkcorev1.PlaybookPhaseSucceeded: + item.Status = api.ResultFailed + // Extract architecture info from playbook result. + results := variable.Extension2Variables(playbook.Status.Result) + if arch, ok := results[item.Name].(string); ok && arch != "" { + item.Arch = arch + item.Status = api.ResultSucceed + } + } + } + + // less is a comparison function for sorting InventoryHostTable items by a given field. + less := func(left, right api.InventoryHostTable, sortBy query.Field) bool { + // Compare fields for sorting. + leftVal := query.GetFieldByJSONTag(reflect.ValueOf(left), sortBy) + rightVal := query.GetFieldByJSONTag(reflect.ValueOf(right), sortBy) + switch leftVal.Kind() { + case reflect.String: + return leftVal.String() > rightVal.String() + case reflect.Int, reflect.Int64: + return leftVal.Int() > rightVal.Int() + default: + return left.Name > right.Name + } + } + + // filter is a function to filter InventoryHostTable items based on query filters. + filter := func(o api.InventoryHostTable, f query.Filter) bool { + // Filter by string fields, otherwise always true. + val := query.GetFieldByJSONTag(reflect.ValueOf(o), f.Field) + switch val.Kind() { + case reflect.String: + return strings.Contains(val.String(), string(f.Value)) + default: + return true + } + } + + // Build the host table for the inventory. + hostTable := make([]api.InventoryHostTable, 0, len(inventory.Spec.Hosts)) + for hostname, raw := range inventory.Spec.Hosts { + // Build the host item from raw data. + item := buildHostItem(hostname, raw) + // Fill in group membership. + fillGroups(&item) + // Fill in playbook status and architecture. + fillByPlaybook(*playbook, &item) + // Add the item to the host table. + hostTable = append(hostTable, item) + } + + // Sort and filter the host table, then write the result. + results := query.DefaultList(hostTable, queryParam, less, filter) + _ = response.WriteEntity(results) +} diff --git a/pkg/web/handler/playbook.go b/pkg/web/handler/playbook.go new file mode 100644 index 00000000..034ac9a5 --- /dev/null +++ b/pkg/web/handler/playbook.go @@ -0,0 +1,293 @@ +package handler + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + "github.com/emicklei/go-restful/v3" + kkcorev1 "github.com/kubesphere/kubekey/api/core/v1" + kkcorev1alpha1 "github.com/kubesphere/kubekey/api/core/v1alpha1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + + _const "github.com/kubesphere/kubekey/v4/pkg/const" + "github.com/kubesphere/kubekey/v4/pkg/web/api" + "github.com/kubesphere/kubekey/v4/pkg/web/query" +) + +// PlaybookHandler handles HTTP requests for playbook resources. +type PlaybookHandler struct { + workdir string // Base directory for storing work files + restconfig *rest.Config // Kubernetes REST client configuration + client ctrlclient.Client // Kubernetes client for API operations +} + +// NewPlaybookHandler creates a new PlaybookHandler with the given workdir, restconfig, and client. +func NewPlaybookHandler(workdir string, restconfig *rest.Config, client ctrlclient.Client) *PlaybookHandler { + return &PlaybookHandler{workdir: workdir, restconfig: restconfig, client: client} +} + +// Post handles the creation of a new playbook resource. +// It reads the playbook from the request, checks schema label constraints, sets the workdir, creates the resource, and starts execution in a goroutine. +func (h *PlaybookHandler) Post(request *restful.Request, response *restful.Response) { + playbook := &kkcorev1.Playbook{} + // Read the playbook entity from the request body + if err := request.ReadEntity(playbook); err != nil { + api.HandleBadRequest(response, request, err) + return + } + + // Check for schema label: only one allowed, must not be empty, and must be unique among playbooks + hasSchemaLabel := false + for labelKey, labelValue := range playbook.Labels { + // Only consider labels with the schema label suffix + if !strings.HasSuffix(labelKey, api.SchemaLabelSubfix) { + continue + } + // If a schema label was already found, this is a conflict + if hasSchemaLabel { + api.HandleConflict(response, request, errors.New("a playbook can only have one schema label. Please ensure only one schema label is set")) + return + } + // The schema label value must not be empty + if labelValue == "" { + api.HandleConflict(response, request, errors.New("the schema label value must not be empty. Please provide a valid schema label value")) + return + } + hasSchemaLabel = true + // Check if there is already a playbook with the same schema label + playbookList := &kkcorev1.PlaybookList{} + if err := h.client.List(request.Request.Context(), playbookList, ctrlclient.MatchingLabels{ + labelKey: labelValue, + }); err != nil { + api.HandleBadRequest(response, request, err) + return + } + // If any playbook with the same schema label exists, this is a conflict + if len(playbookList.Items) > 0 { + api.HandleConflict(response, request, errors.New("a playbook with the same schema label already exists. Please use a different schema label or remove the existing playbook")) + return + } + } + + // Set the workdir in the playbook's spec config + if err := unstructured.SetNestedField(playbook.Spec.Config.Value(), h.workdir, _const.Workdir); err != nil { + api.HandleBadRequest(response, request, err) + return + } + playbook.Status.Phase = kkcorev1.PlaybookPhasePending + // Create the playbook resource in Kubernetes + if err := h.client.Create(context.TODO(), playbook); err != nil { + api.HandleBadRequest(response, request, err) + return + } + // Start playbook execution in a separate goroutine + go func() { + if err := playbookManager.executor(playbook, h.client); err != nil { + klog.ErrorS(err, "failed to executor playbook", "playbook", ctrlclient.ObjectKeyFromObject(playbook)) + } + }() + + // For web UI: it does not run in Kubernetes, so execute playbook immediately. + _ = response.WriteEntity(playbook) +} + +// List handles listing playbook resources with filtering and pagination. +// It supports field selectors and label selectors for filtering the results. +func (h *PlaybookHandler) List(request *restful.Request, response *restful.Response) { + queryParam := query.ParseQueryParameter(request) + var fieldselector fields.Selector + // Parse field selector from query parameters if present. + if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok { + fs, err := fields.ParseSelector(string(v)) + if err != nil { + api.HandleError(response, request, err) + return + } + fieldselector = fs + } + namespace := request.PathParameter("namespace") + + playbookList := &kkcorev1.PlaybookList{} + // List playbooks from the Kubernetes API with the specified options. + err := h.client.List(request.Request.Context(), playbookList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector}) + if err != nil { + api.HandleError(response, request, err) + return + } + + // Sort and filter the playbook list using DefaultList. + results := query.DefaultList(playbookList.Items, queryParam, func(left, right kkcorev1.Playbook, sortBy query.Field) bool { + leftMeta, err := meta.Accessor(left) + if err != nil { + return false + } + rightMeta, err := meta.Accessor(right) + if err != nil { + return false + } + + return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy) + }, func(o kkcorev1.Playbook, filter query.Filter) bool { + // Skip fieldselector filter. + if filter.Field == query.ParameterFieldSelector { + return true + } + objectMeta, err := meta.Accessor(o) + if err != nil { + return false + } + + return query.DefaultObjectMetaFilter(objectMeta, filter) + }) + + _ = response.WriteEntity(results) +} + +// Info handles retrieving a single playbook or watching for changes. +// If the "watch" query parameter is set to "true", it streams updates to the client. +func (h *PlaybookHandler) Info(request *restful.Request, response *restful.Response) { + namespace := request.PathParameter("namespace") + name := request.PathParameter("playbook") + watch := request.QueryParameter("watch") + + playbook := &kkcorev1.Playbook{} + + if watch == "true" { + // Watch for changes to the playbook resource and stream events as JSON. + h.restconfig.GroupVersion = &kkcorev1.SchemeGroupVersion + client, err := rest.RESTClientFor(h.restconfig) + if err != nil { + api.HandleError(response, request, err) + return + } + watchInterface, err := client.Get().Namespace(namespace).Resource("playbooks").Name(name).Param("watch", "true").Watch(request.Request.Context()) + if err != nil { + api.HandleError(response, request, err) + return + } + defer watchInterface.Stop() + + response.AddHeader("Content-Type", "application/json") + flusher, ok := response.ResponseWriter.(http.Flusher) + if !ok { + http.Error(response.ResponseWriter, "Streaming unsupported", http.StatusInternalServerError) + return + } + + encoder := json.NewEncoder(response.ResponseWriter) + // Stream each event object to the client as JSON. + for event := range watchInterface.ResultChan() { + if err := encoder.Encode(event.Object); err != nil { + break + } + flusher.Flush() + } + return + } + + // Retrieve the playbook resource by namespace and name. + err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook) + if err != nil { + api.HandleError(response, request, err) + return + } + + _ = response.WriteEntity(playbook) +} + +// Log handles streaming the log file for a playbook. +// It opens the log file and streams its contents to the client, supporting live updates. +func (h *PlaybookHandler) Log(request *restful.Request, response *restful.Response) { + namespace := request.PathParameter("namespace") + name := request.PathParameter("playbook") + + playbook := &kkcorev1.Playbook{} + // Retrieve the playbook resource to get its config for log file path. + err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook) + if err != nil { + api.HandleError(response, request, err) + return + } + + // Build the log file path for the playbook. + filename := filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log") + file, err := os.Open(filename) + if err != nil { + api.HandleError(response, request, err) + return + } + defer file.Close() + + response.AddHeader("Content-Type", "text/plain; charset=utf-8") + writer := response.ResponseWriter + flusher, ok := writer.(http.Flusher) + if !ok { + http.Error(writer, "Streaming unsupported", http.StatusInternalServerError) + return + } + + // Stream the log file line by line, waiting for new lines if at EOF. + reader := bufio.NewReader(file) + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + // If EOF, wait for new log lines to be written. + time.Sleep(500 * time.Millisecond) + continue + } + break + } + fmt.Fprint(writer, line) + flusher.Flush() + } +} + +// Delete handles deletion of a playbook resource and its associated tasks. +// It stops the playbook execution if running, deletes the playbook, removes related files, and deletes all related tasks. +func (h *PlaybookHandler) Delete(request *restful.Request, response *restful.Response) { + namespace := request.PathParameter("namespace") + name := request.PathParameter("playbook") + + playbook := &kkcorev1.Playbook{} + // Retrieve the playbook resource to delete. + err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook) + if err != nil { + api.HandleError(response, request, err) + return + } + // Stop the playbook execution if it is running. + playbookManager.stopPlaybook(playbook) + // Delete the playbook resource. + if err := h.client.Delete(request.Request.Context(), playbook); err != nil { + api.HandleError(response, request, err) + return + } + // Delete related log file and directory. + _ = os.Remove(filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log")) + _ = os.RemoveAll(filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name)) + // Delete all tasks owned by this playbook. + if err := h.client.DeleteAllOf(request.Request.Context(), &kkcorev1alpha1.Task{}, ctrlclient.InNamespace(playbook.Namespace), ctrlclient.MatchingFields{ + "playbook.name": playbook.Name, + "playbook.uid": string(playbook.UID), + }); err != nil { + api.HandleError(response, request, err) + return + } + + _ = response.WriteEntity(api.SUCCESS) +} diff --git a/pkg/web/resources.go b/pkg/web/handler/resources.go similarity index 68% rename from pkg/web/resources.go rename to pkg/web/handler/resources.go index aeea48c7..831b5341 100644 --- a/pkg/web/resources.go +++ b/pkg/web/handler/resources.go @@ -1,6 +1,7 @@ -package web +package handler import ( + "context" "encoding/json" "fmt" "io" @@ -17,13 +18,16 @@ import ( "time" "github.com/cockroachdb/errors" - restfulspec "github.com/emicklei/go-restful-openapi/v2" "github.com/emicklei/go-restful/v3" kkcorev1 "github.com/kubesphere/kubekey/api/core/v1" "golang.org/x/crypto/ssh" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -32,65 +36,20 @@ import ( "github.com/kubesphere/kubekey/v4/pkg/web/query" ) -// NewSchemaService creates a new WebService that serves schema files from the specified root path. -// It sets up a route that handles GET requests to /resources/schema/{subpath} and serves files from the rootPath directory. -// The {subpath:*} parameter allows for matching any path under /resources/schema/. -func NewSchemaService(rootPath string, workdir string, client ctrlclient.Client) *restful.WebService { - ws := new(restful.WebService) - ws.Path(strings.TrimRight(_const.ResourcesAPIPath, "/")). - Produces(restful.MIME_JSON, "text/plain") - - h := newSchemaHandler(rootPath, workdir, client) - - ws.Route(ws.GET("/ip").To(h.listIP). - Doc("list available ip from ip cidr"). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}). - Param(ws.QueryParameter("cidr", "the cidr for ip").Required(true)). - Param(ws.QueryParameter("sshPort", "the ssh port for ip").Required(false)). - Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). - Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). - Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). - Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=ip").Required(false).DefaultValue("ip")). - Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.IPTable]{})) - - ws.Route(ws.GET("/schema/{subpath:*}").To(h.schemaInfo). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag})) - - ws.Route(ws.GET("/schema").To(h.allSchema). - Doc("list all schema as table"). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}). - Param(ws.PathParameter("namespace", "the namespace of the playbook").Required(false).DefaultValue("default")). - Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). - Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). - Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). - Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=priority")). - Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.SchemaTable]{})) - - ws.Route(ws.POST("/schema/config").To(h.storeConfig). - Doc("storing user-defined configuration information"). - Reads(struct{}{}). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag})) - - ws.Route(ws.GET("/schema/config").To(h.configInfo). - Doc("get user-defined configuration information"). - Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag})) - - return ws -} - -// newSchemaHandler creates a new schemaHandler instance with the given rootPath, workdir, and client. -func newSchemaHandler(rootPath string, workdir string, client ctrlclient.Client) *schemaHandler { - return &schemaHandler{rootPath: rootPath, workdir: workdir, client: client} -} - -// schemaHandler handles schema-related HTTP requests. -type schemaHandler struct { +// ResourceHandler handles resource-related HTTP requests. +type ResourceHandler struct { rootPath string workdir string client ctrlclient.Client } -func (h schemaHandler) configInfo(request *restful.Request, response *restful.Response) { +// NewResourceHandler creates a new ResourceHandler instance. +func NewResourceHandler(rootPath string, workdir string, client ctrlclient.Client) *ResourceHandler { + return &ResourceHandler{rootPath: rootPath, workdir: workdir, client: client} +} + +// ConfigInfo serves the config file content as the HTTP response. +func (h ResourceHandler) ConfigInfo(request *restful.Request, response *restful.Response) { file, err := os.Open(filepath.Join(h.rootPath, api.SchemaConfigFile)) if err != nil { if os.IsNotExist(err) { @@ -105,22 +64,122 @@ func (h schemaHandler) configInfo(request *restful.Request, response *restful.Re _, _ = io.Copy(response.ResponseWriter, file) } -func (h schemaHandler) storeConfig(request *restful.Request, response *restful.Response) { - file, err := os.OpenFile(filepath.Join(h.rootPath, api.SchemaConfigFile), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) +// PostConfig updates the config file and triggers precheck playbooks if needed. +func (h ResourceHandler) PostConfig(request *restful.Request, response *restful.Response) { + var ( + oldConfig map[string]any + newConfig map[string]any + ) + // Read new config from request body. + if err := request.ReadEntity(&newConfig); err != nil { + _ = response.WriteError(http.StatusInternalServerError, err) + return + } + + configPath := filepath.Join(h.rootPath, api.SchemaConfigFile) + // Open config file for reading and writing. + configFile, err := os.OpenFile(configPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { _ = response.WriteError(http.StatusInternalServerError, err) return } - defer file.Close() - _, err = io.Copy(file, request.Request.Body) - if err != nil { + defer configFile.Close() + + // Decode old config if present. + if err := json.NewDecoder(configFile).Decode(&oldConfig); err != nil && err != io.EOF { _ = response.WriteError(http.StatusInternalServerError, err) return } - _ = response.WriteEntity(api.SUCCESS) + + queryParam := query.ParseQueryParameter(request) + namespace := queryParam.Filters["cluster"] + inventory := queryParam.Filters["inventory"] + + playbooks := make(map[string]*kkcorev1.Playbook) + wg := wait.Group{} + + // Iterate over new config and trigger precheck playbooks if config changed. + for fileName, newVal := range newConfig { + if reflect.DeepEqual(newVal, oldConfig[fileName]) { + continue + } + schemaInfo, err := os.ReadFile(filepath.Join(h.rootPath, fileName)) + if err != nil { + _ = response.WriteError(http.StatusInternalServerError, err) + return + } + var schemaFile api.SchemaFile + if err := json.Unmarshal(schemaInfo, &schemaFile); err != nil { + _ = response.WriteError(http.StatusInternalServerError, err) + return + } + // If a precheck playbook is defined, create and execute it. + if pbpath := schemaFile.PlaybookPath["precheck."+api.SchemaLabelSubfix]; pbpath != "" { + configRaw, err := json.Marshal(newVal) + if err != nil { + _ = response.WriteError(http.StatusInternalServerError, err) + return + } + playbook := &kkcorev1.Playbook{ + Spec: kkcorev1.PlaybookSpec{ + Config: kkcorev1.Config{ + Spec: runtime.RawExtension{Raw: configRaw}, + }, + InventoryRef: &corev1.ObjectReference{ + Kind: "Inventory", + Namespace: string(namespace), + Name: string(inventory), + }, + Playbook: pbpath, + }, + Status: kkcorev1.PlaybookStatus{ + Phase: kkcorev1.PlaybookPhasePending, + }, + } + // Set the workdir in the playbook's spec config + if err := unstructured.SetNestedField(playbook.Spec.Config.Value(), h.workdir, _const.Workdir); err != nil { + api.HandleBadRequest(response, request, err) + return + } + if err := h.client.Create(context.TODO(), playbook); err != nil { + api.HandleBadRequest(response, request, err) + return + } + playbooks[fileName] = playbook + wg.Start(func() { + // Execute the playbook asynchronously. + if err := playbookManager.executor(playbook, h.client); err != nil { + klog.ErrorS(err, "failed to executor precheck playbook", "schema", fileName) + } + }) + } + } + wg.Wait() + + // Collect precheck results. + preCheckResult := make(map[string]string) + for fileName, playbook := range playbooks { + if playbook.Status.FailureMessage != "" { + preCheckResult[fileName] = playbook.Status.FailureMessage + } + } + + // Write new config to file. + if _, err := io.Copy(configFile, request.Request.Body); err != nil { + _ = response.WriteError(http.StatusInternalServerError, err) + return + } + + // Respond with precheck results if any failures, otherwise success. + if len(preCheckResult) > 0 { + _ = response.WriteEntity(api.Result{Message: api.ResultFailed, Result: preCheckResult}) + } else { + _ = response.WriteEntity(api.SUCCESS) + } } -func (h schemaHandler) listIP(request *restful.Request, response *restful.Response) { +// ListIP lists all IPs in the given CIDR, checks their online and SSH status, and returns the result. +func (h ResourceHandler) ListIP(request *restful.Request, response *restful.Response) { queryParam := query.ParseQueryParameter(request) cidr, ok := queryParam.Filters["cidr"] if !ok || string(cidr) == "" { @@ -137,12 +196,13 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon mu := sync.Mutex{} jobChannel := make(chan string, 20) wg := sync.WaitGroup{} + // Start worker goroutines for concurrent IP checking. for range maxConcurrency { wg.Add(1) go func() { defer wg.Done() for ip := range jobChannel { - if ifLocalhostIP(ip) { + if _const.IsLocalhostIP(ip) { mu.Lock() ipTable = append(ipTable, api.IPTable{ IP: ip, @@ -157,10 +217,10 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon // Check if the host is online using the ICMP protocol (ping). // Requires root privileges or CAP_NET_RAW capability. - if !ifIPOnline(ip) { + if !isIPOnline(ip) { continue } - reachable, authorized := ifIPSSHAuthorized(ip, string(sshPort)) + reachable, authorized := isSSHAuthorized(ip, string(sshPort)) mu.Lock() ipTable = append(ipTable, api.IPTable{ @@ -174,6 +234,7 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon }() } + // Send IPs to job channel for processing. for _, ip := range ips { jobChannel <- ip } @@ -181,7 +242,7 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon close(jobChannel) wg.Wait() - // less is a comparison function for sorting SchemaTable items by a given field. + // less is a comparison function for sorting IPTable items by a given field. less := func(left, right api.IPTable, sortBy query.Field) bool { leftVal := query.GetFieldByJSONTag(reflect.ValueOf(left), sortBy) rightVal := query.GetFieldByJSONTag(reflect.ValueOf(right), sortBy) @@ -205,7 +266,7 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon return true } } - // filter is a function for filtering SchemaTable items by a given field and value. + // filter is a function for filtering IPTable items by a given field and value. filter := func(o api.IPTable, f query.Filter) bool { val := query.GetFieldByJSONTag(reflect.ValueOf(o), f.Field) switch val.Kind() { @@ -221,15 +282,15 @@ func (h schemaHandler) listIP(request *restful.Request, response *restful.Respon _ = response.WriteEntity(results) } -// schemaInfo serves static schema files from the rootPath directory. +// SchemaInfo serves static schema files from the rootPath directory. // It strips the /resources/schema/ prefix and serves files using http.FileServer. -func (h schemaHandler) schemaInfo(request *restful.Request, response *restful.Response) { +func (h ResourceHandler) SchemaInfo(request *restful.Request, response *restful.Response) { http.StripPrefix("/resources/schema/", http.FileServer(http.Dir(h.rootPath))).ServeHTTP(response.ResponseWriter, request.Request) } -// allSchema lists all schema JSON files in the rootPath directory as a table. +// ListSchema lists all schema JSON files in the rootPath directory as a table. // It supports filtering, sorting, and pagination via query parameters. -func (h schemaHandler) allSchema(request *restful.Request, response *restful.Response) { +func (h ResourceHandler) ListSchema(request *restful.Request, response *restful.Response) { queryParam := query.ParseQueryParameter(request) // Read all entries in the rootPath directory. entries, err := os.ReadDir(h.rootPath) @@ -248,18 +309,18 @@ func (h schemaHandler) allSchema(request *restful.Request, response *restful.Res // Read the JSON file. data, err := os.ReadFile(filepath.Join(h.rootPath, entry.Name())) if err != nil { - api.HandleBadRequest(response, request, err) + api.HandleBadRequest(response, request, errors.Wrapf(err, "failed to read file for schema %q", entry.Name())) return } var schemaFile api.SchemaFile // Unmarshal the JSON data into a SchemaTable struct. if err := json.Unmarshal(data, &schemaFile); err != nil { - api.HandleBadRequest(response, request, err) + api.HandleBadRequest(response, request, errors.Wrapf(err, "failed to unmarshal file for schema %q", entry.Name())) return } // Get all playbooks in the given namespace. playbookList := &kkcorev1.PlaybookList{} - if err := h.client.List(request.Request.Context(), playbookList, ctrlclient.InNamespace(request.PathParameter("namespace"))); err != nil { + if err := h.client.List(request.Request.Context(), playbookList, ctrlclient.InNamespace(request.PathParameter("cluster"))); err != nil { api.HandleBadRequest(response, request, err) return } @@ -333,42 +394,14 @@ func (h schemaHandler) allSchema(request *restful.Request, response *restful.Res _ = response.WriteEntity(results) } -// ifLocalhostIP checks if the given IP address string (ipStr) is bound to any local network interface. -// It returns true if the IP is found on any interface, false otherwise. -func ifLocalhostIP(ipStr string) bool { - targetIP := net.ParseIP(ipStr) - if targetIP == nil { - return false - } - ifaces, err := net.Interfaces() - if err != nil { - return false - } - for _, iface := range ifaces { - addrs, err := iface.Addrs() - if err != nil { - continue - } - for _, addr := range addrs { - switch v := addr.(type) { - case *net.IPNet: - if v.IP.Equal(targetIP) { - return true - } - case *net.IPAddr: - if v.IP.Equal(targetIP) { - return true - } - } - } - } - return false -} +// =========================================================================== +// ============================= isIPOnline ============================== +// =========================================================================== -// ifIPOnline checks if the given IP address is online by sending an ICMP Echo Request (ping). +// isIPOnline checks if the given IP address is online by sending an ICMP Echo Request (ping). // It returns true if a reply is received, false otherwise. // The timeout for the ICMP connection and reply is set to 1 second. -func ifIPOnline(ipStr string) bool { +func isIPOnline(ipStr string) bool { ip := net.ParseIP(ipStr) if ip == nil { return false @@ -473,6 +506,15 @@ func ifIPOnline(ipStr string) bool { return false } +// isValidICMPReply checks if the received ICMP reply is valid and matches the expected parameters. +// n: number of bytes read +// reply: the reply buffer +// src: source address of the reply +// expectedIP: the IP we expect the reply from +// protocol: ICMP protocol number +// pid: process ID used in the Echo request +// seq: sequence number used in the Echo request +// replyFilter: function to filter valid ICMP types func isValidICMPReply(n int, reply []byte, src net.Addr, expectedIP net.IP, protocol int, pid, seq int, replyFilter func(icmp.Type) bool) bool { srcIP, ok := src.(*net.IPAddr) if !ok || !srcIP.IP.Equal(expectedIP) { @@ -503,10 +545,14 @@ func isValidICMPReply(n int, reply []byte, src net.Addr, expectedIP net.IP, prot return false } -// ifIPSSHAuthorized checks if SSH authorization to the given IP is possible using the local private key. +// =========================================================================== +// ============================= isSSHAuthorized ========================= +// =========================================================================== + +// isSSHAuthorized checks if SSH authorization to the given IP is possible using the local private key. // It returns two booleans: the first indicates if the SSH port (22) is reachable, and the second indicates if SSH authorization using the local private key is successful. // The function attempts to find the user's private key, read and parse it, and then connect via SSH. -func ifIPSSHAuthorized(ipStr, sshPort string) (bool, bool) { +func isSSHAuthorized(ipStr, sshPort string) (bool, bool) { // First check if port 22 is reachable on the target IP address. conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", ipStr, sshPort), time.Second) if err != nil { diff --git a/pkg/web/openapi.go b/pkg/web/openapi.go deleted file mode 100644 index d778c8ad..00000000 --- a/pkg/web/openapi.go +++ /dev/null @@ -1,135 +0,0 @@ -package web - -import ( - "io/fs" - "net/http" - "os" - "strings" - - "github.com/cockroachdb/errors" - restfulspec "github.com/emicklei/go-restful-openapi/v2" - "github.com/emicklei/go-restful/v3" - "github.com/go-openapi/spec" - "k8s.io/klog/v2" - - "github.com/kubesphere/kubekey/v4/config" - _const "github.com/kubesphere/kubekey/v4/pkg/const" - "github.com/kubesphere/kubekey/v4/version" -) - -// NewUIService creates a new WebService that serves the static web UI and handles SPA routing. -// - Serves "/" with index.html -// - Serves static assets (e.g., .js, .css, .png) from the embedded web directory -// - Forwards all other non-API paths to index.html for SPA client-side routing -func NewUIService(path string) *restful.WebService { - ws := new(restful.WebService) - ws.Path("/") - - // Create a sub-filesystem for the embedded web UI assets - fileServer := http.FileServer(http.FS(os.DirFS(path))) - - // Serve the root path "/" with index.html - ws.Route(ws.GET("").To(func(req *restful.Request, resp *restful.Response) { - data, err := fs.ReadFile(os.DirFS(path), "index.html") - if err != nil { - _ = resp.WriteError(http.StatusNotFound, err) - return - } - resp.AddHeader("Content-Type", "text/html") - _, _ = resp.Write(data) - })) - - // Serve all subpaths: - // - If the path matches an API prefix, return 404 to let other WebServices handle it - // - If the path looks like a static asset (contains a dot), serve the file - // - Otherwise, serve index.html for SPA routing - ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) { - requestedPath := req.PathParameter("subpath") - - // If the path matches any API route, return 404 so other WebServices can handle it - if strings.HasPrefix(requestedPath, strings.TrimLeft(_const.CoreAPIPath, "/")) || - strings.HasPrefix(requestedPath, strings.TrimLeft(_const.SwaggerAPIPath, "/")) || - strings.HasPrefix(requestedPath, strings.TrimLeft(_const.ResourcesAPIPath, "/")) { - _ = resp.WriteError(http.StatusNotFound, errors.New("not found")) - return - } - - // If the path looks like a static asset (e.g., .js, .css, .ico, .png, etc.), serve it - if strings.Contains(requestedPath, ".") { - fileServer.ServeHTTP(resp.ResponseWriter, req.Request) - return - } - - // For all other paths, serve index.html (SPA client-side routing) - data, err := fs.ReadFile(os.DirFS(path), "index.html") - if err != nil { - _ = resp.WriteError(http.StatusInternalServerError, err) - return - } - resp.AddHeader("Content-Type", "text/html") - _, _ = resp.Write(data) - })) - - return ws -} - -// NewSwaggerUIService creates a new WebService that serves the Swagger UI interface -// It mounts the embedded swagger-ui files and handles requests to display the API documentation -func NewSwaggerUIService() *restful.WebService { - ws := new(restful.WebService) - ws.Path(strings.TrimRight(_const.SwaggerAPIPath, "/")) - - subFS, err := fs.Sub(config.Swagger, "swagger-ui") - if err != nil { - panic(err) - } - - fileServer := http.StripPrefix("/swagger-ui/", http.FileServer(http.FS(subFS))) - - ws.Route(ws.GET("").To(func(req *restful.Request, resp *restful.Response) { - data, err := fs.ReadFile(subFS, "index.html") - if err != nil { - _ = resp.WriteError(http.StatusNotFound, err) - return - } - resp.AddHeader("Content-Type", "text/html") - _, _ = resp.Write(data) - }).Metadata(restfulspec.KeyOpenAPITags, []string{_const.OpenAPITag})) - ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) { - fileServer.ServeHTTP(resp.ResponseWriter, req.Request) - }).Metadata(restfulspec.KeyOpenAPITags, []string{_const.OpenAPITag})) - - return ws -} - -// NewAPIService creates a new WebService that serves the OpenAPI/Swagger specification -// It takes a list of WebServices and generates the API documentation -func NewAPIService(webservice []*restful.WebService) *restful.WebService { - restconfig := restfulspec.Config{ - WebServices: webservice, // you control what services are visible - APIPath: "/apidocs.json", - PostBuildSwaggerObjectHandler: enrichSwaggerObject} - for _, ws := range webservice { - klog.V(2).Infof("%s", ws.RootPath()) - } - - return restfulspec.NewOpenAPIService(restconfig) -} - -// enrichSwaggerObject customizes the Swagger documentation with KubeKey-specific information -// It sets the API title, description, version and contact information -func enrichSwaggerObject(swo *spec.Swagger) { - swo.Info = &spec.Info{ - InfoProps: spec.InfoProps{ - Title: "KubeKey Web API", - Description: "KubeKey Web OpenAPI", - Version: version.Get().String(), - Contact: &spec.ContactInfo{ - ContactInfoProps: spec.ContactInfoProps{ - Name: "KubeKey", - URL: "https://github.com/kubesphere/kubekey", - }, - }, - }, - } -} diff --git a/pkg/web/service.go b/pkg/web/service.go new file mode 100644 index 00000000..2ce2ce1c --- /dev/null +++ b/pkg/web/service.go @@ -0,0 +1,309 @@ +package web + +import ( + "io/fs" + "net/http" + "os" + "strings" + + "github.com/cockroachdb/errors" + restfulspec "github.com/emicklei/go-restful-openapi/v2" + "github.com/emicklei/go-restful/v3" + "github.com/go-openapi/spec" + kkcorev1 "github.com/kubesphere/kubekey/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubesphere/kubekey/v4/config" + _const "github.com/kubesphere/kubekey/v4/pkg/const" + "github.com/kubesphere/kubekey/v4/pkg/web/api" + "github.com/kubesphere/kubekey/v4/pkg/web/handler" + "github.com/kubesphere/kubekey/v4/pkg/web/query" + "github.com/kubesphere/kubekey/v4/version" +) + +// NewCoreService creates and configures a new RESTful web service for managing inventories and playbooks. +// It sets up routes for CRUD operations on inventories and playbooks, including pagination, sorting, and filtering. +func NewCoreService(workdir string, client ctrlclient.Client, restconfig *rest.Config) *restful.WebService { + ws := new(restful.WebService). + // the GroupVersion might be empty, we need to remove the final / + Path(strings.TrimRight(_const.CoreAPIPath+kkcorev1.SchemeGroupVersion.String(), "/")) + + inventoryHandler := handler.NewInventoryHandler(workdir, restconfig, client) + // Inventory management routes + ws.Route(ws.POST("/inventories").To(inventoryHandler.Post). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("create a inventory."). + Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON). + Reads(kkcorev1.Inventory{}). + Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{})) + + ws.Route(ws.PATCH("/namespaces/{namespace}/inventories/{inventory}").To(inventoryHandler.Patch). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("patch a inventory."). + Consumes(string(types.JSONPatchType), string(types.MergePatchType), string(types.ApplyPatchType)).Produces(restful.MIME_JSON). + Reads(kkcorev1.Inventory{}). + Param(ws.PathParameter("namespace", "the namespace of the inventory")). + Param(ws.PathParameter("inventory", "the name of the inventory")). + Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{})) + + ws.Route(ws.GET("/inventories").To(inventoryHandler.List). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("list all inventories."). + Produces(restful.MIME_JSON). + Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). + Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). + Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). + Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")). + Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{})) + + ws.Route(ws.GET("/namespaces/{namespace}/inventories").To(inventoryHandler.List). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("list all inventories in a namespace."). + Produces(restful.MIME_JSON). + Param(ws.PathParameter("namespace", "the namespace of the inventory")). + Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). + Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). + Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). + Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")). + Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{})) + + ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}").To(inventoryHandler.Info). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("get a inventory in a namespace."). + Produces(restful.MIME_JSON). + Param(ws.PathParameter("namespace", "the namespace of the inventory")). + Param(ws.PathParameter("inventory", "the name of the inventory")). + Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{})) + + ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}/hosts").To(inventoryHandler.ListHosts). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("list all hosts in a inventory."). + Produces(restful.MIME_JSON). + Param(ws.PathParameter("namespace", "the namespace of the inventory")). + Param(ws.PathParameter("inventory", "the name of the inventory")). + Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). + Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). + Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). + Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")). + Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.InventoryHostTable]{})) + + playbookHandler := handler.NewPlaybookHandler(workdir, restconfig, client) + // Playbook management routes + ws.Route(ws.POST("/playbooks").To(playbookHandler.Post). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("create a playbook."). + Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON). + Reads(kkcorev1.Playbook{}). + Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{})) + + ws.Route(ws.GET("/playbooks").To(playbookHandler.List). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("list all playbooks."). + Produces(restful.MIME_JSON). + Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). + Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). + Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). + Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")). + Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{})) + + ws.Route(ws.GET("/namespaces/{namespace}/playbooks").To(playbookHandler.List). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("list all playbooks in a namespace."). + Produces(restful.MIME_JSON). + Param(ws.PathParameter("namespace", "the namespace of the playbook")). + Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). + Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). + Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). + Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")). + Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{})) + + ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}").To(playbookHandler.Info). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("get or watch a playbook in a namespace."). + Produces(restful.MIME_JSON). + Param(ws.PathParameter("namespace", "the namespace of the playbook")). + Param(ws.PathParameter("playbook", "the name of the playbook")). + Param(ws.QueryParameter("watch", "set to true to watch this playbook")). + Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{})) + + ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}/log").To(playbookHandler.Log). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("get a playbook execute log."). + Produces("text/plain"). + Param(ws.PathParameter("namespace", "the namespace of the playbook")). + Param(ws.PathParameter("playbook", "the name of the playbook")). + Returns(http.StatusOK, _const.StatusOK, "")) + + ws.Route(ws.DELETE("/namespaces/{namespace}/playbooks/{playbook}").To(playbookHandler.Delete). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}). + Doc("delete a playbook."). + Produces(restful.MIME_JSON). + Param(ws.PathParameter("namespace", "the namespace of the playbook")). + Param(ws.PathParameter("playbook", "the name of the playbook")). + Returns(http.StatusOK, _const.StatusOK, api.Result{})) + + return ws +} + +// NewSchemaService creates a new WebService that serves schema files from the specified root path. +// It sets up a route that handles GET requests to /resources/schema/{subpath} and serves files from the rootPath directory. +// The {subpath:*} parameter allows for matching any path under /resources/schema/. +func NewSchemaService(rootPath string, workdir string, client ctrlclient.Client) *restful.WebService { + ws := new(restful.WebService) + ws.Path(strings.TrimRight(_const.ResourcesAPIPath, "/")). + Produces(restful.MIME_JSON, "text/plain") + + resourceHandler := handler.NewResourceHandler(rootPath, workdir, client) + ws.Route(ws.GET("/ip").To(resourceHandler.ListIP). + Doc("list available ip from ip cidr"). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}). + Param(ws.QueryParameter("cidr", "the cidr for ip").Required(true)). + Param(ws.QueryParameter("sshPort", "the ssh port for ip").Required(false)). + Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). + Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). + Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). + Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=ip").Required(false).DefaultValue("ip")). + Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.IPTable]{})) + + ws.Route(ws.GET("/schema/{subpath:*}").To(resourceHandler.SchemaInfo). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag})) + + ws.Route(ws.GET("/schema").To(resourceHandler.ListSchema). + Doc("list all schema as table"). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}). + Param(ws.QueryParameter("cluster", "The namespace where the cluster resides").Required(false).DefaultValue("default")). + Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")). + Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)). + Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")). + Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=priority")). + Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.SchemaTable]{})) + + ws.Route(ws.POST("/schema/config").To(resourceHandler.PostConfig). + Doc("storing user-defined configuration information"). + Reads(struct{}{}). + Param(ws.QueryParameter("cluster", "The namespace where the cluster resides").Required(false).DefaultValue("default")). + Param(ws.QueryParameter("inventory", "the inventory of the playbook").Required(false).DefaultValue("default")). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag})) + + ws.Route(ws.GET("/schema/config").To(resourceHandler.ConfigInfo). + Doc("get user-defined configuration information"). + Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag})) + + return ws +} + +// NewUIService creates a new WebService that serves the static web UI and handles SPA routing. +// - Serves "/" with index.html +// - Serves static assets (e.g., .js, .css, .png) from the embedded web directory +// - Forwards all other non-API paths to index.html for SPA client-side routing +func NewUIService(path string) *restful.WebService { + ws := new(restful.WebService) + ws.Path("/") + + // Create a sub-filesystem for the embedded web UI assets + fileServer := http.FileServer(http.FS(os.DirFS(path))) + + // Serve the root path "/" with index.html + ws.Route(ws.GET("").To(func(req *restful.Request, resp *restful.Response) { + data, err := fs.ReadFile(os.DirFS(path), "index.html") + if err != nil { + _ = resp.WriteError(http.StatusNotFound, err) + return + } + resp.AddHeader("Content-Type", "text/html") + _, _ = resp.Write(data) + })) + + // Serve all subpaths: + // - If the path matches an API prefix, return 404 to let other WebServices handle it + // - If the path looks like a static asset (contains a dot), serve the file + // - Otherwise, serve index.html for SPA routing + ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) { + requestedPath := req.PathParameter("subpath") + + // If the path matches any API route, return 404 so other WebServices can handle it + if strings.HasPrefix(requestedPath, strings.TrimLeft(_const.CoreAPIPath, "/")) || + strings.HasPrefix(requestedPath, strings.TrimLeft(_const.SwaggerAPIPath, "/")) || + strings.HasPrefix(requestedPath, strings.TrimLeft(_const.ResourcesAPIPath, "/")) { + _ = resp.WriteError(http.StatusNotFound, errors.New("not found")) + return + } + + // If the path looks like a static asset (e.g., .js, .css, .ico, .png, etc.), serve it + if strings.Contains(requestedPath, ".") { + fileServer.ServeHTTP(resp.ResponseWriter, req.Request) + return + } + + // For all other paths, serve index.html (SPA client-side routing) + data, err := fs.ReadFile(os.DirFS(path), "index.html") + if err != nil { + _ = resp.WriteError(http.StatusInternalServerError, err) + return + } + resp.AddHeader("Content-Type", "text/html") + _, _ = resp.Write(data) + })) + + return ws +} + +// NewSwaggerUIService creates a new WebService that serves the Swagger UI interface +// It mounts the embedded swagger-ui files and handles requests to display the API documentation +func NewSwaggerUIService() *restful.WebService { + ws := new(restful.WebService) + ws.Path(strings.TrimRight(_const.SwaggerAPIPath, "/")) + + subFS, err := fs.Sub(config.Swagger, "swagger-ui") + if err != nil { + panic(err) + } + + fileServer := http.StripPrefix("/swagger-ui/", http.FileServer(http.FS(subFS))) + + ws.Route(ws.GET("").To(func(req *restful.Request, resp *restful.Response) { + data, err := fs.ReadFile(subFS, "index.html") + if err != nil { + _ = resp.WriteError(http.StatusNotFound, err) + return + } + resp.AddHeader("Content-Type", "text/html") + _, _ = resp.Write(data) + }).Metadata(restfulspec.KeyOpenAPITags, []string{_const.OpenAPITag})) + ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) { + fileServer.ServeHTTP(resp.ResponseWriter, req.Request) + }).Metadata(restfulspec.KeyOpenAPITags, []string{_const.OpenAPITag})) + + return ws +} + +// NewAPIService creates a new WebService that serves the OpenAPI/Swagger specification +// It takes a list of WebServices and generates the API documentation +func NewAPIService(webservice []*restful.WebService) *restful.WebService { + restconfig := restfulspec.Config{ + WebServices: webservice, // you control what services are visible + APIPath: "/apidocs.json", + PostBuildSwaggerObjectHandler: func(swo *spec.Swagger) { + swo.Info = &spec.Info{ + InfoProps: spec.InfoProps{ + Title: "KubeKey Web API", + Description: "KubeKey Web OpenAPI", + Version: version.Get().String(), + Contact: &spec.ContactInfo{ + ContactInfoProps: spec.ContactInfoProps{ + Name: "KubeKey", + URL: "https://github.com/kubesphere/kubekey", + }, + }, + }, + } + }} + for _, ws := range webservice { + klog.V(2).Infof("%s", ws.RootPath()) + } + + return restfulspec.NewOpenAPIService(restconfig) +}