cadonna commented on code in PR #17721:
URL: https://github.com/apache/kafka/pull/17721#discussion_r1863242293


##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########
@@ -30,11 +30,11 @@
   // - FENCED_MEMBER_EPOCH (version 0+)
   // - UNRELEASED_INSTANCE_ID (version 0+)
   // - GROUP_MAX_SIZE_REACHED (version 0+)
-  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+) 
   // - CLUSTER_AUTHORIZATION_FAILED (version 0+)
   // - STREAMS_INVALID_TOPOLOGY (version 0+)
-  // - STREAMS_MISSING_SOURCE_TOPICS (version 0+)
-  // - STREAMS_INCONSISTENT_INTERNAL_TOPICS (version 0+)
+  // - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+)
+  // - STREAMS_TOPOLOGY_FENCED (version 0+)
   "fields": [
     // Same as consumer group heart beat

Review Comment:
   That comment lies since `AcceptableRecoveryLag` and `TaskOffsetIntervalMs` 
are not part of the consumer group heartbeat.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java:
##########
@@ -73,32 +77,39 @@
 @Timeout(600)
 @Tag("integration")
 public class InternalTopicIntegrationTest {
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    public static EmbeddedKafkaCluster cluster = null;
 
     @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
-        CLUSTER.start();
-        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
+
+        final Properties props = new Properties();
+        
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,streams");
+        props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
"true");
+
+        cluster = new EmbeddedKafkaCluster(1, props);

Review Comment:
   What is the difference between creating the embedded cluster here or on line 
81?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2367,6 +2391,14 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
             group.setMetadataRefreshDeadline(currentTimeMs + 
streamsGroupMetadataRefreshIntervalMs, groupEpoch);
         }
 
+        // If we updated the groupEpoch, we may need to reconfigure the 
topology
+        ConfiguredTopology configuredTopology = group.configuredTopology();
+        if (bumpGroupEpoch) {
+            log.info("[GroupId {}] Configuring the topology {}", groupId, 
topology);
+            configuredTopology =
+                InternalTopicManager.configureTopics(logContext, topology, 
partitionMetadata);
+        }
+
         // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member

Review Comment:
   The number doesn't fit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to