lucasbru commented on code in PR #19802: URL: https://github.com/apache/kafka/pull/19802#discussion_r2121800290
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java: ########## @@ -54,7 +54,7 @@ public void testConstructorWithNullInternalTopicsToBeCreated() { assertThrows(NullPointerException.class, () -> new ConfiguredTopology( 0, - Optional.of(new TreeMap<>()), + 0, Optional.of(new TreeMap<>()), Review Comment: ```suggestion 0, Optional.of(new TreeMap<>()), ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java: ########## @@ -78,7 +78,7 @@ public void testConstructorWithInvalidTopologyEpoch() { assertThrows(IllegalArgumentException.class, () -> new ConfiguredTopology( -1, - Optional.of(new TreeMap<>()), + 0, Optional.of(new TreeMap<>()), Review Comment: ```suggestion 0, Optional.of(new TreeMap<>()), ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16707,13 +16732,18 @@ public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() { TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) .withTargetAssignmentEpoch(10) .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) - .withPartitionMetadata(Map.of( - fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) - )) + .withMetadataHash(groupMetadataHash) ) .build(); + context.groupMetadataManager.getStreamsGroupOrThrow(groupId) Review Comment: Same as above. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -17158,13 +17214,17 @@ public void testStreamsReconciliationProcess() { TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) .withTargetAssignmentEpoch(10) - .withPartitionMetadata(Map.of( - fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) - )) + .withMetadataHash(groupMetadataHash) ) .build(); + context.groupMetadataManager.getStreamsGroupOrThrow(groupId) Review Comment: Same as above. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java: ########## @@ -42,7 +42,7 @@ public void testConstructorWithNullSubtopologies() { assertThrows(NullPointerException.class, () -> new ConfiguredTopology( 0, - null, + 0, null, Review Comment: ```suggestion 0, null, ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -4434,6 +4435,15 @@ public void testUpdateStreamsGroupSizeCounter() { .build())) .build(); + for (int i = 1; i < 4; i++) { Review Comment: It's not so nice to call `InternalTopicManager` from within this test. Could we turn this into explicit `.setConfiguredTopology` calls above? That would seem more idiomatic. EDIT: I see that you do that a couple of times below. I guess we can keep it this way if we don't find an easy way to define those topologies statically. If this will take too much time, we could define a little follow-up ticket to get rid of the `InternalTopicManager` calls in here. Definitely get rid of the loop here. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16526,12 +16539,18 @@ public void testStreamsGroupMemberRequestingShutdownApplication() { TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5))) .withTargetAssignmentEpoch(10) .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) - .withPartitionMetadata(Map.of( - fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6) - )) + .withMetadataHash(groupMetadataHash) ) .build(); + context.groupMetadataManager.getStreamsGroupOrThrow(groupId) Review Comment: Same here. If possible, let's make the ConfiguredTopology explicit here. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java: ########## @@ -90,7 +90,7 @@ public void testNoExceptionButNoSubtopologies() { final IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> new ConfiguredTopology( 1, - Optional.empty(), + 0, Optional.empty(), Review Comment: ```suggestion 0, Optional.empty(), ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -17593,16 +17663,16 @@ public void testStreamsStreamsGroupStates() { context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1) .build())); - context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, groupMetadataHash)); assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId)); - context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, - Map.of( - fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) - ) - )); + context.groupMetadataManager.getStreamsGroupOrThrow(groupId) Review Comment: Same as above. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16807,13 +16848,18 @@ public void testStreamsUpdatingPartitionMetadataTriggersNewTargetAssignment() { TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) .withTargetAssignmentEpoch(10) .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) - .withPartitionMetadata(Map.of( - fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) - )) + .withMetadataHash(oldGroupMetadataHash) ) .build(); + context.groupMetadataManager.getStreamsGroupOrThrow(groupId) Review Comment: Same as above. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java: ########## @@ -66,7 +66,7 @@ public void testConstructorWithNullTopicConfigurationException() { assertThrows(NullPointerException.class, () -> new ConfiguredTopology( 0, - Optional.empty(), + 0, Optional.empty(), Review Comment: ```suggestion 0, Optional.empty(), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org