Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
k8s.io/klog/v2 v2.130.1
k8s.io/kubernetes v1.22.2
k8s.io/utils v0.0.0-20241210054802-24370beab758
kusionstack.io/kube-api v0.7.5-0.20260512114711-9570d38337c2
kusionstack.io/kube-api v0.7.5-0.20260623095733-5ed62841343a
kusionstack.io/kube-utils v0.2.1-0.20251125083928-1134a582b341
kusionstack.io/resourceconsist v0.0.4
sigs.k8s.io/controller-runtime v0.21.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1021,8 +1021,8 @@ k8s.io/sample-apiserver v0.22.2/go.mod h1:h+/DIV5EmuNq4vfPr5TSXy9mIBVXXlPAKQMPbj
k8s.io/system-validators v1.5.0/go.mod h1:bPldcLgkIUK22ALflnsXk8pvkTEndYdNuaHH6gRrl0Q=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
kusionstack.io/kube-api v0.7.5-0.20260512114711-9570d38337c2 h1:jrUVO6a6/fuwdfF8NnehVXGbiYV57AfIJvqnsYRB3sI=
kusionstack.io/kube-api v0.7.5-0.20260512114711-9570d38337c2/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk=
kusionstack.io/kube-api v0.7.5-0.20260623095733-5ed62841343a h1:U7W5odmM823+jqcXLfkTbBkBzLoZ0VJxQ+XFER+WFz4=
kusionstack.io/kube-api v0.7.5-0.20260623095733-5ed62841343a/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk=
kusionstack.io/kube-utils v0.2.1-0.20251125083928-1134a582b341 h1:dnMtHJvIpU3338WpqGiNN2qXWZFiXaoiuzR9jwhvWpg=
kusionstack.io/kube-utils v0.2.1-0.20251125083928-1134a582b341/go.mod h1:Lz5SBYWg9+jw+kP0CAyf/b62D5DeUPf6+jE1d0WC4cI=
kusionstack.io/resourceconsist v0.0.4 h1:wRqLJuNh8O4TT6p0uOklFpHUKiRdRxcAH71Sw/q9LhE=
Expand Down
15 changes: 14 additions & 1 deletion pkg/controllers/rolloutrun/executor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat

currentBatchExpectedReplicas, _ := workload.CalculateUpdatedReplicas(&status.Replicas, item.Replicas)

ready, reason := info.CheckUpdatedReady(currentBatchExpectedReplicas, isLastBatch)
// Find skip toleration for this workload
skipToleration := findSkipToleration(newStatus.BatchStatus.SkipTolerations, item.Cluster, item.Name)

