[ 
https://issues.apache.org/jira/browse/KAFKA-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695320#comment-16695320
 ] 
ASF GitHub Bot commented on KAFKA-5110:
---------------------------------------

hachikuji closed pull request #2892: KAFKA-5110: Check for errors when fetching 
the log end offset in ConsumerGroupCommand
URL: https://github.com/apache/kafka/pull/2892
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index caad62a8372..b6646305f7f 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -23,7 +23,7 @@ import joptsimple.{OptionParser, OptionSpec}
 
 import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, 
PartitionOffsetRequestInfo}
 import kafka.client.ClientUtils
-import kafka.common.{TopicAndPartition, _}
+import kafka.common._
 import kafka.consumer.SimpleConsumer
 import kafka.utils._
 
@@ -50,7 +50,7 @@ object ConsumerGroupCommand extends Logging {
       CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer 
groups, describe a consumer group, or delete consumer group info.")
 
     // should have exactly one action
-    val actions = Seq(opts.listOpt, opts.describeOpt, 
opts.deleteOpt).count(opts.options.has _)
+    val actions = Seq(opts.listOpt, opts.describeOpt, 
opts.deleteOpt).count(opts.options.has)
     if (actions != 1)
       CommandLineUtils.printUsageAndDie(opts.parser, "Command must include 
exactly one action: --list, --describe, --delete")
 
@@ -126,19 +126,21 @@ object ConsumerGroupCommand extends Logging {
 
     groupAssignment.foreach { consumerAssignment =>
       print("%-30s %-10s %-15s %-15s %-10s %-50s".format(
-        consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE),
-        consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
-        consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE)))
+        
consumerAssignment.topicPartition.map(_.topic).getOrElse(MISSING_COLUMN_VALUE),
+        
consumerAssignment.topicPartition.map(_.partition).getOrElse(MISSING_COLUMN_VALUE),
+        consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE),
+        consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
+        consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE),
+        consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE)))
       if (useNewConsumer)
         print("%-30s 
%s".format(consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE)))
       println()
     }
   }
 
