Merge pull request #446 from pixiake/update-cluster-operator

update cluster operator
This commit is contained in:
KubeSphere CI Bot 2021-02-04 12:04:19 +08:00 committed by GitHub
commit b03f72e782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 529 additions and 78 deletions

2
.gitignore vendored
View File

@ -39,3 +39,5 @@ bin
node_modules
package.json
yarn.lock
vendor

View File

@ -21,10 +21,8 @@ FROM debian:stable
RUN useradd -m kubekey && apt-get update && apt-get install bash curl -y; apt-get autoclean; rm -rf /var/lib/apt/lists/*
#ADD kubekey /home/kubekey/kubekey
#RUN chown kubekey:kubekey -R /home/kubekey/kubekey
USER kubekey:kubekey
RUN mkdir -p /home/kubekey/kubekey
WORKDIR /home/kubekey
@ -33,4 +31,3 @@ COPY --from=builder /workspace/helm-charts/src/test/ks-installer /home/kubekey/a
COPY --from=builder /workspace/manager /home/kubekey
COPY --from=builder /workspace/kk /home/kubekey

16
Dockerfile.binaries Normal file
View File

@ -0,0 +1,16 @@
FROM alpine:3
ENV KUBEVERSION=v1.17.9
ENV ETCDVERSION=v3.4.13
ENV CNIVERSION=v0.8.6
ENV HELMVERSION=v3.2.1
ENV ARCH=amd64
WORKDIR /kubekey/${KUBEVERSION}/${ARCH}
RUN wget https://github.com/coreos/etcd/releases/download/${ETCDVERSION}/etcd-${ETCDVERSION}-linux-${ARCH}.tar.gz && \
wget https://storage.googleapis.com/kubernetes-release/release/${KUBEVERSION}/bin/linux/${ARCH}/kubeadm && \
wget https://storage.googleapis.com/kubernetes-release/release/${KUBEVERSION}/bin/linux/${ARCH}/kubelet && \
wget https://storage.googleapis.com/kubernetes-release/release/${KUBEVERSION}/bin/linux/${ARCH}/kubectl && \
wget https://get.helm.sh/helm-${HELMVERSION}-linux-${ARCH}.tar.gz && tar -zxf helm-${HELMVERSION}-linux-${ARCH}.tar.gz && \
mv linux-${ARCH}/helm . && rm -rf *linux-${ARCH}* && \
wget https://github.com/containernetworking/plugins/releases/download/${CNIVERSION}/cni-plugins-linux-${ARCH}-${CNIVERSION}.tgz

View File

@ -63,25 +63,32 @@ type ClusterStatus struct {
Conditions []Condition `json:"Conditions,omitempty"`
}
// JobInfo defines the job information to be used to create a cluster or add a node.
type JobInfo struct {
Namespace string `json:"namespace,omitempty"`
Name string `json:"name,omitempty"`
Pods []PodInfo `json:"pods,omitempty"`
}
// PodInfo defines the pod information to be used to create a cluster or add a node.
type PodInfo struct {
Name string `json:"name,omitempty"`
Containers []ContainerInfo `json:"containers,omitempty"`
}
// ContainerInfo defines the container information to be used to create a cluster or add a node.
type ContainerInfo struct {
Name string `json:"name,omitempty"`
}
// NodeStatus defines the status information of the nodes in the cluster.
type NodeStatus struct {
InternalIP string `json:"internalIP,omitempty"`
Hostname string `json:"hostname,omitempty"`
Roles map[string]bool `json:"roles,omitempty"`
}
// Condition defines the process information.
type Condition struct {
Step string `json:"step,omitempty"`
StartTime metav1.Time `json:"startTime,omitempty"`
@ -118,29 +125,34 @@ func init() {
SchemeBuilder.Register(&Cluster{}, &ClusterList{})
}
// HostCfg defines host information for cluster.
type HostCfg struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Address string `yaml:"address,omitempty" json:"address,omitempty"`
InternalAddress string `yaml:"internalAddress,omitempty" json:"internalAddress,omitempty"`
Port int `yaml:"port,omitempty" json:"port,omitempty"`
User string `yaml:"user,omitempty" json:"user,omitempty"`
Password string `yaml:"password,omitempty" json:"password,omitempty"`
PrivateKey string `yaml:"privateKey,omitempty" json:"privateKey,omitempty"`
PrivateKeyPath string `yaml:"privateKeyPath,omitempty" json:"privateKeyPath,omitempty"`
Arch string `yaml:"arch,omitempty" json:"arch,omitempty"`
Labels map[string]string `yaml:"labels,omitempty" json:"labels,omitempty"`
ID int `json:"-"`
IsEtcd bool `json:"-"`
IsMaster bool `json:"-"`
IsWorker bool `json:"-"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Address string `yaml:"address,omitempty" json:"address,omitempty"`
InternalAddress string `yaml:"internalAddress,omitempty" json:"internalAddress,omitempty"`
Port int `yaml:"port,omitempty" json:"port,omitempty"`
User string `yaml:"user,omitempty" json:"user,omitempty"`
Password string `yaml:"password,omitempty" json:"password,omitempty"`
PrivateKey string `yaml:"privateKey,omitempty" json:"privateKey,omitempty"`
PrivateKeyPath string `yaml:"privateKeyPath,omitempty" json:"privateKeyPath,omitempty"`
Arch string `yaml:"arch,omitempty" json:"arch,omitempty"`
Labels map[string]string `yaml:"labels,omitempty" json:"labels,omitempty"`
ID string `yaml:"id,omitempty" json:"id,omitempty"`
Index int `json:"-"`
IsEtcd bool `json:"-"`
IsMaster bool `json:"-"`
IsWorker bool `json:"-"`
}
// RoleGroups defines the grouping of role for hosts (etcd / master / worker).
type RoleGroups struct {
Etcd []string `yaml:"etcd" json:"etcd,omitempty"`
Master []string `yaml:"master" json:"master,omitempty"`
Worker []string `yaml:"worker" json:"worker,omitempty"`
}
// HostGroups defines the grouping of hosts for cluster (all / etcd / master / worker / k8s).
type HostGroups struct {
All []HostCfg
Etcd []HostCfg
@ -149,24 +161,28 @@ type HostGroups struct {
K8s []HostCfg
}
// ControlPlaneEndpoint defines the control plane endpoint information for cluster.
type ControlPlaneEndpoint struct {
Domain string `yaml:"domain" json:"domain,omitempty"`
Address string `yaml:"address" json:"address,omitempty"`
Port int `yaml:"port" json:"port,omitempty"`
}
// RegistryConfig defines the configuration information of the image's repository.
type RegistryConfig struct {
RegistryMirrors []string `yaml:"registryMirrors" json:"registryMirrors,omitempty"`
InsecureRegistries []string `yaml:"insecureRegistries" json:"insecureRegistries,omitempty"`
PrivateRegistry string `yaml:"privateRegistry" json:"privateRegistry,omitempty"`
}
// KubeSphere defines the configuration information of the KubeSphere.
type KubeSphere struct {
Enabled bool `json:"enabled,omitempty"`
Version string `json:"version,omitempty"`
Configurations string `json:"configurations,omitempty"`
}
// ExternalEtcd defines configuration information of external etcd.
type ExternalEtcd struct {
Endpoints []string
CaFile string
@ -174,6 +190,7 @@ type ExternalEtcd struct {
KeyFile string
}
// GenerateCertSANs is used to generate cert sans for cluster.
func (cfg *ClusterSpec) GenerateCertSANs() []string {
clusterSvc := fmt.Sprintf("kubernetes.default.svc.%s", cfg.Kubernetes.ClusterName)
defaultCertSANs := []string{"kubernetes", "kubernetes.default", "kubernetes.default.svc", clusterSvc, "localhost", "127.0.0.1"}
@ -204,6 +221,7 @@ func (cfg *ClusterSpec) GenerateCertSANs() []string {
return defaultCertSANs
}
// GroupHosts is used to group hosts according to the configuration file.s
func (cfg *ClusterSpec) GroupHosts(logger *log.Logger) (*HostGroups, error) {
clusterHostsGroups := HostGroups{}
@ -217,7 +235,7 @@ func (cfg *ClusterSpec) GroupHosts(logger *log.Logger) (*HostGroups, error) {
return nil, err
}
for index, host := range cfg.Hosts {
host.ID = index
host.Index = index
if len(etcdGroup) > 0 {
for _, hostName := range etcdGroup {
if host.Name == hostName {
@ -262,29 +280,31 @@ func (cfg *ClusterSpec) GroupHosts(logger *log.Logger) (*HostGroups, error) {
//Check that the parameters under roleGroups are incorrect
if len(masterGroup) == 0 {
logger.Fatal(errors.New("The number of master cannot be 0."))
logger.Fatal(errors.New("The number of master cannot be 0"))
}
if len(etcdGroup) == 0 {
logger.Fatal(errors.New("The number of etcd cannot be 0."))
logger.Fatal(errors.New("The number of etcd cannot be 0"))
}
if len(masterGroup) != len(clusterHostsGroups.Master) {
return nil, errors.New("Incorrect nodeName under roleGroups/master in the configuration file, Please check before installing.")
return nil, errors.New("Incorrect nodeName under roleGroups/master in the configuration file")
}
if len(etcdGroup) != len(clusterHostsGroups.Etcd) {
return nil, errors.New("Incorrect nodeName under roleGroups/etcd in the configuration file, Please check before installing.")
return nil, errors.New("Incorrect nodeName under roleGroups/etcd in the configuration file")
}
if len(workerGroup) != len(clusterHostsGroups.Worker) {
return nil, errors.New("Incorrect nodeName under roleGroups/work in the configuration file, Please check before installing.")
return nil, errors.New("Incorrect nodeName under roleGroups/work in the configuration file")
}
return &clusterHostsGroups, nil
}
// ClusterIP is used to get the kube-apiserver service address inside the cluster.
func (cfg *ClusterSpec) ClusterIP() string {
return util.ParseIp(cfg.Network.KubeServiceCIDR)[2]
}
// ParseRolesList is used to parse the host grouping list.
func (cfg *ClusterSpec) ParseRolesList(hostList map[string]string, logger *log.Logger) ([]string, []string, []string, error) {
etcdGroupList := []string{}
masterGroupList := []string{}
@ -343,7 +363,7 @@ func getHostsRange(rangeStr string, hostList map[string]string, group string, lo
func hostVerify(hostList map[string]string, hostName string, group string) error {
if _, ok := hostList[hostName]; !ok {
return errors.New(fmt.Sprintf("[%s] is in [%s] group, but not in hosts list.", hostName, group))
return fmt.Errorf("[%s] is in [%s] group, but not in hosts list", hostName, group)
}
return nil
}

View File

@ -140,7 +140,9 @@ func (in *ClusterSpec) DeepCopyInto(out *ClusterSpec) {
if in.Hosts != nil {
in, out := &in.Hosts, &out.Hosts
*out = make([]HostCfg, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
in.RoleGroups.DeepCopyInto(&out.RoleGroups)
out.ControlPlaneEndpoint = in.ControlPlaneEndpoint
@ -282,6 +284,13 @@ func (in *FlannelCfg) DeepCopy() *FlannelCfg {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *HostCfg) DeepCopyInto(out *HostCfg) {
*out = *in
if in.Labels != nil {
in, out := &in.Labels, &out.Labels
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HostCfg.
@ -300,27 +309,37 @@ func (in *HostGroups) DeepCopyInto(out *HostGroups) {
if in.All != nil {
in, out := &in.All, &out.All
*out = make([]HostCfg, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Etcd != nil {
in, out := &in.Etcd, &out.Etcd
*out = make([]HostCfg, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Master != nil {
in, out := &in.Master, &out.Master
*out = make([]HostCfg, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Worker != nil {
in, out := &in.Worker, &out.Worker
*out = make([]HostCfg, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.K8s != nil {
in, out := &in.K8s, &out.K8s
*out = make([]HostCfg, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}

View File

@ -1,3 +1,19 @@
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (

View File

@ -94,8 +94,14 @@ spec:
type: string
arch:
type: string
id:
type: string
internalAddress:
type: string
labels:
additionalProperties:
type: string
type: object
name:
type: string
password:
@ -118,14 +124,16 @@ spec:
type: array
clusterName:
type: string
containerManager:
type: string
containerRuntimeEndpoint:
type: string
etcdBackupDir:
type: string
etcdBackupPeriod:
type: integer
etcdBackupScript:
type: string
imageRepo:
type: string
keepBackupNumber:
type: integer
masqueradeAll:
@ -168,6 +176,35 @@ spec:
type: string
kubeServiceCIDR:
type: string
kubeovn:
properties:
dpdkMode:
type: boolean
dpdkVersion:
type: string
enableMirror:
type: boolean
enableSSL:
type: boolean
hwOffload:
type: boolean
iface:
type: string
joinCIDR:
type: string
label:
type: string
networkType:
type: string
pingerExternalAddress:
type: string
pingerExternalDomain:
type: string
vlanID:
type: string
vlanInterfaceName:
type: string
type: object
plugin:
type: string
type: object

View File

@ -17,20 +17,27 @@ limitations under the License.
package kubekey
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"os"
"time"
"github.com/go-logr/logr"
"github.com/kubesphere/kubekey/pkg/addons/manifests"
yamlV2 "gopkg.in/yaml.v2"
kubeErr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"time"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/apis/kubekey/v1alpha1"
@ -87,7 +94,10 @@ func (r *ClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
cluster := &kubekeyv1alpha1.Cluster{}
cmFound := &corev1.ConfigMap{}
jobFound := &batchv1.Job{}
var (
clusterAlreadyExist bool
addHosts, removeHosts []kubekeyv1alpha1.HostCfg
)
// Fetch the Cluster object
if err := r.Get(ctx, req.NamespacedName, cluster); err != nil {
if kubeErr.IsNotFound(err) {
@ -98,23 +108,30 @@ func (r *ClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}
if kscluster, err := manifests.GetCluster(cluster.Name); err == nil {
ownerReferencePatch := fmt.Sprintf(`{"metadata": {"ownerReferences": [{"apiVersion": "%s", "kind": "%s", "name": "%s", "uid": "%s"}]}}`, kscluster.GetAPIVersion(), kscluster.GetKind(), kscluster.GetName(), kscluster.GetUID())
_ = r.Patch(context.TODO(), cluster, client.RawPatch(types.MergePatchType, []byte(ownerReferencePatch)))
// Check if the configMap already exists
if err := r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: "kubekey-system"}, cmFound); err == nil {
clusterAlreadyExist = true
}
// create a new cluster
if cluster.Status.NodesCount == 0 {
// create kubesphere cluster
if err := newKubeSphereCluster(r, cluster); err != nil {
return ctrl.Result{}, err
if !clusterAlreadyExist {
// create kubesphere cluster
if err := newKubeSphereCluster(r, cluster); err != nil {
return ctrl.Result{RequeueAfter: 2 * time.Second}, err
}
if err := updateClusterConfigMap(r, ctx, cluster, cmFound, log); err != nil {
return ctrl.Result{RequeueAfter: 2 * time.Second}, err
}
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}
if err := updateClusterConfigMap(r, ctx, cluster, cmFound, log); err != nil {
return ctrl.Result{}, err
}
if err := updateRunJob(r, ctx, cluster, jobFound, log, CreateCluster); err != nil {
return ctrl.Result{Requeue: true}, err
if err := updateRunJob(r, req, ctx, cluster, jobFound, log, CreateCluster); err != nil {
return ctrl.Result{RequeueAfter: 2 * time.Second}, err
}
addHosts = cluster.Spec.Hosts
sendHostsAction(1, addHosts, log)
}
// add nodes to cluster
@ -122,12 +139,128 @@ func (r *ClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
if err := updateClusterConfigMap(r, ctx, cluster, cmFound, log); err != nil {
return ctrl.Result{}, err
}
if err := updateRunJob(r, ctx, cluster, jobFound, log, AddNodes); err != nil {
if err := updateRunJob(r, req, ctx, cluster, jobFound, log, AddNodes); err != nil {
return ctrl.Result{Requeue: true}, err
}
currentNodes := map[string]string{}
for _, node := range cluster.Status.Nodes {
currentNodes[node.Hostname] = node.Hostname
}
for _, host := range cluster.Spec.Hosts {
if _, ok := currentNodes[host.Name]; !ok {
addHosts = append(addHosts, host)
}
}
sendHostsAction(1, addHosts, log)
}
return ctrl.Result{}, nil
// Synchronizing Node Information
if err := r.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-kubeconfig", cluster.Name), Namespace: "kubekey-system"}, cmFound); err == nil && len(cluster.Status.Nodes) != 0 {
cmFound.OwnerReferences = []metav1.OwnerReference{{
APIVersion: cluster.APIVersion,
Kind: cluster.Kind,
Name: cluster.Name,
UID: cluster.UID,
}}
if err := r.Update(ctx, cmFound); err != nil {
return ctrl.Result{Requeue: true}, err
}
kubeconfig, err := base64.StdEncoding.DecodeString(cmFound.Data["kubeconfig"])
if err != nil {
return ctrl.Result{Requeue: true}, err
}
config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfig)
if err != nil {
return ctrl.Result{Requeue: true}, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return ctrl.Result{Requeue: true}, err
}
nodeList, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return ctrl.Result{Requeue: true}, err
}
currentNodes := map[string]string{}
for _, node := range nodeList.Items {
currentNodes[node.Name] = node.Name
}
for _, etcd := range cluster.Spec.RoleGroups.Etcd {
if _, ok := currentNodes[etcd]; !ok {
currentNodes[etcd] = etcd
}
}
nodes := cluster.Status.Nodes
newNodes := []kubekeyv1alpha1.NodeStatus{}
for _, node := range nodes {
if _, ok := currentNodes[node.Hostname]; ok {
newNodes = append(newNodes, node)
}
}
hosts := cluster.Spec.Hosts
newHosts := []kubekeyv1alpha1.HostCfg{}
for _, host := range hosts {
if _, ok := currentNodes[host.Name]; ok {
newHosts = append(newHosts, host)
} else {
removeHosts = append(removeHosts, host)
}
}
sendHostsAction(0, removeHosts, log)
var newEtcd, newMaster, newWorker []string
for _, node := range newNodes {
if node.Roles["etcd"] {
newEtcd = append(newEtcd, node.Hostname)
}
if node.Roles["master"] {
newMaster = append(newMaster, node.Hostname)
}
if node.Roles["worker"] {
newWorker = append(newWorker, node.Hostname)
}
}
cluster.Spec.Hosts = newHosts
cluster.Spec.RoleGroups = kubekeyv1alpha1.RoleGroups{
Etcd: newEtcd,
Master: newMaster,
Worker: newWorker,
}
if err := r.Update(ctx, cluster); err != nil {
return ctrl.Result{Requeue: true}, nil
}
// Fetch the Cluster object
if err := r.Get(ctx, req.NamespacedName, cluster); err != nil {
if kubeErr.IsNotFound(err) {
log.Info("Cluster resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get Cluster")
return ctrl.Result{}, err
}
cluster.Status.Nodes = newNodes
cluster.Status.NodesCount = len(newNodes)
cluster.Status.MasterCount = len(newMaster)
cluster.Status.WorkerCount = len(newWorker)
if err := r.Status().Update(ctx, cluster); err != nil {
return ctrl.Result{Requeue: true}, nil
}
}
return ctrl.Result{RequeueAfter: 2 * time.Minute}, nil
}
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
@ -226,30 +359,61 @@ func (r *ClusterReconciler) jobForCluster(c *kubekeyv1alpha1.Cluster, action str
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{},
Spec: corev1.PodSpec{
Volumes: []corev1.Volume{{
Name: "config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: c.Name,
Volumes: []corev1.Volume{
corev1.Volume{
Name: "config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: c.Name,
},
Items: []corev1.KeyToPath{{
Key: "cluster.yaml",
Path: "cluster.yaml",
}},
},
Items: []corev1.KeyToPath{{
Key: "cluster.yaml",
Path: "cluster.yaml",
}},
},
},
}},
corev1.Volume{
Name: "kube-binaries",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
InitContainers: []corev1.Container{
corev1.Container{
Name: "kube-binaries",
Image: fmt.Sprintf("pixiake/kube-binaries:%s", c.Spec.Kubernetes.Version),
Command: []string{
"sh",
"-c",
"cp -r -f /kubekey/* /home/kubekey/kubekey/",
},
VolumeMounts: []corev1.VolumeMount{
corev1.VolumeMount{
Name: "kube-binaries",
MountPath: "/home/kubekey/kubekey",
},
},
},
},
Containers: []corev1.Container{{
Name: "runner",
Image: image,
ImagePullPolicy: "Always",
Command: []string{"/home/kubekey/kk"},
Args: args,
VolumeMounts: []corev1.VolumeMount{{
Name: "config",
MountPath: "/home/kubekey/config",
}},
VolumeMounts: []corev1.VolumeMount{
corev1.VolumeMount{
Name: "config",
MountPath: "/home/kubekey/config",
},
corev1.VolumeMount{
Name: "kube-binaries",
MountPath: "/home/kubekey/kubekey",
},
},
}},
NodeName: nodeName,
ServiceAccountName: "default",
@ -262,7 +426,7 @@ func (r *ClusterReconciler) jobForCluster(c *kubekeyv1alpha1.Cluster, action str
return job
}
func updateStatusRunner(r *ClusterReconciler, cluster *kubekeyv1alpha1.Cluster, action string) error {
func updateStatusRunner(r *ClusterReconciler, req ctrl.Request, cluster *kubekeyv1alpha1.Cluster, action string) error {
var (
name string
)
@ -280,6 +444,14 @@ func updateStatusRunner(r *ClusterReconciler, cluster *kubekeyv1alpha1.Cluster,
for i := 0; i < 100; i++ {
_ = r.List(context.TODO(), podlist, listOpts...)
if len(podlist.Items) == 1 {
// Fetch the Cluster object
if err := r.Get(context.TODO(), req.NamespacedName, cluster); err != nil {
if kubeErr.IsNotFound(err) {
return nil
}
return err
}
if len(podlist.Items[0].ObjectMeta.GetName()) != 0 && len(podlist.Items[0].Status.ContainerStatuses[0].Name) != 0 && podlist.Items[0].Status.Phase != "Pending" {
cluster.Status.JobInfo = kubekeyv1alpha1.JobInfo{
Namespace: "kubekey-system",
@ -325,7 +497,7 @@ func updateClusterConfigMap(r *ClusterReconciler, ctx context.Context, cluster *
return nil
}
func updateRunJob(r *ClusterReconciler, ctx context.Context, cluster *kubekeyv1alpha1.Cluster, jobFound *batchv1.Job, log logr.Logger, action string) error {
func updateRunJob(r *ClusterReconciler, req ctrl.Request, ctx context.Context, cluster *kubekeyv1alpha1.Cluster, jobFound *batchv1.Job, log logr.Logger, action string) error {
var (
name string
)
@ -369,8 +541,40 @@ func updateRunJob(r *ClusterReconciler, ctx context.Context, cluster *kubekeyv1a
return err
}
}
if err := updateStatusRunner(r, cluster, action); err != nil {
if err := updateStatusRunner(r, req, cluster, action); err != nil {
return err
}
return nil
}
func sendHostsAction(action int, hosts []kubekeyv1alpha1.HostCfg, log logr.Logger) {
if os.Getenv("HOSTS_MANAGER") == "true" {
type HostsAction struct {
Hosts []kubekeyv1alpha1.HostCfg `json:"hosts,omitempty"`
Action int `json:"action,omitempty"`
}
newHostsAction := HostsAction{
Hosts: hosts,
Action: action,
}
fmt.Println(newHostsAction)
hostsInfoBytes, err := json.Marshal(newHostsAction)
if err != nil {
log.Error(err, "Failed to marshal hosts info")
}
fmt.Println(string(hostsInfoBytes))
req, err := http.NewRequest("POST", "http://localhost:8090/api/v1alpha1/hosts", bytes.NewReader(hostsInfoBytes))
if err != nil {
log.Error(err, "Failed to create request")
}
req.Header.Add("Content-Type", "application/json")
_, err = http.DefaultClient.Do(req)
if err != nil {
log.Error(err, "Failed to send hosts info")
}
}
}

View File

@ -20,6 +20,10 @@ import (
"context"
"encoding/base64"
"fmt"
"io/ioutil"
"path/filepath"
"text/template"
kubekeyapiv1alpha1 "github.com/kubesphere/kubekey/apis/kubekey/v1alpha1"
kubekeyclientset "github.com/kubesphere/kubekey/clients/clientset/versioned"
"github.com/kubesphere/kubekey/pkg/addons/manifests"
@ -27,7 +31,6 @@ import (
"github.com/kubesphere/kubekey/pkg/util/manager"
"github.com/lithammer/dedent"
"gopkg.in/yaml.v2"
"io/ioutil"
corev1 "k8s.io/api/core/v1"
kubeErr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -35,15 +38,16 @@ import (
kube "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"path/filepath"
"sigs.k8s.io/controller-runtime/pkg/client"
"text/template"
)
const (
// Pending representative the cluster is being created.
Pending = "pending"
// Success representative the cluster cluster has been created successfully.
Success = "success"
Failed = "failed"
// Failed creation failure.
Failed = "failed"
)
var (
@ -77,6 +81,8 @@ func generateClusterKubeSphere(name string, kubeconfig string, enable, joinFedra
"JoinFedration": joinFedration,
})
}
// CheckClusterRole is used to check the cluster's role (host or member).
func CheckClusterRole() (bool, *rest.Config, error) {
// creates the in-cluster config
config, err := rest.InClusterConfig()
@ -97,6 +103,7 @@ func CheckClusterRole() (bool, *rest.Config, error) {
}
return hostClusterFlag, config, nil
}
func newKubeSphereCluster(r *ClusterReconciler, c *kubekeyapiv1alpha1.Cluster) error {
if hostClusterFlag, config, err := CheckClusterRole(); err != nil {
return err
@ -122,6 +129,7 @@ func newKubeSphereCluster(r *ClusterReconciler, c *kubekeyapiv1alpha1.Cluster) e
return nil
}
// UpdateKubeSphereCluster is used to update the cluster object of KubeSphere's multicluster.
func UpdateKubeSphereCluster(mgr *manager.Manager) error {
if hostClusterFlag, config, err := CheckClusterRole(); err != nil {
return err
@ -137,7 +145,8 @@ func UpdateKubeSphereCluster(mgr *manager.Manager) error {
return nil
}
func KubekeyClient() (*kubekeyclientset.Clientset, error) {
// NewKubekeyClient is used to create a kubekey cluster client.
func NewKubekeyClient() (*kubekeyclientset.Clientset, error) {
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
@ -150,7 +159,7 @@ func KubekeyClient() (*kubekeyclientset.Clientset, error) {
}
func getCluster(name string) (*kubekeyapiv1alpha1.Cluster, error) {
clientset, err := KubekeyClient()
clientset, err := NewKubekeyClient()
if err != nil {
return nil, err
}
@ -161,6 +170,7 @@ func getCluster(name string) (*kubekeyapiv1alpha1.Cluster, error) {
return clusterObj, nil
}
// UpdateClusterConditions is used for updating cluster installation process information or adding nodes.
func UpdateClusterConditions(mgr *manager.Manager, step string, startTime, endTime metav1.Time, status bool, index int) error {
condition := kubekeyapiv1alpha1.Condition{
Step: step,
@ -187,6 +197,7 @@ func UpdateClusterConditions(mgr *manager.Manager, step string, startTime, endTi
return nil
}
// UpdateStatus is used to update status for a new or expanded cluster.
func UpdateStatus(mgr *manager.Manager) error {
cluster, err := getCluster(mgr.ObjName)
if err != nil {
@ -271,6 +282,7 @@ func nodeForCluster(name string, labels map[string]string) *corev1.Node {
return node
}
// CreateNodeForCluster is used to create new nodes for the cluster to be add nodes.
func CreateNodeForCluster(mgr *manager.Manager) error {
clientsetForCluster, err := getClusterClientSet(mgr)
if err != nil && !kubeErr.IsNotFound(err) {
@ -304,6 +316,7 @@ func CreateNodeForCluster(mgr *manager.Manager) error {
return nil
}
// PatchNodeImportStatus is used to update new node's status.
func PatchNodeImportStatus(mgr *manager.Manager, status string) error {
clientsetForCluster, err := getClusterClientSet(mgr)
if err != nil && !kubeErr.IsNotFound(err) {
@ -326,3 +339,45 @@ func PatchNodeImportStatus(mgr *manager.Manager, status string) error {
}
return nil
}
// SaveKubeConfig is used to save the kubeconfig for the new cluster.
func SaveKubeConfig(mgr *manager.Manager) error {
// creates the in-cluster config
inClusterConfig, err := rest.InClusterConfig()
if err != nil {
return err
}
// creates the clientset
clientset, err := kube.NewForConfig(inClusterConfig)
if err != nil {
return err
}
cmClientset := clientset.CoreV1().ConfigMaps("kubekey-system")
if _, err := cmClientset.Get(context.TODO(), fmt.Sprintf("%s-kubeconfig", mgr.ObjName), metav1.GetOptions{}); err != nil {
if kubeErr.IsNotFound(err) {
cmClientset.Create(context.TODO(), configMapForKubeconfig(mgr), metav1.CreateOptions{})
} else {
return err
}
} else {
kubeconfigStr := fmt.Sprintf(`{"kubeconfig": "%s"}`, mgr.Kubeconfig)
cmClientset.Patch(context.TODO(), mgr.ObjName, types.ApplyPatchType, []byte(kubeconfigStr), metav1.PatchOptions{})
}
// clientset.CoreV1().ConfigMaps("kubekey-system").Create(context.TODO(), kubeconfigConfigMap, metav1.CreateOptions{}
return nil
}
// configMapForKubeconfig is used to generate configmap scheme for cluster's kubeconfig.
func configMapForKubeconfig(mgr *manager.Manager) *corev1.ConfigMap {
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-kubeconfig", mgr.ObjName),
},
Data: map[string]string{
"kubeconfig": mgr.Kubeconfig,
},
}
return cm
}

4
go.mod
View File

@ -9,7 +9,7 @@ require (
github.com/lithammer/dedent v1.1.0
github.com/mitchellh/mapstructure v1.1.2
github.com/modood/table v0.0.0-20200225102042-88de94bb9876
github.com/onsi/ginkgo v1.14.2
github.com/onsi/ginkgo v1.12.1
github.com/onsi/gomega v1.10.1
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.12.0
@ -24,6 +24,8 @@ require (
k8s.io/apimachinery v0.18.8
k8s.io/cli-runtime v0.18.8
k8s.io/client-go v0.18.8
k8s.io/code-generator v0.18.8
k8s.io/klog/v2 v2.0.0
k8s.io/kubectl v0.18.8
rsc.io/letsencrypt v0.0.3 // indirect
sigs.k8s.io/controller-runtime v0.6.2

2
go.sum
View File

@ -1077,6 +1077,8 @@ k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0 h1:Foj74zO6RbjjP4hBEKjnYtjjAhGg4jNynUdYF6fJrok=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.5.0 h1:8mOnjf1RmUPW6KRqQCfYSZq/K20Unmp3IhuZUhxl8KI=
k8s.io/klog/v2 v2.5.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6 h1:Oh3Mzx5pJ+yIumsAD0MOECPVeXsVot0UkiaCGVyfGQY=
k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6/go.mod h1:GRQhZsXIAJ1xR0C9bd8UpWHZ5plfAS9fzPjJuQ6JL3E=
k8s.io/kubectl v0.18.4/go.mod h1:EzB+nfeUWk6fm6giXQ8P4Fayw3dsN+M7Wjy23mTRtB0=

22
hack/tools.go Normal file
View File

@ -0,0 +1,22 @@
// +build tools
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This package imports things required by build scripts, to force `go mod` to see them as dependencies
package tools
import _ "k8s.io/code-generator"

21
hack/update-codegen.sh Executable file
View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
# generate the code with:
# --output-base because this script should also be able to run inside the vendor dir of
# k8s.io/kubernetes. The output-base is needed for the generators to output into the vendor dir
# instead of the $GOPATH directly. For normal projects this can be dropped.
bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
github.com/kubesphere/kubekey/clients github.com/kubesphere/kubekey/apis \
kubekey:v1alpha1 \
--output-base "$(dirname "${BASH_SOURCE[0]}")/../../.." \
--go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
# To use your own boilerplate text append:
# --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt

34
hack/verify-codegen.sh Executable file
View File

@ -0,0 +1,34 @@
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
DIFFROOT="${SCRIPT_ROOT}/pkg"
TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg"
_tmp="${SCRIPT_ROOT}/_tmp"
cleanup() {
rm -rf "${_tmp}"
}
trap "cleanup" EXIT SIGINT
cleanup
mkdir -p "${TMP_DIFFROOT}"
cp -a "${DIFFROOT}"/* "${TMP_DIFFROOT}"
"${SCRIPT_ROOT}/hack/update-codegen.sh"
echo "diffing ${DIFFROOT} against freshly generated codegen"
ret=0
diff -Naupr "${DIFFROOT}" "${TMP_DIFFROOT}" || ret=$?
cp -a "${TMP_DIFFROOT}"/* "${DIFFROOT}"
if [[ $ret -eq 0 ]]
then
echo "${DIFFROOT} up to date."
else
echo "${DIFFROOT} is out of date. Please run hack/update-codegen.sh"
exit 1
fi

View File

@ -63,7 +63,7 @@ func CreateCluster(clusterCfgFile, k8sVersion, ksVersion string, logger *log.Log
var clientset *kubekeyclientset.Clientset
if inCluster {
c, err := kubekeycontroller.KubekeyClient()
c, err := kubekeycontroller.NewKubekeyClient()
if err != nil {
return err
}
@ -134,6 +134,9 @@ Please check the result using the command:
if err := kubekeycontroller.UpdateKubeSphereCluster(mgr); err != nil {
return err
}
if err := kubekeycontroller.SaveKubeConfig(mgr); err != nil {
return err
}
}
}
mgr.Logger.Infoln("Congratulations! Installation is successful.")

View File

@ -17,9 +17,10 @@ limitations under the License.
package ssh
import (
kubekeyapiv1alpha1 "github.com/kubesphere/kubekey/apis/kubekey/v1alpha1"
"sync"
"time"
kubekeyapiv1alpha1 "github.com/kubesphere/kubekey/apis/kubekey/v1alpha1"
)
type Dialer struct {
@ -39,7 +40,7 @@ func (dialer *Dialer) Connect(host kubekeyapiv1alpha1.HostCfg) (Connection, erro
dialer.lock.Lock()
defer dialer.lock.Unlock()
conn, _ := dialer.connections[host.ID]
conn, _ := dialer.connections[host.Index]
opts := Cfg{
Username: host.User,
@ -54,7 +55,7 @@ func (dialer *Dialer) Connect(host kubekeyapiv1alpha1.HostCfg) (Connection, erro
if err != nil {
return nil, err
}
dialer.connections[host.ID] = conn
dialer.connections[host.Index] = conn
return conn, nil
}