Merge pull request #328 from pixiake/devel

update controller
This commit is contained in:
KubeSphere CI Bot 2020-10-07 22:14:54 +08:00 committed by GitHub
commit 559eddce3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 286 additions and 8 deletions

View File

@ -67,10 +67,10 @@ type NodeStatus struct {
}
type Condition struct {
Step string `json:"step,omitempty"`
StartTime string `json:"startTime,omitempty"`
EndTime string `json:"endTime,omitempty"`
Status bool `json:"status,omitempty"`
Step string `json:"step,omitempty"`
StartTime metav1.Time `json:"startTime,omitempty"`
EndTime metav1.Time `json:"endTime,omitempty"`
Status bool `json:"status,omitempty"`
}
// +kubebuilder:object:root=true

View File

@ -180,7 +180,9 @@ func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) {
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]Condition, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
@ -197,6 +199,8 @@ func (in *ClusterStatus) DeepCopy() *ClusterStatus {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Condition) DeepCopyInto(out *Condition) {
*out = *in
in.StartTime.DeepCopyInto(&out.StartTime)
in.EndTime.DeepCopyInto(&out.EndTime)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Condition.
@ -244,6 +248,21 @@ func (in *ExternalEtcd) DeepCopy() *ExternalEtcd {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FlannelCfg) DeepCopyInto(out *FlannelCfg) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlannelCfg.
func (in *FlannelCfg) DeepCopy() *FlannelCfg {
if in == nil {
return nil
}
out := new(FlannelCfg)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *HostCfg) DeepCopyInto(out *HostCfg) {
*out = *in
@ -343,6 +362,7 @@ func (in *Kubernetes) DeepCopy() *Kubernetes {
func (in *NetworkConfig) DeepCopyInto(out *NetworkConfig) {
*out = *in
out.Calico = in.Calico
out.Flannel = in.Flannel
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NetworkConfig.

View File

@ -157,6 +157,11 @@ spec:
vxlanMode:
type: string
type: object
flannel:
properties:
backendMode:
type: string
type: object
kubePodsCIDR:
type: string
kubeServiceCIDR:
@ -200,8 +205,10 @@ spec:
items:
properties:
endTime:
format: date-time
type: string
startTime:
format: date-time
type: string
status:
type: boolean

View File

@ -1,2 +1,8 @@
resources:
- manager.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: pixiake/kubekey
newTag: test

View File

@ -22,6 +22,7 @@ spec:
labels:
control-plane: controller-manager
spec:
hostNetwork: true
containers:
- command:
- /manager

View File

@ -6,6 +6,18 @@ metadata:
creationTimestamp: null
name: manager-role
rules:
- apiGroups:
- ""
resources:
- secrets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- kubekey.kubesphere.io
resources:

View File

@ -18,6 +18,16 @@ package controllers
import (
"context"
"github.com/kubesphere/kubekey/pkg/cluster/etcd"
"github.com/kubesphere/kubekey/pkg/cluster/kubernetes"
"github.com/kubesphere/kubekey/pkg/cluster/preinstall"
"github.com/kubesphere/kubekey/pkg/container-engine/docker"
"github.com/kubesphere/kubekey/pkg/plugins/network"
"github.com/kubesphere/kubekey/pkg/util"
"github.com/kubesphere/kubekey/pkg/util/executor"
"github.com/kubesphere/kubekey/pkg/util/manager"
"github.com/pkg/errors"
kubeErr "k8s.io/apimachinery/pkg/api/errors"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
@ -36,12 +46,32 @@ type ClusterReconciler struct {
// +kubebuilder:rbac:groups=kubekey.kubesphere.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kubekey.kubesphere.io,resources=clusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
func (r *ClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
_ = context.Background()
_ = r.Log.WithValues("cluster", req.NamespacedName)
ctx := context.Background()
log := r.Log.WithValues("cluster", req.NamespacedName)
logger := util.InitLogger(true)
// your logic here
cluster := &kubekeyv1alpha1.Cluster{}
newExecutor := executor.NewExecutor(&cluster.Spec, logger, "", true, true, false, false)
err := r.Get(ctx, req.NamespacedName, cluster)
if err != nil {
if kubeErr.IsNotFound(err) {
log.Info("Cluster resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get Cluster")
return ctrl.Result{}, err
}
if len(cluster.Status.Conditions) == 0 {
// install
if err := installTasks(r, ctx, cluster, newExecutor); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
@ -51,3 +81,44 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&kubekeyv1alpha1.Cluster{}).
Complete(r)
}
func runTasks(mgr *manager.Manager, tasks []manager.Task) error {
for _, step := range tasks {
if err := step.Run(mgr); err != nil {
return errors.Wrap(err, step.ErrMsg)
}
}
return nil
}
var (
initNodesTasks = []manager.Task{
{Task: preinstall.DownloadBinaries, ErrMsg: "Failed to download kube binaries"},
{Task: preinstall.InitOS, ErrMsg: "Failed to init OS"},
{Task: docker.InstallerDocker, ErrMsg: "Failed to install docker"},
}
pullImagesTaks = []manager.Task{
{Task: preinstall.PrePullImages, ErrMsg: "Failed to pre-pull images"},
}
initEtcdClusterTasks = []manager.Task{
{Task: etcd.GenerateEtcdCerts, ErrMsg: "Failed to generate etcd certs"},
{Task: etcd.SyncEtcdCertsToMaster, ErrMsg: "Failed to sync etcd certs"},
{Task: etcd.GenerateEtcdService, ErrMsg: "Failed to create etcd service"},
{Task: etcd.SetupEtcdCluster, ErrMsg: "Failed to start etcd cluster"},
{Task: etcd.RefreshEtcdConfig, ErrMsg: "Failed to refresh etcd configuration"},
{Task: etcd.BackupEtcd, ErrMsg: "Failed to backup etcd data"},
}
initControlPlaneTasks = []manager.Task{
{Task: kubernetes.GetClusterStatus, ErrMsg: "Failed to get cluster status"},
{Task: kubernetes.InstallKubeBinaries, ErrMsg: "Failed to install kube binaries"},
{Task: kubernetes.InitKubernetesCluster, ErrMsg: "Failed to init kubernetes cluster"},
{Task: network.DeployNetworkPlugin, ErrMsg: "Failed to deploy network plugin"},
}
joinNodesTasks = []manager.Task{
{Task: kubernetes.JoinNodesToCluster, ErrMsg: "Failed to join node"},
}
)

161
controllers/install.go Normal file
View File

@ -0,0 +1,161 @@
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
kubekeyv1alpha1 "github.com/kubesphere/kubekey/api/v1alpha1"
"github.com/kubesphere/kubekey/pkg/util/executor"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func installTasks(r *ClusterReconciler, ctx context.Context, cluster *kubekeyv1alpha1.Cluster, executor *executor.Executor) error {
mgr, err := executor.CreateManager()
if err != nil {
return err
}
// init nodes
initNodesCondition := kubekeyv1alpha1.Condition{
Step: "Init nodes",
StartTime: metav1.Now(),
EndTime: metav1.Now(),
Status: false,
}
cluster.Status.Conditions = append(cluster.Status.Conditions, initNodesCondition)
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
if err := runTasks(mgr, initNodesTasks); err != nil {
return err
}
cluster.Status.Conditions[0].EndTime = metav1.Now()
cluster.Status.Conditions[0].Status = true
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
// pull images
pullImagesCondition := kubekeyv1alpha1.Condition{
Step: "Pull images",
StartTime: metav1.Now(),
EndTime: metav1.Now(),
Status: false,
}
cluster.Status.Conditions = append(cluster.Status.Conditions, pullImagesCondition)
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
if err := runTasks(mgr, pullImagesTaks); err != nil {
return err
}
cluster.Status.Conditions[1].EndTime = metav1.Now()
cluster.Status.Conditions[1].Status = true
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
// init etcd cluster
initEtcdClusterCondition := kubekeyv1alpha1.Condition{
Step: "init etcd cluster",
StartTime: metav1.Now(),
EndTime: metav1.Now(),
Status: false,
}
cluster.Status.Conditions = append(cluster.Status.Conditions, initEtcdClusterCondition)
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
if err := runTasks(mgr, initEtcdClusterTasks); err != nil {
return err
}
cluster.Status.Conditions[2].EndTime = metav1.Now()
cluster.Status.Conditions[2].Status = true
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
// init controle plane
initControlPlaneCondition := kubekeyv1alpha1.Condition{
Step: "init control plane",
StartTime: metav1.Now(),
EndTime: metav1.Now(),
Status: false,
}
cluster.Status.Conditions = append(cluster.Status.Conditions, initControlPlaneCondition)
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
if err := runTasks(mgr, initControlPlaneTasks); err != nil {
return err
}
cluster.Status.Conditions[3].EndTime = metav1.Now()
cluster.Status.Conditions[3].Status = true
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
// join nodes
joinNodesCondition := kubekeyv1alpha1.Condition{
Step: "join nodes",
StartTime: metav1.Now(),
EndTime: metav1.Now(),
Status: false,
}
cluster.Status.Conditions = append(cluster.Status.Conditions, joinNodesCondition)
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
if err := runTasks(mgr, joinNodesTasks); err != nil {
return err
}
cluster.Status.Conditions[4].EndTime = metav1.Now()
cluster.Status.Conditions[4].Status = true
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
cluster.Status.Version = mgr.Cluster.Kubernetes.Version
cluster.Status.NodesCount = len(mgr.AllNodes)
cluster.Status.MasterCount = len(mgr.MasterNodes)
cluster.Status.WorkerCount = len(mgr.WorkerNodes)
cluster.Status.EtcdCount = len(mgr.EtcdNodes)
cluster.Status.NetworkPlugin = mgr.Cluster.Network.Plugin
for _, node := range mgr.AllNodes {
cluster.Status.Nodes = append(cluster.Status.Nodes, kubekeyv1alpha1.NodeStatus{
InternalIP: node.InternalAddress,
Hostname: node.Name,
Roles: map[string]bool{"etcd": node.IsEtcd, "master": node.IsMaster, "worker": node.IsWorker},
})
}
if err := r.Status().Update(ctx, cluster); err != nil {
return err
}
return nil
}