vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1207962014
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ########## @@ -695,9 +705,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - + Set<String> deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { - long pos = consumer.position(tp); + if (deletedTopics.contains(tp.topic())) { + log.debug("Not assigning offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); + continue; + } + long pos; + try { + pos = consumer.position(tp); + } catch (TimeoutException e) { + log.error("TimeoutException occurred when fetching position for topic partition {}. " + + "Checking if the topic {} exists", tp, tp.topic()); + Map<String, TopicDescription> topic = topicAdmin.describeTopics(tp.topic()); Review Comment: > This adds new ACL requirements for sink connectors' admin clients, That's a good point and something I missed. > Probably safest to assume that the topic exists if we fail to describe it. Yeah.. In the case when let's say `TopicAuthorizationException` is thrown, it again becomes difficult to say if the topic exists or not. The `TopicAdmin#describeTopics` throws a `ConnectException` in this case which could be a false positive. And since this PR throws any exceptions from `TopicAdmin#describeTopics` the connector might now fail with ConnectException, albeit a different one. So, as you said might be safe to assume that topic exists then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org