[ https://issues.apache.org/jira/browse/KAFKA-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695320#comment-16695320 ]
ASF GitHub Bot commented on KAFKA-5110: --------------------------------------- hachikuji closed pull request #2892: KAFKA-5110: Check for errors when fetching the log end offset in ConsumerGroupCommand URL: https://github.com/apache/kafka/pull/2892 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index caad62a8372..b6646305f7f 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -23,7 +23,7 @@ import joptsimple.{OptionParser, OptionSpec} import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo} import kafka.client.ClientUtils -import kafka.common.{TopicAndPartition, _} +import kafka.common._ import kafka.consumer.SimpleConsumer import kafka.utils._ @@ -50,7 +50,7 @@ object ConsumerGroupCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.") // should have exactly one action - val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) + val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has) if (actions != 1) CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete") @@ -126,19 +126,21 @@ object ConsumerGroupCommand extends Logging { groupAssignment.foreach { consumerAssignment => print("%-30s %-10s %-15s %-15s %-10s %-50s".format( - consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE), - consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE), - consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE))) + consumerAssignment.topicPartition.map(_.topic).getOrElse(MISSING_COLUMN_VALUE), + consumerAssignment.topicPartition.map(_.partition).getOrElse(MISSING_COLUMN_VALUE), + consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), + consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE), + consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), + consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE))) if (useNewConsumer) print("%-30s %s".format(consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE))) println() } } - protected case class PartitionAssignmentState(group: String, coordinator: Option[Node], topic: Option[String], - partition: Option[Int], offset: Option[Long], lag: Option[Long], - consumerId: Option[String], host: Option[String], - clientId: Option[String], logEndOffset: Option[Long]) + protected case class PartitionAssignmentState(group: String, coordinator: Option[Node], topicPartition: Option[TopicPartition], + offset: Option[Long], lag: Option[Long], consumerId: Option[String], + host: Option[String], clientId: Option[String], logEndOffset: Option[Long]) sealed trait ConsumerGroupService { @@ -158,24 +160,20 @@ object ConsumerGroupCommand extends Logging { protected def collectConsumerAssignment(group: String, coordinator: Option[Node], - topicPartitions: Seq[TopicAndPartition], - getPartitionOffset: TopicAndPartition => Option[Long], + topicPartitions: Seq[TopicPartition], + getPartitionOffset: TopicPartition => Option[Long], consumerIdOpt: Option[String], hostOpt: Option[String], clientIdOpt: Option[String]): Array[PartitionAssignmentState] = { if (topicPartitions.isEmpty) Array[PartitionAssignmentState]( - PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None) + PartitionAssignmentState(group, coordinator, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None) ) else { - var assignmentRows: Array[PartitionAssignmentState] = Array() - topicPartitions - .sortBy(_.partition) - .foreach { topicPartition => - assignmentRows = assignmentRows :+ describePartition(group, coordinator, topicPartition.topic, topicPartition.partition, getPartitionOffset(topicPartition), - consumerIdOpt, hostOpt, clientIdOpt) - } - assignmentRows + topicPartitions.sortBy(_.partition).map { topicPartition => + describePartition(group, coordinator, topicPartition, getPartitionOffset(topicPartition), + consumerIdOpt, hostOpt, clientIdOpt) + }.toArray } } @@ -184,18 +182,17 @@ object ConsumerGroupCommand extends Logging { private def describePartition(group: String, coordinator: Option[Node], - topic: String, - partition: Int, + topicPartition: TopicPartition, offsetOpt: Option[Long], consumerIdOpt: Option[String], hostOpt: Option[String], clientIdOpt: Option[String]): PartitionAssignmentState = { def getDescribePartitionResult(logEndOffsetOpt: Option[Long]): PartitionAssignmentState = - PartitionAssignmentState(group, coordinator, Option(topic), Option(partition), offsetOpt, + PartitionAssignmentState(group, coordinator, Some(topicPartition), offsetOpt, getLag(offsetOpt, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt) - getLogEndOffset(new TopicPartition(topic, partition)) match { + getLogEndOffset(topicPartition) match { case LogEndOffsetResult.LogEndOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset)) case LogEndOffsetResult.Unknown => getDescribePartitionResult(None) case LogEndOffsetResult.Ignore => null @@ -264,18 +261,19 @@ object ConsumerGroupCommand extends Logging { topicsByConsumerId(consumerId).flatMap { _ => // since consumers with no topic partitions are processed here, we pass empty for topic partitions and offsets // since consumer id is repeated in client id, leave host and client id empty - collectConsumerAssignment(group, None, Array[TopicAndPartition](), Map[TopicAndPartition, Option[Long]](), Some(consumerId), None, None) + collectConsumerAssignment(group, None, Array[TopicPartition](), Map[TopicPartition, Option[Long]](), + Some(consumerId), None, None) } } (None, Some(assignmentRows)) } - private def getAllTopicPartitions(topics: Seq[String]): Seq[TopicAndPartition] = { + private def getAllTopicPartitions(topics: Seq[String]): Seq[TopicPartition] = { val topicPartitionMap = zkUtils.getPartitionsForTopics(topics) topics.flatMap { topic => val partitions = topicPartitionMap.getOrElse(topic, Seq.empty) - partitions.map(TopicAndPartition(topic, _)) + partitions.map(new TopicPartition(topic, _)) } } @@ -284,11 +282,11 @@ object ConsumerGroupCommand extends Logging { case Some(-1) => LogEndOffsetResult.Unknown case Some(brokerId) => getZkConsumer(brokerId).map { consumer => - val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition) - val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - consumer.close() - LogEndOffsetResult.LogEndOffset(logEndOffset) + try { + val (maybeErrMsg, result) = fetchLogEndOffset(topicPartition, consumer) + maybeErrMsg.foreach(printError(_)) + result + } finally consumer.close() }.getOrElse(LogEndOffsetResult.Ignore) case None => printError(s"No broker for partition '$topicPartition'") @@ -296,13 +294,34 @@ object ConsumerGroupCommand extends Logging { } } + private[admin] def fetchLogEndOffset(topicPartition: TopicPartition, + consumer: SimpleConsumer): (Option[String], LogEndOffsetResult) = { + val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition) + val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val response = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition) + + response.error match { + case Errors.NONE => + if (response.offsets.nonEmpty) { + (None, LogEndOffsetResult.LogEndOffset(response.offsets.head)) + } else { + val errorMsg = s"No log end offset returned in ListOffsets response for partition $topicPartition" + (Some(errorMsg), LogEndOffsetResult.Ignore) + } + + case error => + val errorMsg = s"Error fetching log end offset for partition $topicPartition: ${error.message}" + (Some(errorMsg), LogEndOffsetResult.Ignore) + } + } + private def getPartitionOffsets(group: String, - topicPartitions: Seq[TopicAndPartition], + topicPartitions: Seq[TopicPartition], channelSocketTimeoutMs: Int, - channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = { - val offsetMap = mutable.Map[TopicAndPartition, Long]() + channelRetryBackoffMs: Int): Map[TopicPartition, Long] = { + val offsetMap = mutable.Map[TopicPartition, Long]() val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs) - channel.send(OffsetFetchRequest(group, topicPartitions)) + channel.send(OffsetFetchRequest(group, topicPartitions.map(new TopicAndPartition(_)))) val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => @@ -313,13 +332,13 @@ object ConsumerGroupCommand extends Logging { // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong - offsetMap.put(topicAndPartition, offset) + offsetMap.put(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition), offset) } catch { case z: ZkNoNodeException => printError(s"Could not fetch offset from zookeeper for group '$group' partition '$topicAndPartition' due to missing offset data in zookeeper.", Some(z)) } case offsetAndMetaData if offsetAndMetaData.error == Errors.NONE => - offsetMap.put(topicAndPartition, offsetAndMetadata.offset) + offsetMap.put(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition), offsetAndMetadata.offset) case _ => printError(s"Could not fetch offset from kafka for group '$group' partition '$topicAndPartition' due to ${offsetAndMetadata.error.message}.") } @@ -403,32 +422,26 @@ object ConsumerGroupCommand extends Logging { None case Some(consumers) => var assignedTopicPartitions = Array[TopicPartition]() - val offsets = adminClient.listGroupOffsets(group) + val offsets = adminClient.listGroupOffsets(group).mapValues(Some(_)) val rowsWithConsumer = if (offsets.isEmpty) List[PartitionAssignmentState]() else { consumers.sortWith(_.assignment.size > _.assignment.size).flatMap { consumerSummary => - val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) - assignedTopicPartitions = assignedTopicPartitions ++ consumerSummary.assignment - val partitionOffsets: Map[TopicAndPartition, Option[Long]] = consumerSummary.assignment.map { topicPartition => - new TopicAndPartition(topicPartition) -> offsets.get(topicPartition) - }.toMap - collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), topicPartitions, - partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"), + assignedTopicPartitions ++= consumerSummary.assignment + collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), consumerSummary.assignment, + offsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"), Some(s"${consumerSummary.clientId}")) } } val rowsWithoutConsumer = offsets.filterNot { case (topicPartition, offset) => assignedTopicPartitions.contains(topicPartition) - }.flatMap { - case (topicPartition, offset) => - val topicAndPartition = new TopicAndPartition(topicPartition) - collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), Seq(topicAndPartition), - Map(topicAndPartition -> Some(offset)), Some(MISSING_COLUMN_VALUE), - Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE)) - } + }.flatMap { + case (topicPartition, offset) => + collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), Seq(topicPartition), + offsets, Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE)) + } Some(rowsWithConsumer ++ rowsWithoutConsumer) } diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index 2bceeaaadef..daa5723e277 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -26,21 +26,20 @@ import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Before import org.junit.Test -import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions -import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService -import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService -import kafka.consumer.OldConsumer -import kafka.consumer.Whitelist +import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService, LogEndOffsetResult, ZkConsumerGroupService} +import kafka.api._ +import kafka.common.TopicAndPartition +import kafka.consumer.{OldConsumer, SimpleConsumer, Whitelist} import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils - import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException -import org.apache.kafka.common.errors.{CoordinatorNotAvailableException, WakeupException} +import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.serialization.StringDeserializer - class DescribeConsumerGroupTest extends KafkaServerTestHarness { val overridingProps = new Properties() @@ -159,8 +158,8 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { val (_, assignments) = consumerGroupCommand.describeGroup() assignments.isDefined && assignments.get.count(_.group == group) == 2 && - assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 && - assignments.get.count { x => x.group == group && !x.partition.isDefined } == 1 + assignments.get.count { x => x.group == group && x.topicPartition.isDefined } == 1 && + assignments.get.count { x => x.group == group && !x.topicPartition.isDefined } == 1 }, "Expected rows for consumers with no assigned partitions in describe group results.") // cleanup @@ -169,6 +168,51 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { consumer2Mock.stop() } + @Test + def testErrorFetchingLogEndOffset() { + props.setProperty("zookeeper.connect", zkConnect) + + val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group)) + val consumerGroupService = new ZkConsumerGroupService(opts) + val consumer = EasyMock.createMock(classOf[SimpleConsumer]) + val topicAndPartition = TopicAndPartition(topic, 0) + + val listOffsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val partitionResponse = PartitionOffsetsResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, Seq.empty[Long]) + EasyMock.expect(consumer.getOffsetsBefore(listOffsetRequest)) + .andReturn(OffsetResponse(0, Map(topicAndPartition -> partitionResponse))) + + EasyMock.replay(consumer) + + val (maybeErrMsg, result) = consumerGroupService.fetchLogEndOffset(new TopicPartition(topic, 0), consumer) + assertTrue(maybeErrMsg.isDefined) + assertEquals(LogEndOffsetResult.Ignore, result) + } + + @Test + def testFetchingLogEndOffsetWithNoReturnedOffsets() { + // if there are no errors, we should get a valid offset, but this case ensures that the command + // does not just blow up if we don't + + props.setProperty("zookeeper.connect", zkConnect) + + val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group)) + val consumerGroupService = new ZkConsumerGroupService(opts) + val consumer = EasyMock.createMock(classOf[SimpleConsumer]) + val topicAndPartition = TopicAndPartition(topic, 0) + + val listOffsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val partitionResponse = PartitionOffsetsResponse(Errors.NONE, Seq.empty[Long]) + EasyMock.expect(consumer.getOffsetsBefore(listOffsetRequest)) + .andReturn(OffsetResponse(0, Map(topicAndPartition -> partitionResponse))) + + EasyMock.replay(consumer) + + val (maybeErrMsg, result) = consumerGroupService.fetchLogEndOffset(new TopicPartition(topic, 0), consumer) + assertTrue(maybeErrMsg.isDefined) + assertEquals(LogEndOffsetResult.Ignore, result) + } + @Test def testDescribeNonExistingGroupWithNewConsumer() { // run one consumer in the group consuming from a single-partition topic @@ -252,8 +296,8 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { state == Some("Stable") && assignments.isDefined && assignments.get.count(_.group == group) == 2 && - assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 && - assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1 + assignments.get.count{ x => x.group == group && x.topicPartition.isDefined} == 1 && + assignments.get.count{ x => x.group == group && !x.topicPartition.isDefined} == 1 }, "Expected rows for consumers with no assigned partitions in describe group results.") consumerGroupCommand.close() @@ -277,8 +321,8 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { state == Some("Stable") && assignments.isDefined && assignments.get.count(_.group == group) == 2 && - assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 && - assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0 + assignments.get.count{ x => x.group == group && x.topicPartition.isDefined} == 2 && + assignments.get.count{ x => x.group == group && !x.topicPartition.isDefined} == 0 }, "Expected two rows (one row per consumer) in describe group results.") consumerGroupCommand.close() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ConsumerGroupCommand error handling improvement > ----------------------------------------------- > > Key: KAFKA-5110 > URL: https://issues.apache.org/jira/browse/KAFKA-5110 > Project: Kafka > Issue Type: Bug > Components: tools > Affects Versions: 0.10.1.1 > Reporter: Dustin Cote > Assignee: Jason Gustafson > Priority: Major > > The ConsumerGroupCommand isn't handling partition errors properly. It throws > the following: > {code} > kafka-consumer-groups.sh --zookeeper 10.10.10.10:2181 --group mygroup > --describe > GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER > Error while executing consumer group command empty.head > java.lang.UnsupportedOperationException: empty.head > at scala.collection.immutable.Vector.head(Vector.scala:193) > at > kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$getLogEndOffset$1.apply(ConsumerGroupCommand.scala:197) > at > kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$getLogEndOffset$1.apply(ConsumerGroupCommand.scala:194) > at scala.Option.map(Option.scala:146) > at > kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.getLogEndOffset(ConsumerGroupCommand.scala:194) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.kafka$admin$ConsumerGroupCommand$ConsumerGroupService$$describePartition(ConsumerGroupCommand.scala:125) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$describeTopicPartition$2.apply(ConsumerGroupCommand.scala:107) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$describeTopicPartition$2.apply(ConsumerGroupCommand.scala:106) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeTopicPartition(ConsumerGroupCommand.scala:106) > at > kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeTopicPartition(ConsumerGroupCommand.scala:134) > at > kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.kafka$admin$ConsumerGroupCommand$ZkConsumerGroupService$$describeTopic(ConsumerGroupCommand.scala:181) > at > kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$describeGroup$1.apply(ConsumerGroupCommand.scala:166) > at > kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$describeGroup$1.apply(ConsumerGroupCommand.scala:166) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:166) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:89) > at > kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describe(ConsumerGroupCommand.scala:134) > at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68) > at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)