wcarlson5 commented on a change in pull request #9863:
URL: https://github.com/apache/kafka/pull/9863#discussion_r555168775



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -249,20 +249,31 @@ public boolean isValidTransition(final State newState) {
     private boolean waitOnState(final State targetState, final long waitMs) {
         final long begin = time.milliseconds();
         synchronized (stateLock) {
+            boolean interrupted = false;
             long elapsedMs = 0L;
-            while (state != targetState) {
-                if (waitMs > elapsedMs) {
-                    final long remainingMs = waitMs - elapsedMs;
-                    try {
-                        stateLock.wait(remainingMs);
-                    } catch (final InterruptedException e) {
-                        // it is ok: just move on to the next iteration
+            try {
+                while (state != targetState) {
+                    if (waitMs > elapsedMs) {
+                        final long remainingMs = waitMs - elapsedMs;
+                        try {
+                            stateLock.wait(remainingMs);
+                        } catch (final InterruptedException e) {
+                            interrupted = true;
+                        }
+                    } else {
+                        log.debug("Cannot transit to {} within {}ms", 
targetState, waitMs);
+                        return false;
                     }
-                } else {
-                    log.debug("Cannot transit to {} within {}ms", targetState, 
waitMs);
-                    return false;
+                    elapsedMs = time.milliseconds() - begin;
+                }
+            } finally {
+                // Make sure to restore the interruption status before 
returning.

Review comment:
       do we want to interrupt before we return? if that is the case why wait 
until the condition is full-filled?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -249,20 +249,31 @@ public boolean isValidTransition(final State newState) {
     private boolean waitOnState(final State targetState, final long waitMs) {
         final long begin = time.milliseconds();
         synchronized (stateLock) {
+            boolean interrupted = false;
             long elapsedMs = 0L;
-            while (state != targetState) {
-                if (waitMs > elapsedMs) {
-                    final long remainingMs = waitMs - elapsedMs;
-                    try {
-                        stateLock.wait(remainingMs);
-                    } catch (final InterruptedException e) {
-                        // it is ok: just move on to the next iteration
+            try {
+                while (state != targetState) {
+                    if (waitMs > elapsedMs) {
+                        final long remainingMs = waitMs - elapsedMs;
+                        try {
+                            stateLock.wait(remainingMs);
+                        } catch (final InterruptedException e) {
+                            interrupted = true;
+                        }
+                    } else {
+                        log.debug("Cannot transit to {} within {}ms", 
targetState, waitMs);
+                        return false;
                     }
-                } else {
-                    log.debug("Cannot transit to {} within {}ms", targetState, 
waitMs);
-                    return false;
+                    elapsedMs = time.milliseconds() - begin;
+                }
+            } finally {
+                // Make sure to restore the interruption status before 
returning.
+                // We do not always own the current thread that executes this 
method, i.e., we do not know the
+                // interruption policy of the thread. The least we can do is 
restore the interruption status before
+                // the current thread exits this method.
+                if (interrupted) {
+                    Thread.currentThread().interrupt();

Review comment:
       I am not 100% sure this is the purpose of the interrupt flag but I think 
that this will do

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -249,20 +249,31 @@ public boolean isValidTransition(final State newState) {
     private boolean waitOnState(final State targetState, final long waitMs) {
         final long begin = time.milliseconds();
         synchronized (stateLock) {
+            boolean interrupted = false;
             long elapsedMs = 0L;
-            while (state != targetState) {
-                if (waitMs > elapsedMs) {
-                    final long remainingMs = waitMs - elapsedMs;
-                    try {
-                        stateLock.wait(remainingMs);
-                    } catch (final InterruptedException e) {
-                        // it is ok: just move on to the next iteration
+            try {
+                while (state != targetState) {
+                    if (waitMs > elapsedMs) {
+                        final long remainingMs = waitMs - elapsedMs;
+                        try {
+                            stateLock.wait(remainingMs);
+                        } catch (final InterruptedException e) {
+                            interrupted = true;
+                        }
+                    } else {
+                        log.debug("Cannot transit to {} within {}ms", 
targetState, waitMs);
+                        return false;
                     }
-                } else {
-                    log.debug("Cannot transit to {} within {}ms", targetState, 
waitMs);
-                    return false;
+                    elapsedMs = time.milliseconds() - begin;
+                }
+            } finally {
+                // Make sure to restore the interruption status before 
returning.
+                // We do not always own the current thread that executes this 
method, i.e., we do not know the
+                // interruption policy of the thread. The least we can do is 
restore the interruption status before
+                // the current thread exits this method.
+                if (interrupted) {
+                    Thread.currentThread().interrupt();
                 }
-                elapsedMs = time.milliseconds() - begin;
             }
             return true;

Review comment:
       if we have this inside the try as well then we should return and set the 
interrupted status. That should be better for the user if they want to ignore 
the exception because they can be sure of the state change and avoid making 
this call again




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to