dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r651515816



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -580,7 +581,8 @@ object ConsumerGroupCommand extends Logging {
             groupId,
             Option(consumerGroup.coordinator),
             unassignedPartitions.keySet.toSeq,
-            unassignedPartitions.map { case (tp, offset) => tp -> 
Some(offset.offset) },
+            // Because there could be cases where the last commited offset is 
a negative integer (which translates to null [KAFKA-9507 for reference]), the 
following map function could lead to a NullPointerException. Two possible types 
are possible: OffsetsAndMetadata or null (which gets translated to None to 
avoid further exceptions)
+            unassignedPartitions.map { case (tp, offset) => tp -> ( 
if(offset.isInstanceOf[OffsetAndMetadata]) Some(offset.offset) else None ) },

Review comment:
       I am still not a fan of using `offset.isInstanceOf[OffsetAndMetadata]` 
in this case...
   
   Now that we need the same logic twice, what about defining a small helper 
function which takes an `OffsetAndMetadata` and return its offset if not null?
   
   ```
   // The admin client returns `null` as a value to indicate that there is not 
committed offset for a partition. 
   def getPartitionOffset(tp: TopicPartition): Option[Long] = 
committedOffsets.get(tp).filter(_ != null).map(_.offset)
   ```
   
   We could use it in both places.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -106,7 +106,8 @@ class ConsumerGroupServiceTest {
   }
 
   private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {
-    val offsets = topicPartitions.map(_ -> new 
OffsetAndMetadata(100)).toMap.asJava
+    // Half of the partitions of the testing topics are set to have a negative 
integer offset (null value [KAFKA-9507 for reference])
+    val offsets = topicPartitions.zipWithIndex.map{ case (tp, i) => tp -> ( 
if(i % 2 == 0) null else new OffsetAndMetadata(100) ) }.toMap.asJava

Review comment:
       I am not comfortable with hijacking all existing tests like this. Would 
it be possible to define a new unit test(s) which triggers the identified 
issue(s)? I think that it will help to not regress in the future if we have 
explicit test(s).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to