The default number of restart attempts <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#restart-strategy-fixed-delay-attempts> is 1. You need to explicitly configure it to allow more failures.

On 6/7/2021 11:53 AM, 1095193...@qq.com wrote:
Hi community,
I have  a job which read data from Datahub and sink data to Elasticsearch. The Elasticsearch frequently timeout which lead to Flink job failed and stopped, then a manually restart is needed.  After investigate checkpoint strategy, I believe checkpoint can restart job automaically and avoid a manually restart when job failed. However,  the job still failed and stopped when Elasticsearch timeout although I have configure checkpoint in flink-conf.yaml
*flink-conf.yaml*
/state.checkpoints.dir: hdfs://172.16.1.192:9000/flink-checkpoints/
/execution.checkpointing.interval: 10 s/
/state.savepoints.dir: hdfs://172.16.1.192:9000/flink-savepoints/
/restart-strategy: fixed-delay/
/restart-strategy.fixed-delay.delay: 1 min/
*flink log *
see attachment for full log.
/[INFO ] 2021-06-05 10:35:59.020 [flink-akka.actor.default-dispatcher-19] o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy-[getTasksNeedingRestart] - 115 - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0./ /[INFO ] 2021-06-05 10:35:59.020 [flink-akka.actor.default-dispatcher-19] o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy-[getTasksNeedingRestart] - 152 - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. /
*the above log  shows restarted strategy works after Exception occurs. *
/[INFO ] 2021-06-05 10:38:09.428 [flink-akka.actor.default-dispatcher-4] o.a.f.r.c.CheckpointCoordinator-[shutdown] - 405 - Stopping checkpoint coordinator for job 63c270e00b69eb967f59479bb1c84113./ /[INFO ] 2021-06-05 10:38:09.428 [flink-akka.actor.default-dispatcher-4] o.a.f.r.c.StandaloneCompletedCheckpointStore-[shutdown] - 96 - Shutting down/ /[INFO ] 2021-06-05 10:38:09.451 [flink-akka.actor.default-dispatcher-2] o.a.f.r.dispatcher.MiniDispatcher-[jobReachedGloballyTerminalState] - 827 - Job 63c270e00b69eb967f59479bb1c84113 reached globally terminal state FAILED./ /[INFO ] 2021-06-05 10:38:09.452 [flink-akka.actor.default-dispatcher-2] o.a.f.r.dispatcher.MiniDispatcher-[jobReachedGloballyTerminalState] - 132 - Shutting down cluster with state FAILED, jobCancelled: false, executionMode: DETACHED/ /[INFO ] 2021-06-05 10:38:09.453 [flink-akka.actor.default-dispatcher-2] o.a.f.r.entrypoint.ClusterEntrypoint-[shutDownAsync] - 481 - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics null./ /[INFO ] 2021-06-05 10:38:09.453 [flink-akka.actor.default-dispatcher-2] o.a.f.r.j.MiniDispatcherRestEndpoint-[closeAsync] - 309 - Shutting down rest endpoint./ /[INFO ] 2021-06-05 10:38:09.463 [flink-akka.actor.default-dispatcher-4] o.a.f.runtime.jobmaster.JobMaster-[onStop] - 395 - Stopping the JobMaster for job insert-into_default_catalog.default_database.table3(63c270e00b69eb967f59479bb1c84113)./ *However, flink cluster eventually shutdown after serveral restarts failed. Why my flink job eventually failed even though checkpoint is enabled and restart-strategy is set to fixed-delay?*


------------------------------------------------------------------------
1095193...@qq.com


Reply via email to