chia7712 commented on code in PR #22197:
URL: https://github.com/apache/kafka/pull/22197#discussion_r3180752625


##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java:
##########
@@ -48,14 +48,19 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
     private static final String COMMAND_CONFIG_DOC = "Property file containing 
configs to be passed to Admin Client.";
     private static final String RESET_OFFSETS_DOC = "Reset offsets of share 
group. Supports one share group at the time, and instances must be inactive." + 
NL +
         "Has 2 execution options: --dry-run to plan which offsets to reset, 
and --execute to reset the offsets. " + NL +
-        "You must choose one of the following reset specifications: 
--to-datetime, --to-earliest, --to-latest." + NL +
-        "To define the scope use --all-topics or --topic." + NL +
+        "Additionally, the --export option is used to export the offsets in 
CSV format." + NL +
+        "You must choose one of the following reset specifications: 
--to-datetime, --to-earliest, --to-latest, --from-file, --to-current, 
--to-offset." + NL +
+        "To define the scope use --all-topics or --topic. One scope must be 
specified unless you use '--from-file'." + NL +

Review Comment:
   > To define the scope use --all-topics or --topic. One scope must be 
specified unless you use '--from-file'
   
   It looks like we should remove the condition `if (!options.has(topicOpt) && 
!options.has(allTopicsOpt))` from line 226, right?



##########
tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java:
##########
@@ -439,6 +439,42 @@ public Map<TopicPartition, OffsetAndMetadata> 
resetToCurrent(Collection<TopicPar
         return preparedOffsetsForPartitionsWithCommittedOffset;
     }
 
+    public Map<TopicPartition, OffsetAndMetadata> 
resetToCurrentForShareGroup(Collection<TopicPartition> partitionsToReset, 
Map<TopicPartition, SharePartitionOffsetInfo> currentOffsetInfo) {

Review Comment:
   I understand this implementation aligns with `resetToCurrent`, but it feels 
a bit old-school to me. We could simplify it using the following style
   ```java
       public Map<TopicPartition, OffsetAndMetadata> 
resetToCurrentForShareGroup(Collection<TopicPartition> partitionsToReset, 
Map<TopicPartition, SharePartitionOffsetInfo> currentOffsetInfo) {
           var partitioned = 
partitionsToReset.stream().collect(Collectors.partitioningBy(currentOffsetInfo::containsKey));
           var partitionsToResetWithStartOffset = partitioned.get(true);
           var partitionsToResetWithoutStartOffset = partitioned.get(false);
   
           var preparedOffsetsForPartitionsWithStartOffset = 
partitionsToResetWithStartOffset.stream()
               .collect(Collectors.toMap(Function.identity(), topicPartition -> 
new OffsetAndMetadata(currentOffsetInfo.get(topicPartition).startOffset())));
   
           getLogEndOffsets(partitionsToResetWithoutStartOffset).forEach((tp, 
logOffsetResult) -> {
               if (!(logOffsetResult instanceof OffsetsUtils.LogOffset 
logOffset)) {
                   throw new IllegalStateException("Error getting ending offset 
of topic partition: " + tp);
               }
               preparedOffsetsForPartitionsWithStartOffset.put(tp, new 
OffsetAndMetadata(logOffset.value));
           });
   
           return preparedOffsetsForPartitionsWithStartOffset;
       }
   ```



##########
tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java:
##########
@@ -439,6 +439,42 @@ public Map<TopicPartition, OffsetAndMetadata> 
resetToCurrent(Collection<TopicPar
         return preparedOffsetsForPartitionsWithCommittedOffset;
     }
 
+    public Map<TopicPartition, OffsetAndMetadata> 
resetToCurrentForShareGroup(Collection<TopicPartition> partitionsToReset, 
Map<TopicPartition, SharePartitionOffsetInfo> currentOffsetInfo) {

Review Comment:
   BTW, I'm fine with addressing this in a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to