dajac commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r651515816
########## File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ########## @@ -580,7 +581,8 @@ object ConsumerGroupCommand extends Logging { groupId, Option(consumerGroup.coordinator), unassignedPartitions.keySet.toSeq, - unassignedPartitions.map { case (tp, offset) => tp -> Some(offset.offset) }, + // Because there could be cases where the last commited offset is a negative integer (which translates to null [KAFKA-9507 for reference]), the following map function could lead to a NullPointerException. Two possible types are possible: OffsetsAndMetadata or null (which gets translated to None to avoid further exceptions) + unassignedPartitions.map { case (tp, offset) => tp -> ( if(offset.isInstanceOf[OffsetAndMetadata]) Some(offset.offset) else None ) }, Review comment: I am still not a fan of using `offset.isInstanceOf[OffsetAndMetadata]` in this case... Now that we need the same logic twice, what about defining a small helper function which takes an `OffsetAndMetadata` and return its offset if not null? ``` // The admin client returns `null` as a value to indicate that there is not committed offset for a partition. def getPartitionOffset(tp: TopicPartition): Option[Long] = committedOffsets.get(tp).filter(_ != null).map(_.offset) ``` We could use it in both places. ########## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ########## @@ -106,7 +106,8 @@ class ConsumerGroupServiceTest { } private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = { - val offsets = topicPartitions.map(_ -> new OffsetAndMetadata(100)).toMap.asJava + // Half of the partitions of the testing topics are set to have a negative integer offset (null value [KAFKA-9507 for reference]) + val offsets = topicPartitions.zipWithIndex.map{ case (tp, i) => tp -> ( if(i % 2 == 0) null else new OffsetAndMetadata(100) ) }.toMap.asJava Review comment: I am not comfortable with hijacking all existing tests like this. Would it be possible to define a new unit test(s) which triggers the identified issue(s)? I think that it will help to not regress in the future if we have explicit test(s). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org