chia7712 commented on code in PR #22197:
URL: https://github.com/apache/kafka/pull/22197#discussion_r3180752625
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java:
##########
@@ -48,14 +48,19 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
private static final String COMMAND_CONFIG_DOC = "Property file containing
configs to be passed to Admin Client.";
private static final String RESET_OFFSETS_DOC = "Reset offsets of share
group. Supports one share group at the time, and instances must be inactive." +
NL +
"Has 2 execution options: --dry-run to plan which offsets to reset,
and --execute to reset the offsets. " + NL +
- "You must choose one of the following reset specifications:
--to-datetime, --to-earliest, --to-latest." + NL +
- "To define the scope use --all-topics or --topic." + NL +
+ "Additionally, the --export option is used to export the offsets in
CSV format." + NL +
+ "You must choose one of the following reset specifications:
--to-datetime, --to-earliest, --to-latest, --from-file, --to-current,
--to-offset." + NL +
+ "To define the scope use --all-topics or --topic. One scope must be
specified unless you use '--from-file'." + NL +
Review Comment:
> To define the scope use --all-topics or --topic. One scope must be
specified unless you use '--from-file'
It looks like we should remove the condition `if (!options.has(topicOpt) &&
!options.has(allTopicsOpt))` from line 226, right?
##########
tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java:
##########
@@ -439,6 +439,42 @@ public Map<TopicPartition, OffsetAndMetadata>
resetToCurrent(Collection<TopicPar
return preparedOffsetsForPartitionsWithCommittedOffset;
}
+ public Map<TopicPartition, OffsetAndMetadata>
resetToCurrentForShareGroup(Collection<TopicPartition> partitionsToReset,
Map<TopicPartition, SharePartitionOffsetInfo> currentOffsetInfo) {
Review Comment:
I understand this implementation aligns with `resetToCurrent`, but it feels
a bit old-school to me. We could simplify it using the following style
```java
public Map<TopicPartition, OffsetAndMetadata>
resetToCurrentForShareGroup(Collection<TopicPartition> partitionsToReset,
Map<TopicPartition, SharePartitionOffsetInfo> currentOffsetInfo) {
var partitioned =
partitionsToReset.stream().collect(Collectors.partitioningBy(currentOffsetInfo::containsKey));
var partitionsToResetWithStartOffset = partitioned.get(true);
var partitionsToResetWithoutStartOffset = partitioned.get(false);
var preparedOffsetsForPartitionsWithStartOffset =
partitionsToResetWithStartOffset.stream()
.collect(Collectors.toMap(Function.identity(), topicPartition ->
new OffsetAndMetadata(currentOffsetInfo.get(topicPartition).startOffset())));
getLogEndOffsets(partitionsToResetWithoutStartOffset).forEach((tp,
logOffsetResult) -> {
if (!(logOffsetResult instanceof OffsetsUtils.LogOffset
logOffset)) {
throw new IllegalStateException("Error getting ending offset
of topic partition: " + tp);
}
preparedOffsetsForPartitionsWithStartOffset.put(tp, new
OffsetAndMetadata(logOffset.value));
});
return preparedOffsetsForPartitionsWithStartOffset;
}
```
##########
tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java:
##########
@@ -439,6 +439,42 @@ public Map<TopicPartition, OffsetAndMetadata>
resetToCurrent(Collection<TopicPar
return preparedOffsetsForPartitionsWithCommittedOffset;
}
+ public Map<TopicPartition, OffsetAndMetadata>
resetToCurrentForShareGroup(Collection<TopicPartition> partitionsToReset,
Map<TopicPartition, SharePartitionOffsetInfo> currentOffsetInfo) {
Review Comment:
BTW, I'm fine with addressing this in a separate PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]