ready, reason := info.CheckUpdatedReady(currentBatchExpectedReplicas, isLastBatch, skipToleration)
if ready {
// if the target is ready, we will not change partition
continue
Expand Down Expand Up @@ -270,3 +273,13 @@ func (e *batchExecutor) calculateExpectedReplicasBySlidingWindow(status rolloutv
expected = min(currentBatchExpectedReplicas, expected)
return expected, nil
}

// findSkipToleration finds the accumulated skip toleration for the given workload
func findSkipToleration(skipTolerations []rolloutv1alpha1.WorkloadSkipToleration, cluster, name string) int32 {
for _, t := range skipTolerations {
if t.Cluster == cluster && t.Name == name {
return t.Toleration
}
}
return 0
}
197 changes: 197 additions & 0 deletions pkg/controllers/rolloutrun/executor/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,203 @@ func (s *batchExecutorTestSuite) Test_BatchExecutor_Do() {
s.runBatchTestCases(tests)
}

func (s *batchExecutorTestSuite) Test_BatchExecutor_Do_SkipToleration() {
tests := []batchExectorTestCase{
{
name: "skip toleration allows batch to complete with deficit in middle batch",
getObjects: func() (*rolloutv1alpha1.Rollout, *rolloutv1alpha1.RolloutRun) {
rollout := s.rollout.DeepCopy()
rolloutRun := s.rolloutRun.DeepCopy()

// 3 batches, currently on batch 2 (index 1), not the last batch
rolloutRun.Spec.Batch.Batches = []rolloutv1alpha1.RolloutRunStep{
{Targets: []rolloutv1alpha1.RolloutRunStepTarget{
newRunStepTarget("cluster-a", "test-a", intstr.FromInt(30)),
}},
{Targets: []rolloutv1alpha1.RolloutRunStepTarget{
newRunStepTarget("cluster-a", "test-a", intstr.FromInt(60)),
}},
{Targets: []rolloutv1alpha1.RolloutRunStepTarget{
newRunStepTarget("cluster-a", "test-a", intstr.FromInt(100)),
}},
}
rolloutRun.Status.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing
rolloutRun.Status.BatchStatus = &rolloutv1alpha1.RolloutRunBatchStatus{
RolloutBatchStatus: rolloutv1alpha1.RolloutBatchStatus{
CurrentBatchIndex: 1,
CurrentBatchState: StepRunning,
},
Records: []rolloutv1alpha1.RolloutRunStepStatus{
{Index: ptr.To[int32](0), State: StepSkipped},
{Index: ptr.To[int32](1), State: StepRunning, StartTime: ptr.To(metav1.Now())},
{Index: ptr.To[int32](2), State: StepNone},
},
// Batch 0 was skipped: gap = 30 - 25 = 5
SkipTolerations: []rolloutv1alpha1.WorkloadSkipToleration{
{Cluster: "cluster-a", Name: "test-a", Toleration: 5},
},
}
return rollout, rolloutRun
},
getWorkloads: func() []client.Object {
// UpdatedAvailableReplicas = 55, expected = 60, gap = 5 <= toleration(5)
return []client.Object{
newFakeObject("cluster-a", "default", "test-a", 100, 55, 55),
}
},
assertResult: func(done bool, result reconcile.Result, err error) {
s.Require().NoError(err)
s.False(done) // not all done, move to next batch
s.Equal(reconcile.Result{Requeue: true}, result)
},
assertStatus: func(status *rolloutv1alpha1.RolloutRunStatus) {
s.Equal(StepPostBatchStepHook, status.BatchStatus.CurrentBatchState)
},
},
{
name: "skip toleration not enough, batch stays running",
getObjects: func() (*rolloutv1alpha1.Rollout, *rolloutv1alpha1.RolloutRun) {
rollout := s.rollout.DeepCopy()
rolloutRun := s.rolloutRun.DeepCopy()

rolloutRun.Spec.Batch.Batches = []rolloutv1alpha1.RolloutRunStep{
{Targets: []rolloutv1alpha1.RolloutRunStepTarget{
newRunStepTarget("cluster-a", "test-a", intstr.FromInt(30)),
}},
{Targets: []rolloutv1alpha1.RolloutRunStepTarget{
newRunStepTarget("cluster-a", "test-a", intstr.FromInt(60)),
}},
{Targets: []rolloutv1alpha1.RolloutRunStepTarget{
newRunStepTarget("cluster-a", "test-a", intstr.FromInt(100)),
}},
}
rolloutRun.Status.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing
rolloutRun.Status.BatchStatus = &rolloutv1alpha1.RolloutRunBatchStatus{
RolloutBatchStatus: rolloutv1alpha1.RolloutBatchStatus{
CurrentBatchIndex: 1,
CurrentBatchState: StepRunning,
},
Records: []rolloutv1alpha1.RolloutRunStepStatus{
{Index: ptr.To[int32](0), State: StepSkipped},
{Index: ptr.To[int32](1), State: StepRunning, StartTime: ptr.To(metav1.Now())},
{Index: ptr.To[int32](2), State: StepNone},
},
// gap = 60 - 52 = 8 > toleration(5)
SkipTolerations: []rolloutv1alpha1.WorkloadSkipToleration{
{Cluster: "cluster-a", Name: "test-a", Toleration: 5},
},
}
return rollout, rolloutRun
},
getWorkloads: func() []client.Object {
return []client.Object{
newFakeObject("cluster-a", "default", "test-a", 100, 52, 52),
}
},
assertResult: func(done bool, result reconcile.Result, err error) {
s.Require().NoError(err)
s.False(done)
s.Equal(reconcile.Result{RequeueAfter: retryDefault}, result)
},
assertStatus: func(status *rolloutv1alpha1.RolloutRunStatus) {
s.Equal(StepRunning, status.BatchStatus.CurrentBatchState)
},
},
{
name: "skip toleration does not apply on last batch",
getObjects: func() (*rolloutv1alpha1.Rollout, *rolloutv1alpha1.RolloutRun) {
rollout := s.rollout.DeepCopy()
rolloutRun := s.rolloutRun.DeepCopy()

rolloutRun.Spec.Batch.Batches = []rolloutv1alpha1.RolloutRunStep{
{Targets: []rolloutv1alpha1.RolloutRunStepTarget{
newRunStepTarget("cluster-a", "test-a", intstr.FromInt(30)),
}},
{Targets: []rolloutv1alpha1.RolloutRunStepTarget{
newRunStepTarget("cluster-a", "test-a", intstr.FromInt(60)),
}},
{Targets: []rolloutv1alpha1.RolloutRunStepTarget{
newRunStepTarget("cluster-a", "test-a", intstr.FromInt(100)),
}},
}
rolloutRun.Status.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing
rolloutRun.Status.BatchStatus = &rolloutv1alpha1.RolloutRunBatchStatus{
RolloutBatchStatus: rolloutv1alpha1.RolloutBatchStatus{
CurrentBatchIndex: 2,
CurrentBatchState: StepRunning,
},
Records: []rolloutv1alpha1.RolloutRunStepStatus{
{Index: ptr.To[int32](0), State: StepSkipped},
{Index: ptr.To[int32](1), State: StepSucceeded},
{Index: ptr.To[int32](2), State: StepRunning, StartTime: ptr.To(metav1.Now())},
},
// Last batch: toleration should NOT apply
SkipTolerations: []rolloutv1alpha1.WorkloadSkipToleration{
{Cluster: "cluster-a", Name: "test-a", Toleration: 5},
},
}
return rollout, rolloutRun
},
getWorkloads: func() []client.Object {
// UpdatedAvailableReplicas = 96, expected = 100, gap = 4 <= toleration(5)
// but last batch, so toleration does NOT apply, needs strict check
return []client.Object{
newFakeObject("cluster-a", "default", "test-a", 100, 96, 96),
}
},
assertResult: func(done bool, result reconcile.Result, err error) {
s.Require().NoError(err)
s.False(done)
s.Equal(reconcile.Result{RequeueAfter: retryDefault}, result)
},
assertStatus: func(status *rolloutv1alpha1.RolloutRunStatus) {
s.Equal(StepRunning, status.BatchStatus.CurrentBatchState)
},
},
{
name: "no skip toleration, behavior unchanged",
getObjects: func() (*rolloutv1alpha1.Rollout, *rolloutv1alpha1.RolloutRun) {
rollout := s.rollout.DeepCopy()
rolloutRun := s.rolloutRun.DeepCopy()

rolloutRun.Spec.Batch.Batches = []rolloutv1alpha1.RolloutRunStep{
{Targets: []rolloutv1alpha1.RolloutRunStepTarget{
newRunStepTarget("cluster-a", "test-a", intstr.FromInt(10)),
}},
}
rolloutRun.Status.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing
rolloutRun.Status.BatchStatus = &rolloutv1alpha1.RolloutRunBatchStatus{
RolloutBatchStatus: rolloutv1alpha1.RolloutBatchStatus{
CurrentBatchIndex: 0,
CurrentBatchState: StepRunning,
},
Records: []rolloutv1alpha1.RolloutRunStepStatus{
{Index: ptr.To[int32](0), State: StepRunning, StartTime: ptr.To(metav1.Now())},
},
SkipTolerations: nil,
}
return rollout, rolloutRun
},
getWorkloads: func() []client.Object {
// UpdatedAvailableReplicas=8 < expected=10, no toleration -> not ready
return []client.Object{
newFakeObject("cluster-a", "default", "test-a", 100, 8, 8),
}
},
assertResult: func(done bool, result reconcile.Result, err error) {
s.Require().NoError(err)
s.False(done)
s.Equal(reconcile.Result{RequeueAfter: retryDefault}, result)
},
assertStatus: func(status *rolloutv1alpha1.RolloutRunStatus) {
s.Equal(StepRunning, status.BatchStatus.CurrentBatchState)
},
},
}

s.runBatchTestCases(tests)
}

func newRunStepTarget(cluster, name string, replicas intstr.IntOrString) rolloutv1alpha1.RolloutRunStepTarget {
return newRunStepTargetWithSlidingWindow(cluster, name, replicas, nil)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/rolloutrun/executor/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (e *canaryExecutor) doCanary(ctx *ExecutorContext) (bool, time.Duration, er

// 2.b. waiting canary workload ready
for _, info := range canaryWorkloads {
if ready, _ := info.CheckUpdatedReady(info.Status.DesiredReplicas, false); !ready {
if ready, _ := info.CheckUpdatedReady(info.Status.DesiredReplicas, false, 0); !ready {
// ready
logger.Info("still waiting for canary target ready",
"cluster", info.ClusterName,
Expand Down
45 changes: 42 additions & 3 deletions pkg/controllers/rolloutrun/executor/do_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
rolloutapis "kusionstack.io/kube-api/rollout"
rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"

"kusionstack.io/rollout/pkg/workload"
)

// doCommand
Expand All @@ -28,18 +30,18 @@ func (r *Executor) doCommand(ctx *ExecutorContext) ctrl.Result {
}
case rolloutapis.AnnoManualCommandSkip:
if batchError != nil {
handleBatchStatusWhenSkipped(newStatus, len(rolloutRun.Spec.Batch.Batches))
handleBatchStatusWhenSkipped(newStatus, len(rolloutRun.Spec.Batch.Batches), rolloutRun.Spec.Batch.Batches, ctx.Workloads)
}
case rolloutapis.AnnoManualCommandCancel:
newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseCanceling
case rolloutapis.AnnoManualCommandForceSkipCurrentBatch:
handleBatchStatusWhenSkipped(newStatus, len(rolloutRun.Spec.Batch.Batches))
handleBatchStatusWhenSkipped(newStatus, len(rolloutRun.Spec.Batch.Batches), rolloutRun.Spec.Batch.Batches, ctx.Workloads)
}

return ctrl.Result{Requeue: true}
}

func handleBatchStatusWhenSkipped(newStatus *rolloutv1alpha1.RolloutRunStatus, batchSize int) {
func handleBatchStatusWhenSkipped(newStatus *rolloutv1alpha1.RolloutRunStatus, batchSize int, batches []rolloutv1alpha1.RolloutRunStep, workloads *workload.Set) {
currentBatchIndex := newStatus.BatchStatus.CurrentBatchIndex
if newStatus.Error != nil {
newStatus.Error = nil
Expand All @@ -50,5 +52,42 @@ func handleBatchStatusWhenSkipped(newStatus *rolloutv1alpha1.RolloutRunStatus, b
newStatus.BatchStatus.Records[currentBatchIndex].State = StepSkipped
newStatus.BatchStatus.CurrentBatchIndex = currentBatchIndex + 1
newStatus.BatchStatus.CurrentBatchState = StepNone

// Calculate and accumulate skip toleration for each workload in current batch
if workloads != nil {
currentBatch := batches[currentBatchIndex]
for _, target := range currentBatch.Targets {
info := workloads.Get(target.Cluster, target.Name)
if info == nil {
continue
}
status := info.APIStatus()
currentBatchExpectedReplicas, _ := workload.CalculateUpdatedReplicas(&status.Replicas, target.Replicas)
gap := currentBatchExpectedReplicas - status.UpdatedAvailableReplicas
if gap <= 0 {
continue
}

// Accumulate toleration
accumulateSkipToleration(newStatus, target.Cluster, target.Name, gap)
}
}
}
}

func accumulateSkipToleration(newStatus *rolloutv1alpha1.RolloutRunStatus, cluster, name string, gap int32) {
if newStatus.BatchStatus.SkipTolerations == nil {
newStatus.BatchStatus.SkipTolerations = make([]rolloutv1alpha1.WorkloadSkipToleration, 0)
}
for i := range newStatus.BatchStatus.SkipTolerations {
if newStatus.BatchStatus.SkipTolerations[i].Cluster == cluster && newStatus.BatchStatus.SkipTolerations[i].Name == name {
newStatus.BatchStatus.SkipTolerations[i].Toleration += gap
return
}
}
newStatus.BatchStatus.SkipTolerations = append(newStatus.BatchStatus.SkipTolerations, rolloutv1alpha1.WorkloadSkipToleration{
Cluster: cluster,
Name: name,
Toleration: gap,
})
}
Loading