cadonna commented on code in PR #19433: URL: https://github.com/apache/kafka/pull/19433#discussion_r2037630380
########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -154,6 +166,173 @@ private void printGroupInfo(List<GroupListing> groups) { } } + public void describeGroups() throws ExecutionException, InterruptedException { + String group = opts.options.valueOf(opts.groupOpt); + StreamsGroupDescription description = getDescribeGroup(group); + if (description == null) + return; + boolean verbose = opts.options.has(opts.verboseOpt); + if (opts.options.has(opts.membersOpt)) { + printMembers(description, verbose); + } else if (opts.options.has(opts.stateOpt)) { + printStates(description, verbose); + } else { + printOffsets(description, verbose); + } + } + + StreamsGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException { + DescribeStreamsGroupsResult result = adminClient.describeStreamsGroups(List.of(group)); + Map<String, StreamsGroupDescription> descriptionMap = result.all().get(); + return descriptionMap.get(group); + } + + private void printMembers(StreamsGroupDescription description, boolean verbose) { + int groupLen = Math.max(15, description.groupId().length()); + int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15; + Collection<StreamsGroupMemberDescription> members = description.members(); + if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), description.members().size())) { + for (StreamsGroupMemberDescription member : members) { + maxMemberIdLen = Math.max(maxMemberIdLen, member.memberId().length()); + maxHostLen = Math.max(maxHostLen, member.processId().length()); + maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length()); + } + + if (!verbose) { + String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"; + for (StreamsGroupMemberDescription member : members) { + System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", "CLIENT-ID"); + System.out.printf(fmt, description.groupId(), member.memberId(), member.processId(), member.clientId()); + printTasks(member.assignment(), false); + System.out.println(); + } + } else { + String fmt = "%" + -groupLen + "s %s %-15s%" + -maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"; + for (StreamsGroupMemberDescription member : members) { + System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID"); + System.out.printf(fmt, description.groupId(), description.targetAssignmentEpoch(), description.topologyEpoch(), member.memberId(), + member.isClassic() ? "classic" : "streams", member.memberEpoch(), member.processId(), member.clientId()); + printTasks(member.assignment(), false); + printTasks(member.targetAssignment(), true); + System.out.println(); + } + } + } + } + + private void printTaskType(List<StreamsGroupMemberAssignment.TaskIds> tasks, String taskType) { + System.out.printf("%s%n", taskType + ": " + tasks.stream().map(taskId -> taskId.subtopologyId() + ": [" + taskId.partitions()).collect(Collectors.joining(",")) + "] "); + } + + private void printTasks(StreamsGroupMemberAssignment assignment, boolean isTarget) { + String typePrefix = isTarget ? "TARGET-" : ""; + printTaskType(assignment.activeTasks(), typePrefix + "ACTIVE-TASKS:"); + printTaskType(assignment.standbyTasks(), typePrefix + "STANDBY-TASKS:"); + printTaskType(assignment.warmupTasks(), typePrefix + "WARMUP-TASKS:"); + } + + private void printStates(StreamsGroupDescription description, boolean verbose) { + maybePrintEmptyGroupState(description.groupId(), description.groupState(), 1); + + int groupLen = Math.max(15, description.groupId().length()); + String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")"; + int coordinatorLen = Math.max(25, coordinator.length()); + + if (!verbose) { + String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %s\n"; + System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS"); + System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.members().size()); + } else { + String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %-15s %-15s %s\n"; + System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS"); + System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.groupEpoch(), description.targetAssignmentEpoch(), description.members().size()); + } + } + + private void printOffsets(StreamsGroupDescription description, boolean verbose) throws ExecutionException, InterruptedException { + Map<TopicPartition, Long> offsets = getOffsets(description.members(), description); + if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), offsets.size())) { + int groupLen = Math.max(15, description.groupId().length()); + int maxTopicLen = 15; + for (TopicPartition topicPartition : offsets.keySet()) { + maxTopicLen = Math.max(maxTopicLen, topicPartition.topic().length()); + } + + if (!verbose) { + String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s\n"; + System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "OFFSET-LAG"); + for (Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) { + System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue()); + } + } else { + String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %s\n"; + System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "OFFSET-LAG"); + for (Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) { + System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), "", offset.getValue()); + } + } + } + } + + Map<TopicPartition, Long> getOffsets(Collection<StreamsGroupMemberDescription> members, StreamsGroupDescription description) throws ExecutionException, InterruptedException { + Set<TopicPartition> allTp = new HashSet<>(); + for (StreamsGroupMemberDescription memberDescription : members) { + allTp.addAll(getTopicPartitions(memberDescription.assignment().activeTasks(), description)); + } + // fetch latest and earliest offsets + Map<TopicPartition, OffsetSpec> earliest = new HashMap<>(); + Map<TopicPartition, OffsetSpec> latest = new HashMap<>(); + + for (TopicPartition tp : allTp) { + earliest.put(tp, OffsetSpec.earliest()); + latest.put(tp, OffsetSpec.latest()); + } + Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestResult = adminClient.listOffsets(earliest).all().get(); + Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestResult = adminClient.listOffsets(latest).all().get(); + + Map<TopicPartition, Long> lag = new HashMap<>(); + for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) { + lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset()); + } + return lag; + } + + /** + * Prints a summary of the state for situations where the group is empty or dead. + * + * @return Whether the group detail should be printed + */ + public static boolean maybePrintEmptyGroupState(String group, GroupState state, int numRows) { Review Comment: The name of this method is a bit confusing. I would separate verifying the state of the group from printing the errors. BTW, why are the error messages print to two different output streams? I am fine with leaving it how it is, but I do not find it easily readable. ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -154,6 +166,173 @@ private void printGroupInfo(List<GroupListing> groups) { } } + public void describeGroups() throws ExecutionException, InterruptedException { + String group = opts.options.valueOf(opts.groupOpt); + StreamsGroupDescription description = getDescribeGroup(group); + if (description == null) + return; + boolean verbose = opts.options.has(opts.verboseOpt); + if (opts.options.has(opts.membersOpt)) { + printMembers(description, verbose); + } else if (opts.options.has(opts.stateOpt)) { + printStates(description, verbose); + } else { + printOffsets(description, verbose); + } + } + + StreamsGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException { + DescribeStreamsGroupsResult result = adminClient.describeStreamsGroups(List.of(group)); + Map<String, StreamsGroupDescription> descriptionMap = result.all().get(); + return descriptionMap.get(group); + } + + private void printMembers(StreamsGroupDescription description, boolean verbose) { + int groupLen = Math.max(15, description.groupId().length()); + int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15; + Collection<StreamsGroupMemberDescription> members = description.members(); + if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), description.members().size())) { + for (StreamsGroupMemberDescription member : members) { + maxMemberIdLen = Math.max(maxMemberIdLen, member.memberId().length()); + maxHostLen = Math.max(maxHostLen, member.processId().length()); + maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length()); + } + + if (!verbose) { + String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"; + for (StreamsGroupMemberDescription member : members) { + System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", "CLIENT-ID"); + System.out.printf(fmt, description.groupId(), member.memberId(), member.processId(), member.clientId()); + printTasks(member.assignment(), false); + System.out.println(); + } + } else { + String fmt = "%" + -groupLen + "s %s %-15s%" + -maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"; Review Comment: Wouldn't it be more readable if you used variables like `targetAssignmentEpochLength` and `topologyEpochLength`, etc. instead of numbers? Here and in other places. ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -154,6 +166,173 @@ private void printGroupInfo(List<GroupListing> groups) { } } + public void describeGroups() throws ExecutionException, InterruptedException { + String group = opts.options.valueOf(opts.groupOpt); + StreamsGroupDescription description = getDescribeGroup(group); + if (description == null) + return; + boolean verbose = opts.options.has(opts.verboseOpt); + if (opts.options.has(opts.membersOpt)) { + printMembers(description, verbose); + } else if (opts.options.has(opts.stateOpt)) { + printStates(description, verbose); + } else { + printOffsets(description, verbose); + } + } + + StreamsGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException { + DescribeStreamsGroupsResult result = adminClient.describeStreamsGroups(List.of(group)); + Map<String, StreamsGroupDescription> descriptionMap = result.all().get(); + return descriptionMap.get(group); + } + + private void printMembers(StreamsGroupDescription description, boolean verbose) { + int groupLen = Math.max(15, description.groupId().length()); + int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15; + Collection<StreamsGroupMemberDescription> members = description.members(); + if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), description.members().size())) { + for (StreamsGroupMemberDescription member : members) { + maxMemberIdLen = Math.max(maxMemberIdLen, member.memberId().length()); + maxHostLen = Math.max(maxHostLen, member.processId().length()); + maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length()); + } + + if (!verbose) { + String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"; + for (StreamsGroupMemberDescription member : members) { + System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", "CLIENT-ID"); + System.out.printf(fmt, description.groupId(), member.memberId(), member.processId(), member.clientId()); + printTasks(member.assignment(), false); + System.out.println(); + } + } else { + String fmt = "%" + -groupLen + "s %s %-15s%" + -maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"; + for (StreamsGroupMemberDescription member : members) { + System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID"); + System.out.printf(fmt, description.groupId(), description.targetAssignmentEpoch(), description.topologyEpoch(), member.memberId(), + member.isClassic() ? "classic" : "streams", member.memberEpoch(), member.processId(), member.clientId()); + printTasks(member.assignment(), false); + printTasks(member.targetAssignment(), true); + System.out.println(); + } + } + } + } + + private void printTaskType(List<StreamsGroupMemberAssignment.TaskIds> tasks, String taskType) { + System.out.printf("%s%n", taskType + ": " + tasks.stream().map(taskId -> taskId.subtopologyId() + ": [" + taskId.partitions()).collect(Collectors.joining(",")) + "] "); + } + + private void printTasks(StreamsGroupMemberAssignment assignment, boolean isTarget) { + String typePrefix = isTarget ? "TARGET-" : ""; + printTaskType(assignment.activeTasks(), typePrefix + "ACTIVE-TASKS:"); + printTaskType(assignment.standbyTasks(), typePrefix + "STANDBY-TASKS:"); + printTaskType(assignment.warmupTasks(), typePrefix + "WARMUP-TASKS:"); + } + + private void printStates(StreamsGroupDescription description, boolean verbose) { + maybePrintEmptyGroupState(description.groupId(), description.groupState(), 1); + + int groupLen = Math.max(15, description.groupId().length()); + String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")"; + int coordinatorLen = Math.max(25, coordinator.length()); + + if (!verbose) { + String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %s\n"; + System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS"); + System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.members().size()); + } else { + String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %-15s %-15s %s\n"; + System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS"); + System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.groupEpoch(), description.targetAssignmentEpoch(), description.members().size()); + } + } + + private void printOffsets(StreamsGroupDescription description, boolean verbose) throws ExecutionException, InterruptedException { + Map<TopicPartition, Long> offsets = getOffsets(description.members(), description); + if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), offsets.size())) { + int groupLen = Math.max(15, description.groupId().length()); + int maxTopicLen = 15; + for (TopicPartition topicPartition : offsets.keySet()) { + maxTopicLen = Math.max(maxTopicLen, topicPartition.topic().length()); + } + + if (!verbose) { + String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s\n"; + System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "OFFSET-LAG"); + for (Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) { + System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue()); + } + } else { + String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %s\n"; + System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "OFFSET-LAG"); + for (Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) { + System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), "", offset.getValue()); + } + } + } + } + + Map<TopicPartition, Long> getOffsets(Collection<StreamsGroupMemberDescription> members, StreamsGroupDescription description) throws ExecutionException, InterruptedException { + Set<TopicPartition> allTp = new HashSet<>(); + for (StreamsGroupMemberDescription memberDescription : members) { + allTp.addAll(getTopicPartitions(memberDescription.assignment().activeTasks(), description)); + } + // fetch latest and earliest offsets + Map<TopicPartition, OffsetSpec> earliest = new HashMap<>(); + Map<TopicPartition, OffsetSpec> latest = new HashMap<>(); + + for (TopicPartition tp : allTp) { + earliest.put(tp, OffsetSpec.earliest()); + latest.put(tp, OffsetSpec.latest()); + } + Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestResult = adminClient.listOffsets(earliest).all().get(); + Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestResult = adminClient.listOffsets(latest).all().get(); + + Map<TopicPartition, Long> lag = new HashMap<>(); + for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) { + lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset()); + } + return lag; + } + + /** + * Prints a summary of the state for situations where the group is empty or dead. + * + * @return Whether the group detail should be printed + */ + public static boolean maybePrintEmptyGroupState(String group, GroupState state, int numRows) { Review Comment: Is this `public` just for testing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org