Onur Karaman created KAFKA-4753:
-----------------------------------

             Summary: KafkaConsumer susceptible to FetchResponse starvation
                 Key: KAFKA-4753
                 URL: https://issues.apache.org/jira/browse/KAFKA-4753
             Project: Kafka
          Issue Type: Bug
            Reporter: Onur Karaman
            Assignee: Onur Karaman


FetchResponse starvation here means that the KafkaConsumer repeatedly fails to 
fully form FetchResponses within the request timeout from a subset of the 
brokers its fetching from while FetchResponses from the other brokers can get 
fully formed and processed by the application.

In other words, this ticket is concerned with scenarios where fetching from 
some brokers hurts the progress of fetching from other brokers to the point of 
repeatedly hitting a request timeout.

Some FetchResponse starvation scenarios:
1. partition leadership of the consumer's assigned partitions is skewed across 
brokers, causing uneven FetchResponse sizes across brokers.
2. the consumer seeks back on partitions on some brokers but not others, 
causing uneven FetchResponse sizes across brokers.
3. the consumer's ability to keep up with various partitions across brokers is 
skewed, causing uneven FetchResponse sizes across brokers.

I've personally seen scenario 1 happen this past week to one of our users in 
prod. They manually assigned partitions such that a few brokers led most of the 
partitions while other brokers only led a single partition. When NetworkClient 
sends out FetchRequests to different brokers in parallel with an uneven 
partition distribution, FetchResponses from brokers who lead more partitions 
will contain more data than FetchResponses from brokers who lead few 
partitions. This means the small FetchResponses will get fully formed quicker 
than larger FetchResponses. When the application eventually consumes a smaller 
fully formed FetchResponses, the NetworkClient will send out a new FetchRequest 
to the lightly-loaded broker. Their response will again come back quickly while 
only marginal progress has been made on the larger FetchResponse. Repeat this 
process several times and your application will have potentially processed many 
smaller FetchResponses while the larger FetchResponse made minimal progress and 
is forced to timeout, causing the large FetchResponse to start all over again, 
which causes starvation.

To mitigate the problem for the short term, I've suggested to our user that 
they either:
1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB to 
something like 1 MB. This is the solution I short-term solution I suggested 
they go with.
2. reduce the "max.partition.fetch.bytes" down from the current default of 1 MB 
to something like 100 KB. This solution wasn't advised as it could impact 
broker performance.
3. ask our SRE's to run a partition reassignment to balance out the partition 
leadership (partitions were already being led by their preferred leaders).
4. bump up their request timeout. It was set to open-source's former default of 
40 seconds.

Contributing factors:
1. uneven FetchResponse sizes across brokers.
2. processing time of the polled ConsumerRecords.
3. "max.poll.records" increases the number of polls needed to consume a 
FetchResponse, making constant-time overhead per poll magnified.
4. "max.poll.records" makes KafkaConsumer.poll bypass calls to 
ConsumerNetworkClient.poll.
5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and 
ConsumerNetworkClient.poll can return before the poll timeout as soon as a 
single channel is selected.
6. NetworkClient.poll is solely driven by the user thread with manual partition 
assignment.

So far I've only locally reproduced starvation scenario 1 and haven't even 
attempted the other scenarios. Preventing the bypass of 
ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but it 
seems starvation would still be possible.

How to reproduce starvation scenario 1:
1. startup zookeeper
2. startup two brokers
3. create a topic t0 with two partitions led by broker 0 and create a topic t1 
with a single partition led by broker 1
{code}
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 
> --replica-assignment 0,0
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 
> --replica-assignment 1
{code}
4. Produce a lot of data into these topics
{code}
> ./bin/kafka-producer-perf-test.sh --topic t0 --num-records 20000000 
> --record-size 100 --throughput 100000 --producer-props 
> bootstrap.servers=localhost:9090,localhost:9091
> ./bin/kafka-producer-perf-test.sh --topic t1 --num-records 10000000 
> --record-size 100 --throughput 100000 --producer-props 
> bootstrap.servers=localhost:9090,localhost:9091
{code}
5. startup a consumer that consumes these 3 partitions with TRACE level 
NetworkClient logging
{code}
> ./bin/kafka-run-class.sh 
> org.apache.kafka.clients.consumer.StarvedFetchResponseTest 10000 3000 65536
{code}

The config/tools-log4j.properties:
{code}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=WARN, stderr

log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stderr.Target=System.err

log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE, stderr
log4j.additivity.org.apache.kafka.clients.NetworkClient=false
{code}

The consumer code:
{code}
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
 * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
 * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
 * License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class StarvedFetchResponseTest {
    public static void main(String[] args) throws InterruptedException {
        long pollTimeout = Long.valueOf(args[0]);
        long sleepDuration = Long.valueOf(args[1]);
        String receiveBufferSize = args[2];
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9090,localhost:9091");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"fetch-response-starvation");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
        props.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.setProperty(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
receiveBufferSize);
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
KafkaConsumer<>(props);
        List<TopicPartition> partitions = new ArrayList<>();
        for (int i = 0; i < 2; i++) {
            partitions.add(new TopicPartition("t0", i));
        }
        partitions.add(new TopicPartition("t1", 0));
        kafkaConsumer.assign(partitions);
        kafkaConsumer.seekToBeginning(partitions);
        while (true) {
            ConsumerRecords<byte[], byte[]> records = 
kafkaConsumer.poll(pollTimeout);
            System.out.println(recordsPerTopic(records));
            Thread.sleep(sleepDuration);
        }
    }

    private static Map<TopicPartition, Integer> 
recordsPerTopic(ConsumerRecords<byte[], byte[]> records) {
        Map<TopicPartition, Integer> result = new HashMap<>();
        Set<TopicPartition> partitions = records.partitions();
        for (TopicPartition partition : partitions) {
            if (!result.containsKey(partition)) {
                result.put(partition, 0);
            }
            result.put(partition, result.get(partition) + 
records.records(partition).size());
        }
        return result;
    }
}
{code}

After running it for 30 minutes, around 33 FetchResponses from broker 1 were 
served to the application while the many partially formed FetchResponses from 
broker 0 were cancelled due to a disconnect from request timeout. It seems that 
were was only one successful FetchResponse from broker 0 served to the 
application during this time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to