Hi all,
I've noticed a couple edge cases in the Sticky Partitioner and I'd like
to discuss introducing a new KIP to fix it.
Behavior
1. Low throughput producers
The first edge case occurs when a broker becomes temporarily unavailable
for a period less then replica.lag.time.max.ms. If you have a low
throughput producer generating records without a key and using a small
value of linger.ms you will quickly hit the
max.in.flight.requests.per.connection limit for that broker or another
broker which depends on the unavailable broker to achieve acks=all.
At this point, all records will be redirected to whichever broker hits
max.in.flight.requests.per.connection first and if the producer has low
enough throughput compared to batch.size this will result in no records
being sent to any broker until the failing broker becomes available
again. Effectively this transforms a short broker failure into a cluster
failure. Ideally, we'd rather see all records redirected away from these
brokers rather then too them. 2. Overwhelmed brokers The second edge
case occurs when an individual broker begins under performing and cannot
keep up with the producers. Once the broker hits
max.in.flight.requests.per.connection the producer will begin to
redirecting all records without keys to the broker. This results in a
disproportionate percentage of the cluster load going to the failing
broker and begins a death spiral in which the broker becomes more and
more overwhelmed resulting in the producers redirecting more and more of
the clusters load towards it.Proposed Changes We need a solution which
fixes the interaction between the back pressure mechanism
max.in.flight.requests.per.connection and the sticky partitioner.
My current thought is we should remove partitions associated with
brokers which have hit max.in.flight.requests.per.connection from the
available choices for the sticky partitioners. Once they are below
max.in.flight.requests.per.connection they'd then be added back into the
available partition list.
My one concern is that this could cause further edge case behavior for
producers with small values of linger.ms. In particular I could see a
scenario in which the producer hits
max.in.flight.requests.per.connection for all brokers and then blocks on
send() until a request returns rather then building up a new batch. It's
possible (I'd need to investigate the send loop further) the producer
could create a new batch as soon as a request arrives, add a single
record to it and immediately send it then block on send() again. This
would result in the producer doing near to no batching and limiting it's
throughput drastically.
If this is the case, I figure we can allow the sticky partitioner to use
all partitions if all brokers are at
max.in.flight.requests.per.connection. In such a case it would add
records to a single partition until a request completed or it hit
batch.size and then picked a new partition at random.
Feedback
Before writing a KIP I'd love to hear peoples feedback, alternatives and
concerns.
Regards,
Evelyn.