From 642fc514ca0f7247c2800a7d1f5d3ed1f89ec52e Mon Sep 17 00:00:00 2001 From: GoneLikeAir Date: Fri, 12 Mar 2021 12:40:56 +0800 Subject: [PATCH 1/3] feature: support global target. ps: global targets are those jobs the its name with the prefix 'kvass_global' --- pkg/coordinator/coordinator.go | 38 +++++++++++++++++++++++++++++---- pkg/coordinator/rebalance.go | 39 ++++++++++++++++++++++++++++------ pkg/sidecar/targets.go | 16 ++++++++++++-- 3 files changed, 80 insertions(+), 13 deletions(-) diff --git a/pkg/coordinator/coordinator.go b/pkg/coordinator/coordinator.go index e880994f..0053a854 100644 --- a/pkg/coordinator/coordinator.go +++ b/pkg/coordinator/coordinator.go @@ -19,7 +19,9 @@ package coordinator import ( "context" + "fmt" "github.com/pkg/errors" + "strings" "time" "tkestack.io/kvass/pkg/discovery" "tkestack.io/kvass/pkg/shard" @@ -100,6 +102,8 @@ func (c *Coordinator) runOnce() error { active = c.getActive() shardsInfo = c.getShardInfos(shards) changeAbleShards = changeAbleShardsInfo(shardsInfo) + // global targets are those jobs that with "kvass_global_" as its name prefix + globalTargets = getGlobalTarget(active) ) if int32(len(changeAbleShards)) < c.minShard { // insure that scaling up to min shard @@ -110,15 +114,23 @@ func (c *Coordinator) runOnce() error { } lastGlobalScrapeStatus := c.globalScrapeStatus(active, shardsInfo) - c.gcTargets(changeAbleShards, active) - needSpace := c.alleviateShards(changeAbleShards) - needSpace += c.assignNoScrapingTargets(shardsInfo, active, lastGlobalScrapeStatus) + c.gcTargets(changeAbleShards, active, globalTargets) + needSpace := c.alleviateShards(changeAbleShards, globalTargets) + // TODO: assign global target + needSpace += c.assignNoScrapingTargets(shardsInfo, active, globalTargets, lastGlobalScrapeStatus) scale := int32(len(shardsInfo)) if needSpace != 0 { c.log.Infof("need space %d", needSpace) - scale = c.tryScaleUp(shardsInfo, needSpace) + globalSeries := getSeriesTotal(globalTargets, lastGlobalScrapeStatus) + c.log.Info(fmt.Sprintf("Global series: %d", globalSeries)) + if c.maxSeries - globalSeries <= 0 { + c.log.Error("There is no enough to assign other targets after global targets has been assigned.") + return fmt.Errorf("There is no enough to assign other targets after global targets has been assigned") + } + scale = c.tryScaleUp(shardsInfo, globalSeries, needSpace) } else if c.maxIdleTime != 0 { + // TODO: if there is only global target in the shard, need to scale down scale = c.tryScaleDown(shardsInfo) } @@ -143,3 +155,21 @@ func (c *Coordinator) runOnce() error { c.lastGlobalScrapeStatus = newLastGlobalScrapeStatus return nil } + +func getSeriesTotal(targets map[uint64]*discovery.SDTargets, globalScrapeStatus map[uint64]*target.ScrapeStatus,) int64 { + var total int64 = 0 + for h, _ := range targets { + total += globalScrapeStatus[h].Series + } + return total +} + +func getGlobalTarget(active map[uint64]*discovery.SDTargets) map[uint64]*discovery.SDTargets { + globalTargets := make(map[uint64]*discovery.SDTargets) + for k, v := range active { + if strings.HasPrefix(v.Job, "kvass_global_") { + globalTargets[k] = v + } + } + return globalTargets +} \ No newline at end of file diff --git a/pkg/coordinator/rebalance.go b/pkg/coordinator/rebalance.go index 8af71d7d..b99775ab 100644 --- a/pkg/coordinator/rebalance.go +++ b/pkg/coordinator/rebalance.go @@ -18,6 +18,7 @@ package coordinator import ( + "fmt" "github.com/prometheus/prometheus/scrape" "golang.org/x/sync/errgroup" "math/rand" @@ -147,13 +148,17 @@ func (c *Coordinator) applyShardsInfo(shards []*shardInfo) { // 1. not exist in active targets // 2. is in_transfer state and had been scraped by other shard // 3. is normal state and had been scraped by other shard with lower head series -func (c *Coordinator) gcTargets(changeAbleShards []*shardInfo, active map[uint64]*discovery.SDTargets) { +func (c *Coordinator) gcTargets(changeAbleShards []*shardInfo, active , globalTargets map[uint64]*discovery.SDTargets) { for _, s := range changeAbleShards { for h, tar := range s.scraping { // target not exist in active targets if _, exist := active[h]; !exist { delete(s.scraping, h) continue + } else { + if _, exist := globalTargets[h];exist { + continue + } } if tar.ScrapeTimes < minWaitScrapeTimes { @@ -185,7 +190,7 @@ func (c *Coordinator) gcTargets(changeAbleShards []*shardInfo, active map[uint64 // make expect series of targets less than maxSeries * 0.5 if current head series > maxSeries 1.4 // make expect series of targets less than maxSeries * 0.2 if current head series > maxSeries 1.6 // remove all targets if current head series > maxSeries 1.8 -func (c *Coordinator) alleviateShards(changeAbleShards []*shardInfo) (needSpace int64) { +func (c *Coordinator) alleviateShards(changeAbleShards []*shardInfo, globalTargets map[uint64]*discovery.SDTargets) (needSpace int64) { var threshold = []struct { maxSeriesRate float64 expectSeriesRate float64 @@ -212,7 +217,7 @@ func (c *Coordinator) alleviateShards(changeAbleShards []*shardInfo) (needSpace for _, t := range threshold { if s.runtime.HeadSeries >= seriesWithRate(c.maxSeries, t.maxSeriesRate) { c.log.Infof("%s need alleviate", s.shard.ID) - needSpace += c.alleviateShard(s, changeAbleShards, seriesWithRate(c.maxSeries, t.expectSeriesRate)) + needSpace += c.alleviateShard(s, changeAbleShards, seriesWithRate(c.maxSeries, t.expectSeriesRate), globalTargets) break } } @@ -221,13 +226,17 @@ func (c *Coordinator) alleviateShards(changeAbleShards []*shardInfo) (needSpace return needSpace } -func (c *Coordinator) alleviateShard(s *shardInfo, changeAbleShards []*shardInfo, expSeries int64) (needSpace int64) { +func (c *Coordinator) alleviateShard(s *shardInfo, changeAbleShards []*shardInfo, expSeries int64, globalTargets map[uint64]*discovery.SDTargets) (needSpace int64) { total := s.totalTargetsSeries() for hash, tar := range s.scraping { if total < expSeries { break } + if _, isGlobal := globalTargets[hash]; isGlobal { + continue + } + if tar.TargetState != target.StateNormal || tar.Health != scrape.HealthGood || tar.ScrapeTimes < minWaitScrapeTimes { continue } @@ -268,18 +277,32 @@ func seriesWithRate(series int64, rate float64) int64 { func (c *Coordinator) assignNoScrapingTargets( shards []*shardInfo, active map[uint64]*discovery.SDTargets, + globalTargets map[uint64]*discovery.SDTargets, globalScrapeStatus map[uint64]*target.ScrapeStatus, ) (needSpace int64) { healthShards := changeAbleShardsInfo(shards) scraping := map[uint64]bool{} for _, s := range shards { + // Check and assign global targets for every shard. + for h, _ := range globalTargets { + if _, exist := s.scraping[h]; s.scraping != nil && !exist { + s.scraping[h] = globalScrapeStatus[h] + s.runtime.HeadSeries += globalScrapeStatus[h].Series + } + } + for hash := range s.scraping { scraping[hash] = true } } for hash := range active { - if scraping[hash] { + //isGlobal := false + //if _, isGlobal = globalTargets[hash]; scraping[hash] && !isGlobal{ + // continue + //} + + if scraping[hash]{ continue } @@ -432,10 +455,12 @@ func (c *Coordinator) shardBecomeIdle(src *shardInfo, shards []*shardInfo) bool } // tryScaleUp calculate the expect scale according to 'needSpace' -func (c *Coordinator) tryScaleUp(shard []*shardInfo, needSpace int64) int32 { +func (c *Coordinator) tryScaleUp(shard []*shardInfo, globalSeries, needSpace int64) int32 { health := changeAbleShardsInfo(shard) + free := c.maxSeries - globalSeries + c.log.Info(fmt.Sprintf("After global target assigned, free space: %d", free)) exp := int32(len(health)) - exp += int32((needSpace / c.maxSeries) + 1) + exp += int32((needSpace / free) + 1) if exp < int32(len(shard)) { exp = int32(len(shard)) diff --git a/pkg/sidecar/targets.go b/pkg/sidecar/targets.go index c0ec9b67..a10b5155 100644 --- a/pkg/sidecar/targets.go +++ b/pkg/sidecar/targets.go @@ -19,11 +19,13 @@ package sidecar import ( "encoding/json" + "fmt" "github.com/pkg/errors" "github.com/sirupsen/logrus" "io/ioutil" "os" "path" + "strings" "time" "tkestack.io/kvass/pkg/shard" "tkestack.io/kvass/pkg/target" @@ -44,6 +46,7 @@ type TargetsInfo struct { IdleAt *time.Time // Status is the runtime status of all targets Status map[uint64]*target.ScrapeStatus `json:"-"` + globalTargetMap map[uint64]bool } func newTargetsInfo() TargetsInfo { @@ -118,19 +121,27 @@ func (t *TargetsManager) UpdateTargets(req *shard.UpdateTargetsRequest) error { } func (t *TargetsManager) updateIdleState() { - if len(t.targets.Status) == 0 && t.targets.IdleAt == nil { + t.log.Info(fmt.Sprintf("Status len: %d", len(t.targets.Status))) + t.log.Info(fmt.Sprintf("Global target len: %d", len(t.targets.globalTargetMap))) + normalTarNum := len(t.targets.Status) - len(t.targets.globalTargetMap) + if normalTarNum == 0 && t.targets.IdleAt == nil { t.targets.IdleAt = types.TimePtr(time.Now()) + t.log.Info(fmt.Sprintf("Shard is idle. Time: %s", t.targets.IdleAt.String())) } - if len(t.targets.Status) != 0 { + if normalTarNum != 0 { t.targets.IdleAt = nil } } func (t *TargetsManager) updateStatus() { status := map[uint64]*target.ScrapeStatus{} + globalTargetMap := map[uint64]bool{} for job, ts := range t.targets.Targets { for _, tar := range ts { + if strings.HasPrefix(job, "kvass_global_") { + globalTargetMap[tar.Hash] = true + } if t.targets.Status[tar.Hash] == nil { status[tar.Hash] = target.NewScrapeStatus(tar.Series) } else { @@ -145,6 +156,7 @@ func (t *TargetsManager) updateStatus() { } } t.targets.Status = status + t.targets.globalTargetMap = globalTargetMap } func (t *TargetsManager) doCallbacks() error { From b25259a8a11242057462262f96ffab802bd67881 Mon Sep 17 00:00:00 2001 From: GoneLikeAir Date: Fri, 12 Mar 2021 12:42:44 +0800 Subject: [PATCH 2/3] fix: After scale down, the pod may be created again because the pod still terminating. --- pkg/shard/kubernetes/shardmanager.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/shard/kubernetes/shardmanager.go b/pkg/shard/kubernetes/shardmanager.go index 8419d513..3169fe74 100644 --- a/pkg/shard/kubernetes/shardmanager.go +++ b/pkg/shard/kubernetes/shardmanager.go @@ -70,12 +70,29 @@ func (s *shardManager) Shards() ([]*shard.Shard, error) { return nil, errors.Wrap(err, "list pod") } - ret := make([]*shard.Shard, 0) + sts, err := s.cli.AppsV1().StatefulSets(s.sts.Namespace).Get(context.TODO(), s.sts.Name, v12.GetOptions{}) + if err != nil { + return nil, err + } + + podMap := map[string]v1.Pod{} for _, p := range pods.Items { - url := fmt.Sprintf("http://%s:%d", p.Status.PodIP, s.port) - ret = append(ret, shard.NewShard(p.Name, url, k8sutil.IsPodReady(&p), s.lg.WithField("shard", p.Name))) + podMap[p.Name] = p } + ret := make([]*shard.Shard, 0) + for i := int32(0); i < *sts.Spec.Replicas; i ++ { + if p, ok := podMap[fmt.Sprintf("%s-%d", sts.Name, i)]; ok { + url := fmt.Sprintf("http://%s:%d", p.Status.PodIP, s.port) + ret = append(ret, shard.NewShard(p.Name, url, k8sutil.IsPodReady(&p), s.lg.WithField("shard", p.Name))) + } + } + + //for _, p := range pods.Items { + // url := fmt.Sprintf("http://%s:%d", p.Status.PodIP, s.port) + // ret = append(ret, shard.NewShard(p.Name, url, k8sutil.IsPodReady(&p), s.lg.WithField("shard", p.Name))) + //} + return ret, nil } From f8490d4b9d18e2b79fa3836fcd9cc7692dc7976c Mon Sep 17 00:00:00 2001 From: GoneLikeAir <49898234+GoneLikeAir@users.noreply.github.com> Date: Fri, 21 May 2021 10:44:50 +0800 Subject: [PATCH 3/3] Added Untitled Diagram.drawio --- Untitled Diagram.drawio | 1 + 1 file changed, 1 insertion(+) create mode 100644 Untitled Diagram.drawio diff --git a/Untitled Diagram.drawio b/Untitled Diagram.drawio new file mode 100644 index 00000000..84cd3ff6 --- /dev/null +++ b/Untitled Diagram.drawio @@ -0,0 +1 @@ +UzV2zq1wL0osyPDNT0nNUTV2VTV2LsrPL4GwciucU3NyVI0MMlNUjV1UjYwMgFjVyA2HrCFY1qAgsSg1rwSLBiADYTaQg2Y1AA== \ No newline at end of file