[ https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798003#comment-17798003 ]
Rui Fan edited comment on FLINK-33565 at 12/18/23 4:21 AM: ----------------------------------------------------------- Hi [~mapohl], I have finished the POC, and wanna to check whether my solution is fine for you. POC can be found in this [PR|https://github.com/apache/flink/pull/23867], this PR will be updated in the future, and I have backuped a permanent branch here: [https://github.com/1996fanrui/flink/commits/33565/permanent-backup1/] Core idea is: For each failure, RestartStrategy will return whether current failure is a new attempt(Based on FLIP-364). * If it's a new attempt, it's root exception, and it will be the latest root exception. * If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException. Core changes: My poc branch has 3 commits related to this JIRA: # FLINK-33565[Exception] Archive exceptions into the exception history immediately when they occur, instead of archiving them when restarting # FLINK-33565[Exception] Restart strategy checks whether current failure is a new attempt # FLINK-33565[Scheduler] ConcurrentExceptions works with exception merging The first commit is refactoring, actually, I don't know why archiving exception when restarting task instead of immediately. It means, when one task failure, we can see the exception history after flink restart this task. So the first commit is only a refactoring. It archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting. The second commit is related to restart strategy, adding a return value indicates whether current failure is a new attempt. The third commit is core solution of this JIRA: * If it's a new attempt, it's root exception. and it will be the latest root exception. * If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException. The last commit, I added a job demo with 6 regions, all tasks will fail when processing the first record, this demo job can be run directly. Here is the result, we can see all failed tasks in the WebUI. If you agree with my solution, I can go ahead. If not, we can discuss first. Looking forward to your opinions, thanks~ !screenshot-1.png|width=923,height=468! was (Author: fanrui): Hi [~mapohl], I have finished the POC, and wanna to check whether my solution is fine for you. POC can be found in this [PR|https://github.com/apache/flink/pull/23867], this PR will be updated in the future, and I have backup a permanent branch here: https://github.com/1996fanrui/flink/commits/33565/permanent-backup1/ Core idea is: For each failure, RestartStrategy will return whether current failure is a new attempt(Based on FLIP-364). * If it's a new attempt, it's root exception * If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException. Core changes: My poc branch has 3 commits related to this JIRA: # [FLINK-33565][Exception] Archive exceptions into the exception history immediately when they occur, instead of archiving them when restarting # [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt # [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging The first commit is refactoring, actually, I don't know why archiving exception when restarting task instead of immediately. It means, when one task failure, we can see the exception history after flink restart this task. So the first commit is only a refactoring. It archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting. The second commit is related to restart strategy, adding a return value indicates whether current failure is a new attempt. The third commit is core solution of this JIRA: * If it's a new attempt, it's root exception * If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException. The last commit, I added a job demo with 6 regions, all tasks will fail when processing the first record, this demo job can be run directly. Here is the result, we can see all failed tasks in the WebUI. If you agree with my solution, I can go ahead. If not, we can discuss first. Looking forward to your opinions, thanks~ !screenshot-1.png! > The concurrentExceptions doesn't work > ------------------------------------- > > Key: FLINK-33565 > URL: https://issues.apache.org/jira/browse/FLINK-33565 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.18.0, 1.17.1 > Reporter: Rui Fan > Assignee: Rui Fan > Priority: Major > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > > First of all, thanks to [~mapohl] for helping double-check in advance that > this was indeed a bug . > Displaying exception history in WebUI is supported in FLINK-6042. > h1. What's the concurrentExceptions? > When an execution fails due to an exception, other executions in the same > region will also restart, and the first Exception is rootException. If other > restarted executions also report Exception at this time, we hope to collect > these exceptions and Displayed to the user as concurrentExceptions. > h2. What's this bug? > The concurrentExceptions is always empty in production, even if other > executions report exception at very close times. > h1. Why doesn't it work? > If one job has all-to-all shuffle, this job only has one region, and this > region has a lot of executions. If one execution throw exception: > * JobMaster will mark the state as FAILED for this execution. > * The rest of executions of this region will be marked to CANCELING. > ** This call stack can be found at FLIP-364 > [part-4.2.3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy#FLIP364:Improvetherestartstrategy-4.2.3Detailedcodeforfull-failover] > > When these executions throw exception as well, it JobMaster will mark the > state from CANCELING to CANCELED instead of FAILED. > The CANCELED execution won't call FAILED logic, so their exceptions are > ignored. > Note: all reports are executed inside of JobMaster RPC thread, it's single > thread. So these reports are executed serially. So only one execution is > marked to FAILED, and the rest of executions will be marked to CANCELED later. > h1. How to fix it? > Offline discuss with [~mapohl] , we need to discuss with community should we > keep the concurrentExceptions first. > * If no, we can remove related logic directly > * If yew, we discuss how to fix it later. -- This message was sent by Atlassian Jira (v8.20.10#820010)