C0urante commented on code in PR #14966: URL: https://github.com/apache/kafka/pull/14966#discussion_r1423211506
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ########## @@ -324,127 +363,151 @@ public void testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws private void alterAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { int numPartitions = 3; - kafkaCluster.createTopic(TOPIC, numPartitions); + kafkaCluster.createTopic(topic, numPartitions); // Produce records to each partition for (int partition = 0; partition < numPartitions; partition++) { for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { - kafkaCluster.produce(TOPIC, partition, "key", "value"); + kafkaCluster.produce(topic, partition, "key", "value"); } } // Create sink connector - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + connect.configureConnector(connectorName, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks did not start in time."); - verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION, + verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); - connect.stopConnector(CONNECTOR_NAME); + connect.stopConnector(connectorName); connect.assertions().assertConnectorIsStopped( - CONNECTOR_NAME, + connectorName, "Connector did not stop in time" ); // Delete the offset of one partition; alter the offsets of the others List<ConnectorOffset> offsetsToAlter = new ArrayList<>(); Map<String, Object> partition = new HashMap<>(); - partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic); partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); offsetsToAlter.add(new ConnectorOffset(partition, null)); for (int i = 1; i < numPartitions; i++) { partition = new HashMap<>(); - partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic); partition.put(SinkUtils.KAFKA_PARTITION_KEY, i); offsetsToAlter.add(new ConnectorOffset(partition, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 5))); } - String response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + AtomicReference<String> responseReference = new AtomicReference<>(); + // Some retry logic is necessary to account for KAFKA-15826, + // where laggy sink task startup/shutdown can leave consumers running + waitForCondition( + () -> { + try { + responseReference.set(connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter))); + return true; + } catch (ConnectRestException e) { + boolean internalServerError = e.statusCode() == INTERNAL_SERVER_ERROR.getStatusCode(); + + String message = Optional.of(e.getMessage()).orElse(""); + boolean failedToResetConsumerOffsets = message.contains("Failed to reset consumer group offsets for connector"); + boolean canBeRetried = message.contains("If the connector is in a stopped state, this operation can be safely retried"); + + boolean retriable = internalServerError && failedToResetConsumerOffsets && canBeRetried; + if (retriable) { + return false; + } else { + throw new NoRetryException(e); + } + } catch (Throwable t) { + throw new NoRetryException(t); + } + }, + 30_000, + "Failed to reset sink connector offsets in time" + ); + String response = responseReference.get(); Review Comment: Sure, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org