kubekey/controllers/kkinstance_controller.go
24sama 6b5a0c6e52 feat: add k3s control-plane provider and bootstrap provider
Signed-off-by: 24sama <jacksama@foxmail.com>
2022-10-21 16:28:59 +08:00

451 lines
16 KiB
Go

/*
Copyright 2022 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
cutil "sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/predicates"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
infrav1 "github.com/kubesphere/kubekey/api/v1beta1"
"github.com/kubesphere/kubekey/pkg/clients/ssh"
"github.com/kubesphere/kubekey/pkg/scope"
"github.com/kubesphere/kubekey/pkg/service"
"github.com/kubesphere/kubekey/pkg/service/binary"
"github.com/kubesphere/kubekey/pkg/service/bootstrap"
"github.com/kubesphere/kubekey/pkg/service/containermanager"
"github.com/kubesphere/kubekey/pkg/service/provisioning"
"github.com/kubesphere/kubekey/pkg/service/repository"
"github.com/kubesphere/kubekey/pkg/util"
)
const (
defaultRequeueWait = 30 * time.Second
defaultKKInstanceInterval = 5 * time.Second
defaultKKInstanceTimeout = 10 * time.Minute
)
// KKInstanceReconciler reconciles a KKInstance object
type KKInstanceReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
sshClientFactory func(scope *scope.InstanceScope) ssh.Interface
bootstrapFactory func(sshClient ssh.Interface, scope scope.LBScope, instanceScope *scope.InstanceScope) service.Bootstrap
repositoryFactory func(sshClient ssh.Interface, scope scope.KKInstanceScope, instanceScope *scope.InstanceScope) service.Repository
binaryFactory func(sshClient ssh.Interface, scope scope.KKInstanceScope, instanceScope *scope.InstanceScope, distribution string) service.BinaryService
containerManagerFactory func(sshClient ssh.Interface, scope scope.KKInstanceScope, instanceScope *scope.InstanceScope) service.ContainerManager
provisioningFactory func(sshClient ssh.Interface, format bootstrapv1.Format) service.Provisioning
WatchFilterValue string
DataDir string
WaitKKInstanceInterval time.Duration
WaitKKInstanceTimeout time.Duration
}
func (r *KKInstanceReconciler) getSSHClient(scope *scope.InstanceScope) ssh.Interface {
if r.sshClientFactory != nil {
return r.sshClientFactory(scope)
}
return ssh.NewClient(scope.KKInstance.Spec.Address, scope.KKInstance.Spec.Auth, &scope.Logger)
}
func (r *KKInstanceReconciler) getBootstrapService(sshClient ssh.Interface, scope scope.LBScope, instanceScope *scope.InstanceScope) service.Bootstrap {
if r.bootstrapFactory != nil {
return r.bootstrapFactory(sshClient, scope, instanceScope)
}
return bootstrap.NewService(sshClient, scope, instanceScope)
}
func (r *KKInstanceReconciler) getRepositoryService(sshClient ssh.Interface, scope scope.KKInstanceScope, instanceScope *scope.InstanceScope) service.Repository {
if r.repositoryFactory != nil {
return r.repositoryFactory(sshClient, scope, instanceScope)
}
return repository.NewService(sshClient, scope, instanceScope)
}
func (r *KKInstanceReconciler) getBinaryService(sshClient ssh.Interface, scope scope.KKInstanceScope, instanceScope *scope.InstanceScope, distribution string) service.BinaryService {
if r.binaryFactory != nil {
return r.binaryFactory(sshClient, scope, instanceScope, distribution)
}
return binary.NewService(sshClient, scope, instanceScope, distribution)
}
func (r *KKInstanceReconciler) getContainerManager(sshClient ssh.Interface, scope scope.KKInstanceScope, instanceScope *scope.InstanceScope) service.ContainerManager {
if r.containerManagerFactory != nil {
return r.containerManagerFactory(sshClient, scope, instanceScope)
}
return containermanager.NewService(sshClient, scope, instanceScope)
}
func (r *KKInstanceReconciler) getProvisioningService(sshClient ssh.Interface, format bootstrapv1.Format) service.Provisioning {
if r.provisioningFactory != nil {
return r.provisioningFactory(sshClient, format)
}
return provisioning.NewService(sshClient, format)
}
// SetupWithManager sets up the controller with the Manager.
func (r *KKInstanceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
log := ctrl.LoggerFrom(ctx)
if r.WaitKKInstanceInterval.Nanoseconds() == 0 {
r.WaitKKInstanceInterval = defaultKKInstanceInterval
}
if r.WaitKKInstanceTimeout.Nanoseconds() == 0 {
r.WaitKKInstanceTimeout = defaultKKInstanceTimeout
}
c, err := ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
For(&infrav1.KKInstance{}).
Watches(
&source.Kind{Type: &infrav1.KKMachine{}},
handler.EnqueueRequestsFromMapFunc(r.KKMachineToKKInstanceMapFunc(log)),
).
Watches(
&source.Kind{Type: &infrav1.KKCluster{}},
handler.EnqueueRequestsFromMapFunc(r.KKClusterToKKInstances(log)),
).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(log, r.WatchFilterValue)).
WithEventFilter(
predicate.Funcs{
// Avoid reconciling if the event triggering the reconciliation is related to incremental status updates
// for KKInstance resources only
UpdateFunc: func(e event.UpdateEvent) bool {
log.V(5).Info("KKInstance controller update predicate")
if _, ok := e.ObjectOld.(*infrav1.KKInstance); !ok {
log.V(5).Info(fmt.Sprintf("gvk is %s, not equale KKInstance", e.ObjectOld.GetObjectKind().GroupVersionKind().Kind))
return true
}
oldInstance := e.ObjectOld.(*infrav1.KKInstance).DeepCopy()
newInstance := e.ObjectNew.(*infrav1.KKInstance).DeepCopy()
oldInstance.Status = infrav1.KKInstanceStatus{}
newInstance.Status = infrav1.KKInstanceStatus{}
oldInstance.ObjectMeta.ResourceVersion = ""
newInstance.ObjectMeta.ResourceVersion = ""
if cmp.Equal(oldInstance, newInstance) {
log.V(4).Info("oldInstance and newInstance are equaled, skip")
return false
}
log.V(4).Info("oldInstance and newInstance are not equaled, allowing further processing")
return true
},
},
).
Build(r)
if err != nil {
return err
}
err = c.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
handler.EnqueueRequestsFromMapFunc(r.requeueKKInstancesForUnpausedCluster(log)),
predicates.ClusterUnpausedAndInfrastructureReady(log),
)
if err != nil {
return err
}
return nil
}
//+kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=kkinstances,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=kkinstances/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=kkinstances/finalizers,verbs=update
func (r *KKInstanceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, retErr error) {
log := ctrl.LoggerFrom(ctx)
// Fetch the KKInstance.
kkInstance := &infrav1.KKInstance{}
err := r.Get(ctx, req.NamespacedName, kkInstance)
if err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// Fetch the KKMachine.
kkMachine, err := util.GetOwnerKKMachine(ctx, r.Client, kkInstance.ObjectMeta)
if err != nil {
return ctrl.Result{}, err
}
if kkMachine == nil {
log.Info("KKMachine Controller has not yet set OwnerRef")
return ctrl.Result{}, nil
}
log = log.WithValues("kkMachine", kkMachine.Name)
// Fetch the Machine.
machine, err := cutil.GetOwnerMachine(ctx, r.Client, kkMachine.ObjectMeta)
if err != nil {
return ctrl.Result{}, err
}
if machine == nil {
log.Info("Machine Controller has not yet set OwnerRef")
return ctrl.Result{}, nil
}
log = log.WithValues("machine", machine.Name)
// Fetch the Cluster.
cluster, err := cutil.GetClusterFromMetadata(ctx, r.Client, kkInstance.ObjectMeta)
if err != nil {
log.Info("KKInstance is missing cluster label or cluster does not exist")
return ctrl.Result{}, nil
}
if annotations.IsPaused(cluster, kkInstance) {
log.Info("KKInstance or linked Cluster is marked as paused. Won't reconcile")
return ctrl.Result{}, nil
}
log = log.WithValues("cluster", cluster.Name)
infraCluster, err := util.GetInfraCluster(ctx, r.Client, log, cluster, "kkinstance", r.DataDir)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "error getting infra provider cluster object")
}
if infraCluster == nil {
log.Info("KKCluster is not ready yet")
return ctrl.Result{}, nil
}
instanceScope, err := scope.NewInstanceScope(scope.InstanceScopeParams{
Client: r.Client,
Cluster: cluster,
Machine: machine,
InfraCluster: infraCluster,
KKInstance: kkInstance,
})
if err != nil {
log.Error(err, "failed to create instance scope")
return ctrl.Result{}, err
}
if err := r.reconcilePing(ctx, instanceScope); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to ping remote instance [%s]", kkInstance.Spec.Address)
}
// Always close the scope when exiting this function, so we can persist any KKInstance changes.
defer func() {
if err := instanceScope.Close(); err != nil && retErr == nil {
log.Error(err, "failed to patch object")
retErr = err
}
}()
if !kkInstance.ObjectMeta.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, instanceScope, infraCluster)
}
return r.reconcileNormal(ctx, instanceScope, infraCluster, infraCluster)
}
func (r *KKInstanceReconciler) reconcileDelete(ctx context.Context, instanceScope *scope.InstanceScope, lbScope scope.LBScope) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
log.V(4).Info("Reconcile KKInstance delete")
if conditions.Get(instanceScope.KKInstance, infrav1.KKInstanceDeletingBootstrapCondition) == nil {
conditions.MarkFalse(instanceScope.KKInstance, infrav1.KKInstanceDeletingBootstrapCondition,
infrav1.CleaningReason, clusterv1.ConditionSeverityInfo, "Cleaning the node before deletion")
}
if err := instanceScope.PatchObject(); err != nil {
instanceScope.Error(err, "unable to patch object")
return ctrl.Result{}, err
}
sshClient := r.getSSHClient(instanceScope)
if err := r.reconcileDeletingBootstrap(ctx, sshClient, instanceScope, lbScope); err != nil {
instanceScope.Error(err, "failed to reconcile deleting bootstrap")
return ctrl.Result{}, nil
}
conditions.MarkTrue(instanceScope.KKInstance, infrav1.KKInstanceDeletingBootstrapCondition)
instanceScope.SetState(infrav1.InstanceStateCleaned)
instanceScope.Info("Reconcile KKInstance delete successful")
controllerutil.RemoveFinalizer(instanceScope.KKInstance, infrav1.InstanceFinalizer)
return ctrl.Result{}, nil
}
func (r *KKInstanceReconciler) reconcileNormal(ctx context.Context, instanceScope *scope.InstanceScope, lbScope scope.LBScope, kkInstanceScope scope.KKInstanceScope) (ctrl.Result, error) {
instanceScope.Info("Reconcile KKInstance normal")
// If the KKInstance is in an error state, return early.
if instanceScope.HasFailed() {
instanceScope.Info("Error state detected, skipping reconciliation")
return ctrl.Result{}, nil
}
// If the KKMachine doesn't have our finalizer, add it.
controllerutil.AddFinalizer(instanceScope.KKInstance, infrav1.InstanceFinalizer)
// Register the finalizer after first read operation from KK to avoid orphaning KK resources on delete
if err := instanceScope.PatchObject(); err != nil {
instanceScope.Error(err, "unable to patch object")
return ctrl.Result{}, err
}
sshClient := r.getSSHClient(instanceScope)
phases := r.phaseFactory(kkInstanceScope)
for _, phase := range phases {
pollErr := wait.PollImmediate(r.WaitKKInstanceInterval, r.WaitKKInstanceTimeout, func() (done bool, err error) {
if err := phase(ctx, sshClient, instanceScope, kkInstanceScope, lbScope); err != nil {
return false, err
}
return true, nil
})
if pollErr != nil {
instanceScope.Error(pollErr, "failed to reconcile phase")
return ctrl.Result{RequeueAfter: defaultRequeueWait}, pollErr
}
}
instanceScope.SetState(infrav1.InstanceStateRunning)
instanceScope.Info("Reconcile KKInstance normal successful")
return ctrl.Result{}, nil
}
// KKClusterToKKInstances is a handler.ToRequestsFunc to be used to enqeue requests for reconciliation of KKInstance.
func (r *KKInstanceReconciler) KKClusterToKKInstances(log logr.Logger) handler.MapFunc {
log.V(4).Info("KKClusterToKKInstances")
return func(o client.Object) []ctrl.Request {
c, ok := o.(*infrav1.KKCluster)
if !ok {
panic(fmt.Sprintf("Expected a KKCluster but got a %T", o))
}
log := log.WithValues("objectMapper", "kkClusterToKKInstance", "namespace", c.Namespace, "kkCluster", c.Name)
// Don't handle deleted KKClusters
if !c.ObjectMeta.DeletionTimestamp.IsZero() {
log.V(4).Info("KKCluster has a deletion timestamp, skipping mapping.")
return nil
}
cluster, err := cutil.GetOwnerCluster(context.TODO(), r.Client, c.ObjectMeta)
switch {
case apierrors.IsNotFound(err) || cluster == nil:
log.V(4).Info("Cluster for KKCluster not found, skipping mapping.")
return nil
case err != nil:
log.Error(err, "Failed to get owning cluster, skipping mapping.")
return nil
}
return r.requestsForCluster(log, cluster.Namespace, cluster.Name)
}
}
func (r *KKInstanceReconciler) requeueKKInstancesForUnpausedCluster(log logr.Logger) handler.MapFunc {
log.V(4).Info("requeueKKInstancesForUnpausedCluster")
return func(o client.Object) []ctrl.Request {
c, ok := o.(*clusterv1.Cluster)
if !ok {
panic(fmt.Sprintf("Expected a Cluster but got a %T", o))
}
log := log.WithValues("objectMapper", "clusterToKKInstance", "namespace", c.Namespace, "cluster", c.Name)
// Don't handle deleted clusters
if !c.ObjectMeta.DeletionTimestamp.IsZero() {
log.V(4).Info("Cluster has a deletion timestamp, skipping mapping.")
return nil
}
return r.requestsForCluster(log, c.Namespace, c.Name)
}
}
func (r *KKInstanceReconciler) requestsForCluster(log logr.Logger, namespace, name string) []ctrl.Request {
labels := map[string]string{clusterv1.ClusterLabelName: name}
kkMachineList := &infrav1.KKMachineList{}
if err := r.Client.List(context.TODO(), kkMachineList, client.InNamespace(namespace), client.MatchingLabels(labels)); err != nil {
log.Error(err, "Failed to get owned Machines, skipping mapping.")
return nil
}
result := make([]ctrl.Request, 0, len(kkMachineList.Items))
for _, m := range kkMachineList.Items {
log.WithValues("kkmachine", m.Name)
if m.Spec.InstanceID == nil {
log.V(4).Info("KKMachine does not have a providerID, will not add to reconciliation request.")
continue
}
log.WithValues("kkInstance", m.Spec.InstanceID)
log.V(4).Info("Adding KKInstance to reconciliation request.")
result = append(result, ctrl.Request{NamespacedName: client.ObjectKey{Namespace: m.Namespace, Name: *m.Spec.InstanceID}})
}
return result
}
// KKMachineToKKInstanceMapFunc returns a handler.ToRequestsFunc that watches for
// KKMachine events and returns reconciliation requests for an KKInstance object.
func (r *KKInstanceReconciler) KKMachineToKKInstanceMapFunc(log logr.Logger) handler.MapFunc {
log.V(4).Info("KKMachineToKKInstanceMapFunc")
return func(o client.Object) []reconcile.Request {
m, ok := o.(*infrav1.KKMachine)
if !ok {
return nil
}
if m.Spec.InstanceID == nil {
return nil
}
return []reconcile.Request{
{
NamespacedName: client.ObjectKey{
Namespace: m.Namespace,
Name: *m.Spec.InstanceID,
},
},
}
}
}