Thanks for your replies. To Peter: The heartbeat.timeout has been increased to 3 minutes before, but the job manager timeout will still occur. At present, the following logic is added : When JM times out, onFatalError is called, which can ensure that the job fails to exit quickly. Does the method have side effects?
@Override public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) { validateRunsInMainThread(); log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause); JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId); if (jobManagerRegistration != null) { jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause); } else { ResourceManagerException exception = new ResourceManagerException("Job Manager is lost, can not notify allocation failure."); onFatalError(exception); } } Yours, Anyang Peter Huang <huangzhenqiu0...@gmail.com> 于2019年7月2日周二 上午2:43写道: > Hi Anyang, > > Thanks for rising the question. I didn't test the PR in batch mode, the > observation helps me to have better implementation. From my understanding, > if rm to a job manager heartbeat timeout, the job manager connection will > be closed, so it will not be reconnected. Are you running batch job in per > job cluster or session cluster? To temporarily mitigate the issue you are > facing, you probable can tune the heartbeat.timecout (default 50s) to a > larger value. > > > Best Regards > Peter Huang > > On Mon, Jul 1, 2019 at 7:50 AM Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi Anyang, >> >> as far as I can tell, FLINK-10868 has not been merged into Flink yet. >> Thus, I cannot tell much about how well it works. The case you are >> describing should be properly handled in a version which get's merged >> though. I guess what needs to happen is that once the JM reconnects to the >> RM it should synchronize the pending slot requests with the registered slot >> requests on the RM. But this should be a follow up issue to FLINK-10868, >> because it would widen the scope too much. >> >> Cheers, >> Till >> >> On Wed, Jun 26, 2019 at 10:52 AM Anyang Hu <huanyang1...@gmail.com> >> wrote: >> >>> Hi ZhenQiu && Rohrmann: >>> >>> Currently I backport the FLINK-10868 to flink-1.5, most of my jobs (all >>> batch jobs) can be exited immediately after applying for the failed >>> container to the upper limit, but there are still some jobs cannot be >>> exited immediately. Through the log, it is observed that these jobs have >>> the job manager timed out first for unknown reasons. The execution of code >>> segment 1 is after the job manager timed out but before the job manager is >>> reconnected, so it is suspected that the job manager is out of >>> synchronization and notifyAllocationFailure() method in the code segment 2 >>> is not executed. >>> >>> >>> I'm wandering if you have encountered similar problems and is there a >>> solution? In order to solve the problem that cannot be immediately quit, it >>> is currently considered that if (jobManagerRegistration==null) then >>> executes the onFatalError() method to immediately exit the process, it is >>> temporarily unclear whether this violent practice will have any side >>> effects. >>> >>> >>> Thanks, >>> Anyang >>> >>> >>> code segment 1 in ResourceManager.java: >>> >>> private void cancelAllPendingSlotRequests(Exception cause) { >>> slotManager.cancelAllPendingSlotRequests(cause); >>> } >>> >>> >>> code segment 2 in ResourceManager.java: >>> >>> @Override >>> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, >>> Exception cause) { >>> validateRunsInMainThread(); >>> log.info("Slot request with allocation id {} for job {} failed.", >>> allocationId, jobId, cause); >>> >>> JobManagerRegistration jobManagerRegistration = >>> jobManagerRegistrations.get(jobId); >>> if (jobManagerRegistration != null) { >>> >>> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, >>> cause); >>> } >>> } >>> >>>