ableegoldman commented on a change in pull request #9863:
URL: https://github.com/apache/kafka/pull/9863#discussion_r556176651
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -249,20 +249,31 @@ public boolean isValidTransition(final State newState) {
private boolean waitOnState(final State targetState, final long waitMs) {
final long begin = time.milliseconds();
synchronized (stateLock) {
+ boolean interrupted = false;
long elapsedMs = 0L;
- while (state != targetState) {
- if (waitMs > elapsedMs) {
- final long remainingMs = waitMs - elapsedMs;
- try {
- stateLock.wait(remainingMs);
- } catch (final InterruptedException e) {
- // it is ok: just move on to the next iteration
+ try {
+ while (state != targetState) {
+ if (waitMs > elapsedMs) {
+ final long remainingMs = waitMs - elapsedMs;
+ try {
+ stateLock.wait(remainingMs);
+ } catch (final InterruptedException e) {
+ interrupted = true;
Review comment:
Maybe we should have a quick sync on this. My understanding is that an
interrupt means that the thread wants to regain control somewhere along the
callstack. So the only way I can see to interpret it is as "(1) stop
blocking/waiting on this, (2) get the system back into a consistent state, and
then (3) reset the flag so the interrupt can be handled (or not) by the
caller". Before this PR we were doing (1) and (2), now we're doing (2) and (3),
but why not all three?
If we don't break out of the loop then we've effectively ignored (but not
swallowed) the interrupt, since we will go on waiting for the thread to reach
DEAD. Worse, the thread will now be in a busy loop, as you mentioned in another
comment
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]