lucasbru commented on code in PR #20284: URL: https://github.com/apache/kafka/pull/20284#discussion_r2247861071
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() { private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>(); private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new KafkaFutureImpl<>(); + // Missing source topic timeout tracking + private long firstMissingSourceTopicTime = -1L; Review Comment: Maybe it would make things slighly more easy to read if we'd use `org.apache.kafka.common.utils.Timer` for this? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1534,15 +1538,28 @@ private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) { public void handleStreamsRebalanceData() { if (streamsRebalanceData.isPresent()) { + boolean hasMissingSourceTopics = false; + String missingTopicsDetail = null; + for (final StreamsGroupHeartbeatResponseData.Status status : streamsRebalanceData.get().statuses()) { if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) { shutdownErrorHook.run(); } else if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) { - final String errorMsg = String.format("Missing source topics: %s", status.statusDetail()); + hasMissingSourceTopics = true; + missingTopicsDetail = status.statusDetail(); + } else if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code()) { + final String errorMsg = status.statusDetail(); log.error(errorMsg); - throw new MissingSourceTopicException(errorMsg); + throw new TopologyException(errorMsg); } } + + if (hasMissingSourceTopics) { + handleMissingSourceTopicsWithTimeout(missingTopicsDetail); + } else { + // Reset timeout tracking when no missing source topics are reported + firstMissingSourceTopicTime = -1L; Review Comment: I think if you use org.apache.kafka.common.utils.Timer and call reset here, you don't need the inline comment. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() { private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>(); private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new KafkaFutureImpl<>(); + // Missing source topic timeout tracking + private long firstMissingSourceTopicTime = -1L; Review Comment: Also, can we rename this to a more generic `topicsReadyTimer`? I think we may want to reuse the timer to also time out when internal topics are not created in time. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1561,6 +1578,33 @@ public void handleStreamsRebalanceData() { } } + private void handleMissingSourceTopicsWithTimeout(final String missingTopicsDetail) { Review Comment: Yes, using `org.apache.kafka.common.utils.Timer` should simplify this. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() { private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>(); private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new KafkaFutureImpl<>(); + // Missing source topic timeout tracking Review Comment: If you describe a member, I'd use a javadoc comment. But this comment isn't adding anything on top of the variable name, so maybe we can drop it altogether? ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java: ########## @@ -40,46 +67,21 @@ import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Repartitioned; import org.apache.kafka.streams.processor.StreamPartitioner; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import org.apache.kafka.test.TestUtils; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.io.File; Review Comment: Can you please revert the import reordering? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org