yashmayya commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1169647018


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -145,6 +154,10 @@ public WorkerSinkTask(ConnectorTaskId id,
         this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
         this.taskStopped = false;
         this.workerErrantRecordReporter = workerErrantRecordReporter;
+        Map<String, Object> adminProps = new HashMap<>();
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
workerConfig.bootstrapServers());

Review Comment:
   Connect supports overriding Kafka client configs per connector - see 
[KIP-458](https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy).
 If we're using an admin client for a specific connector (or task), it should 
respect these overrides - the connector may even be targeting a Kafka cluster 
different from the Connect cluster's backing Kafka cluster and we cannot use 
worker configurations to create Kafka clients for connectors. It would probably 
be cleaner to instantiate this admin client in the `Worker` and pass it to the 
`WorkerSinkTask` during creation. An example from `WorkerSourceTask` creation 
that might help - 
https://github.com/apache/kafka/blob/454b72161a76b1687a1263157d7cc30a1bdb2506/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1424-L1446



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {
+                    log.debug("Not Committing offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());
+                    continue;
+                }
+                long pos;
+                try {
+                    pos = consumer.position(tp);
+                } catch (TimeoutException e) {

Review Comment:
   This seems kinda weird to me too - why would the consumer time out on trying 
to fetch the position for a topic partition that doesn't exist anymore? Is this 
expected / documented or just a poorly handled case in the consumer client?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {
+                    log.debug("Not Committing offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());

Review Comment:
   We don't commit offsets in `onPartitionsAssigned` so I'm not sure what the 
purpose of this log is?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {
+                    log.debug("Not Committing offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());
+                    continue;
+                }
+                long pos;
+                try {
+                    pos = consumer.position(tp);
+                } catch (TimeoutException e) {
+                    log.error("TimeoutException occurred when fetching 
position for topic partition {}. " +
+                            "Checking if the topic {} exists", tp, tp.topic());
+                    Map<String, TopicDescription> topic = 
topicAdmin.describeTopics(tp.topic());
+                    if (topic.isEmpty()) {
+                        log.debug("Not Committing offsets for topic-partition 
{} since the topic {} has been deleted", tp, tp.topic());

Review Comment:
   Same thoughts as above regarding the purpose of this log line.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {

Review Comment:
   I'm not sure I understand why the `onPartitionsAssigned` rebalance listener 
hook will be called for partitions of topics that are deleted? Is this expected 
behavior or a bug in the consumer client? If it's expected, is there any 
documentation around this? If it's a bug, I think it might be better to address 
the bug in the consumer client rather than introducing a workaround for it in 
Connect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to