dajac commented on a change in pull request #11173: URL: https://github.com/apache/kafka/pull/11173#discussion_r828928713
########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -103,59 +104,77 @@ object GetOffsetShell { throw new IllegalArgumentException("--topic-partitions cannot be used with --topic or --partitions") } - val listOffsetsTimestamp = options.valueOf(timeOpt).longValue + val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt)) val topicPartitionFilter = if (options.has(topicPartitionsOpt)) { - createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), excludeInternalTopics) + createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt)) } else { - val partitionIdsRequested = createPartitionSet(options.valueOf(partitionsOpt)) - createTopicPartitionFilterWithTopicAndPartitionPattern( if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None, - excludeInternalTopics, - partitionIdsRequested + options.valueOf(partitionsOpt) ) } val config = if (options.has(commandConfigOpt)) Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties - config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) - val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) + config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId) + val adminClient = Admin.create(config) try { - val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter) + val partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics) if (partitionInfos.isEmpty) { throw new IllegalArgumentException("Could not match any topic-partitions with the specified filters") } - val topicPartitions = partitionInfos.flatMap { p => - if (p.leader == null) { - System.err.println(s"Error: topic-partition ${p.topic}:${p.partition} does not have a leader. Skip getting offsets") - None - } else - Some(new TopicPartition(p.topic, p.partition)) - } + val timestampsToSearch = partitionInfos.map(tp => tp -> offsetSpec).toMap.asJava - /* Note that the value of the map can be null */ - val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match { - case ListOffsetsRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala - case ListOffsetsRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala - case _ => - val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava - consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, x) => - if (x == null) (k, null) else (k, x.offset: java.lang.Long) - } + val listOffsetsResult = adminClient.listOffsets(timestampsToSearch) + val partitionOffsets = partitionInfos.flatMap { tp => + try { + val partitionInfo = listOffsetsResult.partitionResult(tp).get + Some((tp, partitionInfo.offset)) + } catch { + case e: ExecutionException => + e.getCause match { + case _: LeaderNotAvailableException => + System.err.println(s"Skip getting offsets for: topic-partition ${tp.topic}:${tp.partition} since it does not have a leader right now.") + case _ => + System.err.println(s"Error while getting offset for topic-partition ${tp.topic}:${tp.partition}") + e.printStackTrace() Review comment: We should not print the stack trace like this with `e.printStackTrace()`. It would be better to include the error name or the error message in the `System.err.println`. How about having a generic error for all `KafkaException`? Something like `Error: Skip getting offsets for topic-partition ${p.topic}:${p.partition due to: ${e.getMessage}. If we would get any non `KafkaException` in the cause, we should throw if further, in my opinion. I have not sure if it can really happen but let's be on the safe side. ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -109,6 +111,61 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging { ) } + @ParameterizedTest + @ValueSource(strings = Array("-1", "latest")) + def testGetLatestOffsets(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) + assertEquals( + List( + ("topic1", 0, Some(1)), + ("topic2", 0, Some(2)), + ("topic3", 0, Some(3)), + ("topic4", 0, Some(4)) + ), + offsets + ) + } + + @ParameterizedTest + @ValueSource(strings = Array("-2", "earliest")) + def testGetEarliestOffsets(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) + assertEquals( + List( + ("topic1", 0, Some(0)), + ("topic2", 0, Some(0)), + ("topic3", 0, Some(0)), + ("topic4", 0, Some(0)) + ), + offsets + ) + } + + @ParameterizedTest + @ValueSource(strings = Array("-3", "max-timestamp")) + def testGetOffsetsByMaxTimestamp(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*", "--time", time)) + offsets.foreach{ case (topic, _, timestampOpt) => Review comment: nit: A space is missing before `{` and there is an extra one after. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org