Hi Xiangyu/Dev, Did any one has solution handle below important note in StreamingFileSink:
Caused by: java.io.IOException: Cannot clean commit: Staging file does not exist. at org.apache.flink.runtime.fs.hdfs. HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit( HadoopRecoverableFsDataOutputStream.java:250) Important Note 3: Flink and the StreamingFileSink never overwrites committed data. Given this, when trying to restore from an old checkpoint/savepoint which assumes an in-progress file which was committed by subsequent successful checkpoints, *Flink will refuse to resume and it will throw an exception as it cannot locate the in-progress file*. Currently i am facing same issue in the PROD code. Regards, Nagireddy Y. On Fri, Aug 4, 2023 at 12:11 PM xiangyu feng <xiangyu...@gmail.com> wrote: > Hi ynagireddy4u, > > From the exception info, I think your application has met a HDFS file > issue during the commit phase of checkpoint. Can u check why 'Staging file > does not exist' in the first place? > > Regards, > Xiangyu > > Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月4日周五 12:21写道: > >> Hi Xiangyu/Dev Team, >> >> Thanks for reply. >> >> In our flink job, we increase the *checkpoint timeout to 30 min.* >> And the *checkpoint interval is 10 min.* >> >> But while running the job we got below exception. >> >> java.lang.RuntimeException: Error while confirming checkpoint >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .notifyCheckpointComplete(StreamTask.java:952) >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924) >> at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5( >> FunctionUtils.java:125) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at org.apache.flink.streaming.runtime.tasks. >> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run( >> StreamTaskActionExecutor.java:87) >> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail >> .java:78) >> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >> .processMail(MailboxProcessor.java:261) >> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >> .runMailboxLoop(MailboxProcessor.java:186) >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .runMailboxLoop(StreamTask.java:487) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:470) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.IOException: Cannot clean commit: Staging file does >> not exist. >> at org.apache.flink.runtime.fs.hdfs. >> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit( >> HadoopRecoverableFsDataOutputStream.java:250) >> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket >> .onSuccessfulCompletionOfCheckpoint(Bucket.java:300) >> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets >> .commitUpToCheckpoint(Buckets.java:216) >> at org.apache.flink.streaming.api.functions.sink.filesystem. >> StreamingFileSink.notifyCheckpointComplete(StreamingFileSink.java:415) >> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator >> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .lambda$notifyCheckpointComplete$8(StreamTask.java:936) >> at org.apache.flink.streaming.runtime.tasks. >> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call( >> StreamTaskActionExecutor.java:101) >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .notifyCheckpointComplete(StreamTask.java:930) >> ... 12 more >> >> It would be great, if you have any workaround for that. >> >> Regards, >> Nagireddy Y. >> >> >> >> >> >> >> On Thu, Aug 3, 2023 at 7:24 AM xiangyu feng <xiangyu...@gmail.com> wrote: >> >>> Hi ynagireddy4u, >>> >>> We have met this exception before. Usually it is caused by following >>> reasons: >>> >>> 1), TaskManager is too busy with other works to send the heartbeat to >>> JobMaster or TaskManager process might already exited; >>> 2), There might be a network issues between this TaskManager and >>> JobMaster; >>> 3), In certain cases, JobMaster actor might also being too busy to >>> process the RPC requests from TaskManager; >>> >>> Pls check if your problem fits the above situations. >>> >>> Best, >>> Xiangyu >>> >>> >>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年7月31日周一 >>> 20:49写道: >>> >>>> Hi Team, >>>> >>>> Did any one face the below exception. >>>> If yes, please share the resolution. >>>> >>>> >>>> 2023-07-28 22:04:16 >>>> j*ava.util.concurrent.TimeoutException: Heartbeat of TaskManager with id >>>> container_e19_1690528962823_0382_01_000005 timed out.* >>>> at org.apache.flink.runtime.jobmaster. >>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster >>>> .java:1147) >>>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run( >>>> HeartbeatMonitorImpl.java:109) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: >>>> 511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync( >>>> AkkaRpcActor.java:397) >>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( >>>> AkkaRpcActor.java:190) >>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor >>>> .handleRpcMessage(FencedAkkaRpcActor.java:74) >>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( >>>> AkkaRpcActor.java:152) >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>> at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>> at akka.japi.pf >>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>> at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>> at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool >>>> .java:1339) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread >>>> .java:107) >>>> >>>> Any suggestions, please share with me. >>>> >>>> Regards, >>>> Nagireddy Y >>>> >>>