Skip to content

ruskaof/consumer-balancer

Repository files navigation

consumer-balancer

Load-aware Kafka consumer partition assignment driven by per-partition weights (default: Prometheus), plus optional proactive rebalance when an elected group member detects load imbalance.

Built-in Kafka assignors such as RangeAssignor and RoundRobinAssignor balance partition count (and sticky strategies minimize movement). They do not use external per-partition load signals. This library assigns partitions with a greedy least-loaded strategy using those weights, which reduces the worst consumer load when weights are skewed.

Modules

Module Purpose
consumer-balancer-core LoadAwarePartitionAssignor, Prometheus client, balancing, triggers
consumer-balancer-spring-boot-starter Spring Boot auto-configuration
test-listener Example Spring Boot app

Requirements

  • Spring Boot 4.0+ (Spring Framework 7, Spring for Apache Kafka 4.0, Apache Kafka clients 4.1+)
  • Java 17+

For Spring Boot 3.x, use the 1.0.0 release of this library.

Quickstart (Gradle)

Both modules are published to Maven Central. Most users only need the starter, which pulls in consumer-balancer-core transitively:

dependencies {
    implementation("io.github.ruskaof:consumer-balancer-spring-boot-starter:2.0.0")
}

Using the assignor without Spring Boot? Depend on the core module directly:

dependencies {
    implementation("io.github.ruskaof:consumer-balancer-core:2.0.0")
}

Minimal configuration:

spring:
  kafka:
    consumer:
      group-id: my-group
      properties:
        partition.assignment.strategy: io.github.ruskaof.balancer.LoadAwarePartitionAssignor

consumer-balancer:
  prometheus:
    weight-query-template: 'sum(rate(kafka_topic_partition_current_offset{topic=~"%s"}[1m])) by (topic, partition)'
    host: localhost
    port: 9090

Example using a hypothetical bytes metric instead:

consumer-balancer:
  prometheus:
    weight-query-template: 'sum(rate(kafka_consumer_fetch_bytes_total{topic=~"%s"}[1m])) by (topic, partition)'

Placeholder %s is replaced with a |‑separated, regex‑escaped list of subscribed topic names for the instant query.

consumer-balancer.prometheus.* is merged into assignor.load-aware.prometheus.* for the assignor. You can still set assignor.load-aware.prometheus.* under spring.kafka.consumer.properties explicitly; those values take precedence over consumer-balancer defaults where applicable.

Configuration reference (consumer-balancer)

Property Default Description
consumer-balancer.enabled true Master switch for balancer auto-configuration.
consumer-balancer.proactive-rebalance-enabled true When true, one elected consumer runs the threshold trigger and may call enforceRebalance() on listener containers.
consumer-balancer.rebalance-load-imbalance-threshold 1.1 Proactive rebalance when (max member load) / (optimal max load) > threshold (see ThresholdTrigger).
consumer-balancer.prometheus.weight-query-template Required when using the default PrometheusWeightService: PromQL with %s. Series must include topic and partition labels.
consumer-balancer.prometheus.scheme http Prometheus URL scheme.
consumer-balancer.prometheus.host localhost Prometheus host.
consumer-balancer.prometheus.port 9090 Prometheus port.
consumer-balancer.prometheus.connect-timeout 10s HTTP connect timeout.
consumer-balancer.prometheus.request-timeout 30s HTTP request timeout (per query).
consumer-balancer.coordinator.election-interval 30s How often coordinator election runs.
consumer-balancer.coordinator.trigger-check-interval 30s How often the coordinator evaluates the rebalance trigger.

Assignor keys (merged from consumer-balancer.prometheus when not set in YAML):

  • assignor.load-aware.prometheus.weight-query-template (required for load-aware assignor)
  • assignor.load-aware.prometheus.host
  • assignor.load-aware.prometheus.port
  • assignor.load-aware.prometheus.scheme
  • assignor.load-aware.prometheus.connect-timeout-ms
  • assignor.load-aware.prometheus.request-timeout-ms

Custom weight store

Implement io.github.ruskaof.balancer.weight.WeightService and expose it as a Spring @Bean. The default PrometheusWeightService + PrometheusClient beans are omitted when a WeightService bean is present.

Optionally provide your own io.github.ruskaof.balancer.prometheus.KafkaRatePromqlBuilder (or TemplatedKafkaRatePromqlBuilder) for custom PromQL while still using Prometheus.

Operations

  • Your PromQL must return series with topic and partition labels so weights can be mapped to TopicPartition.
  • If load-aware assignment throws, LoadAwarePartitionAssignor falls back to Kafka’s RoundRobinAssignor.
  • LoadAwarePartitionAssignor is a client-side assignor, so it applies only under the classic consumer group protocol (group.protocol=classic, the default on Kafka 4.x). If you opt into the new KIP-848 protocol (group.protocol=consumer), partitions are assigned broker-side and this assignor is bypassed.

Build

This repository is built with Gradle on the PATH (not the wrapper), for example:

gradle test

Performance test (Docker)

End-to-end run compares RoundRobin vs load-aware consumers with skewed synthetic load. It writes:

  • test-out/result-default.png — throughput rebalance spikes when Micrometer exposes kafka_consumer_*rebalance* counters.
docker compose --env-file docker/test-env.properties -f docker/docker-compose.yaml up --abort-on-container-exit

License

See LICENSE.

About

No description, website, or topics provided.

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors