[
https://issues.apache.org/jira/browse/KAFKA-14520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Waleed Fateem resolved KAFKA-14520.
-----------------------------------
Resolution: Duplicate
> TimeoutException Raised by KafkaConsumer Leads to: User provided listener
> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on
> invocation of onPartitionsAssigned
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-14520
> URL: https://issues.apache.org/jira/browse/KAFKA-14520
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 3.2.1
> Reporter: Waleed Fateem
> Priority: Minor
>
> I'm on the fence on whether or not this should actually be considered a bug,
> but decided to open it as such from the perspective of a sink developer. Even
> though there's a sign of a potential issue on the Kafka broker's side, we're
> dependent on Kafka Connect to provide a level of robustness so we don't have
> to manually intervene to restart the connector.
> We don't have access to the Kafka broker cluster, so we don't know what the
> underlying issue might be that caused the following error during a rebalance:
> {code:java}
> Nov 21, 2022 @
> 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of
> 60000ms expired before the position for partition topic-partition-2 could be
> determined {code}
> That leads to the following problem:
> {code:java}
> Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer
> clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User
> provided listener
> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on
> invocation of onPartitionsAssigned for partitions [<list of partitions>]
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [task-thread-the-sink-1]
> {code}
> The KafkaConsumer's position() method invoked in the WorkerSinkTask's
> HandleRebalance
> [code|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697]
> causing that TimeoutException is:
> {code:java}
> private class HandleRebalance implements ConsumerRebalanceListener {
> @Override
> public void onPartitionsAssigned(Collection<TopicPartition>
> partitions){
> log.debug("{} Partitions assigned {}", WorkerSinkTask.this,
> partitions);
> for (TopicPartition tp : partitions) { long pos =
> consumer.position(tp); lastCommittedOffsets.put(tp, new
> OffsetAndMetadata(pos)); currentOffsets.put(tp, new
> OffsetAndMetadata(pos)); log.debug("{} Assigned topic
> partition {} with offset {}", WorkerSinkTask.this, tp, pos);
> }{code}
> Which is then considered an unrecoverable error
> [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]:
> {code:java}
> Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR
> WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except
> ion. Task is being killed and will not recover until manually restarted
> (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code}
> Do we expect that TimeoutException to cause the task to be killed, or should
> have this been handled ideally somehow in the WorkerSinkTask's
> HandleRebalance code?
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)