jiameixie commented on a change in pull request #8836: URL: https://github.com/apache/kafka/pull/8836#discussion_r437180584
########## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ########## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L - var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: "In eager rebalancing, it will always be called at the start of a rebalance and after the consumer stops fetching data. In cooperative rebalancing, it will be called at the end of a rebalance on the set of partitions being revoked iff the set is non-empty" is said in https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsAssigned-java.util.Collection-. ########## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ########## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L - var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: Below is the output of kafka-consumer-perf-test.sh, where fetch.time.ms is negative. start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 2020-06-03 02:50:50:008, 2020-06-03 02:52:41:376, 19073.5445, 171.2659, 20000061, 179585.3477, 1591123850918, -1591123739550, -0.0000, -0.0126 ########## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ########## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L - var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: I added logs in methods onPartitionsAssigned and onPartitionsRevoked and checked both of them are called just once in normal cases. ########## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ########## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L - var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: @chia7712 So I think `joinStart = System.currentTimeMillis` in onPartitionsAssigned is no required, where joinStart is close to endMs. ########## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ########## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L - var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: @chia7712 So I think `joinStart = System.currentTimeMillis` in onPartitionsAssigned is not required, where joinStart is close to endMs. ########## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ########## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L - var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: @chia7712 So I think `joinStart = System.currentTimeMillis` in onPartitionsRevoked is not required, where joinStart is close to endMs. ########## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ########## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L - var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: @chia7712 I don't find a good place to update it. Perhaps we should remove this metric. The way it gets fetchTimeInMs value is also not a good way. `val fetchTimeInMs = (endMs - startMs) - joinGroupTimeInMs.get` . There might be some time waiting for connection. ########## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ########## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L - var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: For `onPartitionsRevoked`. "In eager rebalancing, it will always be called at the start of a rebalance and after the consumer stops fetching data. In cooperative rebalancing, it will be called at the end of a rebalance on the set of partitions being revoked iff the set is non-empty" is said in https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsAssigned-java.util.Collection-. `onPartitionsRevoked `is called after `onPartitionsAssigned`. `joinStart `is 0 when calling onPartitionsAssigned. `joinTime.addAndGet(System.currentTimeMillis - joinStart)` is equal to `System.currentTimeMillis`, which leads `val fetchTimeInMs = (endMs - startMs) - joinGroupTimeInMs.get` negative. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org