-  protected case class PartitionAssignmentState(group: String, coordinator: 
Option[Node], topic: Option[String],
-                                                partition: Option[Int], 
offset: Option[Long], lag: Option[Long],
-                                                consumerId: Option[String], 
host: Option[String],
-                                                clientId: Option[String], 
logEndOffset: Option[Long])
+  protected case class PartitionAssignmentState(group: String, coordinator: 
Option[Node], topicPartition: Option[TopicPartition],
+                                                offset: Option[Long], lag: 
Option[Long], consumerId: Option[String],
+                                                host: Option[String], 
clientId: Option[String], logEndOffset: Option[Long])
 
   sealed trait ConsumerGroupService {
 
@@ -158,24 +160,20 @@ object ConsumerGroupCommand extends Logging {
 
     protected def collectConsumerAssignment(group: String,
                                             coordinator: Option[Node],
-                                            topicPartitions: 
Seq[TopicAndPartition],
-                                            getPartitionOffset: 
TopicAndPartition => Option[Long],
+                                            topicPartitions: 
Seq[TopicPartition],
+                                            getPartitionOffset: TopicPartition 
=> Option[Long],
                                             consumerIdOpt: Option[String],
                                             hostOpt: Option[String],
                                             clientIdOpt: Option[String]): 
Array[PartitionAssignmentState] = {
       if (topicPartitions.isEmpty)
         Array[PartitionAssignmentState](
-          PartitionAssignmentState(group, coordinator, None, None, None, 
getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None)
+          PartitionAssignmentState(group, coordinator, None, None, 
getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None)
         )
       else {
-        var assignmentRows: Array[PartitionAssignmentState] = Array()
-        topicPartitions
-          .sortBy(_.partition)
-          .foreach { topicPartition =>
-            assignmentRows = assignmentRows :+ describePartition(group, 
coordinator, topicPartition.topic, topicPartition.partition, 
getPartitionOffset(topicPartition),
-              consumerIdOpt, hostOpt, clientIdOpt)
-          }
-        assignmentRows
+        topicPartitions.sortBy(_.partition).map { topicPartition =>
+          describePartition(group, coordinator, topicPartition, 
getPartitionOffset(topicPartition),
+            consumerIdOpt, hostOpt, clientIdOpt)
+        }.toArray
       }
     }
 
@@ -184,18 +182,17 @@ object ConsumerGroupCommand extends Logging {
 
     private def describePartition(group: String,
                                   coordinator: Option[Node],
-                                  topic: String,
-                                  partition: Int,
+                                  topicPartition: TopicPartition,
                                   offsetOpt: Option[Long],
                                   consumerIdOpt: Option[String],
                                   hostOpt: Option[String],
                                   clientIdOpt: Option[String]): 
PartitionAssignmentState = {
       def getDescribePartitionResult(logEndOffsetOpt: Option[Long]): 
PartitionAssignmentState =
-        PartitionAssignmentState(group, coordinator, Option(topic), 
Option(partition), offsetOpt,
+        PartitionAssignmentState(group, coordinator, Some(topicPartition), 
offsetOpt,
                                  getLag(offsetOpt, logEndOffsetOpt), 
consumerIdOpt, hostOpt,
                                  clientIdOpt, logEndOffsetOpt)
 
-      getLogEndOffset(new TopicPartition(topic, partition)) match {
+      getLogEndOffset(topicPartition) match {
         case LogEndOffsetResult.LogEndOffset(logEndOffset) => 
getDescribePartitionResult(Some(logEndOffset))
         case LogEndOffsetResult.Unknown => getDescribePartitionResult(None)
         case LogEndOffsetResult.Ignore => null
@@ -264,18 +261,19 @@ object ConsumerGroupCommand extends Logging {
         topicsByConsumerId(consumerId).flatMap { _ =>
           // since consumers with no topic partitions are processed here, we 
pass empty for topic partitions and offsets
           // since consumer id is repeated in client id, leave host and client 
id empty
-          collectConsumerAssignment(group, None, Array[TopicAndPartition](), 
Map[TopicAndPartition, Option[Long]](), Some(consumerId), None, None)
+          collectConsumerAssignment(group, None, Array[TopicPartition](), 
Map[TopicPartition, Option[Long]](),
+            Some(consumerId), None, None)
         }
       }
 
       (None, Some(assignmentRows))
     }
 
-    private def getAllTopicPartitions(topics: Seq[String]): 
Seq[TopicAndPartition] = {
+    private def getAllTopicPartitions(topics: Seq[String]): 
Seq[TopicPartition] = {
       val topicPartitionMap = zkUtils.getPartitionsForTopics(topics)
       topics.flatMap { topic =>
         val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
-        partitions.map(TopicAndPartition(topic, _))
+        partitions.map(new TopicPartition(topic, _))
       }
     }
 
@@ -284,11 +282,11 @@ object ConsumerGroupCommand extends Logging {
         case Some(-1) => LogEndOffsetResult.Unknown
         case Some(brokerId) =>
           getZkConsumer(brokerId).map { consumer =>
-            val topicAndPartition = TopicAndPartition(topicPartition.topic, 
topicPartition.partition)
-            val request = OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
-            val logEndOffset = 
consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-            consumer.close()
-            LogEndOffsetResult.LogEndOffset(logEndOffset)
+            try {
+              val (maybeErrMsg, result) = fetchLogEndOffset(topicPartition, 
consumer)
+              maybeErrMsg.foreach(printError(_))
+              result
+            } finally consumer.close()
           }.getOrElse(LogEndOffsetResult.Ignore)
         case None =>
           printError(s"No broker for partition '$topicPartition'")
@@ -296,13 +294,34 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
+    private[admin] def fetchLogEndOffset(topicPartition: TopicPartition,
+                                         consumer: SimpleConsumer): 
(Option[String], LogEndOffsetResult) = {
+      val topicAndPartition = TopicAndPartition(topicPartition.topic, 
topicPartition.partition)
+      val request = OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+      val response = 
consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
+
+      response.error match {
+        case Errors.NONE =>
+          if (response.offsets.nonEmpty) {
+            (None, LogEndOffsetResult.LogEndOffset(response.offsets.head))
+          } else {
+            val errorMsg = s"No log end offset returned in ListOffsets 
response for partition $topicPartition"
+            (Some(errorMsg), LogEndOffsetResult.Ignore)
+          }
+
+        case error =>
+          val errorMsg = s"Error fetching log end offset for partition 
$topicPartition: ${error.message}"
+          (Some(errorMsg), LogEndOffsetResult.Ignore)
+      }
+    }
+
     private def getPartitionOffsets(group: String,
-                                    topicPartitions: Seq[TopicAndPartition],
+                                    topicPartitions: Seq[TopicPartition],
                                     channelSocketTimeoutMs: Int,
-                                    channelRetryBackoffMs: Int): 
Map[TopicAndPartition, Long] = {
-      val offsetMap = mutable.Map[TopicAndPartition, Long]()
+                                    channelRetryBackoffMs: Int): 
Map[TopicPartition, Long] = {
+      val offsetMap = mutable.Map[TopicPartition, Long]()
       val channel = ClientUtils.channelToOffsetManager(group, zkUtils, 
channelSocketTimeoutMs, channelRetryBackoffMs)
-      channel.send(OffsetFetchRequest(group, topicPartitions))
+      channel.send(OffsetFetchRequest(group, topicPartitions.map(new 
TopicAndPartition(_))))
       val offsetFetchResponse = 
OffsetFetchResponse.readFrom(channel.receive().payload())
 
       offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, 
offsetAndMetadata) =>
@@ -313,13 +332,13 @@ object ConsumerGroupCommand extends Logging {
             // (meaning the lag may be off until all the consumers in the 
group have the same setting for offsets storage)
             try {
               val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" 
+ topicAndPartition.partition)._1.toLong
-              offsetMap.put(topicAndPartition, offset)
+              offsetMap.put(new TopicPartition(topicAndPartition.topic, 
topicAndPartition.partition), offset)
             } catch {
               case z: ZkNoNodeException =>
                 printError(s"Could not fetch offset from zookeeper for group 
'$group' partition '$topicAndPartition' due to missing offset data in 
zookeeper.", Some(z))
             }
           case offsetAndMetaData if offsetAndMetaData.error == Errors.NONE =>
-            offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
+            offsetMap.put(new TopicPartition(topicAndPartition.topic, 
topicAndPartition.partition), offsetAndMetadata.offset)
           case _ =>
             printError(s"Could not fetch offset from kafka for group '$group' 
partition '$topicAndPartition' due to ${offsetAndMetadata.error.message}.")
         }
@@ -403,32 +422,26 @@ object ConsumerGroupCommand extends Logging {
             None
           case Some(consumers) =>
             var assignedTopicPartitions = Array[TopicPartition]()
-            val offsets = adminClient.listGroupOffsets(group)
+            val offsets = 
adminClient.listGroupOffsets(group).mapValues(Some(_))
             val rowsWithConsumer =
               if (offsets.isEmpty)
                 List[PartitionAssignmentState]()
               else {
                 consumers.sortWith(_.assignment.size > 
_.assignment.size).flatMap { consumerSummary =>
-                  val topicPartitions = consumerSummary.assignment.map(tp => 
TopicAndPartition(tp.topic, tp.partition))
-                  assignedTopicPartitions = assignedTopicPartitions ++ 
consumerSummary.assignment
-                  val partitionOffsets: Map[TopicAndPartition, Option[Long]] = 
consumerSummary.assignment.map { topicPartition =>
-                    new TopicAndPartition(topicPartition) -> 
offsets.get(topicPartition)
-                  }.toMap
-                  collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), topicPartitions,
-                    partitionOffsets, Some(s"${consumerSummary.consumerId}"), 
Some(s"${consumerSummary.host}"),
+                  assignedTopicPartitions ++= consumerSummary.assignment
+                  collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), consumerSummary.assignment,
+                    offsets, Some(s"${consumerSummary.consumerId}"), 
Some(s"${consumerSummary.host}"),
                     Some(s"${consumerSummary.clientId}"))
                 }
               }
 
             val rowsWithoutConsumer = offsets.filterNot {
               case (topicPartition, offset) => 
assignedTopicPartitions.contains(topicPartition)
-              }.flatMap {
-                case (topicPartition, offset) =>
-                  val topicAndPartition = new TopicAndPartition(topicPartition)
-                  collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), Seq(topicAndPartition),
-                      Map(topicAndPartition -> Some(offset)), 
Some(MISSING_COLUMN_VALUE),
-                      Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
-                }
+            }.flatMap {
+              case (topicPartition, offset) =>
+                collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), Seq(topicPartition),
+                  offsets, Some(MISSING_COLUMN_VALUE), 
Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
+            }
 
             Some(rowsWithConsumer ++ rowsWithoutConsumer)
       }
diff --git 
a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala 
b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 2bceeaaadef..daa5723e277 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -26,21 +26,20 @@ import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Before
 import org.junit.Test
-import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
-import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
-import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
-import kafka.consumer.OldConsumer
-import kafka.consumer.Whitelist
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, 
KafkaConsumerGroupService, LogEndOffsetResult, ZkConsumerGroupService}
+import kafka.api._
+import kafka.common.TopicAndPartition
+import kafka.consumer.{OldConsumer, SimpleConsumer, Whitelist}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.errors.{CoordinatorNotAvailableException, 
WakeupException}
+import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.serialization.StringDeserializer
 
-
 class DescribeConsumerGroupTest extends KafkaServerTestHarness {
 
   val overridingProps = new Properties()
@@ -159,8 +158,8 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
         val (_, assignments) = consumerGroupCommand.describeGroup()
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 2 &&
-        assignments.get.count { x => x.group == group && x.partition.isDefined 
} == 1 &&
-        assignments.get.count { x => x.group == group && 
!x.partition.isDefined } == 1
+        assignments.get.count { x => x.group == group && 
x.topicPartition.isDefined } == 1 &&
+        assignments.get.count { x => x.group == group && 
!x.topicPartition.isDefined } == 1
       }, "Expected rows for consumers with no assigned partitions in describe 
group results.")
 
     // cleanup
@@ -169,6 +168,51 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
     consumer2Mock.stop()
   }
 
