Thanks for your replies.

To Peter:
The heartbeat.timeout has been increased to 3 minutes before, but the job
manager timeout will still occur. At present, the following logic is added
: When JM times out, onFatalError is called, which can ensure that the job
fails to exit quickly. Does the method have side effects?


@Override
public void notifyAllocationFailure(JobID jobId, AllocationID
allocationId, Exception cause) {
   validateRunsInMainThread();
   log.info("Slot request with allocation id {} for job {} failed.",
allocationId, jobId, cause);

   JobManagerRegistration jobManagerRegistration =
jobManagerRegistrations.get(jobId);
   if (jobManagerRegistration != null) {
      
jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
cause);
   } else {
      ResourceManagerException exception = new
ResourceManagerException("Job Manager is lost, can not notify
allocation failure.");
      onFatalError(exception);
   }
}



Yours,
Anyang


Peter Huang <huangzhenqiu0...@gmail.com> 于2019年7月2日周二 上午2:43写道:

> Hi Anyang,
>
> Thanks for rising the question. I didn't test the PR in batch mode, the
> observation helps me to have better implementation. From my understanding,
> if rm to a job manager heartbeat timeout, the job manager connection will
> be closed, so it will not be reconnected. Are you running batch job in per
> job cluster or session cluster? To temporarily mitigate the issue you are
> facing, you probable can tune the heartbeat.timecout (default 50s) to a
> larger value.
>
>
> Best Regards
> Peter Huang
>
> On Mon, Jul 1, 2019 at 7:50 AM Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi Anyang,
>>
>> as far as I can tell, FLINK-10868 has not been merged into Flink yet.
>> Thus, I cannot tell much about how well it works. The case you are
>> describing should be properly handled in a version which get's merged
>> though. I guess what needs to happen is that once the JM reconnects to the
>> RM it should synchronize the pending slot requests with the registered slot
>> requests on the RM. But this should be a follow up issue to FLINK-10868,
>> because it would widen the scope too much.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 26, 2019 at 10:52 AM Anyang Hu <huanyang1...@gmail.com>
>> wrote:
>>
>>> Hi ZhenQiu && Rohrmann:
>>>
>>> Currently I backport the FLINK-10868 to flink-1.5, most of my jobs (all
>>> batch jobs) can be exited immediately after applying for the failed
>>> container to the upper limit, but there are still some jobs cannot be
>>> exited immediately. Through the log, it is observed that these jobs have
>>> the job manager timed out first for unknown reasons. The execution of code
>>> segment 1 is after the job manager timed out but before the job manager is
>>> reconnected, so it is suspected that the job manager is out of
>>> synchronization and notifyAllocationFailure() method in the code segment 2
>>> is not executed.
>>>
>>>
>>> I'm wandering if you have encountered similar problems and is there a
>>> solution? In order to solve the problem that cannot be immediately quit, it
>>> is currently considered that if (jobManagerRegistration==null) then
>>> executes the onFatalError() method to immediately exit the process, it is
>>> temporarily unclear whether this violent practice will have any side
>>> effects.
>>>
>>>
>>> Thanks,
>>> Anyang
>>>
>>>
>>> code segment 1 in ResourceManager.java:
>>>
>>> private void cancelAllPendingSlotRequests(Exception cause) {
>>>    slotManager.cancelAllPendingSlotRequests(cause);
>>> }
>>>
>>>
>>> code segment 2 in ResourceManager.java:
>>>
>>> @Override
>>> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
>>> Exception cause) {
>>>    validateRunsInMainThread();
>>>    log.info("Slot request with allocation id {} for job {} failed.", 
>>> allocationId, jobId, cause);
>>>
>>>    JobManagerRegistration jobManagerRegistration = 
>>> jobManagerRegistrations.get(jobId);
>>>    if (jobManagerRegistration != null) {
>>>       
>>> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
>>>  cause);
>>>    }
>>> }
>>>
>>>

Reply via email to