akalash commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1111988132


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -949,7 +952,9 @@ public final void cleanUp(Throwable throwable) throws 
Exception {
         // disabled the interruptions or not.
         getCompletionFuture().exceptionally(unused -> null).join();
         // clean up everything we initialized
-        isRunning = false;
+        if (!isCanceled() && !isFailing()) {

Review Comment:
   I have a proposal. Since we always can change the state to the final only if 
it is not the final yet. Maybe we can implement the transfer method or 
something similar:
   ```
   void transfer(TaskState newState) {
     if (!taskState.isFinal()) {
       taskState = newState;
     }
   }
   ```
   Anyway I don't have strong opinion here



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -949,7 +952,9 @@ public final void cleanUp(Throwable throwable) throws 
Exception {
         // disabled the interruptions or not.
         getCompletionFuture().exceptionally(unused -> null).join();
         // clean up everything we initialized
-        isRunning = false;
+        if (!isCanceled() && !isFailing()) {

Review Comment:
   Technically, it is a race condition here. (as the same as several lines 
above in `failing` condition). We check the current state and only then set a 
new one. I don't think it is a big problem since we have finished the task 
already, but it is good to clarify this logic anyway.
   
   Right now I see that we set the final state unconditionally in a couple of 
places while in a couple of other places, we check the condition that the state 
is not final yet. 
   
   But I think actually that we have a pretty clear state machine here:
   - init -> running
   - running -> running(maybe)
   - running -> canceled
   - running -> failing
   - running -> finished
   So all other transfers should be ignored. So maybe we should come up with 
`compareAndSet` solution somehow?
   
   @pnowojski , @rkhachatryan , WDYT?
   
   P.S. Since the race condition is present in the current code as well we can 
implement it in the follow-up ticket rather than in this ticket.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -256,19 +259,17 @@
     private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
 
     /**
-     * Flag to mark the task "in operation", in which case check needs to be 
initialized to true, so
-     * that early cancel() before invoke() behaves correctly.
-     */
-    private volatile boolean isRunning;
-
-    /** Flag to mark this task as canceled. */
-    private volatile boolean canceled;
-
-    /**
-     * Flag to mark this task as failing, i.e. if an exception has occurred 
inside {@link
-     * #invoke()}.
+     * INITIALIZED: task constructor was called or on init state. RUNNING: 
task is in operation.
+     * FAILING: task is failing e.g., if an exception has occurred inside 
{@link #invoke()}.
+     * CANCELED: when this task is canceled. FINISHED: task successfully 
terminated.
      */
-    private volatile boolean failing;
+    private enum TaskState {
+        INITIALIZED,
+        RUNNING,
+        FAILING,

Review Comment:
   I actually wonder why it is `FAILING` but not `FAILED`. Does it suppose to 
be NOT the final state? Maybe @rkhachatryan knows the answer to this question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to