C0urante commented on code in PR #14966:
URL: https://github.com/apache/kafka/pull/14966#discussion_r1423211506


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -324,127 +363,151 @@ public void 
testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws
 
     private void alterAndVerifySinkConnectorOffsets(Map<String, String> 
connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
         int numPartitions = 3;
-        kafkaCluster.createTopic(TOPIC, numPartitions);
+        kafkaCluster.createTopic(topic, numPartitions);
 
         // Produce records to each partition
         for (int partition = 0; partition < numPartitions; partition++) {
             for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) 
{
-                kafkaCluster.produce(TOPIC, partition, "key", "value");
+                kafkaCluster.produce(topic, partition, "key", "value");
             }
         }
         // Create sink connector
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+        connect.configureConnector(connectorName, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
 
-        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions, NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSinkConnectorOffsets(connectorName, topic, 
numPartitions, NUM_RECORDS_PER_PARTITION,
                 "Sink connector consumer group offsets should catch up to the 
topic end offsets");
 
-        connect.stopConnector(CONNECTOR_NAME);
+        connect.stopConnector(connectorName);
         connect.assertions().assertConnectorIsStopped(
-                CONNECTOR_NAME,
+                connectorName,
                 "Connector did not stop in time"
         );
 
         // Delete the offset of one partition; alter the offsets of the others
         List<ConnectorOffset> offsetsToAlter = new ArrayList<>();
         Map<String, Object> partition = new HashMap<>();
-        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
         partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
         offsetsToAlter.add(new ConnectorOffset(partition, null));
 
         for (int i = 1; i < numPartitions; i++) {
             partition = new HashMap<>();
-            partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+            partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
             partition.put(SinkUtils.KAFKA_PARTITION_KEY, i);
             offsetsToAlter.add(new ConnectorOffset(partition, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 5)));
         }
 
-        String response = connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+        AtomicReference<String> responseReference = new AtomicReference<>();
+        // Some retry logic is necessary to account for KAFKA-15826,
+        // where laggy sink task startup/shutdown can leave consumers running
+        waitForCondition(
+                () -> {
+                    try {
+                        
responseReference.set(connect.alterConnectorOffsets(connectorName, new 
ConnectorOffsets(offsetsToAlter)));
+                        return true;
+                    } catch (ConnectRestException e) {
+                        boolean internalServerError = e.statusCode() == 
INTERNAL_SERVER_ERROR.getStatusCode();
+
+                        String message = 
Optional.of(e.getMessage()).orElse("");
+                        boolean failedToResetConsumerOffsets = 
message.contains("Failed to reset consumer group offsets for connector");
+                        boolean canBeRetried = message.contains("If the 
connector is in a stopped state, this operation can be safely retried");
+
+                        boolean retriable = internalServerError && 
failedToResetConsumerOffsets && canBeRetried;
+                        if (retriable) {
+                            return false;
+                        } else {
+                            throw new NoRetryException(e);
+                        }
+                    } catch (Throwable t) {
+                        throw new NoRetryException(t);
+                    }
+                },
+                30_000,
+                "Failed to reset sink connector offsets in time"
+        );
+        String response = responseReference.get();

Review Comment:
   Sure, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to