junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r478604292



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -100,7 +100,8 @@ case class LogAppendInfo(var firstOffset: Option[Long],
                          offsetsMonotonic: Boolean,
                          lastOffsetOfFirstBatch: Long,
                          recordErrors: Seq[RecordError] = List(),
-                         errorMessage: String = null) {
+                         errorMessage: String = null,
+                         leaderHWIncremented: Option[Boolean] = None) {

Review comment:
       Could we add the new param to the javadoc?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -585,6 +597,27 @@ class ReplicaManager(val config: KafkaConfig,
                     result.info.logStartOffset, 
result.info.recordErrors.asJava, result.info.errorMessage)) // response status
       }
 
+      delayedActions.put {
+        () =>
+          localProduceResults.foreach {
+            case (topicPartition, result) =>
+              result.info.leaderHWIncremented.foreach {
+                incremented =>
+                  val requestKey = TopicPartitionOperationKey(topicPartition)
+                  if (incremented) {
+                    // some delayed operations may be unblocked after HW 
changed
+                    delayedProducePurgatory.checkAndComplete(requestKey)
+                    delayedFetchPurgatory.checkAndComplete(requestKey)
+                    delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+                  } else {
+                    // probably unblock some follower fetch requests since log 
end offset has been updated
+                    delayedFetchPurgatory.checkAndComplete(requestKey)
+                  }
+              }
+          }
+      }
+
+

Review comment:
       Unneeded new line.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -187,6 +187,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       // The local completion time may be set while processing the request. 
Only record it if it's unset.
       if (request.apiLocalCompleteTimeNanos < 0)
         request.apiLocalCompleteTimeNanos = time.nanoseconds
+
+      replicaManager.tryCompleteDelayedAction()

Review comment:
       This probably should be included in the local time as before.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -36,8 +36,13 @@ private[group] class DelayedJoin(coordinator: 
GroupCoordinator,
                                  rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
 
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
+  override def onExpiration() = {
+    coordinator.onExpireJoin()
+    tryToCompleteDelayedAction()

Review comment:
       We probably want to add a comment why this is needed.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -57,6 +62,8 @@ private[group] class InitialDelayedJoin(coordinator: 
GroupCoordinator,
 
   override def tryComplete(): Boolean = false
 
+  override def onExpiration(): Unit = tryToCompleteDelayedAction()

Review comment:
       Hmm, why do we need to override this instead of using the one defined in 
DelayedJoin?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to