Background
We had an application to create some materialized fields inside the database. Each collection has one topic inside Kafka and we use a single application to listen to them. So we created the following
Consumer
class.@Component public class Consumer { @KafkaListener(..., concurrency=3) public listener1(...) {...} @KafkaListener(..., concurrency=2) public listener2(...) {...} @KafkaListener(..., concurrency=2) public listener1(...) {...} }
After application was developed, everything was fine. Until we start to do capacity planning for the produciton environment. How can we achieve that?
Kafka Partitions and Consumer Concurrency
For Kafka, inside each Consumer Group, only one consumer can consume from a partition. This means the following situation:
Number of Consumers for this topic
>Number of Partitions in this topic
Some of the consumers will not be assigned any partitions.
Number of Consumers for this topic
<Number of Partitions in this topic
Some of the consumers will be assigned with many partitions.
How Kafka Assign Consumers?
There are two modes to assign consumers to partitions, one is
RangeAssignor
and the other is RoundRobinAssignor
.Range Assignor
For example, suppose there are two consumersÂC0
 andÂC1
, two topicsÂt0
 andÂt1
, and each topic has 3 partitions, resulting in partitionsÂt0p0
,Ât0p1
,Ât0p2
,Ât1p0
,Ât1p1
, andÂt1p2
.The assignment will be:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
Round Robin Assingor
For example, suppose there are two consumersÂC0
 andÂC1
, two topicsÂt0
 andÂt1
, and each topic has 3 partitions, resulting in partitionsÂt0p0
,Ât0p1
,Ât0p2
,Ât1p0
,Ât1p1
, andÂt1p2
.The assignment will be:
C0: [t0p0, t0p2, t1p1]
C1: [t0p1, t1p0, t1p2]
Â
Static Membership
For both methods, before assigning, Kafka will sort the consumers with
memberId
and then assign from the start.@Override public int compareTo(MemberInfo otherMemberInfo) { if (this.groupInstanceId.isPresent() && otherMemberInfo.groupInstanceId.isPresent()) { return this.groupInstanceId.get() .compareTo(otherMemberInfo.groupInstanceId.get()); } else if (this.groupInstanceId.isPresent()) { return -1; } else if (otherMemberInfo.groupInstanceId.isPresent()) { return 1; } else { return this.memberId.compareTo(otherMemberInfo.memberId); } }
Since this issue: KIP-345 . Kafka has been using instance id as the first sorting key. This is to prevent too many switchings of partition assignments between consumers.
However, if
group.instance.id
was not specified for spring-Kafka, it will not use static membership to connect to Kafka. Thus the sorting key is memberId
, which is issued based on the first-come-first-serve rule.If We Want to Scale-Out Servers?
Because the consumers were directly committing changes to a SQL database, which cannot handle very high update throughput, we need to control the speed of consumption. The ideal situation would be:
Total number of consumers
== Topic partition number
.What’s wrong with multiple @KafkaListener
Inside the code, we set the
concurrency
option for every @KafkaListener
. This creates a concurrency count of consumer threads and joins the Consumer group on Kafka. So if we scale out the server, concurrency * count of the server
of threads will be created. There’s no easy way of setting: Server A
will use 2 threads to listen totopic A
,Server B
will use 0 threads to listen toTopic Foo
, andServer B
will spin up 2 threads to listen toTopic Foo
ifServer A
is down.
Server A
should only use10
threads to listen to onlyTopic Foo
andTopic Bar
.Server B
should only use10
threads to listen to onlyTopic FooBar
andTopic Bar
. Such fine-grained control is not easily configurable with Spring-Kafka because the consumer container will not stop if it wasn’t assigned to a partition.
These two difficulties leads to we can’t match the concurrency of every
@KafkaListener
exactly with the partition (or, desired concurrency).Controlling the Concurrency on Topic
To control the consumption speed of events (Max qps on the topic), we have two options:
- Control the number of consumers. (partition count > consumer count)
- Control the partition count. (consumer count > partition count)
Pros and Cons
So for option one, we have the following pros and cons:
- Pros: The load is balanced between all servers.
- Cons: Some partitions will have slower consumption speed and messages will lag longer.
Â
And for option two the pros and cons are:
- Pros: all partitions are balanced.
- Cons: wasted resources and partitions will be assigned on servers that spin up first.
Because the assignment is based on the first-come-first-serve rule, when the concurrency of the consumer is more significant than the partition count, those consumers who connect late will not get assigned with any partition. Thus, the server that started up at last will be wholly idle and server start up in the front will have partitions assigned for all its consumer threads (Higher Load).
Solution
Letting messages to lag longer is not what we desired so option 2 is more reasonable. How to mitigate the server load not balanced issue?
We need two things:
- Establish the proportion between different topics (
Traffic on topic 1
:Traffic on topic 2
= 1 : 2 →Topic 1 concurrency
:Topic 2 concurrency
= 1 : 2) and create corresponding partition counts on topics.
- Confirm the max capacity of a single server by doing a load test (Make sure it won’t break under 100% load on all topics) and scale the server till every topic has enough consumers (> partition count). ( So the proportion of consumer count is kept between different topics )
And we can ignore the wasted resources on those idle threads since our main target is that the server will not be overloaded and messages are all consumed ASAP.
But one thing to mention is that the lowest concurrency count is
1
, so if all concurrency was set at 1
, and the application still slows down on the load test. That indicates there were too many @KafkaListener
in the application; some refactoring is needed to split those Kafka listeners.Conclusion
In this case, we try to balance between load tilting and resource wasting. There is no perfect solution inside the production system, always some compromises and trade offs present.
Â