+  @Test
+  def testErrorFetchingLogEndOffset() {
+    props.setProperty("zookeeper.connect", zkConnect)
+
+    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, 
"--describe", "--group", group))
+    val consumerGroupService = new ZkConsumerGroupService(opts)
+    val consumer = EasyMock.createMock(classOf[SimpleConsumer])
+    val topicAndPartition = TopicAndPartition(topic, 0)
+
+    val listOffsetRequest = OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+    val partitionResponse = 
PartitionOffsetsResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, Seq.empty[Long])
+    EasyMock.expect(consumer.getOffsetsBefore(listOffsetRequest))
+      .andReturn(OffsetResponse(0, Map(topicAndPartition -> 
partitionResponse)))
+
+    EasyMock.replay(consumer)
+
+    val (maybeErrMsg, result) = consumerGroupService.fetchLogEndOffset(new 
TopicPartition(topic, 0), consumer)
+    assertTrue(maybeErrMsg.isDefined)
+    assertEquals(LogEndOffsetResult.Ignore, result)
+  }
+
+  @Test
+  def testFetchingLogEndOffsetWithNoReturnedOffsets() {
+    // if there are no errors, we should get a valid offset, but this case 
ensures that the command
+    // does not just blow up if we don't
+
+    props.setProperty("zookeeper.connect", zkConnect)
+
+    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, 
"--describe", "--group", group))
+    val consumerGroupService = new ZkConsumerGroupService(opts)
+    val consumer = EasyMock.createMock(classOf[SimpleConsumer])
+    val topicAndPartition = TopicAndPartition(topic, 0)
+
+    val listOffsetRequest = OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+    val partitionResponse = PartitionOffsetsResponse(Errors.NONE, 
Seq.empty[Long])
+    EasyMock.expect(consumer.getOffsetsBefore(listOffsetRequest))
+      .andReturn(OffsetResponse(0, Map(topicAndPartition -> 
partitionResponse)))
+
+    EasyMock.replay(consumer)
+
+    val (maybeErrMsg, result) = consumerGroupService.fetchLogEndOffset(new 
TopicPartition(topic, 0), consumer)
+    assertTrue(maybeErrMsg.isDefined)
+    assertEquals(LogEndOffsetResult.Ignore, result)
+  }
+
   @Test
   def testDescribeNonExistingGroupWithNewConsumer() {
     // run one consumer in the group consuming from a single-partition topic
@@ -252,8 +296,8 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
         state == Some("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 2 &&
-        assignments.get.count{ x => x.group == group && x.partition.isDefined} 
== 1 &&
-        assignments.get.count{ x => x.group == group && 
!x.partition.isDefined} == 1
+        assignments.get.count{ x => x.group == group && 
x.topicPartition.isDefined} == 1 &&
+        assignments.get.count{ x => x.group == group && 
!x.topicPartition.isDefined} == 1
     }, "Expected rows for consumers with no assigned partitions in describe 
group results.")
 
     consumerGroupCommand.close()
@@ -277,8 +321,8 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
       state == Some("Stable") &&
       assignments.isDefined &&
       assignments.get.count(_.group == group) == 2 &&
-      assignments.get.count{ x => x.group == group && x.partition.isDefined} 
== 2 &&
-      assignments.get.count{ x => x.group == group && !x.partition.isDefined} 
== 0
+      assignments.get.count{ x => x.group == group && 
x.topicPartition.isDefined} == 2 &&
+      assignments.get.count{ x => x.group == group && 
!x.topicPartition.isDefined} == 0
     }, "Expected two rows (one row per consumer) in describe group results.")
 
     consumerGroupCommand.close()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConsumerGroupCommand error handling improvement
