lucasbru commented on code in PR #19802:
URL: https://github.com/apache/kafka/pull/19802#discussion_r2121800290


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -54,7 +54,7 @@ public void 
testConstructorWithNullInternalTopicsToBeCreated() {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredTopology(
                 0,
-                Optional.of(new TreeMap<>()),
+                0, Optional.of(new TreeMap<>()),

Review Comment:
   ```suggestion
                   0, 
                   Optional.of(new TreeMap<>()),
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -78,7 +78,7 @@ public void testConstructorWithInvalidTopologyEpoch() {
         assertThrows(IllegalArgumentException.class,
             () -> new ConfiguredTopology(
                 -1,
-                Optional.of(new TreeMap<>()),
+                0, Optional.of(new TreeMap<>()),

Review Comment:
   ```suggestion
                   0, 
                   Optional.of(new TreeMap<>()),
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16707,13 +16732,18 @@ public void 
testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)))
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
-                .withPartitionMetadata(Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                    barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-                ))
+                .withMetadataHash(groupMetadataHash)
             )
             .build();
 
+        context.groupMetadataManager.getStreamsGroupOrThrow(groupId)

Review Comment:
   Same as above.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -17158,13 +17214,17 @@ public void testStreamsReconciliationProcess() {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5),
                     TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
                 .withTargetAssignmentEpoch(10)
-                .withPartitionMetadata(Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                    barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-                ))
+                .withMetadataHash(groupMetadataHash)
             )
             .build();
 
+        context.groupMetadataManager.getStreamsGroupOrThrow(groupId)

Review Comment:
   Same as above.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -42,7 +42,7 @@ public void testConstructorWithNullSubtopologies() {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredTopology(
                 0,
-                null,
+                0, null,

Review Comment:
   ```suggestion
                   0, 
                   null,
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -4434,6 +4435,15 @@ public void testUpdateStreamsGroupSizeCounter() {
                     .build()))
             .build();
 
+        for (int i = 1; i < 4; i++) {

Review Comment:
   It's not so nice to call `InternalTopicManager` from within this test. Could 
we turn this into explicit `.setConfiguredTopology` calls above? That would 
seem more idiomatic.
   
   EDIT: I see that you do that a couple of times below. I guess we can keep it 
this way if we don't find an easy way to define those topologies statically. If 
this will take too much time, we could define a little follow-up ticket to get 
rid of the `InternalTopicManager` calls in here. Definitely get rid of the loop 
here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16526,12 +16539,18 @@ public void 
testStreamsGroupMemberRequestingShutdownApplication() {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
-                .withPartitionMetadata(Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)
-                ))
+                .withMetadataHash(groupMetadataHash)
             )
             .build();
 
+        context.groupMetadataManager.getStreamsGroupOrThrow(groupId)

Review Comment:
   Same here. If possible, let's make the ConfiguredTopology explicit here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -90,7 +90,7 @@ public void testNoExceptionButNoSubtopologies() {
         final IllegalArgumentException ex = 
assertThrows(IllegalArgumentException.class,
             () -> new ConfiguredTopology(
                 1,
-                Optional.empty(),
+                0, Optional.empty(),

Review Comment:
   ```suggestion
                   0, 
                   Optional.empty(),
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -17593,16 +17663,16 @@ public void testStreamsStreamsGroupStates() {
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
 topology));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberId1)
             .build()));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
 11, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
 11, groupMetadataHash));
 
         assertEquals(StreamsGroupState.NOT_READY, 
context.streamsGroupState(groupId));
 
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
-            Map.of(
-                fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-            )
-        ));
+        context.groupMetadataManager.getStreamsGroupOrThrow(groupId)

Review Comment:
   Same as above.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16807,13 +16848,18 @@ public void 
testStreamsUpdatingPartitionMetadataTriggersNewTargetAssignment() {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)))
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
-                .withPartitionMetadata(Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                    barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-                ))
+                .withMetadataHash(oldGroupMetadataHash)
             )
             .build();
 
+        context.groupMetadataManager.getStreamsGroupOrThrow(groupId)

Review Comment:
   Same as above.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -66,7 +66,7 @@ public void 
testConstructorWithNullTopicConfigurationException() {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredTopology(
                 0,
-                Optional.empty(),
+                0, Optional.empty(),

Review Comment:
   ```suggestion
                   0, 
                   Optional.empty(),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to