mirror of
https://github.com/kubesphere/kubekey.git
synced 2025-12-28 20:22:45 +00:00
207 lines
9.0 KiB
Go
207 lines
9.0 KiB
Go
/*
|
|
Copyright 2022 The KubeSphere Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package controllers
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
|
|
"github.com/pkg/errors"
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
kerrors "k8s.io/apimachinery/pkg/util/errors"
|
|
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
|
|
"sigs.k8s.io/cluster-api/util/collections"
|
|
"sigs.k8s.io/cluster-api/util/conditions"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
|
|
infracontrolplanev1 "github.com/kubesphere/kubekey/controlplane/k3s/api/v1beta1"
|
|
k3sCluster "github.com/kubesphere/kubekey/controlplane/k3s/pkg/cluster"
|
|
)
|
|
|
|
func (r *K3sControlPlaneReconciler) initializeControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *infracontrolplanev1.K3sControlPlane, controlPlane *k3sCluster.ControlPlane) (ctrl.Result, error) {
|
|
logger := controlPlane.Logger()
|
|
|
|
// Perform an uncached read of all the owned machines. This check is in place to make sure
|
|
// that the controller cache is not misbehaving and we end up initializing the cluster more than once.
|
|
ownedMachines, err := r.managementClusterUncached.GetMachinesForCluster(ctx, cluster, collections.OwnedMachines(kcp))
|
|
if err != nil {
|
|
logger.Error(err, "failed to perform an uncached read of control plane machines for cluster")
|
|
return ctrl.Result{}, err
|
|
}
|
|
if len(ownedMachines) > 0 {
|
|
return ctrl.Result{}, errors.Errorf(
|
|
"control plane has already been initialized, found %d owned machine for cluster %s/%s: controller cache or management cluster is misbehaving",
|
|
len(ownedMachines), cluster.Namespace, cluster.Name,
|
|
)
|
|
}
|
|
|
|
bootstrapSpec := controlPlane.InitialControlPlaneConfig()
|
|
fd := controlPlane.NextFailureDomainForScaleUp()
|
|
if err := r.cloneConfigsAndGenerateMachine(ctx, cluster, kcp, bootstrapSpec, fd); err != nil {
|
|
logger.Error(err, "Failed to create initial control plane Machine")
|
|
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedInitialization", "Failed to create initial control plane Machine for cluster %s/%s control plane: %v", cluster.Namespace, cluster.Name, err)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// Requeue the control plane, in case there are additional operations to perform
|
|
return ctrl.Result{Requeue: true}, nil
|
|
}
|
|
|
|
func (r *K3sControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *infracontrolplanev1.K3sControlPlane, controlPlane *k3sCluster.ControlPlane) (ctrl.Result, error) {
|
|
logger := controlPlane.Logger()
|
|
|
|
// Run preflight checks to ensure that the control plane is stable before proceeding with a scale up/scale down operation; if not, wait.
|
|
if result, err := r.preflightChecks(ctx, controlPlane); err != nil || !result.IsZero() {
|
|
return result, err
|
|
}
|
|
|
|
// Create the bootstrap configuration
|
|
bootstrapSpec := controlPlane.JoinControlPlaneConfig()
|
|
fd := controlPlane.NextFailureDomainForScaleUp()
|
|
if err := r.cloneConfigsAndGenerateMachine(ctx, cluster, kcp, bootstrapSpec, fd); err != nil {
|
|
logger.Error(err, "Failed to create additional control plane Machine")
|
|
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedScaleUp", "Failed to create additional control plane Machine for cluster %s/%s control plane: %v", cluster.Namespace, cluster.Name, err)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// Requeue the control plane, in case there are other operations to perform
|
|
return ctrl.Result{Requeue: true}, nil
|
|
}
|
|
|
|
func (r *K3sControlPlaneReconciler) scaleDownControlPlane(
|
|
ctx context.Context,
|
|
cluster *clusterv1.Cluster,
|
|
kcp *infracontrolplanev1.K3sControlPlane,
|
|
controlPlane *k3sCluster.ControlPlane,
|
|
outdatedMachines collections.Machines,
|
|
) (ctrl.Result, error) {
|
|
logger := controlPlane.Logger()
|
|
|
|
// Pick the Machine that we should scale down.
|
|
machineToDelete, err := selectMachineForScaleDown(controlPlane, outdatedMachines)
|
|
if err != nil {
|
|
return ctrl.Result{}, errors.Wrap(err, "failed to select machine for scale down")
|
|
}
|
|
|
|
// Run preflight checks ensuring the control plane is stable before proceeding with a scale up/scale down operation; if not, wait.
|
|
// Given that we're scaling down, we can exclude the machineToDelete from the preflight checks.
|
|
if result, err := r.preflightChecks(ctx, controlPlane, machineToDelete); err != nil || !result.IsZero() {
|
|
return result, err
|
|
}
|
|
|
|
if machineToDelete == nil {
|
|
logger.Info("Failed to pick control plane Machine to delete")
|
|
return ctrl.Result{}, errors.New("failed to pick control plane Machine to delete")
|
|
}
|
|
|
|
logger = logger.WithValues("machine", machineToDelete.Name)
|
|
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
|
|
logger.Error(err, "Failed to delete control plane machine")
|
|
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedScaleDown",
|
|
"Failed to delete control plane Machine %s for cluster %s/%s control plane: %v", machineToDelete.Name, cluster.Namespace, cluster.Name, err)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// Requeue the control plane, in case there are additional operations to perform
|
|
return ctrl.Result{Requeue: true}, nil
|
|
}
|
|
|
|
// preflightChecks checks if the control plane is stable before proceeding with a scale up/scale down operation,
|
|
// where stable means that:
|
|
// - There are no machine deletion in progress
|
|
// - All the health conditions on KCP are true.
|
|
// - All the health conditions on the control plane machines are true.
|
|
// If the control plane is not passing preflight checks, it requeue.
|
|
//
|
|
// NOTE: this func uses KCP conditions, it is required to call reconcileControlPlaneConditions before this.
|
|
func (r *K3sControlPlaneReconciler) preflightChecks(_ context.Context, controlPlane *k3sCluster.ControlPlane, excludeFor ...*clusterv1.Machine) (ctrl.Result, error) { //nolint:unparam
|
|
logger := controlPlane.Logger()
|
|
|
|
// If there is no KCP-owned control-plane machines, then control-plane has not been initialized yet,
|
|
// so it is considered ok to proceed.
|
|
if controlPlane.Machines.Len() == 0 {
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// If there are deleting machines, wait for the operation to complete.
|
|
if controlPlane.HasDeletingMachine() {
|
|
logger.Info("Waiting for machines to be deleted", "Machines", strings.Join(controlPlane.Machines.Filter(collections.HasDeletionTimestamp).Names(), ", "))
|
|
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
|
|
}
|
|
|
|
// Check machine health conditions; if there are conditions with False or Unknown, then wait.
|
|
allMachineHealthConditions := []clusterv1.ConditionType{
|
|
infracontrolplanev1.MachineAgentHealthyCondition,
|
|
}
|
|
var machineErrors []error
|
|
|
|
loopmachines:
|
|
for _, machine := range controlPlane.Machines {
|
|
for _, excluded := range excludeFor {
|
|
// If this machine should be excluded from the individual
|
|
// health check, continue the out loop.
|
|
if machine.Name == excluded.Name {
|
|
continue loopmachines
|
|
}
|
|
}
|
|
|
|
for _, condition := range allMachineHealthConditions {
|
|
if err := preflightCheckCondition("machine", machine, condition); err != nil {
|
|
machineErrors = append(machineErrors, err)
|
|
}
|
|
}
|
|
}
|
|
if len(machineErrors) > 0 {
|
|
aggregatedError := kerrors.NewAggregate(machineErrors)
|
|
r.recorder.Eventf(controlPlane.KCP, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
|
|
"Waiting for control plane to pass preflight checks to continue reconciliation: %v", aggregatedError)
|
|
logger.Info("Waiting for control plane to pass preflight checks", "failures", aggregatedError.Error())
|
|
|
|
return ctrl.Result{RequeueAfter: preflightFailedRequeueAfter}, nil
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func preflightCheckCondition(kind string, obj conditions.Getter, condition clusterv1.ConditionType) error {
|
|
c := conditions.Get(obj, condition)
|
|
if c == nil {
|
|
return errors.Errorf("%s %s does not have %s condition", kind, obj.GetName(), condition)
|
|
}
|
|
if c.Status == corev1.ConditionFalse {
|
|
return errors.Errorf("%s %s reports %s condition is false (%s, %s)", kind, obj.GetName(), condition, c.Severity, c.Message)
|
|
}
|
|
if c.Status == corev1.ConditionUnknown {
|
|
return errors.Errorf("%s %s reports %s condition is unknown (%s)", kind, obj.GetName(), condition, c.Message)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func selectMachineForScaleDown(controlPlane *k3sCluster.ControlPlane, outdatedMachines collections.Machines) (*clusterv1.Machine, error) {
|
|
machines := controlPlane.Machines
|
|
switch {
|
|
case controlPlane.MachineWithDeleteAnnotation(outdatedMachines).Len() > 0:
|
|
machines = controlPlane.MachineWithDeleteAnnotation(outdatedMachines)
|
|
case controlPlane.MachineWithDeleteAnnotation(machines).Len() > 0:
|
|
machines = controlPlane.MachineWithDeleteAnnotation(machines)
|
|
case outdatedMachines.Len() > 0:
|
|
machines = outdatedMachines
|
|
}
|
|
return controlPlane.MachineInFailureDomainWithMostMachines(machines)
|
|
}
|