dajac commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r818717336



##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -109,6 +111,46 @@ class GetOffsetShellTest extends KafkaServerTestHarness 
with Logging {
     )
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("-1", "latest"))
+  def testGetLatestOffsets(time: String): Unit = {
+    val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", 
"--time", time))
+    assertEquals(
+      List(
+        ("topic1", 0, Some(1)),
+        ("topic2", 0, Some(2)),
+        ("topic3", 0, Some(3)),
+        ("topic4", 0, Some(4))
+      ),
+      offsets
+    )
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("-2", "earliest"))
+  def testGetEarliestOffsets(time: String): Unit = {
+    val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", 
"--time", time))
+    assertEquals(
+      List(
+        ("topic1", 0, Some(0)),
+        ("topic2", 0, Some(0)),
+        ("topic3", 0, Some(0)),
+        ("topic4", 0, Some(0))
+      ),
+      offsets
+    )
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("-3", "max-timestamp"))
+  def testGetOffsetsByMaxTimestamp(time: String): Unit = {
+    val offsets = executeAndParse(Array("--topic-partitions", "topic.*", 
"--time", time))
+    offsets.foreach{ offset =>

Review comment:
       nit: Could we deconstruct offset? `offsets.foreach { case (topic, 
partition, timestamp) ->`

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -109,6 +111,46 @@ class GetOffsetShellTest extends KafkaServerTestHarness 
with Logging {
     )
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("-1", "latest"))
+  def testGetLatestOffsets(time: String): Unit = {
+    val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", 
"--time", time))
+    assertEquals(
+      List(
+        ("topic1", 0, Some(1)),
+        ("topic2", 0, Some(2)),
+        ("topic3", 0, Some(3)),
+        ("topic4", 0, Some(4))
+      ),
+      offsets
+    )
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("-2", "earliest"))
+  def testGetEarliestOffsets(time: String): Unit = {
+    val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", 
"--time", time))
+    assertEquals(
+      List(
+        ("topic1", 0, Some(0)),
+        ("topic2", 0, Some(0)),
+        ("topic3", 0, Some(0)),
+        ("topic4", 0, Some(0))
+      ),
+      offsets
+    )
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("-3", "max-timestamp"))
+  def testGetOffsetsByMaxTimestamp(time: String): Unit = {
+    val offsets = executeAndParse(Array("--topic-partitions", "topic.*", 
"--time", time))
+    offsets.foreach{ offset =>
+      // We can't know the exact offsets with max timestamp
+      assertTrue(offset._3.get >= 0 && offset._3.get <= 
offset._1.replace("topic", "").toInt)
+    }
+  }
+
   @Test
   def testTopicPartitionsArgWithInternalExcluded(): Unit = {

Review comment:
       Do we have a unit test which verifies that internal topics are still 
included?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -103,59 +104,76 @@ object GetOffsetShell {
       throw new IllegalArgumentException("--topic-partitions cannot be used 
with --topic or --partitions")
     }
 
-    val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
+    val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt))
 
     val topicPartitionFilter = if (options.has(topicPartitionsOpt)) {
-      
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), 
excludeInternalTopics)
+      
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt))
     } else {
-      val partitionIdsRequested = 
createPartitionSet(options.valueOf(partitionsOpt))
-
       createTopicPartitionFilterWithTopicAndPartitionPattern(
         if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
-        excludeInternalTopics,
-        partitionIdsRequested
+        options.valueOf(partitionsOpt)
       )
     }
 
     val config = if (options.has(commandConfigOpt))
       Utils.loadProps(options.valueOf(commandConfigOpt))
     else
       new Properties
-    config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
-    val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+    config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId)
+    val client = Admin.create(config)
 
     try {
-      val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter)
+      val partitionInfos = listPartitionInfos(client, topicPartitionFilter, 
excludeInternalTopics)
 
       if (partitionInfos.isEmpty) {
         throw new IllegalArgumentException("Could not match any 
topic-partitions with the specified filters")
       }
 
-      val topicPartitions = partitionInfos.flatMap { p =>
-        if (p.leader == null) {
-          System.err.println(s"Error: topic-partition 
${p.topic}:${p.partition} does not have a leader. Skip getting offsets")
-          None
-        } else
-          Some(new TopicPartition(p.topic, p.partition))
-      }
+      val timestampsToSearch = partitionInfos.map(tp => tp -> 
offsetSpec).toMap.asJava
 
-      /* Note that the value of the map can be null */
-      val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = 
listOffsetsTimestamp match {
-        case ListOffsetsRequest.EARLIEST_TIMESTAMP => 
consumer.beginningOffsets(topicPartitions.asJava).asScala
-        case ListOffsetsRequest.LATEST_TIMESTAMP => 
consumer.endOffsets(topicPartitions.asJava).asScala
-        case _ =>
-          val timestampsToSearch = topicPartitions.map(tp => tp -> 
(listOffsetsTimestamp: java.lang.Long)).toMap.asJava
-          consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, 
x) =>
-            if (x == null) (k, null) else (k, x.offset: java.lang.Long)
-          }
+      val listOffsetsResult = client.listOffsets(timestampsToSearch)
+      val partitionOffsets = partitionInfos.flatMap { tp =>
+        try {
+          val partitionInfo = listOffsetsResult.partitionResult(tp).get
+          Some((tp, partitionInfo.offset))
+        } catch {
+          case e: ExecutionException =>
+            e.getCause match {
+              case _: LeaderNotAvailableException =>
+                System.err.println(s"Error: topic-partition 
${tp.topic}:${tp.partition} does not have a leader. Skip getting offsets")
+              case _ =>
+                System.err.println(s"Error while getting end offsets for 
topic-partition ${tp.topic}:${tp.partition}")

Review comment:
       I wonder if there is a real value in printing this out if we just skip 
the partition in the end, is there any? We did not do anything in this case 
prior to your PR as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to