Ignacio Acuna created KAFKA-12926: ------------------------------------- Summary: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh Key: KAFKA-12926 URL: https://issues.apache.org/jira/browse/KAFKA-12926 Project: Kafka Issue Type: Bug Components: admin, clients Reporter: Ignacio Acuna Assignee: Ignacio Acuna
Hi everybody, hope everyone is doing great. *i) Introduction:* I noticed the following exception (on a cluster with brokers running 2.3.1) when trying to describe a consumer group (using the Kafka 2.7.1): {code:java} ./kafka-consumer-groups.sh --describe --group order-validations{code} {code:java} Error: Executing consumer group command failed due to null java.lang.NullPointerException at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$6(ConsumerGroupCommand.scala:579) at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99) at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86) at scala.collection.convert.JavaCollectionWrappers$JSetWrapper.map(JavaCollectionWrappers.scala:180) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$5(ConsumerGroupCommand.scala:578) at scala.collection.immutable.List.flatMap(List.scala:293) at scala.collection.immutable.List.flatMap(List.scala:79) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:574) at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) at scala.collection.mutable.Growable.addAll(Growable.scala:62) at scala.collection.mutable.Growable.addAll$(Growable.scala:59) at scala.collection.mutable.HashMap.addAll(HashMap.scala:117) at scala.collection.mutable.HashMap$.from(HashMap.scala:570) at scala.collection.mutable.HashMap$.from(HashMap.scala:563) at scala.collection.MapOps$WithFilter.map(Map.scala:358) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:569) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:369) at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:76) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala){code} When trying on and older version of AdminClient (2.3.1): {code:java} Error: Executing consumer group command failed due to java.lang.IllegalArgumentException: Invalid negative offset java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Invalid negative offset at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getCommittedOffsets(ConsumerGroupCommand.scala:595) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:421) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$Lambda$131/000000004CB1EFD0.apply(Unknown Source) at scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:827) at scala.collection.TraversableLike$WithFilter$$Lambda$132/000000004CD49E20.apply(Unknown Source) at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at scala.collection.mutable.HashMap$$Lambda$133/000000004CD4A4F0.apply(Unknown Source) at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:826) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:419) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:312) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) Caused by: java.lang.IllegalArgumentException: Invalid negative offset at org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:50) at org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1160) at java.lang.Thread.run(Thread.java:820){code} The main difference between those outputs is what had been done in KAFKA-9507. *ii) Problem:* commitedOffsets for some partitions are arriving as null to _ConsumerGroupCommand_. Then (for the assigned consumers to those such null OffsetAndMetadata's partitions) getting the offset's value throws an java.lang.NullPointerException, because the ConsumerGroupCommand tries to map over a null value. *iii) Example*: a) +GroupID information (from describeConsumerGroups() method):+ (groupId=order-validations, isSimpleConsumerGroup=false, members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)), partitionAssignor=RoundRobinAssigner, state=Stable, coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[]) b) +Commited Offsets information (from getCommittedOffsets() method):+ Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> OffsetAndMetadata\{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 -> null, rtl_orderReceive-3 -> OffsetAndMetadata\{offset=33, leaderEpoch=null, metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null) As seen, member order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6 is assigned to all partitions, but the commited offsets reported for the the partition 0,2,4,5,7,8 are null. Then getting commited offsets for rtl_orderReceive-0 throws an error at .map(_.offset), because it translates to null.map(_.offset). This is happening because the offset of that partions is -1 and that gets map to null (as defined on KAFKA-9507). *iv) Proposals:* a) +Fix locally on the ConsumerGroupCommand:+ Add a filter to the Commited Offsets arriving from upstreams to catch border cases. In that way, even if upstreams cames with null values instead of a OffsetAndMetadata, the describeGroups would work and get the consumer group description. From {code:java} val committedOffsets = getCommittedOffsets(groupId){code} To: {code:java} val committedOffsets = getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata]){code} b) +Fix upstreams on KafkaAdmin's listConsumerGroupOffsets method:+ Related to KAFKA-9507. In that issue, the solution to handle negative offsets was to explicitly set null to the topicPartition: {code:java} if (offset < 0) { groupOffsetsListing.put(topicPartition, null); } else { groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); }{code} That approach solves org.apache.kafka.clients.consumer.OffsetAndMetadata for throwing an _'Invalid negative offset'_ error, but affects downstreams methods that use KafkaAdminClient's listConsumerGroupOffsets method (as the one at kafka-consumer-groups.sh). The proposal is to skip returning offset for topic partitions where offsets are negative: {code:java} if (offset >= 0) { groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); }{code} This would remove the negative offsets from the the listConsumerGroupOffsets and guarantee that the results are valids OffsetAndMetadata (not only handling negative offsets as KAFKA-9507, but not impacting other downstreams methods which expects an OffsetAndMetadata instead of a null value). I think the second approach is cleaner because let the downstreams methods without having to handle the null's border case, which may lead to expecions (as seen). I had been working on the both approaches, and I ready to prepare a PR. What do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)