jiameixie commented on a change in pull request #8836:
URL: https://github.com/apache/kafka/pull/8836#discussion_r437180584



##########
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##########
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
     var messagesRead = 0L
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
-    var joinStart = 0L
     var joinTimeMsInSingleRound = 0L
 
     consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-        joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
       "In eager rebalancing, it will always be called at the start of a 
rebalance and after the consumer stops fetching data. In cooperative 
rebalancing, it will be called at the end of a rebalance on the set of 
partitions being revoked iff the set is non-empty" is said in 
https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsAssigned-java.util.Collection-.
 

##########
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##########
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
     var messagesRead = 0L
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
-    var joinStart = 0L
     var joinTimeMsInSingleRound = 0L
 
     consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-        joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
       Below is the output of kafka-consumer-perf-test.sh, where fetch.time.ms 
is negative.
   start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, 
nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
   2020-06-03 02:50:50:008, 2020-06-03 02:52:41:376, 19073.5445, 171.2659, 
20000061, 179585.3477, 1591123850918, -1591123739550, -0.0000, -0.0126
   

##########
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##########
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
     var messagesRead = 0L
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
-    var joinStart = 0L
     var joinTimeMsInSingleRound = 0L
 
     consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-        joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
       I added logs in methods onPartitionsAssigned and onPartitionsRevoked and 
checked both of them are called just once in normal cases.

##########
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##########
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
     var messagesRead = 0L
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
-    var joinStart = 0L
     var joinTimeMsInSingleRound = 0L
 
     consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-        joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
       @chia7712 So I think `joinStart = System.currentTimeMillis` in 
onPartitionsAssigned is no required, where joinStart is close to endMs.  

##########
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##########
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
     var messagesRead = 0L
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
-    var joinStart = 0L
     var joinTimeMsInSingleRound = 0L
 
     consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-        joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
       @chia7712 So I think `joinStart = System.currentTimeMillis` in 
onPartitionsAssigned is not required, where joinStart is close to endMs.  

##########
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##########
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
     var messagesRead = 0L
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
-    var joinStart = 0L
     var joinTimeMsInSingleRound = 0L
 
     consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-        joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
       @chia7712 So I think `joinStart = System.currentTimeMillis` in 
onPartitionsRevoked is not required, where joinStart is close to endMs.  

##########
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##########
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
     var messagesRead = 0L
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
-    var joinStart = 0L
     var joinTimeMsInSingleRound = 0L
 
     consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-        joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
       @chia7712 I don't find a good place to update it. Perhaps we should 
remove this metric. The way it gets fetchTimeInMs value is also not a good way. 
  `val fetchTimeInMs = (endMs - startMs) - joinGroupTimeInMs.get` .  There 
might be some time waiting for connection.

##########
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##########
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
     var messagesRead = 0L
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
-    var joinStart = 0L
     var joinTimeMsInSingleRound = 0L
 
     consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-        joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
       For `onPartitionsRevoked`.  "In eager rebalancing, it will always be 
called at the start of a rebalance and after the consumer stops fetching data. 
In cooperative rebalancing, it will be called at the end of a rebalance on the 
set of partitions being revoked iff the set is non-empty" is said in 
https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsAssigned-java.util.Collection-.
 
   `onPartitionsRevoked `is called after `onPartitionsAssigned`.   `joinStart 
`is 0 when calling onPartitionsAssigned.  
`joinTime.addAndGet(System.currentTimeMillis - joinStart)` is equal to 
`System.currentTimeMillis`, which leads  `val fetchTimeInMs = (endMs - startMs) 
- joinGroupTimeInMs.get`  negative.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to