From 935f5f34866439dbe3d12afaec992711aeefbac7 Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Tue, 23 Jun 2026 19:51:08 +0800 Subject: [PATCH] feat: implement skip toleration mechanism for batch rolling When a batch is skipped, record the gap between expected and actual replicas as toleration. Subsequent batches can complete with a deficit within the accumulated toleration. On the last batch, toleration is not applied and strict check is required since there is no subsequent batch to compensate. Co-Authored-By: Claude Opus 4.6 --- go.mod | 2 +- go.sum | 4 +- pkg/controllers/rolloutrun/executor/batch.go | 15 +- .../rolloutrun/executor/batch_test.go | 197 ++++++++++++++++ pkg/controllers/rolloutrun/executor/canary.go | 2 +- .../rolloutrun/executor/do_command.go | 45 +++- .../rolloutrun/executor/do_command_test.go | 218 ++++++++++++++++++ pkg/workload/info.go | 17 +- pkg/workload/info_test.go | 164 +++++++++++++ 9 files changed, 653 insertions(+), 11 deletions(-) create mode 100644 pkg/controllers/rolloutrun/executor/do_command_test.go create mode 100644 pkg/workload/info_test.go diff --git a/go.mod b/go.mod index 9d5fce4..864f0bf 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( k8s.io/klog/v2 v2.130.1 k8s.io/kubernetes v1.22.2 k8s.io/utils v0.0.0-20241210054802-24370beab758 - kusionstack.io/kube-api v0.7.5-0.20260512114711-9570d38337c2 + kusionstack.io/kube-api v0.7.5-0.20260623095733-5ed62841343a kusionstack.io/kube-utils v0.2.1-0.20251125083928-1134a582b341 kusionstack.io/resourceconsist v0.0.4 sigs.k8s.io/controller-runtime v0.21.0 diff --git a/go.sum b/go.sum index d2f1479..bc92822 100644 --- a/go.sum +++ b/go.sum @@ -1021,8 +1021,8 @@ k8s.io/sample-apiserver v0.22.2/go.mod h1:h+/DIV5EmuNq4vfPr5TSXy9mIBVXXlPAKQMPbj k8s.io/system-validators v1.5.0/go.mod h1:bPldcLgkIUK22ALflnsXk8pvkTEndYdNuaHH6gRrl0Q= k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -kusionstack.io/kube-api v0.7.5-0.20260512114711-9570d38337c2 h1:jrUVO6a6/fuwdfF8NnehVXGbiYV57AfIJvqnsYRB3sI= -kusionstack.io/kube-api v0.7.5-0.20260512114711-9570d38337c2/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.5-0.20260623095733-5ed62841343a h1:U7W5odmM823+jqcXLfkTbBkBzLoZ0VJxQ+XFER+WFz4= +kusionstack.io/kube-api v0.7.5-0.20260623095733-5ed62841343a/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-utils v0.2.1-0.20251125083928-1134a582b341 h1:dnMtHJvIpU3338WpqGiNN2qXWZFiXaoiuzR9jwhvWpg= kusionstack.io/kube-utils v0.2.1-0.20251125083928-1134a582b341/go.mod h1:Lz5SBYWg9+jw+kP0CAyf/b62D5DeUPf6+jE1d0WC4cI= kusionstack.io/resourceconsist v0.0.4 h1:wRqLJuNh8O4TT6p0uOklFpHUKiRdRxcAH71Sw/q9LhE= diff --git a/pkg/controllers/rolloutrun/executor/batch.go b/pkg/controllers/rolloutrun/executor/batch.go index 680e96c..9581fdd 100644 --- a/pkg/controllers/rolloutrun/executor/batch.go +++ b/pkg/controllers/rolloutrun/executor/batch.go @@ -217,7 +217,10 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat currentBatchExpectedReplicas, _ := workload.CalculateUpdatedReplicas(&status.Replicas, item.Replicas) - ready, reason := info.CheckUpdatedReady(currentBatchExpectedReplicas, isLastBatch) + // Find skip toleration for this workload + skipToleration := findSkipToleration(newStatus.BatchStatus.SkipTolerations, item.Cluster, item.Name) + + ready, reason := info.CheckUpdatedReady(currentBatchExpectedReplicas, isLastBatch, skipToleration) if ready { // if the target is ready, we will not change partition continue @@ -270,3 +273,13 @@ func (e *batchExecutor) calculateExpectedReplicasBySlidingWindow(status rolloutv expected = min(currentBatchExpectedReplicas, expected) return expected, nil } + +// findSkipToleration finds the accumulated skip toleration for the given workload +func findSkipToleration(skipTolerations []rolloutv1alpha1.WorkloadSkipToleration, cluster, name string) int32 { + for _, t := range skipTolerations { + if t.Cluster == cluster && t.Name == name { + return t.Toleration + } + } + return 0 +} diff --git a/pkg/controllers/rolloutrun/executor/batch_test.go b/pkg/controllers/rolloutrun/executor/batch_test.go index 69ca2b6..f69c3f7 100644 --- a/pkg/controllers/rolloutrun/executor/batch_test.go +++ b/pkg/controllers/rolloutrun/executor/batch_test.go @@ -318,6 +318,203 @@ func (s *batchExecutorTestSuite) Test_BatchExecutor_Do() { s.runBatchTestCases(tests) } +func (s *batchExecutorTestSuite) Test_BatchExecutor_Do_SkipToleration() { + tests := []batchExectorTestCase{ + { + name: "skip toleration allows batch to complete with deficit in middle batch", + getObjects: func() (*rolloutv1alpha1.Rollout, *rolloutv1alpha1.RolloutRun) { + rollout := s.rollout.DeepCopy() + rolloutRun := s.rolloutRun.DeepCopy() + + // 3 batches, currently on batch 2 (index 1), not the last batch + rolloutRun.Spec.Batch.Batches = []rolloutv1alpha1.RolloutRunStep{ + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(30)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(60)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(100)), + }}, + } + rolloutRun.Status.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing + rolloutRun.Status.BatchStatus = &rolloutv1alpha1.RolloutRunBatchStatus{ + RolloutBatchStatus: rolloutv1alpha1.RolloutBatchStatus{ + CurrentBatchIndex: 1, + CurrentBatchState: StepRunning, + }, + Records: []rolloutv1alpha1.RolloutRunStepStatus{ + {Index: ptr.To[int32](0), State: StepSkipped}, + {Index: ptr.To[int32](1), State: StepRunning, StartTime: ptr.To(metav1.Now())}, + {Index: ptr.To[int32](2), State: StepNone}, + }, + // Batch 0 was skipped: gap = 30 - 25 = 5 + SkipTolerations: []rolloutv1alpha1.WorkloadSkipToleration{ + {Cluster: "cluster-a", Name: "test-a", Toleration: 5}, + }, + } + return rollout, rolloutRun + }, + getWorkloads: func() []client.Object { + // UpdatedAvailableReplicas = 55, expected = 60, gap = 5 <= toleration(5) + return []client.Object{ + newFakeObject("cluster-a", "default", "test-a", 100, 55, 55), + } + }, + assertResult: func(done bool, result reconcile.Result, err error) { + s.Require().NoError(err) + s.False(done) // not all done, move to next batch + s.Equal(reconcile.Result{Requeue: true}, result) + }, + assertStatus: func(status *rolloutv1alpha1.RolloutRunStatus) { + s.Equal(StepPostBatchStepHook, status.BatchStatus.CurrentBatchState) + }, + }, + { + name: "skip toleration not enough, batch stays running", + getObjects: func() (*rolloutv1alpha1.Rollout, *rolloutv1alpha1.RolloutRun) { + rollout := s.rollout.DeepCopy() + rolloutRun := s.rolloutRun.DeepCopy() + + rolloutRun.Spec.Batch.Batches = []rolloutv1alpha1.RolloutRunStep{ + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(30)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(60)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(100)), + }}, + } + rolloutRun.Status.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing + rolloutRun.Status.BatchStatus = &rolloutv1alpha1.RolloutRunBatchStatus{ + RolloutBatchStatus: rolloutv1alpha1.RolloutBatchStatus{ + CurrentBatchIndex: 1, + CurrentBatchState: StepRunning, + }, + Records: []rolloutv1alpha1.RolloutRunStepStatus{ + {Index: ptr.To[int32](0), State: StepSkipped}, + {Index: ptr.To[int32](1), State: StepRunning, StartTime: ptr.To(metav1.Now())}, + {Index: ptr.To[int32](2), State: StepNone}, + }, + // gap = 60 - 52 = 8 > toleration(5) + SkipTolerations: []rolloutv1alpha1.WorkloadSkipToleration{ + {Cluster: "cluster-a", Name: "test-a", Toleration: 5}, + }, + } + return rollout, rolloutRun + }, + getWorkloads: func() []client.Object { + return []client.Object{ + newFakeObject("cluster-a", "default", "test-a", 100, 52, 52), + } + }, + assertResult: func(done bool, result reconcile.Result, err error) { + s.Require().NoError(err) + s.False(done) + s.Equal(reconcile.Result{RequeueAfter: retryDefault}, result) + }, + assertStatus: func(status *rolloutv1alpha1.RolloutRunStatus) { + s.Equal(StepRunning, status.BatchStatus.CurrentBatchState) + }, + }, + { + name: "skip toleration does not apply on last batch", + getObjects: func() (*rolloutv1alpha1.Rollout, *rolloutv1alpha1.RolloutRun) { + rollout := s.rollout.DeepCopy() + rolloutRun := s.rolloutRun.DeepCopy() + + rolloutRun.Spec.Batch.Batches = []rolloutv1alpha1.RolloutRunStep{ + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(30)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(60)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(100)), + }}, + } + rolloutRun.Status.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing + rolloutRun.Status.BatchStatus = &rolloutv1alpha1.RolloutRunBatchStatus{ + RolloutBatchStatus: rolloutv1alpha1.RolloutBatchStatus{ + CurrentBatchIndex: 2, + CurrentBatchState: StepRunning, + }, + Records: []rolloutv1alpha1.RolloutRunStepStatus{ + {Index: ptr.To[int32](0), State: StepSkipped}, + {Index: ptr.To[int32](1), State: StepSucceeded}, + {Index: ptr.To[int32](2), State: StepRunning, StartTime: ptr.To(metav1.Now())}, + }, + // Last batch: toleration should NOT apply + SkipTolerations: []rolloutv1alpha1.WorkloadSkipToleration{ + {Cluster: "cluster-a", Name: "test-a", Toleration: 5}, + }, + } + return rollout, rolloutRun + }, + getWorkloads: func() []client.Object { + // UpdatedAvailableReplicas = 96, expected = 100, gap = 4 <= toleration(5) + // but last batch, so toleration does NOT apply, needs strict check + return []client.Object{ + newFakeObject("cluster-a", "default", "test-a", 100, 96, 96), + } + }, + assertResult: func(done bool, result reconcile.Result, err error) { + s.Require().NoError(err) + s.False(done) + s.Equal(reconcile.Result{RequeueAfter: retryDefault}, result) + }, + assertStatus: func(status *rolloutv1alpha1.RolloutRunStatus) { + s.Equal(StepRunning, status.BatchStatus.CurrentBatchState) + }, + }, + { + name: "no skip toleration, behavior unchanged", + getObjects: func() (*rolloutv1alpha1.Rollout, *rolloutv1alpha1.RolloutRun) { + rollout := s.rollout.DeepCopy() + rolloutRun := s.rolloutRun.DeepCopy() + + rolloutRun.Spec.Batch.Batches = []rolloutv1alpha1.RolloutRunStep{ + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(10)), + }}, + } + rolloutRun.Status.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing + rolloutRun.Status.BatchStatus = &rolloutv1alpha1.RolloutRunBatchStatus{ + RolloutBatchStatus: rolloutv1alpha1.RolloutBatchStatus{ + CurrentBatchIndex: 0, + CurrentBatchState: StepRunning, + }, + Records: []rolloutv1alpha1.RolloutRunStepStatus{ + {Index: ptr.To[int32](0), State: StepRunning, StartTime: ptr.To(metav1.Now())}, + }, + SkipTolerations: nil, + } + return rollout, rolloutRun + }, + getWorkloads: func() []client.Object { + // UpdatedAvailableReplicas=8 < expected=10, no toleration -> not ready + return []client.Object{ + newFakeObject("cluster-a", "default", "test-a", 100, 8, 8), + } + }, + assertResult: func(done bool, result reconcile.Result, err error) { + s.Require().NoError(err) + s.False(done) + s.Equal(reconcile.Result{RequeueAfter: retryDefault}, result) + }, + assertStatus: func(status *rolloutv1alpha1.RolloutRunStatus) { + s.Equal(StepRunning, status.BatchStatus.CurrentBatchState) + }, + }, + } + + s.runBatchTestCases(tests) +} + func newRunStepTarget(cluster, name string, replicas intstr.IntOrString) rolloutv1alpha1.RolloutRunStepTarget { return newRunStepTargetWithSlidingWindow(cluster, name, replicas, nil) } diff --git a/pkg/controllers/rolloutrun/executor/canary.go b/pkg/controllers/rolloutrun/executor/canary.go index f465c7d..e9920a0 100644 --- a/pkg/controllers/rolloutrun/executor/canary.go +++ b/pkg/controllers/rolloutrun/executor/canary.go @@ -224,7 +224,7 @@ func (e *canaryExecutor) doCanary(ctx *ExecutorContext) (bool, time.Duration, er // 2.b. waiting canary workload ready for _, info := range canaryWorkloads { - if ready, _ := info.CheckUpdatedReady(info.Status.DesiredReplicas, false); !ready { + if ready, _ := info.CheckUpdatedReady(info.Status.DesiredReplicas, false, 0); !ready { // ready logger.Info("still waiting for canary target ready", "cluster", info.ClusterName, diff --git a/pkg/controllers/rolloutrun/executor/do_command.go b/pkg/controllers/rolloutrun/executor/do_command.go index 08a4123..719f8bd 100644 --- a/pkg/controllers/rolloutrun/executor/do_command.go +++ b/pkg/controllers/rolloutrun/executor/do_command.go @@ -4,6 +4,8 @@ import ( rolloutapis "kusionstack.io/kube-api/rollout" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" ctrl "sigs.k8s.io/controller-runtime" + + "kusionstack.io/rollout/pkg/workload" ) // doCommand @@ -28,18 +30,18 @@ func (r *Executor) doCommand(ctx *ExecutorContext) ctrl.Result { } case rolloutapis.AnnoManualCommandSkip: if batchError != nil { - handleBatchStatusWhenSkipped(newStatus, len(rolloutRun.Spec.Batch.Batches)) + handleBatchStatusWhenSkipped(newStatus, len(rolloutRun.Spec.Batch.Batches), rolloutRun.Spec.Batch.Batches, ctx.Workloads) } case rolloutapis.AnnoManualCommandCancel: newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseCanceling case rolloutapis.AnnoManualCommandForceSkipCurrentBatch: - handleBatchStatusWhenSkipped(newStatus, len(rolloutRun.Spec.Batch.Batches)) + handleBatchStatusWhenSkipped(newStatus, len(rolloutRun.Spec.Batch.Batches), rolloutRun.Spec.Batch.Batches, ctx.Workloads) } return ctrl.Result{Requeue: true} } -func handleBatchStatusWhenSkipped(newStatus *rolloutv1alpha1.RolloutRunStatus, batchSize int) { +func handleBatchStatusWhenSkipped(newStatus *rolloutv1alpha1.RolloutRunStatus, batchSize int, batches []rolloutv1alpha1.RolloutRunStep, workloads *workload.Set) { currentBatchIndex := newStatus.BatchStatus.CurrentBatchIndex if newStatus.Error != nil { newStatus.Error = nil @@ -50,5 +52,42 @@ func handleBatchStatusWhenSkipped(newStatus *rolloutv1alpha1.RolloutRunStatus, b newStatus.BatchStatus.Records[currentBatchIndex].State = StepSkipped newStatus.BatchStatus.CurrentBatchIndex = currentBatchIndex + 1 newStatus.BatchStatus.CurrentBatchState = StepNone + + // Calculate and accumulate skip toleration for each workload in current batch + if workloads != nil { + currentBatch := batches[currentBatchIndex] + for _, target := range currentBatch.Targets { + info := workloads.Get(target.Cluster, target.Name) + if info == nil { + continue + } + status := info.APIStatus() + currentBatchExpectedReplicas, _ := workload.CalculateUpdatedReplicas(&status.Replicas, target.Replicas) + gap := currentBatchExpectedReplicas - status.UpdatedAvailableReplicas + if gap <= 0 { + continue + } + + // Accumulate toleration + accumulateSkipToleration(newStatus, target.Cluster, target.Name, gap) + } + } + } +} + +func accumulateSkipToleration(newStatus *rolloutv1alpha1.RolloutRunStatus, cluster, name string, gap int32) { + if newStatus.BatchStatus.SkipTolerations == nil { + newStatus.BatchStatus.SkipTolerations = make([]rolloutv1alpha1.WorkloadSkipToleration, 0) + } + for i := range newStatus.BatchStatus.SkipTolerations { + if newStatus.BatchStatus.SkipTolerations[i].Cluster == cluster && newStatus.BatchStatus.SkipTolerations[i].Name == name { + newStatus.BatchStatus.SkipTolerations[i].Toleration += gap + return + } } + newStatus.BatchStatus.SkipTolerations = append(newStatus.BatchStatus.SkipTolerations, rolloutv1alpha1.WorkloadSkipToleration{ + Cluster: cluster, + Name: name, + Toleration: gap, + }) } diff --git a/pkg/controllers/rolloutrun/executor/do_command_test.go b/pkg/controllers/rolloutrun/executor/do_command_test.go new file mode 100644 index 0000000..4c8ca79 --- /dev/null +++ b/pkg/controllers/rolloutrun/executor/do_command_test.go @@ -0,0 +1,218 @@ +package executor + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" + "kusionstack.io/rollout/pkg/workload" +) + +func TestHandleBatchStatusWhenSkipped(t *testing.T) { + tests := []struct { + name string + batchIndex int32 + batchSize int + batches []rolloutv1alpha1.RolloutRunStep + workloads *workload.Set + existingSkipToleration []rolloutv1alpha1.WorkloadSkipToleration + expectedSkipToleration []rolloutv1alpha1.WorkloadSkipToleration + expectedCurrentBatchIndex int32 + expectedCurrentBatchState rolloutv1alpha1.RolloutStepState + }{ + { + name: "skip records toleration for workload with deficit", + batchIndex: 0, + batchSize: 3, + batches: []rolloutv1alpha1.RolloutRunStep{ + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(30)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(60)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(100)), + }}, + }, + workloads: newTestWorkloadSet("cluster-a", "test-a", 1, 100, 25), + existingSkipToleration: nil, + expectedSkipToleration: []rolloutv1alpha1.WorkloadSkipToleration{ + {Cluster: "cluster-a", Name: "test-a", Toleration: 5}, // 30 - 25 = 5 + }, + expectedCurrentBatchIndex: 1, + expectedCurrentBatchState: rolloutv1alpha1.RolloutStepNone, + }, + { + name: "skip accumulates toleration when existing toleration present", + batchIndex: 1, + batchSize: 3, + batches: []rolloutv1alpha1.RolloutRunStep{ + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(30)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(60)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(100)), + }}, + }, + workloads: newTestWorkloadSet("cluster-a", "test-a", 1, 100, 52), + existingSkipToleration: []rolloutv1alpha1.WorkloadSkipToleration{ + {Cluster: "cluster-a", Name: "test-a", Toleration: 5}, + }, + expectedSkipToleration: []rolloutv1alpha1.WorkloadSkipToleration{ + {Cluster: "cluster-a", Name: "test-a", Toleration: 13}, // 5 + (60-52=8) = 13 + }, + expectedCurrentBatchIndex: 2, + expectedCurrentBatchState: rolloutv1alpha1.RolloutStepNone, + }, + { + name: "skip does not advance when last batch", + batchIndex: 2, + batchSize: 3, + batches: []rolloutv1alpha1.RolloutRunStep{ + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(30)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(60)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(100)), + }}, + }, + workloads: newTestWorkloadSet("cluster-a", "test-a", 1, 100, 93), + existingSkipToleration: []rolloutv1alpha1.WorkloadSkipToleration{ + {Cluster: "cluster-a", Name: "test-a", Toleration: 5}, + }, + expectedSkipToleration: []rolloutv1alpha1.WorkloadSkipToleration{ + {Cluster: "cluster-a", Name: "test-a", Toleration: 5}, + }, + expectedCurrentBatchIndex: 2, // unchanged + expectedCurrentBatchState: rolloutv1alpha1.RolloutStepNone, + }, + { + name: "skip with no deficit does not add toleration", + batchIndex: 0, + batchSize: 2, + batches: []rolloutv1alpha1.RolloutRunStep{ + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(30)), + }}, + {Targets: []rolloutv1alpha1.RolloutRunStepTarget{ + newRunStepTarget("cluster-a", "test-a", intstr.FromInt(60)), + }}, + }, + workloads: newTestWorkloadSet("cluster-a", "test-a", 1, 100, 35), + existingSkipToleration: nil, + expectedSkipToleration: nil, // no gap, no toleration added + expectedCurrentBatchIndex: 1, + expectedCurrentBatchState: rolloutv1alpha1.RolloutStepNone, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + newStatus := &rolloutv1alpha1.RolloutRunStatus{ + BatchStatus: &rolloutv1alpha1.RolloutRunBatchStatus{ + RolloutBatchStatus: rolloutv1alpha1.RolloutBatchStatus{ + CurrentBatchIndex: tt.batchIndex, + }, + Records: make([]rolloutv1alpha1.RolloutRunStepStatus, tt.batchSize), + SkipTolerations: tt.existingSkipToleration, + }, + } + + handleBatchStatusWhenSkipped(newStatus, tt.batchSize, tt.batches, tt.workloads) + + if tt.expectedSkipToleration == nil { + if newStatus.BatchStatus.SkipTolerations != nil { + t.Errorf("expected nil SkipToleration, got %v", newStatus.BatchStatus.SkipTolerations) + } + } else { + if len(newStatus.BatchStatus.SkipTolerations) != len(tt.expectedSkipToleration) { + t.Errorf("expected %d SkipToleration entries, got %d", len(tt.expectedSkipToleration), len(newStatus.BatchStatus.SkipTolerations)) + } + for i, expected := range tt.expectedSkipToleration { + actual := newStatus.BatchStatus.SkipTolerations[i] + if actual.Cluster != expected.Cluster || actual.Name != expected.Name || actual.Toleration != expected.Toleration { + t.Errorf("SkipToleration[%d] = %+v, want %+v", i, actual, expected) + } + } + } + + if newStatus.BatchStatus.CurrentBatchIndex != tt.expectedCurrentBatchIndex { + t.Errorf("CurrentBatchIndex = %d, want %d", newStatus.BatchStatus.CurrentBatchIndex, tt.expectedCurrentBatchIndex) + } + if newStatus.BatchStatus.CurrentBatchState != tt.expectedCurrentBatchState { + t.Errorf("CurrentBatchState = %v, want %v", newStatus.BatchStatus.CurrentBatchState, tt.expectedCurrentBatchState) + } + }) + } +} + +func TestFindSkipToleration(t *testing.T) { + tolerations := []rolloutv1alpha1.WorkloadSkipToleration{ + {Cluster: "cluster-a", Name: "test-a", Toleration: 5}, + {Cluster: "cluster-b", Name: "test-b", Toleration: 8}, + } + + tests := []struct { + name string + tolerations []rolloutv1alpha1.WorkloadSkipToleration + cluster string + wlName string + expected int32 + }{ + { + name: "found workload", + tolerations: tolerations, + cluster: "cluster-a", + wlName: "test-a", + expected: 5, + }, + { + name: "not found workload", + tolerations: tolerations, + cluster: "cluster-c", + wlName: "test-c", + expected: 0, + }, + { + name: "nil tolerations", + tolerations: nil, + cluster: "cluster-a", + wlName: "test-a", + expected: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := findSkipToleration(tt.tolerations, tt.cluster, tt.wlName) + if result != tt.expected { + t.Errorf("findSkipToleration() = %d, want %d", result, tt.expected) + } + }) + } +} + +// newTestWorkloadSet creates a workload.Set with a single workload for testing +func newTestWorkloadSet(cluster, name string, generation int64, desiredReplicas, updatedAvailableReplicas int32) *workload.Set { + return workload.NewSet(&workload.Info{ + ClusterName: cluster, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + Generation: generation, + }, + Status: workload.InfoStatus{ + ObservedGeneration: generation, + DesiredReplicas: desiredReplicas, + UpdatedAvailableReplicas: updatedAvailableReplicas, + }, + }) +} diff --git a/pkg/workload/info.go b/pkg/workload/info.go index 3cc29db..a8ed1a7 100644 --- a/pkg/workload/info.go +++ b/pkg/workload/info.go @@ -103,14 +103,25 @@ func (o *Info) String() string { return rolloutv1alpha1.CrossClusterObjectNameReference{Cluster: o.ClusterName, Name: o.Name}.String() } -func (o *Info) CheckUpdatedReady(replicas int32, strictCheck bool) (bool, string) { +func (o *Info) CheckUpdatedReady(replicas int32, strictCheck bool, skipToleration int32) (bool, string) { if o.Generation != o.Status.ObservedGeneration { return false, "workload Generation and ObservedGeneration are mismatched" } - if o.Status.UpdatedAvailableReplicas < replicas { + + // When this is the last batch, toleration is not allowed. + // The last batch must strictly satisfy UpdatedAvailableReplicas >= currentBatchExpectedReplicas, + // because there is no subsequent batch to compensate for the deficit. + effectiveToleration := skipToleration + if strictCheck { + effectiveToleration = 0 + } + + gap := replicas - o.Status.UpdatedAvailableReplicas + if gap > effectiveToleration { return false, "workload updated available replicas is not satisfied" } - if strictCheck && (o.Status.ObservedReplicas > o.Status.DesiredReplicas || o.Status.TerminatingReplicas != 0) { + + if strictCheck && o.Status.ObservedReplicas > o.Status.DesiredReplicas || o.Status.TerminatingReplicas != 0 { return false, "workload observed replicas is more than desiredReplicas" } return true, "" diff --git a/pkg/workload/info_test.go b/pkg/workload/info_test.go new file mode 100644 index 0000000..bcfe229 --- /dev/null +++ b/pkg/workload/info_test.go @@ -0,0 +1,164 @@ +package workload + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestCheckUpdatedReady(t *testing.T) { + tests := []struct { + name string + generation int64 + observedGen int64 + updatedAvailable int32 + desiredReplicas int32 + observedReplicas int32 + replicas int32 + strictCheck bool + skipToleration int32 + expectedReady bool + expectedReason string + }{ + { + name: "generation mismatch returns not ready", + generation: 2, + observedGen: 1, + updatedAvailable: 10, + desiredReplicas: 100, + observedReplicas: 100, + replicas: 10, + strictCheck: false, + skipToleration: 0, + expectedReady: false, + expectedReason: "workload Generation and ObservedGeneration are mismatched", + }, + { + name: "no toleration, replicas satisfied, returns ready", + generation: 1, + observedGen: 1, + updatedAvailable: 10, + desiredReplicas: 100, + observedReplicas: 100, + replicas: 10, + strictCheck: false, + skipToleration: 0, + expectedReady: true, + expectedReason: "", + }, + { + name: "no toleration, replicas not satisfied, returns not ready", + generation: 1, + observedGen: 1, + updatedAvailable: 8, + desiredReplicas: 100, + observedReplicas: 100, + replicas: 10, + strictCheck: false, + skipToleration: 0, + expectedReady: false, + expectedReason: "workload updated available replicas is not satisfied", + }, + { + name: "toleration covers gap, not last batch, returns ready", + generation: 1, + observedGen: 1, + updatedAvailable: 8, + desiredReplicas: 100, + observedReplicas: 100, + replicas: 10, + strictCheck: false, + skipToleration: 3, + expectedReady: true, + expectedReason: "", + }, + { + name: "toleration exactly equals gap, not last batch, returns ready", + generation: 1, + observedGen: 1, + updatedAvailable: 8, + desiredReplicas: 100, + observedReplicas: 100, + replicas: 10, + strictCheck: false, + skipToleration: 2, + expectedReady: true, + expectedReason: "", + }, + { + name: "toleration insufficient, not last batch, returns not ready", + generation: 1, + observedGen: 1, + updatedAvailable: 8, + desiredReplicas: 100, + observedReplicas: 100, + replicas: 10, + strictCheck: false, + skipToleration: 1, + expectedReady: false, + expectedReason: "workload updated available replicas is not satisfied", + }, + { + name: "last batch ignores toleration, gap exists, returns not ready", + generation: 1, + observedGen: 1, + updatedAvailable: 96, + desiredReplicas: 100, + observedReplicas: 100, + replicas: 100, + strictCheck: true, + skipToleration: 5, + expectedReady: false, + expectedReason: "workload updated available replicas is not satisfied", + }, + { + name: "last batch no toleration needed, replicas satisfied, returns ready", + generation: 1, + observedGen: 1, + updatedAvailable: 100, + desiredReplicas: 100, + observedReplicas: 100, + replicas: 100, + strictCheck: true, + skipToleration: 5, + expectedReady: true, + expectedReason: "", + }, + { + name: "last batch observed replicas exceeds desired", + generation: 1, + observedGen: 1, + updatedAvailable: 100, + desiredReplicas: 100, + observedReplicas: 105, + replicas: 100, + strictCheck: true, + skipToleration: 0, + expectedReady: false, + expectedReason: "workload observed replicas is more than desiredReplicas", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + info := &Info{ + ObjectMeta: metav1.ObjectMeta{ + Generation: tt.generation, + }, + Status: InfoStatus{ + ObservedGeneration: tt.observedGen, + UpdatedAvailableReplicas: tt.updatedAvailable, + DesiredReplicas: tt.desiredReplicas, + ObservedReplicas: tt.observedReplicas, + }, + } + ready, reason := info.CheckUpdatedReady(tt.replicas, tt.strictCheck, tt.skipToleration) + if ready != tt.expectedReady { + t.Errorf("CheckUpdatedReady() ready = %v, want %v", ready, tt.expectedReady) + } + if reason != tt.expectedReason { + t.Errorf("CheckUpdatedReady() reason = %v, want %v", reason, tt.expectedReason) + } + }) + } +} \ No newline at end of file