IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r650434257



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -559,7 +559,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], 
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- 
consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId)
+        val committedOffsets = 
getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

Review comment:
       @dajac 
   That approach seem nicer, because keeps the change very narrow.
   Thanks for you comment/feedback. 
   Updated the PR. 
   
   To be completly honest I've been struggling all day with the unit test, 
because I am not being able to force a KafkaConsumer to start at an -1 offset 
to write an unit test (always starts at 0, which makes sense with 
OffsetAndMetada's _IllegalArgumentException("Invalid negative offset_). Dont 
know for what particular reason that consumer-group got on that condition:
   
   a) GroupID information (from describeConsumerGroups() method):
   (groupId=order-validations, isSimpleConsumerGroup=false, 
members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, 
groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, 
assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)),
 partitionAssignor=RoundRobinAssigner, state=Stable, 
coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[])
   
   b) Commited Offsets information (from getCommittedOffsets() method):
   Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> 
OffsetAndMetadata{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 
-> null, rtl_orderReceive-3 -> OffsetAndMetadata{offset=33, leaderEpoch=null, 
metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, 
rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null)
   
   Whitout the change:
   
![image](https://user-images.githubusercontent.com/31544929/121788612-99c39300-cb9c-11eb-8aea-4d8ed1df97a9.png)
   
   With the change:
   
![image](https://user-images.githubusercontent.com/31544929/121788562-228dff00-cb9c-11eb-9c3f-985ef02d26e2.png)
   

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -559,7 +559,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], 
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- 
consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId)
+        val committedOffsets = 
getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

Review comment:
       I was stuck with DescribeConsumerGroupTest. You are right, looking at 
ConsumerGroupServiceTest it should do the trick. Would try that.
   Thanks for the tip




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to