kubekey/pkg/controllers/pipeline_controller.go
II 865913fea9
feat: add more golangci lint rule. (#2366)
Signed-off-by: joyceliu <joyceliu@yunify.com>
Co-authored-by: joyceliu <joyceliu@yunify.com>
2024-08-21 16:33:36 +08:00

313 lines
9.5 KiB
Go

/*
Copyright 2023 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"os"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
ctrlcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrlfinalizer "sigs.k8s.io/controller-runtime/pkg/finalizer"
kkcorev1 "github.com/kubesphere/kubekey/v4/pkg/apis/core/v1"
)
const (
// jobLabel set in job or cronJob. value is which pipeline belongs to.
jobLabel = "kubekey.kubesphere.io/pipeline"
defaultExecutorImage = "hub.kubesphere.com.cn/kubekey/executor:latest"
defaultPullPolicy = "IfNotPresent"
defaultServiceAccount = "kk-executor"
)
// PipelineReconciler reconcile pipeline
type PipelineReconciler struct {
*runtime.Scheme
ctrlclient.Client
record.EventRecorder
ctrlfinalizer.Finalizers
MaxConcurrentReconciles int
}
func (r PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// get pipeline
pipeline := &kkcorev1.Pipeline{}
err := r.Client.Get(ctx, req.NamespacedName, pipeline)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(5).InfoS("pipeline not found", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
if pipeline.DeletionTimestamp != nil {
klog.V(5).InfoS("pipeline is deleting", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return ctrl.Result{}, nil
}
switch pipeline.Status.Phase {
case "":
excepted := pipeline.DeepCopy()
pipeline.Status.Phase = kkcorev1.PipelinePhasePending
if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(excepted)); err != nil {
klog.V(5).ErrorS(err, "update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return ctrl.Result{}, err
}
case kkcorev1.PipelinePhasePending:
excepted := pipeline.DeepCopy()
pipeline.Status.Phase = kkcorev1.PipelinePhaseRunning
if err := r.Client.Status().Patch(ctx, pipeline, ctrlclient.MergeFrom(excepted)); err != nil {
klog.V(5).ErrorS(err, "update pipeline error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline))
return ctrl.Result{}, err
}
case kkcorev1.PipelinePhaseRunning:
return r.dealRunningPipeline(ctx, pipeline)
case kkcorev1.PipelinePhaseFailed:
// do nothing
case kkcorev1.PipelinePhaseSucceed:
// do nothing
}
return ctrl.Result{}, nil
}
func (r *PipelineReconciler) dealRunningPipeline(ctx context.Context, pipeline *kkcorev1.Pipeline) (ctrl.Result, error) {
if err := r.checkServiceAccount(ctx, *pipeline); err != nil {
return ctrl.Result{}, err
}
// check if job is exist
switch pipeline.Spec.JobSpec.Schedule {
case "": // pipeline will create job
jobs := &batchv1.JobList{}
if err := r.Client.List(ctx, jobs, ctrlclient.InNamespace(pipeline.Namespace), ctrlclient.MatchingLabels{
jobLabel: pipeline.Name,
}); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
} else if len(jobs.Items) != 0 {
// could find exist job
return ctrl.Result{}, nil
}
// create job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: pipeline.Name + "-",
Namespace: pipeline.Namespace,
Labels: map[string]string{
jobLabel: pipeline.Name,
},
},
Spec: r.GenerateJobSpec(*pipeline),
}
if err := controllerutil.SetControllerReference(pipeline, job, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err := r.Client.Create(ctx, job); err != nil {
return ctrl.Result{}, err
}
default: // pipeline will create cronJob
jobs := &batchv1.CronJobList{}
if err := r.Client.List(ctx, jobs, ctrlclient.InNamespace(pipeline.Namespace), ctrlclient.MatchingLabels{
jobLabel: pipeline.Name,
}); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
} else if len(jobs.Items) != 0 {
// could find exist cronJob
for _, job := range jobs.Items {
// update cronJob from pipeline, the pipeline status should always be running.
if pipeline.Spec.JobSpec.Suspend != job.Spec.Suspend {
cp := job.DeepCopy()
job.Spec.Suspend = pipeline.Spec.JobSpec.Suspend
// update pipeline status
if err := r.Client.Status().Patch(ctx, &job, ctrlclient.MergeFrom(cp)); err != nil {
klog.V(5).ErrorS(err, "update corn job error", "pipeline", ctrlclient.ObjectKeyFromObject(pipeline),
"cronJob", ctrlclient.ObjectKeyFromObject(&job))
}
}
}
return ctrl.Result{}, nil
}
// create cornJob
cornJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
GenerateName: pipeline.Name + "-",
Namespace: pipeline.Namespace,
Labels: map[string]string{
jobLabel: pipeline.Name,
},
},
Spec: batchv1.CronJobSpec{
Schedule: pipeline.Spec.JobSpec.Schedule,
JobTemplate: batchv1.JobTemplateSpec{
Spec: r.GenerateJobSpec(*pipeline),
},
Suspend: pipeline.Spec.JobSpec.Suspend,
SuccessfulJobsHistoryLimit: pipeline.Spec.JobSpec.SuccessfulJobsHistoryLimit,
FailedJobsHistoryLimit: pipeline.Spec.JobSpec.FailedJobsHistoryLimit,
},
}
if err := controllerutil.SetControllerReference(pipeline, cornJob, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err := r.Client.Create(ctx, cornJob); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// checkServiceAccount when ServiceAccount is not exist, create it.
func (r *PipelineReconciler) checkServiceAccount(ctx context.Context, pipeline kkcorev1.Pipeline) error {
// get ServiceAccount name for executor pod
saName, ok := os.LookupEnv("EXECUTOR_SERVICEACCOUNT")
if !ok {
saName = defaultServiceAccount
}
var sa = &corev1.ServiceAccount{}
if err := r.Client.Get(ctx, ctrlclient.ObjectKey{Namespace: pipeline.Namespace, Name: saName}, sa); err != nil {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "get service account", "pipeline", ctrlclient.ObjectKeyFromObject(&pipeline))
return err
}
// create sa
if err := r.Client.Create(ctx, &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{Name: saName, Namespace: pipeline.Namespace},
}); err != nil {
klog.ErrorS(err, "create service account error", "pipeline", ctrlclient.ObjectKeyFromObject(&pipeline))
return err
}
}
var rb = &rbacv1.ClusterRoleBinding{}
if err := r.Client.Get(ctx, ctrlclient.ObjectKey{Namespace: pipeline.Namespace, Name: saName}, rb); err != nil {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "create role binding error", "pipeline", ctrlclient.ObjectKeyFromObject(&pipeline))
return err
}
//create rolebinding
if err := r.Client.Create(ctx, &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{Namespace: pipeline.Namespace, Name: saName},
RoleRef: rbacv1.RoleRef{
APIGroup: rbacv1.GroupName,
Kind: "ClusterRole",
Name: saName,
},
Subjects: []rbacv1.Subject{
{
APIGroup: corev1.GroupName,
Kind: "ServiceAccount",
Name: saName,
Namespace: pipeline.Namespace,
},
},
}); err != nil {
klog.ErrorS(err, "create role binding error", "pipeline", ctrlclient.ObjectKeyFromObject(&pipeline))
return err
}
}
return nil
}
// GenerateJobSpec for pipeline
func (r *PipelineReconciler) GenerateJobSpec(pipeline kkcorev1.Pipeline) batchv1.JobSpec {
// get ServiceAccount name for executor pod
saName, ok := os.LookupEnv("EXECUTOR_SERVICEACCOUNT")
if !ok {
saName = defaultServiceAccount
}
// get image from env
image, ok := os.LookupEnv("EXECUTOR_IMAGE")
if !ok {
image = defaultExecutorImage
}
// get image from env
imagePullPolicy, ok := os.LookupEnv("EXECUTOR_IMAGE_PULLPOLICY")
if !ok {
imagePullPolicy = defaultPullPolicy
}
// create a job spec
jobSpec := batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
Completions: ptr.To[int32](1),
BackoffLimit: ptr.To[int32](0),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
ServiceAccountName: saName,
RestartPolicy: "Never",
Volumes: pipeline.Spec.JobSpec.Volumes,
Containers: []corev1.Container{
{
Name: "executor",
Image: image,
ImagePullPolicy: corev1.PullPolicy(imagePullPolicy),
Command: []string{"kk"},
Args: []string{"pipeline",
"--name", pipeline.Name,
"--namespace", pipeline.Namespace},
VolumeMounts: pipeline.Spec.JobSpec.VolumeMounts,
},
},
},
},
}
return jobSpec
}
// SetupWithManager sets up the controller with the Manager.
func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(ctrlcontroller.Options{
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
}).
For(&kkcorev1.Pipeline{}).
Complete(r)
}