cadonna commented on code in PR #17721: URL: https://github.com/apache/kafka/pull/17721#discussion_r1863242293
########## clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json: ########## @@ -30,11 +30,11 @@ // - FENCED_MEMBER_EPOCH (version 0+) // - UNRELEASED_INSTANCE_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) - // - TOPIC_AUTHORIZATION_FAILED (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - CLUSTER_AUTHORIZATION_FAILED (version 0+) // - STREAMS_INVALID_TOPOLOGY (version 0+) - // - STREAMS_MISSING_SOURCE_TOPICS (version 0+) - // - STREAMS_INCONSISTENT_INTERNAL_TOPICS (version 0+) + // - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+) + // - STREAMS_TOPOLOGY_FENCED (version 0+) "fields": [ // Same as consumer group heart beat Review Comment: That comment lies since `AcceptableRecoveryLag` and `TaskOffsetIntervalMs` are not part of the consumer group heartbeat. ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java: ########## @@ -73,32 +77,39 @@ @Timeout(600) @Tag("integration") public class InternalTopicIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + public static EmbeddedKafkaCluster cluster = null; @BeforeAll public static void startCluster() throws IOException, InterruptedException { - CLUSTER.start(); - CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC); + + final Properties props = new Properties(); + props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams"); + props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"); + + cluster = new EmbeddedKafkaCluster(1, props); Review Comment: What is the difference between creating the embedded cluster here or on line 81? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2367,6 +2391,14 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream group.setMetadataRefreshDeadline(currentTimeMs + streamsGroupMetadataRefreshIntervalMs, groupEpoch); } + // If we updated the groupEpoch, we may need to reconfigure the topology + ConfiguredTopology configuredTopology = group.configuredTopology(); + if (bumpGroupEpoch) { + log.info("[GroupId {}] Configuring the topology {}", groupId, topology); + configuredTopology = + InternalTopicManager.configureTopics(logContext, topology, partitionMetadata); + } + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member Review Comment: The number doesn't fit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org