Thoughts on Capacity Planning with multiple @KafkaListener in Spring Kafka

Thoughts on Capacity Planning with multiple @KafkaListener in Spring Kafka

Background

We had an application to create some materialized fields inside the database. Each collection has one topic inside Kafka and we use a single application to listen to them. So we created the following Consumer class.
@Component public class Consumer { @KafkaListener(..., concurrency=3) public listener1(...) {...} @KafkaListener(..., concurrency=2) public listener2(...) {...} @KafkaListener(..., concurrency=2) public listener1(...) {...} }
After application was developed, everything was fine. Until we start to do capacity planning for the produciton environment. How can we achieve that?

Kafka Partitions and Consumer Concurrency

For Kafka, inside each Consumer Group, only one consumer can consume from a partition. This means the following situation:
  1. Number of Consumers for this topic > Number of Partitions in this topic
    1. Some of the consumers will not be assigned any partitions.
  1. Number of Consumers for this topic < Number of Partitions in this topic
    1. Some of the consumers will be assigned with many partitions.

How Kafka Assign Consumers?

There are two modes to assign consumers to partitions, one is RangeAssignor and the other is RoundRobinAssignor.

Range Assignor

For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
The assignment will be:
  • C0: [t0p0, t0p1, t1p0, t1p1]
  • C1: [t0p2, t1p2]

Round Robin Assingor

For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
The assignment will be:
  • C0: [t0p0, t0p2, t1p1]
  • C1: [t0p1, t1p0, t1p2]
 

Static Membership

For both methods, before assigning, Kafka will sort the consumers with memberId and then assign from the start.
@Override public int compareTo(MemberInfo otherMemberInfo) { if (this.groupInstanceId.isPresent() && otherMemberInfo.groupInstanceId.isPresent()) { return this.groupInstanceId.get() .compareTo(otherMemberInfo.groupInstanceId.get()); } else if (this.groupInstanceId.isPresent()) { return -1; } else if (otherMemberInfo.groupInstanceId.isPresent()) { return 1; } else { return this.memberId.compareTo(otherMemberInfo.memberId); } }
Comparator of MemberInfo in Kafka Source Code
Since this issue: KIP-345 . Kafka has been using instance id as the first sorting key. This is to prevent too many switchings of partition assignments between consumers.
However, if group.instance.id was not specified for spring-Kafka, it will not use static membership to connect to Kafka. Thus the sorting key is memberId, which is issued based on the first-come-first-serve rule.

If We Want to Scale-Out Servers?

Because the consumers were directly committing changes to a SQL database, which cannot handle very high update throughput, we need to control the speed of consumption. The ideal situation would be: Total number of consumers == Topic partition number.

What’s wrong with multiple @KafkaListener

Inside the code, we set the concurrency option for every @KafkaListener. This creates a concurrency count of consumer threads and joins the Consumer group on Kafka. So if we scale out the server, concurrency * count of the server of threads will be created. There’s no easy way of setting:
  1. Server A will use 2 threads to listen to topic A, Server B will use 0 threads to listen to Topic Foo, and Server B will spin up 2 threads to listen to Topic Foo if Server A is down.
  1. Server A should only use 10 threads to listen to only Topic Foo and Topic Bar. Server B should only use 10 threads to listen to only Topic FooBar and Topic Bar. Such fine-grained control is not easily configurable with Spring-Kafka because the consumer container will not stop if it wasn’t assigned to a partition.
These two difficulties leads to we can’t match the concurrency of every @KafkaListener exactly with the partition (or, desired concurrency).

Controlling the Concurrency on Topic

To control the consumption speed of events (Max qps on the topic), we have two options:
  1. Control the number of consumers. (partition count > consumer count)
  1. Control the partition count. (consumer count > partition count)

Pros and Cons

So for option one, we have the following pros and cons:
  • Pros: The load is balanced between all servers.
  • Cons: Some partitions will have slower consumption speed and messages will lag longer.
 
And for option two the pros and cons are:
  • Pros: all partitions are balanced.
  • Cons: wasted resources and partitions will be assigned on servers that spin up first.
Because the assignment is based on the first-come-first-serve rule, when the concurrency of the consumer is more significant than the partition count, those consumers who connect late will not get assigned with any partition. Thus, the server that started up at last will be wholly idle and server start up in the front will have partitions assigned for all its consumer threads (Higher Load).

Solution

Letting messages to lag longer is not what we desired so option 2 is more reasonable. How to mitigate the server load not balanced issue?
We need two things:
  1. Establish the proportion between different topics (Traffic on topic 1 : Traffic on topic 2 = 1 : 2 → Topic 1 concurrency : Topic 2 concurrency = 1 : 2) and create corresponding partition counts on topics.
  1. Confirm the max capacity of a single server by doing a load test (Make sure it won’t break under 100% load on all topics) and scale the server till every topic has enough consumers (> partition count). ( So the proportion of consumer count is kept between different topics )
And we can ignore the wasted resources on those idle threads since our main target is that the server will not be overloaded and messages are all consumed ASAP.
But one thing to mention is that the lowest concurrency count is 1, so if all concurrency was set at 1, and the application still slows down on the load test. That indicates there were too many @KafkaListener in the application; some refactoring is needed to split those Kafka listeners.

Conclusion

In this case, we try to balance between load tilting and resource wasting. There is no perfect solution inside the production system, always some compromises and trade offs present.
Â