> -----------------------------------------------
>
>                 Key: KAFKA-5110
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5110
>             Project: Kafka
>          Issue Type: Bug
>          Components: tools
>    Affects Versions: 0.10.1.1
>            Reporter: Dustin Cote
>            Assignee: Jason Gustafson
>            Priority: Major
>
> The ConsumerGroupCommand isn't handling partition errors properly. It throws 
> the following:
> {code}
> kafka-consumer-groups.sh --zookeeper 10.10.10.10:2181 --group mygroup 
> --describe
> GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
> Error while executing consumer group command empty.head
> java.lang.UnsupportedOperationException: empty.head
> at scala.collection.immutable.Vector.head(Vector.scala:193)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$getLogEndOffset$1.apply(ConsumerGroupCommand.scala:197)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$getLogEndOffset$1.apply(ConsumerGroupCommand.scala:194)
> at scala.Option.map(Option.scala:146)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.getLogEndOffset(ConsumerGroupCommand.scala:194)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.kafka$admin$ConsumerGroupCommand$ConsumerGroupService$$describePartition(ConsumerGroupCommand.scala:125)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$describeTopicPartition$2.apply(ConsumerGroupCommand.scala:107)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$describeTopicPartition$2.apply(ConsumerGroupCommand.scala:106)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeTopicPartition(ConsumerGroupCommand.scala:106)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeTopicPartition(ConsumerGroupCommand.scala:134)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.kafka$admin$ConsumerGroupCommand$ZkConsumerGroupService$$describeTopic(ConsumerGroupCommand.scala:181)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$describeGroup$1.apply(ConsumerGroupCommand.scala:166)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$describeGroup$1.apply(ConsumerGroupCommand.scala:166)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:166)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:89)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describe(ConsumerGroupCommand.scala:134)
> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to