Waleed Fateem created KAFKA-14520:
-------------------------------------

             Summary: TimeoutException Raised by KafkaConsumer Leads to: User 
provided listener 
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on 
invocation of onPartitionsAssigned
                 Key: KAFKA-14520
                 URL: https://issues.apache.org/jira/browse/KAFKA-14520
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 3.2.1
            Reporter: Waleed Fateem


I'm on the fence on whether or not this should actually be considered a bug, 
but decided to open it as such from the perspective of a sink developer. Even 
though there's a sign of a potential issue on the Kafka broker's side, we're 
dependent on Kafka Connect to provide a level of robustness so we don't have to 
manually intervene to restart the connector.

We don't have access to the Kafka broker cluster, so we don't know what the 
underlying issue might be that caused the following error during a rebalance:
{code:java}
Nov 21, 2022 @ 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: 
Timeout of 60000ms expired before the position for partition topic-partition-2 
could be determined {code}
That leads to the following problem:

 

 
{code:java}
Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer 
clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User provided 
listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed 
on invocation of onPartitionsAssigned for partitions [<list of partitions>] 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 
[task-thread-the-sink-1] 

{code}
The KafkaConsumer's position() method invoked in the WorkerSinkTask's 
HandleRebalance code causing that TimeoutException is 
[here|[https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697]:]]:

 
{code:java}
    private class HandleRebalance implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions){
            log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
            for (TopicPartition tp : partitions) {                long pos = 
consumer.position(tp);                lastCommittedOffsets.put(tp, new 
OffsetAndMetadata(pos));                currentOffsets.put(tp, new 
OffsetAndMetadata(pos));                log.debug("{} Assigned topic partition 
{} with offset {}", WorkerSinkTask.this, tp, pos);            }{code}
Which is then considered an unrecoverable error 
[here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]:
{code:java}
Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR 
WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except 
ion. Task is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code}
Do we expect that TimeoutException to cause the task to be killed, or should 
have this been handled ideally somehow in the WorkerSinkTask's HandleRebalance 
code?

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to