chia7712 commented on code in PR #19181:
URL: https://github.com/apache/kafka/pull/19181#discussion_r1989690501


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -119,9 +146,9 @@ private static 
List<StreamsGroupHeartbeatRequestData.TaskIds> convertTaskIdColle
                 .map(entry -> {
                     StreamsGroupHeartbeatRequestData.TaskIds ids = new 
StreamsGroupHeartbeatRequestData.TaskIds();
                     ids.setSubtopologyId(entry.getKey());
-                    ids.setPartitions(entry.getValue());
+                    
ids.setPartitions(entry.getValue().stream().sorted().collect(Collectors.toList()));

Review Comment:
   Excuse me, is the sorting operation necessary in production, or is it 
primarily used for testing purposes?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -82,29 +97,41 @@ public StreamsGroupHeartbeatRequestData buildRequestData() {
             data.setMemberId(membershipManager.memberId());
             data.setMemberEpoch(membershipManager.memberEpoch());
             membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
-            StreamsGroupHeartbeatRequestData.Topology topology = new 
StreamsGroupHeartbeatRequestData.Topology();
-            
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
-            topology.setEpoch(streamsRebalanceData.topologyEpoch());
-            data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
-            data.setTopology(topology);
-            data.setProcessId(streamsRebalanceData.processId().toString());
-            streamsRebalanceData.endpoint().ifPresent(userEndpoint -> {
-                data.setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint()
-                    .setHost(userEndpoint.host())
-                    .setPort(userEndpoint.port())
-                );
-            });
-            
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
-                .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue()
-                    .setKey(entry.getKey())
-                    .setValue(entry.getValue())
-                )
-                .collect(Collectors.toList()));
+
+            boolean joining = membershipManager.state() == MemberState.JOINING;
+
+            if (joining) {
+                StreamsGroupHeartbeatRequestData.Topology topology = new 
StreamsGroupHeartbeatRequestData.Topology();
+                
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
+                topology.setEpoch(streamsRebalanceData.topologyEpoch());
+                data.setTopology(topology);
+                data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
+                data.setProcessId(streamsRebalanceData.processId().toString());

Review Comment:
   Out of curiosity, what is the rationale for using `String` instead of `Uuid` 
as the data type for `processId`? By contrast, `SubscriptionInfoData` uses 
`Uuid`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -61,9 +61,23 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
     static class HeartbeatState {
 
+        // Fields of StreamsGroupHeartbeatRequest sent in the most recent 
request
+        static class LastSentFields {
+
+            private StreamsRebalanceData.Assignment assignment = null;

Review Comment:
   Maybe we can initialize it to `Assignment.EMPTY` to avoid NPE in the future?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -61,9 +61,23 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
     static class HeartbeatState {
 
+        // Fields of StreamsGroupHeartbeatRequest sent in the most recent 
request
+        static class LastSentFields {

Review Comment:
   Given that this struct contains only one field, perhaps we could directly 
incorporate the `assignment` field into `HeartbeatState`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -476,20 +492,25 @@ public void 
testNotSendingLeaveHeartbeatIfPollTimerExpiredAndMemberIsLeaving() {
     }
 
     @Test
-    public void testSendingFullHeartbeatRequest() {
+    public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
         try (
             final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
                 HeartbeatRequestState.class,
                 (mock, context) -> {
                     
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
-                })
+                });
+            final MockedConstruction<Timer> pollTimerMockedConstruction = 
mockConstruction(
+                Timer.class,
+                (mock, context) -> {
+                    when(mock.isExpired()).thenReturn(true);
+                });

Review Comment:
   `;` is unnecessary



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -82,29 +97,41 @@ public StreamsGroupHeartbeatRequestData buildRequestData() {
             data.setMemberId(membershipManager.memberId());
             data.setMemberEpoch(membershipManager.memberEpoch());
             membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
-            StreamsGroupHeartbeatRequestData.Topology topology = new 
StreamsGroupHeartbeatRequestData.Topology();
-            
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
-            topology.setEpoch(streamsRebalanceData.topologyEpoch());
-            data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
-            data.setTopology(topology);
-            data.setProcessId(streamsRebalanceData.processId().toString());
-            streamsRebalanceData.endpoint().ifPresent(userEndpoint -> {
-                data.setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint()
-                    .setHost(userEndpoint.host())
-                    .setPort(userEndpoint.port())
-                );
-            });
-            
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
-                .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue()
-                    .setKey(entry.getKey())
-                    .setValue(entry.getValue())
-                )
-                .collect(Collectors.toList()));
+
+            boolean joining = membershipManager.state() == MemberState.JOINING;
+
+            if (joining) {
+                StreamsGroupHeartbeatRequestData.Topology topology = new 
StreamsGroupHeartbeatRequestData.Topology();
+                
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
+                topology.setEpoch(streamsRebalanceData.topologyEpoch());
+                data.setTopology(topology);
+                data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
+                data.setProcessId(streamsRebalanceData.processId().toString());
+                streamsRebalanceData.endpoint().ifPresent(userEndpoint -> {
+                    data.setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint()
+                        .setHost(userEndpoint.host())
+                        .setPort(userEndpoint.port())
+                    );
+                });
+                
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
+                    .map(entry -> new 
StreamsGroupHeartbeatRequestData.KeyValue()
+                        .setKey(entry.getKey())
+                        .setValue(entry.getValue())
+                    )
+                    .collect(Collectors.toList()));
+                data.setActiveTasks(convertTaskIdCollection(Set.of()));

Review Comment:
   Excuse me, what is the rationale for initializing these fields as empty 
lists, rather than leaving them as `null`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to