feat: add more web interface (#2648)

Signed-off-by: joyceliu <joyceliu@yunify.com>
This commit is contained in:
liujian 2025-07-03 14:48:49 +08:00 committed by GitHub
parent bbb8a4a031
commit e3dec872f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1631 additions and 841 deletions

View File

@ -73,7 +73,7 @@
post_tasks:
- name: Add custom label to cluster
command: |
{{- range $k, $v := .kubernetes.custom_label }}
{{- range $k, $v := .kubernetes.custom_labels }}
/usr/local/bin/kubectl label --overwrite node {{ $.hostname }} {{ $k }}={{ $v }}
{{- end }}
when: .kubernetes.custom_label | empty | not

View File

@ -200,7 +200,7 @@ kubernetes:
{{ .dockerio_registry }}
repository: kubesphere/etcd
tag: 3.5.0
custom_label: {}
# custom_labels: {}
# if auto renew kubernetes certs
renew_certs:
enabled: true

2
go.mod
View File

@ -21,6 +21,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.10.0
golang.org/x/crypto v0.31.0
golang.org/x/net v0.33.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.31.3
k8s.io/apimachinery v0.31.3
@ -132,7 +133,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect

213
pkg/const/ip.go Normal file
View File

@ -0,0 +1,213 @@
package _const
import (
"encoding/binary"
"math"
"math/big"
"net"
"strconv"
"strings"
)
// ParseIP parses a CIDR, an IP range string (e.g., "xxx-xxx"), or a single IP into a slice of actual IPs.
// Supports both IPv4 and IPv6.
func ParseIP(ip string) []string {
var availableIPs []string
ip = strings.TrimRight(ip, "/")
if strings.Contains(ip, "/") {
// Handle single IP case in CIDR format
if strings.HasSuffix(ip, "/32") || strings.HasSuffix(ip, "/128") {
availableIPs = append(availableIPs, strings.Split(ip, "/")[0])
} else {
availableIPs = getAvailableIP(ip)
}
} else if strings.Contains(ip, "-") {
ipRange := strings.SplitN(ip, "-", 2)
if len(ipRange) == 2 {
availableIPs = getAvailableIPRange(ipRange[0], ipRange[1])
}
} else {
availableIPs = append(availableIPs, ip)
}
return availableIPs
}
// getAvailableIPRange generates all IPs between the start and end IP addresses (inclusive).
// Supports both IPv4 and IPv6.
func getAvailableIPRange(ipStart, ipEnd string) []string {
var availableIPs []string
startIP := net.ParseIP(ipStart)
endIP := net.ParseIP(ipEnd)
if startIP == nil || endIP == nil {
return availableIPs
}
// Determine if IPv4 or IPv6
if startIP.To4() != nil && endIP.To4() != nil {
startIP = startIP.To4()
endIP = endIP.To4()
startIPNum := ip4ToInt(startIP)
endIPNum := ip4ToInt(endIP)
for ipNum := new(big.Int).Set(startIPNum); ipNum.Cmp(endIPNum) <= 0; ipNum.Add(ipNum, big.NewInt(1)) {
availableIPs = append(availableIPs, intToIP4(ipNum).String())
}
} else if startIP.To16() != nil && endIP.To16() != nil {
startIP = startIP.To16()
endIP = endIP.To16()
startIPNum := ip6ToInt(startIP)
endIPNum := ip6ToInt(endIP)
for ipNum := new(big.Int).Set(startIPNum); ipNum.Cmp(endIPNum) <= 0; ipNum.Add(ipNum, big.NewInt(1)) {
availableIPs = append(availableIPs, intToIP6(ipNum).String())
}
}
return availableIPs
}
// getAvailableIP calculates all available IPs in a given CIDR.
// Supports both IPv4 and IPv6.
func getAvailableIP(ipAndMask string) []string {
var availableIPs []string
ipAndMask = strings.TrimSpace(ipAndMask)
ipAndMask = iPAddressToCIDR(ipAndMask)
_, ipnet, err := net.ParseCIDR(ipAndMask)
if err != nil || ipnet == nil {
return availableIPs
}
firstIP, lastIP := networkRange(ipnet)
if firstIP == nil || lastIP == nil {
return availableIPs
}
// IPv4
if firstIP.To4() != nil {
startIPNum := ip4ToInt(firstIP)
endIPNum := ip4ToInt(lastIP)
// Exclude network and broadcast addresses if possible
for ipNum := new(big.Int).Add(startIPNum, big.NewInt(1)); ipNum.Cmp(endIPNum) < 0; ipNum.Add(ipNum, big.NewInt(1)) {
availableIPs = append(availableIPs, intToIP4(ipNum).String())
}
} else if firstIP.To16() != nil {
// IPv6: no broadcast, so include all except network address
startIPNum := ip6ToInt(firstIP)
endIPNum := ip6ToInt(lastIP)
for ipNum := new(big.Int).Set(startIPNum); ipNum.Cmp(endIPNum) <= 0; ipNum.Add(ipNum, big.NewInt(1)) {
availableIPs = append(availableIPs, intToIP6(ipNum).String())
}
}
return availableIPs
}
// ip4ToInt converts an IPv4 address to a big.Int.
func ip4ToInt(ip net.IP) *big.Int {
ip = ip.To4()
if ip == nil {
return big.NewInt(0)
}
return big.NewInt(0).SetUint64(uint64(binary.BigEndian.Uint32(ip)))
}
// intToIP4 converts a big.Int to an IPv4 address.
func intToIP4(n *big.Int) net.IP {
maxIPv4 := big.NewInt(int64(math.MaxUint32))
if n.Cmp(maxIPv4) > 0 {
n = maxIPv4
}
// Get 4-byte representation
b := n.Bytes()
if len(b) < 4 {
// Pad with leading zeros if necessary
padded := make([]byte, 4)
copy(padded[4-len(b):], b)
b = padded
} else if len(b) > 4 {
// Trim to last 4 bytes if it's longer (shouldn't happen due to clamp)
b = b[len(b)-4:]
}
return net.IP(b)
}
// ip6ToInt converts an IPv6 address to a big.Int.
func ip6ToInt(ip net.IP) *big.Int {
ip = ip.To16()
if ip == nil {
return big.NewInt(0)
}
return big.NewInt(0).SetBytes(ip)
}
// intToIP6 converts a big.Int to an IPv6 address.
func intToIP6(n *big.Int) net.IP {
b := n.Bytes()
if len(b) < 16 {
pad := make([]byte, 0, 16-len(b))
b = append(pad, b...)
}
return net.IP(b)
}
// iPAddressToCIDR converts an IP address with a subnet mask to CIDR format.
// Only supports IPv4 mask notation (e.g., "192.168.1.1/255.255.255.0").
func iPAddressToCIDR(ipAddress string) string {
if strings.Contains(ipAddress, "/") {
parts := strings.Split(ipAddress, "/")
ip := parts[0]
mask := parts[1]
if strings.Contains(mask, ".") {
mask = iPMaskStringToCIDR(mask)
}
return ip + "/" + mask
}
return ipAddress
}
// iPMaskStringToCIDR converts a subnet mask string (e.g., "255.255.255.0") to a CIDR prefix length.
func iPMaskStringToCIDR(netmask string) string {
parts := strings.Split(netmask, ".")
if len(parts) != 4 {
return "0"
}
maskBytes := make([]byte, 4)
for i, part := range parts {
val, _ := strconv.Atoi(part)
maskBytes[i] = byte(val)
}
mask := net.IPv4Mask(maskBytes[0], maskBytes[1], maskBytes[2], maskBytes[3])
ones, _ := mask.Size()
return strconv.Itoa(ones)
}
// networkRange calculates the first and last IP in a given network.
// Supports both IPv4 and IPv6.
func networkRange(network *net.IPNet) (net.IP, net.IP) {
netIP := network.IP
mask := network.Mask
if netIP == nil || mask == nil {
return nil, nil
}
ipLen := len(netIP)
if ipLen == net.IPv4len {
netIP = netIP.To4()
} else if ipLen == net.IPv6len {
netIP = netIP.To16()
}
if netIP == nil {
return nil, nil
}
startIP := make(net.IP, len(netIP))
copy(startIP, netIP.Mask(mask))
endIP := make(net.IP, len(startIP))
for i := range startIP {
endIP[i] = startIP[i] | ^mask[i]
}
return startIP, endIP
}

131
pkg/const/ip_test.go Normal file
View File

@ -0,0 +1,131 @@
/*
Copyright 2023 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package _const
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestParseIP(t *testing.T) {
testcases := []struct {
name string
ipRange string
excepted func() []string
}{
{
name: "parse cidr",
ipRange: "192.168.1.0/30",
excepted: func() []string {
// 192.168.1.1 - 192.168.1.2
return []string{"192.168.1.1", "192.168.1.2"}
},
},
{
name: "parse single host cidr",
ipRange: "10.0.0.1/32",
excepted: func() []string {
return []string{"10.0.0.1"}
},
},
{
name: "parse range",
ipRange: "192.168.1.1-192.168.1.3",
excepted: func() []string {
return []string{"192.168.1.1", "192.168.1.2", "192.168.1.3"}
},
},
{
name: "parse single ip",
ipRange: "8.8.8.8",
excepted: func() []string {
return []string{"8.8.8.8"}
},
},
{
name: "parse ipv6 cidr",
ipRange: "2001:db8::/126",
excepted: func() []string {
return []string{"2001:db8::", "2001:db8::1", "2001:db8::2", "2001:db8::3"}
},
},
{
name: "parse ipv6 range",
ipRange: "2001:db8::1-2001:db8::3",
excepted: func() []string {
return []string{"2001:db8::1", "2001:db8::2", "2001:db8::3"}
},
},
{
name: "parse ipv6 single",
ipRange: "2001:db8::1",
excepted: func() []string {
return []string{"2001:db8::1"}
},
},
{
name: "parse ip with mask",
ipRange: "192.168.1.0/255.255.255.252",
excepted: func() []string {
return []string{"192.168.1.1", "192.168.1.2"}
},
},
{
name: "invalid input",
ipRange: "invalid",
excepted: func() []string {
return []string{"invalid"}
},
},
{
name: "parse large cidr (truncated check)",
ipRange: "192.168.0.0/18",
excepted: func() []string {
// 192.168.0.1 - 192.168.63.254
var ips []string
for i := range 64 {
for j := range 256 {
ips = append(ips, fmt.Sprintf("192.168.%d.%d", i, j))
}
}
return ips[1 : len(ips)-1]
},
},
{
name: "parse large range (truncated check)",
ipRange: "192.168.0.1-192.168.63.254",
excepted: func() []string {
// 192.168.0.1 - 192.168.63.254
var ips []string
for i := range 64 {
for j := range 256 {
ips = append(ips, fmt.Sprintf("192.168.%d.%d", i, j))
}
}
return ips[1 : len(ips)-1]
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.excepted(), ParseIP(tc.ipRange))
})
}
}

View File

@ -1,145 +0,0 @@
package internal
import (
"encoding/binary"
"net"
"strconv"
"strings"
)
// parseIP parses a CIDR or an IP range string (e.g., "xxx-xxx") into a slice of actual IPs.
func parseIP(ip string) []string {
var availableIPs []string
// Trim trailing slash from IP if present
ip = strings.TrimRight(ip, "/")
if strings.Contains(ip, "/") {
if strings.HasSuffix(ip, "/32") {
// Handle single IP case in CIDR format
availableIPs = append(availableIPs, strings.TrimSuffix(ip, "/32"))
} else {
// Parse CIDR into available IPs
availableIPs = getAvailableIP(ip)
}
} else if strings.Contains(ip, "-") {
// Handle IP range format (e.g., "192.168.1.1-192.168.1.10")
ipRange := strings.SplitN(ip, "-", 2)
if len(ipRange) == 2 {
availableIPs = getAvailableIPRange(ipRange[0], ipRange[1])
}
} else {
// Single IP case
availableIPs = append(availableIPs, ip)
}
return availableIPs
}
// getAvailableIPRange generates all IPs between the start and end IP addresses.
func getAvailableIPRange(ipStart, ipEnd string) []string {
var availableIPs []string
startIP := net.ParseIP(ipStart).To4()
endIP := net.ParseIP(ipEnd).To4()
if startIP == nil || endIP == nil {
return availableIPs
}
startIPNum := ipToInt(startIP)
endIPNum := ipToInt(endIP)
for ipNum := startIPNum; ipNum <= endIPNum; ipNum++ {
availableIPs = append(availableIPs, intToIP(ipNum).String())
}
return availableIPs
}
// getAvailableIP calculates all available IPs in a given CIDR.
func getAvailableIP(ipAndMask string) []string {
var availableIPs []string
// Ensure the input is in CIDR format
ipAndMask = strings.TrimSpace(ipAndMask)
ipAndMask = iPAddressToCIDR(ipAndMask)
_, ipnet, err := net.ParseCIDR(ipAndMask)
if err != nil || ipnet == nil {
return availableIPs
}
firstIP, lastIP := networkRange(ipnet)
startIPNum := ipToInt(firstIP)
endIPNum := ipToInt(lastIP)
// Exclude the network and broadcast addresses
for ipNum := startIPNum + 1; ipNum < endIPNum; ipNum++ {
availableIPs = append(availableIPs, intToIP(ipNum).String())
}
return availableIPs
}
// ipToInt converts an IP address to a uint32.
func ipToInt(ip net.IP) uint32 {
return binary.BigEndian.Uint32(ip)
}
// intToIP converts a uint32 to an IP address.
func intToIP(n uint32) net.IP {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, n)
return net.IP(b)
}
// iPAddressToCIDR converts an IP address with a subnet mask to CIDR format.
func iPAddressToCIDR(ipAddress string) string {
if strings.Contains(ipAddress, "/") {
parts := strings.Split(ipAddress, "/")
ip := parts[0]
mask := parts[1]
if strings.Contains(mask, ".") {
mask = iPMaskStringToCIDR(mask)
}
return ip + "/" + mask
}
return ipAddress
}
// iPMaskStringToCIDR converts a subnet mask string (e.g., "255.255.255.0") to a CIDR prefix length.
func iPMaskStringToCIDR(netmask string) string {
parts := strings.Split(netmask, ".")
if len(parts) != 4 {
return "0"
}
maskBytes := make([]byte, 4)
for i, part := range parts {
val, _ := strconv.Atoi(part)
maskBytes[i] = byte(val)
}
mask := net.IPv4Mask(maskBytes[0], maskBytes[1], maskBytes[2], maskBytes[3])
ones, _ := mask.Size()
return strconv.Itoa(ones)
}
// networkRange calculates the first and last IP in a given network.
func networkRange(network *net.IPNet) (net.IP, net.IP) {
netIP := network.IP.To4()
if netIP == nil {
return nil, nil
}
startIP := netIP.Mask(network.Mask)
endIP := make(net.IP, len(startIP))
for i := range startIP {
endIP[i] = startIP[i] | ^network.Mask[i]
}
return startIP, endIP
}

View File

@ -1,53 +0,0 @@
package internal
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestParseIP(t *testing.T) {
testcases := []struct {
name string
ipRange string
excepted func() []string
}{
{
name: "parse cidr",
ipRange: "192.168.0.0/18",
excepted: func() []string {
// 192.168.0.1 - 192.168.63.254
var ips []string
for i := range 64 {
for j := range 256 {
ips = append(ips, fmt.Sprintf("192.168.%d.%d", i, j))
}
}
return ips[1 : len(ips)-1]
},
},
{
name: "parse range",
ipRange: "192.168.0.1-192.168.63.254",
excepted: func() []string {
// 192.168.0.1 - 192.168.63.254
var ips []string
for i := range 64 {
for j := range 256 {
ips = append(ips, fmt.Sprintf("192.168.%d.%d", i, j))
}
}
return ips[1 : len(ips)-1]
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.excepted(), parseIP(tc.ipRange))
})
}
}

View File

@ -1,4 +1,4 @@
package internal
package tmpl
import (
"math"
@ -9,13 +9,12 @@ import (
"github.com/Masterminds/sprig/v3"
"github.com/cockroachdb/errors"
"gopkg.in/yaml.v3"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
)
// default function docs: http://masterminds.github.io/sprig
// Template parse file or vars which defined in project.
var Template = template.New("kubekey").Funcs(funcMap())
func funcMap() template.FuncMap {
var f = sprig.TxtFuncMap()
delete(f, "env")
@ -59,7 +58,7 @@ func fromYAML(v string) (any, error) {
func ipInCIDR(cidr string) ([]string, error) {
var ips = make([]string, 0)
for _, s := range strings.Split(cidr, ",") {
ips = append(ips, parseIP(s)...)
ips = append(ips, _const.ParseIP(s)...)
}
return ips, nil

View File

@ -18,12 +18,11 @@ package tmpl
import (
"bytes"
"text/template"
"github.com/cockroachdb/errors"
kkprojectv1 "github.com/kubesphere/kubekey/api/project/v1"
"k8s.io/klog/v2"
"github.com/kubesphere/kubekey/v4/pkg/converter/internal"
)
// ParseFunc parses a template string using the provided context and parse function.
@ -39,7 +38,7 @@ func ParseFunc[C ~map[string]any, Output any](ctx C, input string, f func([]byte
return f([]byte(input)), nil
}
// Parse the template string
tl, err := internal.Template.Parse(input)
tl, err := template.New("kubekey").Funcs(funcMap()).Parse(input)
if err != nil {
return f(nil), errors.Wrapf(err, "failed to parse template '%s'", input)
}

View File

@ -35,8 +35,8 @@ func (m webManager) Run(ctx context.Context) error {
container.RecoverHandler(func(panicReason any, httpWriter http.ResponseWriter) {
logStackOnRecover(panicReason, httpWriter)
})
container.Add(web.NewWebService(ctx, m.workdir, m.Client, m.Config)).
Add(web.NewSchemaService(m.schemaPath)).
container.Add(web.NewSchemaService(m.schemaPath, m.workdir, m.Client)).
Add(web.NewCoreService(m.workdir, m.Client, m.Config)).
// openapi
Add(web.NewSwaggerUIService()).
Add(web.NewAPIService(container.RegisteredWebServices()))

View File

@ -16,22 +16,26 @@ limitations under the License.
package api
// SUCCESS represents a successful operation result with a default success message
// SUCCESS is a global variable representing a successful operation result with a default success message.
// It can be used as a standard response for successful API calls.
var SUCCESS = Result{Message: "success"}
// Result represents a basic API response with a message field
// Result represents a basic API response structure containing a message field.
// The Message field is typically used to convey error or success information.
type Result struct {
Message string `description:"error message" json:"message"`
Message string `description:"error message" json:"message"` // Message provides details about the result or error.
}
// ListResult represents a paginated list response containing items and total count
// ListResult is a generic struct representing a paginated list response.
// T is a type parameter for the type of items in the list.
// Items contains the list of results, and TotalItems indicates the total number of items available.
type ListResult[T any] struct {
Items []T `json:"items"`
TotalItems int `json:"totalItems"`
Items []T `json:"items"` // List of items of type T
TotalItems int `json:"totalItems"` // Total number of items available
}
// InventoryHostTable represents a host entry in an inventory with its configuration details
// It includes network information, SSH credentials, and group membership
// InventoryHostTable represents a host entry in an inventory with its configuration details.
// It includes network information, SSH credentials, group membership, and architecture.
type InventoryHostTable struct {
Name string `json:"name"` // Hostname of the inventory host
Status string `json:"status"` // Current status of the host
@ -46,3 +50,35 @@ type InventoryHostTable struct {
Groups []string `json:"groups"` // Groups the host belongs to
Arch string `json:"arch"` // Architecture of the host
}
// SchemaTable represents schema metadata for a resource.
// It includes fields such as name, type, title, description, version, namespace, logo, priority, and associated playbooks.
// The Playbook field is a slice of SchemaTablePlaybook, each representing a playbook reference.
type SchemaTable struct {
Name string `json:"name"` // Name of schema, defined by filename
SchemaType string `json:"schemaType"` // Type of the schema (e.g., CRD, built-in)
Title string `json:"title"` // Title of the schema
Description string `json:"description"` // Description of the schema
Version string `json:"version"` // Version of the schema
Namespace string `json:"namespace"` // Namespace of the schema
Logo string `json:"logo"` // Logo URL or identifier
Priority int `json:"priority"` // Priority for display or ordering
Playbook []SchemaTablePlaybook `json:"playbook"` // List of reference playbooks
}
// SchemaTablePlaybook represents a reference to a playbook associated with a schema.
// It includes the playbook's name, namespace, and phase.
type SchemaTablePlaybook struct {
Name string `json:"name"` // Name of the playbook
Namespace string `json:"namespace"` // Namespace of the playbook
Phase string `json:"phase"` // Phase of the playbook
}
// IPTable represents an IP address entry and its SSH status information.
// It indicates whether the IP is a localhost, if SSH is reachable, and if SSH authorization is present.
type IPTable struct {
IP string `json:"ip"` // IP address
Localhost bool `json:"localhost"` // Whether the IP is a localhost IP
SSHReachable bool `json:"sshReachable"` // Whether SSH port is reachable on this IP
SSHAuthorized bool `json:"sshAuthorized"` // Whether SSH is authorized for this IP
}

686
pkg/web/corev1.go Normal file
View File

@ -0,0 +1,686 @@
package web
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"reflect"
"slices"
"strings"
"sync"
"time"
restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3"
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
kkcorev1alpha1 "github.com/kubesphere/kubekey/api/core/v1alpha1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/executor"
"github.com/kubesphere/kubekey/v4/pkg/variable"
"github.com/kubesphere/kubekey/v4/pkg/web/api"
"github.com/kubesphere/kubekey/v4/pkg/web/query"
)
// NewCoreService creates and configures a new RESTful web service for managing inventories and playbooks.
// It sets up routes for CRUD operations on inventories and playbooks, including pagination, sorting, and filtering.
func NewCoreService(workdir string, client ctrlclient.Client, config *rest.Config) *restful.WebService {
ws := new(restful.WebService)
// the GroupVersion might be empty, we need to remove the final /
ws.Path(strings.TrimRight(_const.APIPath+kkcorev1.SchemeGroupVersion.String(), "/")).
Produces(restful.MIME_JSON).Consumes(
string(types.JSONPatchType),
string(types.MergePatchType),
string(types.StrategicMergePatchType),
string(types.ApplyPatchType),
restful.MIME_JSON)
h := newCoreHandler(workdir, client, config)
// Inventory management routes
ws.Route(ws.POST("/inventories").To(h.createInventory).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("create a inventory.").
Reads(kkcorev1.Inventory{}).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.PATCH("/namespaces/{namespace}/inventories/{inventory}").To(h.patchInventory).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("patch a inventory.").
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Reads(kkcorev1.Inventory{}).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.GET("/inventories").To(h.listInventories).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all inventories.").
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories").To(h.listInventories).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all inventories in a namespace.").
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}").To(h.inventoryInfo).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get a inventory in a namespace.").
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}/hosts").To(h.listInventoryHosts).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all hosts in a inventory.").
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.InventoryHostTable]{}))
// Playbook management routes
ws.Route(ws.POST("/playbooks").To(h.createPlaybook).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("create a playbook.").
Reads(kkcorev1.Playbook{}).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{}))
ws.Route(ws.GET("/playbooks").To(h.listPlaybooks).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all playbooks.").
Reads(kkcorev1.Playbook{}).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks").To(h.listPlaybooks).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all playbooks in a namespace.").
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}").To(h.playbookInfo).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get or watch a playbook in a namespace.").
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Param(ws.QueryParameter("watch", "set to true to watch this playbook")).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}/log").To(h.logPlaybook).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get a playbook execute log.").
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Returns(http.StatusOK, _const.StatusOK, "text/plain"))
ws.Route(ws.DELETE("/namespaces/{namespace}/playbooks/{playbook}").To(h.deletePlaybook).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("delete a playbook.").
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Returns(http.StatusOK, _const.StatusOK, api.Result{}))
return ws
}
// newInventoryHandler creates a new handler instance with the given workdir, client and config
// workdir: Base directory for storing work files
// client: Kubernetes client for API operations
// config: Kubernetes REST client configuration
func newCoreHandler(workdir string, client ctrlclient.Client, config *rest.Config) *coreHandler {
// Create a new coreHandler with initialized playbookManager
return &coreHandler{workdir: workdir, client: client, restconfig: config, playbookManager: playbookManager{manager: make(map[string]context.CancelFunc)}}
}
// playbookManager is responsible for managing playbook execution contexts and their cancellation.
// It uses a mutex to ensure thread-safe access to the manager map.
type playbookManager struct {
sync.Mutex
manager map[string]context.CancelFunc // Map of playbook key to its cancel function
}
// addPlaybook adds a playbook and its cancel function to the manager map.
func (m *playbookManager) addPlaybook(playbook *kkcorev1.Playbook, cancel context.CancelFunc) {
m.Lock()
defer m.Unlock()
m.manager[ctrlclient.ObjectKeyFromObject(playbook).String()] = cancel
}
// deletePlaybook removes a playbook from the manager map.
func (m *playbookManager) deletePlaybook(playbook *kkcorev1.Playbook) {
m.Lock()
defer m.Unlock()
delete(m.manager, ctrlclient.ObjectKeyFromObject(playbook).String())
}
// stopPlaybook cancels the context for a running playbook, if it exists.
func (m *playbookManager) stopPlaybook(playbook *kkcorev1.Playbook) {
// Attempt to cancel the playbook's context if it exists in the manager
if cancel, ok := m.manager[ctrlclient.ObjectKeyFromObject(playbook).String()]; ok {
cancel()
}
}
// coreHandler implements HTTP handlers for managing inventories and playbooks
// It provides methods for CRUD operations on inventories and playbooks
type coreHandler struct {
workdir string // Base directory for storing work files
restconfig *rest.Config // Kubernetes REST client configuration
client ctrlclient.Client // Kubernetes client for API operations
// playbookManager control to cancel playbook
playbookManager playbookManager
}
// createInventory creates a new inventory resource
// It reads the inventory from the request body and creates it in the Kubernetes cluster
func (h *coreHandler) createInventory(request *restful.Request, response *restful.Response) {
inventory := &kkcorev1.Inventory{}
if err := request.ReadEntity(inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
if err := h.client.Create(request.Request.Context(), inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// patchInventory patches an existing inventory resource
// It reads the patch data from the request body and applies it to the specified inventory
func (h *coreHandler) patchInventory(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
data, err := io.ReadAll(request.Request.Body)
if err != nil {
api.HandleBadRequest(response, request, err)
return
}
patchType := request.HeaderParameter("Content-Type")
// get old inventory
inventory := &kkcorev1.Inventory{}
if err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
if err := h.client.Patch(request.Request.Context(), inventory, ctrlclient.RawPatch(types.PatchType(patchType), data)); err != nil {
api.HandleBadRequest(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// listInventories lists all inventory resources with optional filtering and sorting
// It supports field selectors and label selectors for filtering the results
func (h *coreHandler) listInventories(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
var fieldselector fields.Selector
if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok {
fs, err := fields.ParseSelector(string(v))
if err != nil {
api.HandleError(response, request, err)
return
}
fieldselector = fs
}
namespace := request.PathParameter("namespace")
inventoryList := &kkcorev1.InventoryList{}
err := h.client.List(request.Request.Context(), inventoryList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector})
if err != nil {
api.HandleError(response, request, err)
return
}
// Sort and filter the inventory list using DefaultList
results := query.DefaultList(inventoryList.Items, queryParam, func(left, right kkcorev1.Inventory, sortBy query.Field) bool {
leftMeta, err := meta.Accessor(left)
if err != nil {
return false
}
rightMeta, err := meta.Accessor(right)
if err != nil {
return false
}
return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy)
}, func(o kkcorev1.Inventory, filter query.Filter) bool {
// skip fieldselector
if filter.Field == query.ParameterFieldSelector {
return true
}
objectMeta, err := meta.Accessor(o)
if err != nil {
return false
}
return query.DefaultObjectMetaFilter(objectMeta, filter)
})
_ = response.WriteEntity(results)
}
// inventoryInfo retrieves a specific inventory resource
// It returns the inventory with the specified name in the given namespace
func (h *coreHandler) inventoryInfo(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
inventory := &kkcorev1.Inventory{}
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory)
if err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// listInventoryHosts lists all hosts in an inventory with their details
// It includes information about SSH configuration, IP addresses, and group membership
func (h *coreHandler) listInventoryHosts(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
inventory := &kkcorev1.Inventory{}
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory)
if err != nil {
api.HandleError(response, request, err)
return
}
// Retrieve the list of tasks associated with the inventory
taskList := &kkcorev1alpha1.TaskList{}
_ = h.client.List(request.Request.Context(), taskList, ctrlclient.InNamespace(namespace), ctrlclient.MatchingFields{
"playbook.name": inventory.Annotations[kkcorev1.HostCheckPlaybookAnnotation],
})
// buildHostItem constructs an InventoryHostTable from the hostname and raw extension
buildHostItem := func(hostname string, raw runtime.RawExtension) api.InventoryHostTable {
vars := variable.Extension2Variables(raw)
internalIPV4, _ := variable.StringVar(nil, vars, _const.VariableIPv4)
internalIPV6, _ := variable.StringVar(nil, vars, _const.VariableIPv6)
sshHost, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorHost)
sshPort, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPort)
sshUser, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorUser)
sshPassword, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPassword)
sshPrivateKey, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPrivateKey)
// Remove sensitive or redundant variables from the vars map
delete(vars, _const.VariableIPv4)
delete(vars, _const.VariableIPv6)
delete(vars, _const.VariableConnector)
return api.InventoryHostTable{
Name: hostname,
InternalIPV4: internalIPV4,
InternalIPV6: internalIPV6,
SSHHost: sshHost,
SSHPort: sshPort,
SSHUser: sshUser,
SSHPassword: sshPassword,
SSHPrivateKey: sshPrivateKey,
Vars: vars,
Groups: []string{},
}
}
// Convert inventory groups for host membership lookup
groups := variable.ConvertGroup(*inventory)
// fillGroups adds group names to the InventoryHostTable item if the host is a member
fillGroups := func(item *api.InventoryHostTable) {
for groupName, hosts := range groups {
if groupName == _const.VariableGroupsAll || groupName == _const.VariableUnGrouped {
continue
}
if slices.Contains(hosts, item.Name) {
item.Groups = append(item.Groups, groupName)
}
}
}
// fillTaskInfo populates status and architecture info for the host from task results
fillTaskInfo := func(item *api.InventoryHostTable) {
for _, task := range taskList.Items {
switch task.Name {
case _const.Getenv(_const.TaskNameGatherFacts):
for _, result := range task.Status.HostResults {
if result.Host == item.Name {
item.Status = result.Stdout
break
}
}
case _const.Getenv(_const.TaskNameGetArch):
for _, result := range task.Status.HostResults {
if result.Host == item.Name {
item.Arch = result.Stdout
break
}
}
}
}
}
// less is a comparison function for sorting InventoryHostTable items by a given field
less := func(left, right api.InventoryHostTable, sortBy query.Field) bool {
leftVal := query.GetFieldByJSONTag(reflect.ValueOf(left), sortBy)
rightVal := query.GetFieldByJSONTag(reflect.ValueOf(right), sortBy)
switch leftVal.Kind() {
case reflect.String:
return leftVal.String() > rightVal.String()
case reflect.Int, reflect.Int64:
return leftVal.Int() > rightVal.Int()
default:
return left.Name > right.Name
}
}
// filter is a function to filter InventoryHostTable items based on query filters
filter := func(o api.InventoryHostTable, f query.Filter) bool {
val := query.GetFieldByJSONTag(reflect.ValueOf(o), f.Field)
switch val.Kind() {
case reflect.String:
return val.String() == string(f.Value)
default:
return true
}
}
// Build the host table for the inventory
hostTable := make([]api.InventoryHostTable, 0)
for hostname, raw := range inventory.Spec.Hosts {
item := buildHostItem(hostname, raw)
fillGroups(&item)
fillTaskInfo(&item)
hostTable = append(hostTable, item)
}
// Sort and filter the host table, then write the result
results := query.DefaultList(hostTable, queryParam, less, filter)
_ = response.WriteEntity(results)
}
// createPlaybook handles the creation of a new playbook resource.
// It reads the playbook from the request, sets the workdir, creates the resource, and starts execution in a goroutine.
func (h *coreHandler) createPlaybook(request *restful.Request, response *restful.Response) {
playbook := &kkcorev1.Playbook{}
if err := request.ReadEntity(playbook); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// Set workdir to playbook spec config.
if err := unstructured.SetNestedField(playbook.Spec.Config.Value(), h.workdir, _const.Workdir); err != nil {
api.HandleBadRequest(response, request, err)
return
}
if err := h.client.Create(request.Request.Context(), playbook); err != nil {
api.HandleBadRequest(response, request, err)
return
}
go func() {
// Create playbook log file and execute the playbook, writing output to the log.
filename := filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log")
// Check if the directory for the log file exists, and create it if it does not.
if _, err := os.Stat(filepath.Dir(filename)); err != nil {
if !os.IsNotExist(err) {
api.HandleBadRequest(response, request, err)
return
}
// If directory does not exist, create it.
if err := os.MkdirAll(filepath.Dir(filename), os.ModePerm); err != nil {
api.HandleBadRequest(response, request, err)
return
}
}
// Open the log file for writing.
file, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
klog.ErrorS(err, "failed to open file", "file", filename)
return
}
defer file.Close()
// Create a cancellable context for the playbook execution.
ctx, cancel := context.WithCancel(context.Background())
// Add the playbook and its cancel function to the playbookManager.
h.playbookManager.addPlaybook(playbook, cancel)
// Execute the playbook and write output to the log file.
if err := executor.NewPlaybookExecutor(ctx, h.client, playbook, file).Exec(ctx); err != nil {
klog.ErrorS(err, "failed to exec playbook", "playbook", playbook.Name)
}
// Remove the playbook from the playbookManager after execution.
h.playbookManager.deletePlaybook(playbook)
}()
// For web UI: it does not run in Kubernetes, so execute playbook immediately.
_ = response.WriteEntity(playbook)
}
// listPlaybooks handles listing playbook resources with filtering and pagination.
// It supports field selectors and label selectors for filtering the results.
func (h *coreHandler) listPlaybooks(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
var fieldselector fields.Selector
// Parse field selector from query parameters if present.
if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok {
fs, err := fields.ParseSelector(string(v))
if err != nil {
api.HandleError(response, request, err)
return
}
fieldselector = fs
}
namespace := request.PathParameter("namespace")
playbookList := &kkcorev1.PlaybookList{}
// List playbooks from the Kubernetes API with the specified options.
err := h.client.List(request.Request.Context(), playbookList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector})
if err != nil {
api.HandleError(response, request, err)
return
}
// Sort and filter the playbook list using DefaultList.
results := query.DefaultList(playbookList.Items, queryParam, func(left, right kkcorev1.Playbook, sortBy query.Field) bool {
leftMeta, err := meta.Accessor(left)
if err != nil {
return false
}
rightMeta, err := meta.Accessor(right)
if err != nil {
return false
}
return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy)
}, func(o kkcorev1.Playbook, filter query.Filter) bool {
// Skip fieldselector filter.
if filter.Field == query.ParameterFieldSelector {
return true
}
objectMeta, err := meta.Accessor(o)
if err != nil {
return false
}
return query.DefaultObjectMetaFilter(objectMeta, filter)
})
_ = response.WriteEntity(results)
}
// playbookInfo handles retrieving a single playbook or watching for changes.
// If the "watch" query parameter is set to "true", it streams updates to the client.
func (h *coreHandler) playbookInfo(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
watch := request.QueryParameter("watch")
playbook := &kkcorev1.Playbook{}
if watch == "true" {
// Watch for changes to the playbook resource and stream events as JSON.
h.restconfig.GroupVersion = &kkcorev1.SchemeGroupVersion
client, err := rest.RESTClientFor(h.restconfig)
if err != nil {
api.HandleError(response, request, err)
return
}
watchInterface, err := client.Get().Namespace(namespace).Resource("playbooks").Name(name).Param("watch", "true").Watch(request.Request.Context())
if err != nil {
api.HandleError(response, request, err)
return
}
defer watchInterface.Stop()
response.AddHeader("Content-Type", "application/json")
flusher, ok := response.ResponseWriter.(http.Flusher)
if !ok {
http.Error(response.ResponseWriter, "Streaming unsupported", http.StatusInternalServerError)
return
}
encoder := json.NewEncoder(response.ResponseWriter)
// Stream each event object to the client as JSON.
for event := range watchInterface.ResultChan() {
if err := encoder.Encode(event.Object); err != nil {
break
}
flusher.Flush()
}
return
}
// Retrieve the playbook resource by namespace and name.
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(playbook)
}
// logPlaybook handles streaming the log file for a playbook.
// It opens the log file and streams its contents to the client, supporting live updates.
func (h *coreHandler) logPlaybook(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
playbook := &kkcorev1.Playbook{}
// Retrieve the playbook resource to get its config for log file path.
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
// Build the log file path for the playbook.
filename := filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log")
file, err := os.Open(filename)
if err != nil {
api.HandleError(response, request, err)
return
}
defer file.Close()
response.AddHeader("Content-Type", "text/plain; charset=utf-8")
writer := response.ResponseWriter
flusher, ok := writer.(http.Flusher)
if !ok {
http.Error(writer, "Streaming unsupported", http.StatusInternalServerError)
return
}
// Stream the log file line by line, waiting for new lines if at EOF.
reader := bufio.NewReader(file)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
// If EOF, wait for new log lines to be written.
time.Sleep(500 * time.Millisecond)
continue
}
break
}
fmt.Fprint(writer, line)
flusher.Flush()
}
}
// deletePlaybook handles deletion of a playbook resource and its associated tasks.
// It stops the playbook execution if running, deletes the playbook, and deletes all related tasks.
func (h *coreHandler) deletePlaybook(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
playbook := &kkcorev1.Playbook{}
// Retrieve the playbook resource to delete.
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
// Stop the playbook execution if it is running.
h.playbookManager.stopPlaybook(playbook)
// Delete the playbook resource.
if err := h.client.Delete(request.Request.Context(), playbook); err != nil {
api.HandleError(response, request, err)
return
}
// delete relative filepath: variable and log
_ = os.Remove(filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log"))
_ = os.RemoveAll(filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name))
// Delete all tasks owned by this playbook.
if err := h.client.DeleteAllOf(request.Request.Context(), &kkcorev1alpha1.Task{}, ctrlclient.MatchingFields{
"playbook.name": playbook.Name, "playbook.namespace": playbook.Namespace,
}); err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(api.SUCCESS)
}

View File

@ -1,461 +0,0 @@
package web
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"reflect"
"slices"
"time"
"github.com/emicklei/go-restful/v3"
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
kkcorev1alpha1 "github.com/kubesphere/kubekey/api/core/v1alpha1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/executor"
"github.com/kubesphere/kubekey/v4/pkg/variable"
"github.com/kubesphere/kubekey/v4/pkg/web/api"
"github.com/kubesphere/kubekey/v4/pkg/web/query"
)
// newHandler creates a new handler instance with the given workdir, client and config
// workdir: Base directory for storing work files
// client: Kubernetes client for API operations
// config: Kubernetes REST client configuration
func newHandler(workdir string, client ctrlclient.Client, config *rest.Config) *handler {
return &handler{workdir: workdir, client: client, restconfig: config}
}
// handler implements HTTP handlers for managing inventories and playbooks
// It provides methods for CRUD operations on inventories and playbooks
type handler struct {
workdir string // Base directory for storing work files
restconfig *rest.Config // Kubernetes REST client configuration
client ctrlclient.Client // Kubernetes client for API operations
}
// createInventory creates a new inventory resource
// It reads the inventory from the request body and creates it in the Kubernetes cluster
func (h handler) createInventory(request *restful.Request, response *restful.Response) {
inventory := &kkcorev1.Inventory{}
if err := request.ReadEntity(inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
if err := h.client.Create(request.Request.Context(), inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// patchInventory patches an existing inventory resource
// It reads the patch data from the request body and applies it to the specified inventory
func (h handler) patchInventory(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
data, err := io.ReadAll(request.Request.Body)
if err != nil {
api.HandleBadRequest(response, request, err)
return
}
patchType := request.HeaderParameter("Content-Type")
// get old inventory
inventory := &kkcorev1.Inventory{}
if err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory); err != nil {
api.HandleBadRequest(response, request, err)
return
}
if err := h.client.Patch(request.Request.Context(), inventory, ctrlclient.RawPatch(types.PatchType(patchType), data)); err != nil {
api.HandleBadRequest(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// listInventories lists all inventory resources with optional filtering and sorting
// It supports field selectors and label selectors for filtering the results
func (h handler) listInventories(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
var fieldselector fields.Selector
if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok {
fs, err := fields.ParseSelector(string(v))
if err != nil {
api.HandleError(response, request, err)
return
}
fieldselector = fs
}
namespace := request.PathParameter("namespace")
inventoryList := &kkcorev1.InventoryList{}
err := h.client.List(request.Request.Context(), inventoryList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector})
if err != nil {
api.HandleError(response, request, err)
return
}
results := query.DefaultList(inventoryList.Items, queryParam, func(left, right kkcorev1.Inventory, sortBy query.Field) bool {
leftMeta, err := meta.Accessor(left)
if err != nil {
return false
}
rightMeta, err := meta.Accessor(right)
if err != nil {
return false
}
return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy)
}, func(o kkcorev1.Inventory, filter query.Filter) bool {
// skip fieldselector
if filter.Field == query.ParameterFieldSelector {
return true
}
objectMeta, err := meta.Accessor(o)
if err != nil {
return false
}
return query.DefaultObjectMetaFilter(objectMeta, filter)
})
_ = response.WriteEntity(results)
}
// getInventory retrieves a specific inventory resource
// It returns the inventory with the specified name in the given namespace
func (h handler) inventoryInfo(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
inventory := &kkcorev1.Inventory{}
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory)
if err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(inventory)
}
// listInventoryHosts lists all hosts in an inventory with their details
// It includes information about SSH configuration, IP addresses, and group membership
func (h handler) listInventoryHosts(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
namespace := request.PathParameter("namespace")
name := request.PathParameter("inventory")
inventory := &kkcorev1.Inventory{}
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, inventory)
if err != nil {
api.HandleError(response, request, err)
return
}
taskList := &kkcorev1alpha1.TaskList{}
_ = h.client.List(request.Request.Context(), taskList, ctrlclient.InNamespace(namespace), ctrlclient.MatchingFields{
"playbook.name": inventory.Annotations[kkcorev1.HostCheckPlaybookAnnotation],
})
buildHostItem := func(hostname string, raw runtime.RawExtension) api.InventoryHostTable {
vars := variable.Extension2Variables(raw)
internalIPV4, _ := variable.StringVar(nil, vars, _const.VariableIPv4)
internalIPV6, _ := variable.StringVar(nil, vars, _const.VariableIPv6)
sshHost, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorHost)
sshPort, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPort)
sshUser, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorUser)
sshPassword, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPassword)
sshPrivateKey, _ := variable.StringVar(nil, vars, _const.VariableConnector, _const.VariableConnectorPrivateKey)
delete(vars, _const.VariableIPv4)
delete(vars, _const.VariableIPv6)
delete(vars, _const.VariableConnector)
return api.InventoryHostTable{
Name: hostname,
InternalIPV4: internalIPV4,
InternalIPV6: internalIPV6,
SSHHost: sshHost,
SSHPort: sshPort,
SSHUser: sshUser,
SSHPassword: sshPassword,
SSHPrivateKey: sshPrivateKey,
Vars: vars,
Groups: []string{},
}
}
groups := variable.ConvertGroup(*inventory)
fillGroups := func(item *api.InventoryHostTable) {
for groupName, hosts := range groups {
if groupName == _const.VariableGroupsAll || groupName == _const.VariableUnGrouped {
continue
}
if slices.Contains(hosts, item.Name) {
item.Groups = append(item.Groups, groupName)
}
}
}
fillTaskInfo := func(item *api.InventoryHostTable) {
for _, task := range taskList.Items {
switch task.Name {
case _const.Getenv(_const.TaskNameGatherFacts):
for _, result := range task.Status.HostResults {
if result.Host == item.Name {
item.Status = result.Stdout
break
}
}
case _const.Getenv(_const.TaskNameGetArch):
for _, result := range task.Status.HostResults {
if result.Host == item.Name {
item.Arch = result.Stdout
break
}
}
}
}
}
less := func(left, right api.InventoryHostTable, sortBy query.Field) bool {
leftVal := left.Name
if val := reflect.ValueOf(left).FieldByName(string(sortBy)); val.Kind() == reflect.String {
leftVal = val.String()
}
rightVal := right.Name
if val := reflect.ValueOf(right).FieldByName(string(sortBy)); val.Kind() == reflect.String {
rightVal = val.String()
}
return leftVal > rightVal
}
filter := func(o api.InventoryHostTable, f query.Filter) bool {
if f.Field == query.ParameterFieldSelector {
return true
}
objectMeta, err := meta.Accessor(o)
if err != nil {
return false
}
return query.DefaultObjectMetaFilter(objectMeta, f)
}
hostTable := make([]api.InventoryHostTable, 0)
for hostname, raw := range inventory.Spec.Hosts {
item := buildHostItem(hostname, raw)
fillGroups(&item)
fillTaskInfo(&item)
hostTable = append(hostTable, item)
}
results := query.DefaultList(hostTable, queryParam, less, filter)
_ = response.WriteEntity(results)
}
// createPlaybook handles the creation of a new playbook resource
func (h handler) createPlaybook(request *restful.Request, response *restful.Response) {
playbook := &kkcorev1.Playbook{}
if err := request.ReadEntity(playbook); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// set workdir to playbook
if err := unstructured.SetNestedField(playbook.Spec.Config.Value(), h.workdir, _const.Workdir); err != nil {
api.HandleBadRequest(response, request, err)
return
}
if err := h.client.Create(request.Request.Context(), playbook); err != nil {
api.HandleBadRequest(response, request, err)
return
}
go func() {
// create playbook log
filename := filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log")
if _, err := os.Stat(filepath.Dir(filename)); err != nil {
if !os.IsNotExist(err) {
api.HandleBadRequest(response, request, err)
return
}
// if dir is not exist, create it.
if err := os.MkdirAll(filepath.Dir(filename), os.ModePerm); err != nil {
api.HandleBadRequest(response, request, err)
return
}
}
file, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
klog.ErrorS(err, "failed to open file", "file", filename)
return
}
defer file.Close()
ctx := context.TODO()
if err := executor.NewPlaybookExecutor(ctx, h.client, playbook, file).Exec(ctx); err != nil {
klog.ErrorS(err, "failed to exec playbook", "playbook", playbook.Name)
}
}()
// for web ui. it not run in kubernetes. executor playbook right now
_ = response.WriteEntity(playbook)
}
// listPlaybooks handles listing playbook resources with filtering and pagination
func (h handler) listPlaybooks(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
var fieldselector fields.Selector
if v, ok := queryParam.Filters[query.ParameterFieldSelector]; ok {
fs, err := fields.ParseSelector(string(v))
if err != nil {
api.HandleError(response, request, err)
return
}
fieldselector = fs
}
namespace := request.PathParameter("namespace")
playbookList := &kkcorev1.PlaybookList{}
err := h.client.List(request.Request.Context(), playbookList, &ctrlclient.ListOptions{Namespace: namespace, LabelSelector: queryParam.Selector(), FieldSelector: fieldselector})
if err != nil {
api.HandleError(response, request, err)
return
}
results := query.DefaultList(playbookList.Items, queryParam, func(left, right kkcorev1.Playbook, sortBy query.Field) bool {
leftMeta, err := meta.Accessor(left)
if err != nil {
return false
}
rightMeta, err := meta.Accessor(right)
if err != nil {
return false
}
return query.DefaultObjectMetaCompare(leftMeta, rightMeta, sortBy)
}, func(o kkcorev1.Playbook, filter query.Filter) bool {
// skip fieldselector
if filter.Field == query.ParameterFieldSelector {
return true
}
objectMeta, err := meta.Accessor(o)
if err != nil {
return false
}
return query.DefaultObjectMetaFilter(objectMeta, filter)
})
_ = response.WriteEntity(results)
}
// playbookInfo handles retrieving a single playbook or watching for changes
func (h handler) playbookInfo(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
watch := request.QueryParameter("watch")
playbook := &kkcorev1.Playbook{}
if watch == "true" {
h.restconfig.GroupVersion = &kkcorev1.SchemeGroupVersion
client, err := rest.RESTClientFor(h.restconfig)
if err != nil {
api.HandleError(response, request, err)
return
}
watchInterface, err := client.Get().Namespace(namespace).Resource("playbooks").Name(name).Param("watch", "true").Watch(request.Request.Context())
if err != nil {
api.HandleError(response, request, err)
return
}
defer watchInterface.Stop()
response.AddHeader("Content-Type", "application/json")
flusher, ok := response.ResponseWriter.(http.Flusher)
if !ok {
http.Error(response.ResponseWriter, "Streaming unsupported", http.StatusInternalServerError)
return
}
encoder := json.NewEncoder(response.ResponseWriter)
for event := range watchInterface.ResultChan() {
if err := encoder.Encode(event.Object); err != nil {
break
}
flusher.Flush()
}
return
}
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
_ = response.WriteEntity(playbook)
}
// logPlaybook handles streaming the log file for a playbook
func (h handler) logPlaybook(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
name := request.PathParameter("playbook")
playbook := &kkcorev1.Playbook{}
err := h.client.Get(request.Request.Context(), ctrlclient.ObjectKey{Namespace: namespace, Name: name}, playbook)
if err != nil {
api.HandleError(response, request, err)
return
}
filename := filepath.Join(_const.GetWorkdirFromConfig(playbook.Spec.Config), _const.RuntimeDir, kkcorev1.SchemeGroupVersion.Group, kkcorev1.SchemeGroupVersion.Version, "playbooks", playbook.Namespace, playbook.Name, playbook.Name+".log")
file, err := os.Open(filename)
if err != nil {
api.HandleError(response, request, err)
return
}
defer file.Close()
response.AddHeader("Content-Type", "text/plain; charset=utf-8")
writer := response.ResponseWriter
flusher, ok := writer.(http.Flusher)
if !ok {
http.Error(writer, "Streaming unsupported", http.StatusInternalServerError)
return
}
reader := bufio.NewReader(file)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
time.Sleep(500 * time.Millisecond)
continue
}
break
}
fmt.Fprint(writer, line)
flusher.Flush()
}
}

View File

@ -16,6 +16,8 @@ limitations under the License.
package query
import "reflect"
// Field represents a query field name used for filtering and sorting
type Field string
@ -50,3 +52,39 @@ const (
// FieldOwnerKind represents the owner kind field of a resource
FieldOwnerKind = "ownerKind"
)
// GetFieldByJSONTag returns the value of the struct field whose JSON tag matches the given field name (filed).
// If not found by JSON tag, it tries to find the field by its struct field name.
// The function expects obj to be a struct or a pointer to a struct.
func GetFieldByJSONTag(obj reflect.Value, filed Field) reflect.Value {
// If obj is a pointer, get the element it points to
if obj.Kind() == reflect.Ptr {
obj = obj.Elem()
}
// If obj is not a struct, return zero Value
if obj.Kind() != reflect.Struct {
return reflect.Value{}
}
typ := obj.Type()
// Iterate over all struct fields
for i := range obj.NumField() {
structField := typ.Field(i)
jsonTag := structField.Tag.Get("json")
// The tag may have options, e.g. "name,omitempty"
// Check for exact match or prefix match before comma
if jsonTag == string(filed) ||
(jsonTag != "" && jsonTag == string(filed)+",omitempty") ||
(jsonTag != "" && len(jsonTag) >= len(string(filed)) &&
jsonTag[:len(string(filed))] == string(filed) &&
(len(jsonTag) == len(string(filed)) || jsonTag[len(string(filed))] == ',')) {
// Return the field value if the JSON tag matches
return obj.Field(i)
}
}
// If not found by json tag, try by field name (case-sensitive)
if f := obj.FieldByName(string(filed)); f.IsValid() {
return f
}
// Return zero Value if not found
return reflect.Value{}
}

508
pkg/web/resources.go Normal file
View File

@ -0,0 +1,508 @@
package web
import (
"encoding/json"
"errors"
"net"
"net/http"
"net/netip"
"os"
"os/user"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"time"
restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3"
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
"golang.org/x/crypto/ssh"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/web/api"
"github.com/kubesphere/kubekey/v4/pkg/web/query"
)
// NewSchemaService creates a new WebService that serves schema files from the specified root path.
// It sets up a route that handles GET requests to /resources/schema/{subpath} and serves files from the rootPath directory.
// The {subpath:*} parameter allows for matching any path under /resources/schema/.
func NewSchemaService(rootPath string, workdir string, client ctrlclient.Client) *restful.WebService {
ws := new(restful.WebService)
ws.Path("/resources").
Produces(restful.MIME_JSON, "text/plain")
h := newSchemaHandler(rootPath, workdir, client)
ws.Route(ws.GET("/ip").To(h.listIP).
Doc("list available ip from ip cidr").
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}).
Param(ws.QueryParameter("cidr", "the cidr for ip").Required(true)).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=ip").Required(false).DefaultValue("ip")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.IPTable]{}))
ws.Route(ws.GET("/schema/{subpath:*}").To(h.schemaInfo).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}))
ws.Route(ws.GET("/schema").To(h.allSchema).
Doc("list all schema as table").
Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}).
Param(ws.QueryParameter("schemaType", "the type of schema json").Required(false)).
Param(ws.QueryParameter("playbookLabel", "the reference playbook of schema. eg: install.kubekey.kubesphere.io/schema,check.kubekey.kubesphere.io/schema"+
"if empty will not return any reference playbook").Required(false)).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=priority")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.SchemaTable]{}))
return ws
}
// newSchemaHandler creates a new schemaHandler instance with the given rootPath, workdir, and client.
func newSchemaHandler(rootPath string, workdir string, client ctrlclient.Client) *schemaHandler {
return &schemaHandler{rootPath: rootPath, workdir: workdir, client: client}
}
// schemaHandler handles schema-related HTTP requests.
type schemaHandler struct {
rootPath string
workdir string
client ctrlclient.Client
}
func (h schemaHandler) listIP(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
cidr, ok := queryParam.Filters["cidr"]
if !ok || len(cidr) == 0 {
api.HandleBadRequest(response, request, errors.New("cidr parameter is required"))
return
}
ips := _const.ParseIP(string(cidr))
ipTable := make([]api.IPTable, 0, len(ips))
maxConcurrency := 20
mu := sync.Mutex{}
jobChannel := make(chan string, 20)
wg := sync.WaitGroup{}
for range maxConcurrency {
wg.Add(1)
go func() {
defer wg.Done()
for ip := range jobChannel {
if ifLocalhostIP(ip) {
mu.Lock()
ipTable = append(ipTable, api.IPTable{
IP: ip,
Localhost: true,
SSHReachable: true,
SSHAuthorized: true,
})
mu.Unlock()
continue
}
// Check if the host is online using the ICMP protocol (ping).
// Requires root privileges or CAP_NET_RAW capability.
if !ifIPOnline(ip) {
continue
}
reachable, authorized := ifIPSSHAuthorized(ip)
mu.Lock()
ipTable = append(ipTable, api.IPTable{
IP: ip,
SSHReachable: reachable,
SSHAuthorized: authorized,
})
mu.Unlock()
}
}()
}
for _, ip := range ips {
jobChannel <- ip
}
close(jobChannel)
wg.Wait()
// less is a comparison function for sorting SchemaTable items by a given field.
less := func(left, right api.IPTable, sortBy query.Field) bool {
leftVal := query.GetFieldByJSONTag(reflect.ValueOf(left), sortBy)
rightVal := query.GetFieldByJSONTag(reflect.ValueOf(right), sortBy)
switch leftVal.Kind() {
case reflect.String:
if sortBy == "ip" {
leftIP, err := netip.ParseAddr(leftVal.String())
if err != nil {
return true
}
rightIP, err := netip.ParseAddr(rightVal.String())
if err != nil {
return true
}
return leftIP.Compare(rightIP) > 0
}
return leftVal.String() > rightVal.String()
case reflect.Int, reflect.Int64:
return leftVal.Int() > rightVal.Int()
default:
return true
}
}
// filter is a function for filtering SchemaTable items by a given field and value.
filter := func(o api.IPTable, f query.Filter) bool {
val := query.GetFieldByJSONTag(reflect.ValueOf(o), f.Field)
switch val.Kind() {
case reflect.String:
return val.String() == string(f.Value)
case reflect.Int:
v, err := strconv.Atoi(string(f.Value))
if err != nil {
return false
}
return v == int(val.Int())
default:
return true
}
}
// Use the DefaultList function to apply filtering, sorting, and pagination.
results := query.DefaultList(ipTable, queryParam, less, filter)
_ = response.WriteEntity(results)
}
// schemaInfo serves static schema files from the rootPath directory.
// It strips the /resources/schema/ prefix and serves files using http.FileServer.
func (h schemaHandler) schemaInfo(request *restful.Request, response *restful.Response) {
http.StripPrefix("/resources/schema/", http.FileServer(http.Dir(h.rootPath))).ServeHTTP(response.ResponseWriter, request.Request)
}
// allSchema lists all schema JSON files in the rootPath directory as a table.
// It supports filtering, sorting, and pagination via query parameters.
func (h schemaHandler) allSchema(request *restful.Request, response *restful.Response) {
queryParam := query.ParseQueryParameter(request)
playbookLabel := string(queryParam.Filters["playbookLabel"])
// Get all entries in the rootPath directory.
entries, err := os.ReadDir(h.rootPath)
if err != nil {
api.HandleBadRequest(response, request, err)
return
}
schemaTable := make([]api.SchemaTable, 0)
for _, entry := range entries {
if entry.IsDir() || // Skip directories.
!strings.HasSuffix(entry.Name(), ".json") || // Only process files with .json suffix.
entry.Name() == "product.json" { // "product.json" is agreed file name
continue
}
// Read the JSON file.
data, err := os.ReadFile(filepath.Join(h.rootPath, entry.Name()))
if err != nil {
api.HandleBadRequest(response, request, err)
return
}
schema := api.SchemaTable{Name: entry.Name()}
// Unmarshal the JSON data into a SchemaTable struct.
if err := json.Unmarshal(data, &schema); err != nil {
api.HandleBadRequest(response, request, err)
return
}
// get reference playbook
if playbookLabel != "" {
playbookList := &kkcorev1.PlaybookList{}
if err := h.client.List(request.Request.Context(), playbookList, ctrlclient.MatchingLabels{
playbookLabel: entry.Name(),
}); err != nil {
api.HandleBadRequest(response, request, err)
return
}
schema.Playbook = make([]api.SchemaTablePlaybook, len(playbookList.Items))
for i, playbook := range playbookList.Items {
schema.Playbook[i] = api.SchemaTablePlaybook{
Name: playbook.Name,
Namespace: playbook.Namespace,
Phase: string(playbook.Status.Phase),
}
}
}
schemaTable = append(schemaTable, schema)
}
// less is a comparison function for sorting SchemaTable items by a given field.
less := func(left, right api.SchemaTable, sortBy query.Field) bool {
leftVal := query.GetFieldByJSONTag(reflect.ValueOf(left), sortBy)
rightVal := query.GetFieldByJSONTag(reflect.ValueOf(right), sortBy)
switch leftVal.Kind() {
case reflect.String:
return leftVal.String() > rightVal.String()
case reflect.Int, reflect.Int64:
return leftVal.Int() > rightVal.Int()
default:
return left.Priority > right.Priority
}
}
// filter is a function for filtering SchemaTable items by a given field and value.
filter := func(o api.SchemaTable, f query.Filter) bool {
val := query.GetFieldByJSONTag(reflect.ValueOf(o), f.Field)
switch val.Kind() {
case reflect.String:
return val.String() == string(f.Value)
case reflect.Int:
v, err := strconv.Atoi(string(f.Value))
if err != nil {
return false
}
return v == int(val.Int())
default:
return true
}
}
// Use the DefaultList function to apply filtering, sorting, and pagination.
results := query.DefaultList(schemaTable, queryParam, less, filter)
_ = response.WriteEntity(results)
}
// ifLocalhostIP checks if the given IP address string (ipStr) is bound to any local network interface.
// It returns true if the IP is found on any interface, false otherwise.
func ifLocalhostIP(ipStr string) bool {
targetIP := net.ParseIP(ipStr)
if targetIP == nil {
return false
}
ifaces, err := net.Interfaces()
if err != nil {
return false
}
for _, iface := range ifaces {
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
if v.IP.Equal(targetIP) {
return true
}
case *net.IPAddr:
if v.IP.Equal(targetIP) {
return true
}
}
}
}
return false
}
// ifIPOnline checks if the given IP address is online by sending an ICMP Echo Request (ping).
// It returns true if a reply is received, false otherwise.
// The timeout for the ICMP connection and reply is set to 1 second.
func ifIPOnline(ipStr string) bool {
ip := net.ParseIP(ipStr)
if ip == nil {
return false
}
var (
network string
icmpType icmp.Type
protocol int
listenAddr string
replyFilter func(icmp.Type) bool
deadline = time.Now().Add(1 * time.Second)
)
// Determine if the IP is IPv4 or IPv6 and set appropriate values.
if ip.To4() != nil {
network = "ip4:icmp"
icmpType = ipv4.ICMPTypeEcho
protocol = 1 // ICMP for IPv4
listenAddr = "0.0.0.0"
replyFilter = func(t icmp.Type) bool {
return t == ipv4.ICMPTypeEchoReply || t == ipv4.ICMPTypeDestinationUnreachable
}
} else if ip.To16() != nil {
network = "ip6:ipv6-icmp"
icmpType = ipv6.ICMPTypeEchoRequest
protocol = 58 // ICMPv6
listenAddr = "::"
replyFilter = func(t icmp.Type) bool {
return t == ipv6.ICMPTypeEchoReply || t == ipv6.ICMPTypeDestinationUnreachable
}
} else {
// Not a valid IP address.
return false
}
// Listen for ICMP packets on the specified network.
conn, err := icmp.ListenPacket(network, listenAddr)
if err != nil {
klog.V(6).Infof("connect to %q use icmp failed, error: %v", ip, err)
return false
}
defer conn.Close()
// Set a deadline for the entire operation (write + read).
err = conn.SetDeadline(deadline)
if err != nil {
klog.V(6).Infof("set deadline for %q use icmp failed, error: %v", ip, err)
return false
}
pid := os.Getpid() & 0xffff
seq := int(time.Now().UnixNano() & 0xffff)
// Construct the ICMP Echo Request message.
msg := icmp.Message{
Type: icmpType,
Code: 0,
Body: &icmp.Echo{
ID: pid,
Seq: seq,
Data: []byte("PING"),
},
}
msgBytes, err := msg.Marshal(nil)
if err != nil {
klog.V(6).Infof("marshal msg to %q use icmp failed, error: %v", ip, err)
return false
}
// Send the ICMP Echo Request to the target IP address.
_, err = conn.WriteTo(msgBytes, &net.IPAddr{IP: ip})
if err != nil {
klog.V(6).Infof("write msg to %q use icmp failed, error: %v", ip, err)
return false
}
reply := make([]byte, 1500)
for time.Now().Before(deadline) {
if time.Until(deadline) <= 0 {
break
}
if err := conn.SetDeadline(time.Now().Add(100 * time.Millisecond)); err != nil {
klog.V(6).Infof("set reply deadline for %q use icmp failed, error: %v", ip, err)
continue
}
n, src, err := conn.ReadFrom(reply)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
continue
}
klog.V(6).Infof("read msg from %q use icmp timeout, error: %v", ip, err)
return false
}
if isValidICMPReply(n, reply, src, ip, protocol, pid, seq, replyFilter) {
return true
}
}
return false
}
func isValidICMPReply(n int, reply []byte, src net.Addr, expectedIP net.IP, protocol int, pid, seq int, replyFilter func(icmp.Type) bool) bool {
srcIP, ok := src.(*net.IPAddr)
if !ok || !srcIP.IP.Equal(expectedIP) {
klog.V(6).Infof("Ignore response from non-target IP: %s (expected: %s)", srcIP, expectedIP)
return false
}
recvMsg, err := icmp.ParseMessage(protocol, reply[:n])
if err != nil {
klog.V(6).Infof("parse msg from %q failed, error: %v", expectedIP, err)
return false
}
if !replyFilter(recvMsg.Type) {
klog.V(6).Infof("Ignore unrelated ICMP type: %v", recvMsg.Type)
return false
}
switch recvMsg.Type {
case ipv4.ICMPTypeEchoReply, ipv6.ICMPTypeEchoReply:
if echo, ok := recvMsg.Body.(*icmp.Echo); ok && echo.ID == pid && echo.Seq == seq {
return true
}
case ipv4.ICMPTypeDestinationUnreachable, ipv6.ICMPTypeDestinationUnreachable:
return false
}
return false
}
// ifIPSSHAuthorized checks if SSH authorization to the given IP is possible using the local private key.
// It returns two booleans: the first indicates if the SSH port (22) is reachable, and the second indicates if SSH authorization using the local private key is successful.
// The function attempts to find the user's private key, read and parse it, and then connect via SSH.
func ifIPSSHAuthorized(ipStr string) (bool, bool) {
// First check if port 22 is reachable on the target IP address.
conn, err := net.DialTimeout("tcp", ipStr+":22", time.Second)
if err != nil {
klog.V(6).Infof("port 22 not reachable on ip %q, error %v", ipStr, err)
return false, false
}
defer conn.Close()
// Set default SSH user and private key path.
sshUser := "root"
sshPrivateKey := "/root/.ssh/id_rsa"
// Try to get the current user and set the SSH user and private key path accordingly.
if currentUser, err := user.Current(); err == nil {
sshUser = currentUser.Username
sshPrivateKey = filepath.Join(currentUser.HomeDir, ".ssh/id_rsa")
}
// Check if the private key file exists.
if _, err := os.Stat(sshPrivateKey); err != nil {
// Port 22 is reachable, but private key is not found.
klog.V(6).Infof("cannot found private key %q local in ip %q, error %v", sshPrivateKey, ipStr, err)
return true, false
}
// Read the private key file.
key, err := os.ReadFile(sshPrivateKey)
if err != nil {
// Port 22 is reachable, but private key cannot be read.
klog.V(6).Infof("cannot read private key %q local in ip %q, error %v", sshPrivateKey, ipStr, err)
return true, false
}
// Parse the private key.
privateKey, err := ssh.ParsePrivateKey(key)
if err != nil {
// Port 22 is reachable, but private key cannot be parsed.
klog.V(6).Infof("cannot parse private key %q local in ip %q, error %v", sshPrivateKey, ipStr, err)
return true, false
}
// Prepare SSH authentication method.
auth := []ssh.AuthMethod{ssh.PublicKeys(privateKey)}
// Attempt to establish an SSH connection to the target IP.
sshClient, err := ssh.Dial("tcp", ipStr+":22", &ssh.ClientConfig{
User: sshUser,
Auth: auth,
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: time.Second,
})
if err != nil {
// Port 22 is reachable, but SSH authentication failed.
klog.V(6).Infof("failed to connect ip %q by ssh, error %v", ipStr, err)
return true, false
}
defer sshClient.Close()
// Port 22 is reachable and SSH authentication succeeded.
return true, true
}

View File

@ -1,24 +0,0 @@
package web
import (
"net/http"
restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
)
// NewSchemaService creates a new WebService that serves schema files from the specified root path.
// It sets up a route that handles GET requests to /resources/schema/{subpath} and serves files from the rootPath directory.
// The {subpath:*} parameter allows for matching any path under /resources/schema/.
func NewSchemaService(rootPath string) *restful.WebService {
ws := new(restful.WebService)
ws.Path("/resources/schema")
ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
http.StripPrefix("/resources/schema/", http.FileServer(http.Dir(rootPath))).ServeHTTP(resp.ResponseWriter, req.Request)
}).Metadata(restfulspec.KeyOpenAPITags, []string{_const.ResourceTag}))
return ws
}

View File

@ -1,137 +0,0 @@
package web
import (
"context"
"net/http"
"strings"
restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3"
kkcorev1 "github.com/kubesphere/kubekey/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
_const "github.com/kubesphere/kubekey/v4/pkg/const"
"github.com/kubesphere/kubekey/v4/pkg/web/api"
"github.com/kubesphere/kubekey/v4/pkg/web/query"
)
// NewWebService creates and configures a new RESTful web service for managing inventories and playbooks.
// It sets up routes for CRUD operations on inventories and playbooks, including pagination, sorting, and filtering.
// Parameters:
// - ctx: Context for the web service
// - workdir: Working directory for file operations
// - client: Kubernetes controller client
// - config: REST configuration
//
// Returns a configured WebService instance
func NewWebService(ctx context.Context, workdir string, client ctrlclient.Client, config *rest.Config) *restful.WebService {
ws := new(restful.WebService)
// the GroupVersion might be empty, we need to remove the final /
ws.Path(strings.TrimRight(_const.APIPath+kkcorev1.SchemeGroupVersion.String(), "/")).
Produces(restful.MIME_JSON).Consumes(
string(types.JSONPatchType),
string(types.MergePatchType),
string(types.StrategicMergePatchType),
string(types.ApplyPatchType),
restful.MIME_JSON)
h := newHandler(workdir, client, config)
// Inventory management routes
ws.Route(ws.POST("/inventories").To(h.createInventory).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("create a inventory.").
Reads(kkcorev1.Inventory{}).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.PATCH("/namespaces/{namespace}/inventories/{inventory}").To(h.patchInventory).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("patch a inventory.").
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Reads(kkcorev1.Inventory{}).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.GET("/inventories").To(h.listInventories).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all inventories.").
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d").DefaultValue("page=1")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("ascending=false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories").To(h.listInventories).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all inventories in a namespace.").
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d").DefaultValue("page=1")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("ascending=false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Inventory]{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}").To(h.inventoryInfo).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get a inventory in a namespace.").
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Inventory{}))
ws.Route(ws.GET("/namespaces/{namespace}/inventories/{inventory}/hosts").To(h.listInventoryHosts).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all hosts in a inventory.").
Param(ws.PathParameter("namespace", "the namespace of the inventory")).
Param(ws.PathParameter("inventory", "the name of the inventory")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d").DefaultValue("page=1")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("ascending=false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[api.InventoryHostTable]{}))
// Playbook management routes
ws.Route(ws.POST("/playbooks").To(h.createPlaybook).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("create a playbook.").
Reads(kkcorev1.Playbook{}).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{}))
ws.Route(ws.GET("/playbooks").To(h.listPlaybooks).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all playbooks.").
Reads(kkcorev1.Playbook{}).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d").DefaultValue("page=1")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("ascending=false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks").To(h.listInventories).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("list all playbooks in a namespace.").
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d").DefaultValue("page=1")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. reverse=true").Required(false).DefaultValue("ascending=false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, _const.StatusOK, api.ListResult[kkcorev1.Playbook]{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}").To(h.playbookInfo).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("get or watch a playbook in a namespace.").
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Param(ws.QueryParameter("watch", "set to true to watch this playbook")).
Returns(http.StatusOK, _const.StatusOK, kkcorev1.Playbook{}))
ws.Route(ws.GET("/namespaces/{namespace}/playbooks/{playbook}/log").To(h.logPlaybook).
Metadata(restfulspec.KeyOpenAPITags, []string{_const.KubeKeyTag}).
Doc("watch a playbook in a namespace.").
Param(ws.PathParameter("namespace", "the namespace of the playbook")).
Param(ws.PathParameter("playbook", "the name of the playbook")).
Returns(http.StatusOK, _const.StatusOK, "text/plain"))
return ws
}