Hi Nagireddy, I'm not particularly familiar with StreamingFileSink but I checked with the implementation of HadoopFsCommitter. AFAIK, when committing files to HDFS the committer will check if the temp file exist in the first place. [image: image.png]
In your case, could u check why the committing temp file not exist on HDFS? Were these files deleted by mistake? I searched some information, this error may be due to the small file merge will merge the file that is being written. You can disable small file merge when writing files. Hope this helps. Regards, Xiangyu Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月5日周六 18:22写道: > Hi Xiangyu/Dev, > > Did any one has solution handle below important note in StreamingFileSink: > > Caused by: java.io.IOException: Cannot clean commit: Staging file does not > exist. > at org.apache.flink.runtime.fs.hdfs. > HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit( > HadoopRecoverableFsDataOutputStream.java:250) > > Important Note 3: Flink and the StreamingFileSink never overwrites > committed data. Given this, when trying to restore from an old > checkpoint/savepoint which assumes an in-progress file which was committed > by subsequent successful checkpoints, *Flink will refuse to resume and it > will throw an exception as it cannot locate the in-progress file*. > > Currently i am facing same issue in the PROD code. > > > > Regards, > Nagireddy Y. > > > > > > > On Fri, Aug 4, 2023 at 12:11 PM xiangyu feng <xiangyu...@gmail.com> wrote: > >> Hi ynagireddy4u, >> >> From the exception info, I think your application has met a HDFS file >> issue during the commit phase of checkpoint. Can u check why 'Staging file >> does not exist' in the first place? >> >> Regards, >> Xiangyu >> >> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月4日周五 12:21写道: >> >>> Hi Xiangyu/Dev Team, >>> >>> Thanks for reply. >>> >>> In our flink job, we increase the *checkpoint timeout to 30 min.* >>> And the *checkpoint interval is 10 min.* >>> >>> But while running the job we got below exception. >>> >>> java.lang.RuntimeException: Error while confirming checkpoint >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .notifyCheckpointComplete(StreamTask.java:952) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924) >>> at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5( >>> FunctionUtils.java:125) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at org.apache.flink.streaming.runtime.tasks. >>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run( >>> StreamTaskActionExecutor.java:87) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail >>> .java:78) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .processMail(MailboxProcessor.java:261) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .runMailboxLoop(MailboxProcessor.java:186) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .runMailboxLoop(StreamTask.java:487) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:470) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.io.IOException: Cannot clean commit: Staging file does >>> not exist. >>> at org.apache.flink.runtime.fs.hdfs. >>> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit( >>> HadoopRecoverableFsDataOutputStream.java:250) >>> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket >>> .onSuccessfulCompletionOfCheckpoint(Bucket.java:300) >>> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets >>> .commitUpToCheckpoint(Buckets.java:216) >>> at org.apache.flink.streaming.api.functions.sink.filesystem. >>> StreamingFileSink.notifyCheckpointComplete(StreamingFileSink.java:415) >>> at org.apache.flink.streaming.api.operators. >>> AbstractUdfStreamOperator.notifyCheckpointComplete( >>> AbstractUdfStreamOperator.java:130) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .lambda$notifyCheckpointComplete$8(StreamTask.java:936) >>> at org.apache.flink.streaming.runtime.tasks. >>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call( >>> StreamTaskActionExecutor.java:101) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .notifyCheckpointComplete(StreamTask.java:930) >>> ... 12 more >>> >>> It would be great, if you have any workaround for that. >>> >>> Regards, >>> Nagireddy Y. >>> >>> >>> >>> >>> >>> >>> On Thu, Aug 3, 2023 at 7:24 AM xiangyu feng <xiangyu...@gmail.com> >>> wrote: >>> >>>> Hi ynagireddy4u, >>>> >>>> We have met this exception before. Usually it is caused by following >>>> reasons: >>>> >>>> 1), TaskManager is too busy with other works to send the heartbeat to >>>> JobMaster or TaskManager process might already exited; >>>> 2), There might be a network issues between this TaskManager and >>>> JobMaster; >>>> 3), In certain cases, JobMaster actor might also being too busy to >>>> process the RPC requests from TaskManager; >>>> >>>> Pls check if your problem fits the above situations. >>>> >>>> Best, >>>> Xiangyu >>>> >>>> >>>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年7月31日周一 >>>> 20:49写道: >>>> >>>>> Hi Team, >>>>> >>>>> Did any one face the below exception. >>>>> If yes, please share the resolution. >>>>> >>>>> >>>>> 2023-07-28 22:04:16 >>>>> j*ava.util.concurrent.TimeoutException: Heartbeat of TaskManager with >>>>> id >>>>> container_e19_1690528962823_0382_01_000005 timed out.* >>>>> at org.apache.flink.runtime.jobmaster. >>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster >>>>> .java:1147) >>>>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run( >>>>> HeartbeatMonitorImpl.java:109) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: >>>>> 511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync( >>>>> AkkaRpcActor.java:397) >>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( >>>>> AkkaRpcActor.java:190) >>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor >>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74) >>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( >>>>> AkkaRpcActor.java:152) >>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>>> at >>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>> at akka.japi.pf >>>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>>> at >>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>>> at >>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>>> at >>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>>> at >>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool >>>>> .java:1339) >>>>> at >>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread >>>>> .java:107) >>>>> >>>>> Any suggestions, please share with me. >>>>> >>>>> Regards, >>>>> Nagireddy Y >>>>> >>>>