dajac commented on code in PR #20055:
URL: https://github.com/apache/kafka/pull/20055#discussion_r2318877931


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3089,9 +3111,9 @@ private static boolean isNotEmpty(String value) {
      * @param member        The old member.
      * @param updatedMember The new member.
      * @param records       The records accumulator.
-     * @return Whether a rebalance must be triggered.
+     * @return The result of the update.
      */
-    private boolean maybeUpdateRegularExpressions(
+    private UpdateRegularExpressionsResult maybeUpdateRegularExpressions(

Review Comment:
   Have you considered returning an enum here? We could have three states:
   1) no change;
   2) regex updated;
   3) regex updated and resolved.
   It may be easier to read. Thoughts?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3463,15 +3488,16 @@ private ConsumerGroupMember maybeReconcile(
         BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
         int targetAssignmentEpoch,
         Assignment targetAssignment,
+        Map<String, ResolvedRegularExpression> resolvedRegularExpressions,
+        boolean hasSubscriptionChanged,
         List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
         List<CoordinatorRecord> records
     ) {
-        if (member.isReconciledTo(targetAssignmentEpoch)) {

Review Comment:
   I think that my ask was to add the condition within CurrentAssignmentBuilder 
in order to have the full state machine implemented in the class. It make it 
easier to reason about it. However, we could also short-cut here in order to 
not allocate CurrentAssignmentBuilder.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2239,21 +2239,30 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
         // epoch 0 and that it is fully initialized.
         boolean bumpGroupEpoch = group.groupEpoch() == 0;
 
-        bumpGroupEpoch |= hasMemberSubscriptionChanged(
+        boolean subscribedTopicNamesChanged = hasMemberSubscriptionChanged(
             groupId,
             member,
             updatedMember,
             records
         );
-
-        bumpGroupEpoch |= maybeUpdateRegularExpressions(
+        UpdateRegularExpressionsResult updateRegularExpressionsResult = 
maybeUpdateRegularExpressions(
             context,
             group,
             member,
             updatedMember,
             records
         );
 
+        // The subscription has changed when either the subscribed topic names 
or subscribed topic
+        // regex has changed.
+        boolean hasSubscriptionChanged = subscribedTopicNamesChanged || 
updateRegularExpressionsResult.subscribedTopicRegexChanged;
+        // Bumping the group epoch signals that the target assignment should 
be updated. We bump the
+        // group epoch when the member has changed its subscribed topic names 
or the member has
+        // changed its subscribed topic regex to a regex that is already 
resolved. We explicitly
+        // avoid bumping the group epoch when the new subscribed topic regex 
has not been resolved
+        // yet, since we will have to update the target assignment again later.
+        bumpGroupEpoch |= subscribedTopicNamesChanged || 
updateRegularExpressionsResult.bumpGroupEpoch;

Review Comment:
   Should we inline `group.groupEpoch() == 0` (with the comment)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to