Luke Stephenson created KAFKA-8900:
--------------------------------------

             Summary: Stalled partitions
                 Key: KAFKA-8900
                 URL: https://issues.apache.org/jira/browse/KAFKA-8900
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 2.1.1
            Reporter: Luke Stephenson


I'm seeing behaviour where a Scala KafkaConsumer has stalled for 1 partition 
for a topic.  All other partitions for that topic are successfully being 
consumed.

Restarting the consumer process does not resolve the issue.  The consumer is 
using version 2.3.0 ("org.apache.kafka" % "kafka-clients" % "2.3.0").

When the consumer starts, I see that it is assigned the partition.  However it 
then logs:
{code}
[Consumer 
clientId=kafka-bus-router-64c88855cf-hxck7.event-bus-router-consumer.1d1ed7ee-5038-4441-84eb-8080ac130e9a,
 groupId=event-bus-router] Setting offset for partition maxwell.transactions-22 
to the committed offset FetchPosition{offset=275413397, 
offsetEpoch=Optional[271], currentLeader=LeaderAndEpoch{leader=:-1 (id: -1 
rack: null), epoch=271}}
{code}

Note that the leader is logged as "-1".  If I search through my application 
logs for the past couple of days, the only time I ever see this logged on the 
consumer is for this partition.

The kafka broker is running version 2.1.1.  On the broker side the logs show:
{code}
{"timeMillis":1568087844876,"thread":"kafka-request-handler-1","level":"WARN","loggerName":"state.change.logger","message":"[Broker
 id=5] Ignoring LeaderAndIsr request from controller 4 with correlation id 
15943 epoch 155 for partition maxwell.transactions-22 since its associated 
leader epoch 270 is not higher than the current leader epoch 
270","endOfBatch":false,"loggerFqcn":"org.slf4j.impl.Log4jLoggerAdapter","threadId":72,"threadPriority":5}
{"timeMillis":1568087844880,"thread":"kafka-request-handler-1","level":"INFO","loggerName":"kafka.server.ReplicaFetcherManager","message":"[ReplicaFetcherManager
 on broker 5] Removed fetcher for partitions 
Set(maxwell.transactions-22)","endOfBatch":false,"loggerFqcn":"org.slf4j.impl.Log4jLoggerAdapter","threadId":72,"threadPriority":5}
{"timeMillis":1568087844880,"thread":"kafka-request-handler-1","level":"INFO","loggerName":"kafka.cluster.Partition","message":"[Partition
 maxwell.transactions-22 broker=5] maxwell.transactions-22 starts at Leader 
Epoch 271 from offset 275403423. Previous Leader Epoch was: 
270","endOfBatch":false,"loggerFqcn":"org.slf4j.impl.Log4jLoggerAdapter","threadId":72,"threadPriority":5}
{"timeMillis":1568087844891,"thread":"kafka-request-handler-1","level":"INFO","loggerName":"state.change.logger","message":"[Broker
 id=5] Skipped the become-leader state change after marking its partition as 
leader with correlation id 15945 from controller 4 epoch 155 for partition 
maxwell.transactions-22 (last update controller epoch 155) since it is already 
the leader for the 
partition.","endOfBatch":false,"loggerFqcn":"org.slf4j.impl.Log4jLoggerAdapter","threadId":72,"threadPriority":5}
{code}

As soon as I restart the broker which is the leader for that partition, the 
messages flow through to the consumer.

Given restarts of the consumer don't help, but restarting the broker allows the 
stalled partition to resume, I'm inclined to think this is an issue with the 
broker.  Please let me know if I can assist further with investigating or 
resolving this.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to