lucasbru commented on code in PR #20284:
URL: https://github.com/apache/kafka/pull/20284#discussion_r2247861071


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() {
     private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = 
new KafkaFutureImpl<>();
     private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new 
KafkaFutureImpl<>();
 
+    // Missing source topic timeout tracking
+    private long firstMissingSourceTopicTime = -1L;

Review Comment:
   Maybe it would make things slighly more easy to read if we'd use 
   `org.apache.kafka.common.utils.Timer` for this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1534,15 +1538,28 @@ private ConsumerRecords<byte[], byte[]> 
pollRequests(final Duration pollTime) {
 
     public void handleStreamsRebalanceData() {
         if (streamsRebalanceData.isPresent()) {
+            boolean hasMissingSourceTopics = false;
+            String missingTopicsDetail = null;
+            
             for (final StreamsGroupHeartbeatResponseData.Status status : 
streamsRebalanceData.get().statuses()) {
                 if (status.statusCode() == 
StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
                     shutdownErrorHook.run();
                 } else if (status.statusCode() == 
StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) {
-                    final String errorMsg = String.format("Missing source 
topics: %s", status.statusDetail());
+                    hasMissingSourceTopics = true;
+                    missingTopicsDetail = status.statusDetail();
+                } else if (status.statusCode() == 
StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code()) {
+                    final String errorMsg = status.statusDetail();
                     log.error(errorMsg);
-                    throw new MissingSourceTopicException(errorMsg);
+                    throw new TopologyException(errorMsg);
                 }
             }
+            
+            if (hasMissingSourceTopics) {
+                handleMissingSourceTopicsWithTimeout(missingTopicsDetail);
+            } else {
+                // Reset timeout tracking when no missing source topics are 
reported
+                firstMissingSourceTopicTime = -1L;

Review Comment:
   I think if you use org.apache.kafka.common.utils.Timer and call reset here, 
you don't need the inline comment.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() {
     private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = 
new KafkaFutureImpl<>();
     private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new 
KafkaFutureImpl<>();
 
+    // Missing source topic timeout tracking
+    private long firstMissingSourceTopicTime = -1L;

Review Comment:
   Also, can we rename this to a more generic `topicsReadyTimer`? I think we 
may want to reuse the timer to also time out when internal topics are not 
created in time. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1561,6 +1578,33 @@ public void handleStreamsRebalanceData() {
         }
     }
 
+    private void handleMissingSourceTopicsWithTimeout(final String 
missingTopicsDetail) {

Review Comment:
   Yes, using `org.apache.kafka.common.utils.Timer` should simplify this.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() {
     private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = 
new KafkaFutureImpl<>();
     private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new 
KafkaFutureImpl<>();
 
+    // Missing source topic timeout tracking

Review Comment:
   If you describe a member, I'd use a javadoc comment. But this comment isn't 
adding anything on top of the variable name, so maybe we can drop it altogether?



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java:
##########
@@ -40,46 +67,21 @@
 import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Repartitioned;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 import org.apache.kafka.test.TestUtils;
-
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.io.File;

Review Comment:
   Can you please revert the import reordering?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to