[ https://issues.apache.org/jira/browse/FLINK-22685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Moses updated FLINK-22685: -------------------------- Description: h3. Scence I wanna luanch a batch job to process hive table data and write the result to another table(*T1*), and my SQL statements is wriiten like below: {code:sql} -- use hive dialect SET table.sql-dialect=hive; -- insert into hive table insert overwrite table T1 partition (p_day_id,p_file_id) select distinct .... {code} The job was success luanched, but it failed on *Sink* operator. On Flink UI page I saw all task state is `*FINISHED*`, but *the job failed and it restarted again*. And I found exception information like below: (*The path was marksed*) {code:java} java.lang.Exception: Failed to finalize execution on master at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1291) at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:870) at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1125) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1491) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1464) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:497) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.table.api.TableException: Exception in finalizeGlobal at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:97) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132) at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286) ... 31 more Caused by: java.io.FileNotFoundException: File /XXXX/XX/XXX/.staging_1621244168369 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:814) at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:872) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:868) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:868) at org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:246) at org.apache.hadoop.fs.viewfs.ChRootedFileSystem.listStatus(ChRootedFileSystem.java:246) at org.apache.hadoop.fs.viewfs.ViewFileSystem.listStatus(ViewFileSystem.java:395) at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:169) at org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140) at org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98) at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95) ... 33 more {code} h3. Assumption According to the exception, I tracked the code and found that `.staging_` directory may not be create. Thus, I try to add the following codes to fix it: In *org.apache.flink.table.filesystem.FileSystemOutputFormat* {code:java} @Override public void open(int taskNumber, int numTasks) throws IOException { try { // ---- Create directory if not exists here final FileSystem fs = fsFactory.create(tmpPath.toUri()); if (!fs.exists(tmpPath)) { fs.mkdirs(tmpPath); } PartitionTempFileManager fileManager = new PartitionTempFileManager( fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, outputFileConfig); PartitionWriter.Context<T> context = new PartitionWriter.Context<>( parameters, formatFactory); writer = PartitionWriterFactory.<T>get( partitionColumns.length - staticPartitions.size() > 0, dynamicGrouped, staticPartitions).create(context, fileManager, computer); } catch (Exception e) { throw new TableException("Exception in open", e); } } {code} I rebuilt flink and execute my statements, and *it really worked*. Did I used Flink SQL Client *not correctly*? Or this is a *BUG*? Alse I found the flowing codes in *org.apache.flink.connectors.hive.HiveTableSink* : {code:java} private String toStagingDir(String finalDir, Configuration conf) throws IOException { String res = finalDir; if (!finalDir.endsWith(Path.SEPARATOR)) { res += Path.SEPARATOR; } // TODO: may append something more meaningful than a timestamp, like query ID res += ".staging_" + System.currentTimeMillis(); Path path = new Path(res); FileSystem fs = path.getFileSystem(conf); Preconditions.checkState( fs.exists(path) || fs.mkdirs(path), "Failed to create staging dir " + path); fs.deleteOnExit(path); return res; } {code} I guess the purpose of adding `fs.deleteOnExit(path)` is to clean up the dirty data (while client is shuting down). So shall I keep my Flink Client alive to wait for the job finish? was: h3. Scence I wanna luanch a batch job to process hive table data and write the result to another table(*T1*), and my SQL statements is wriiten like below: {code:sql} -- use hive dialect SET table.sql-dialect=hive; -- insert into hive table insert overwrite table T1 partition (p_day_id,p_file_id) select distinct .... {code} The job was success luanched, but it failed on *Sink* operator. On Flink UI page I saw all task state is `*FINISHED*`, but *the job failed and it restarted again*. And I found exception information like below: (*The path was marksed*) {code:} java.lang.Exception: Failed to finalize execution on master at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1291) at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:870) at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1125) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1491) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1464) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:497) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.table.api.TableException: Exception in finalizeGlobal at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:97) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132) at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286) ... 31 more Caused by: java.io.FileNotFoundException: File /XXXX/XX/XXX/.staging_1621244168369 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:814) at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:872) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:868) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:868) at org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:246) at org.apache.hadoop.fs.viewfs.ChRootedFileSystem.listStatus(ChRootedFileSystem.java:246) at org.apache.hadoop.fs.viewfs.ViewFileSystem.listStatus(ViewFileSystem.java:395) at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:169) at org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140) at org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98) at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95) ... 33 more {code} h3. Assumption According to the exception, I tracked the code and found that `.staging_` directory may not be create. Thus, I try to add the following codes to fix it: In *org.apache.flink.table.filesystem.FileSystemOutputFormat* {code:java} @Override public void open(int taskNumber, int numTasks) throws IOException { try { final FileSystem fs = fsFactory.create(tmpPath.toUri()); if (!fs.exists(tmpPath)) { fs.mkdirs(tmpPath); } PartitionTempFileManager fileManager = new PartitionTempFileManager( fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, outputFileConfig); PartitionWriter.Context<T> context = new PartitionWriter.Context<>( parameters, formatFactory); writer = PartitionWriterFactory.<T>get( partitionColumns.length - staticPartitions.size() > 0, dynamicGrouped, staticPartitions).create(context, fileManager, computer); } catch (Exception e) { throw new TableException("Exception in open", e); } } {code} I rebuilt flink and execute my statements, and *it really worked*. Did I used Flink SQL Client *not correctly*? Or this is a *BUG*? Alse I found the flowing codes in *org.apache.flink.connectors.hive.HiveTableSink* : {code:java} private String toStagingDir(String finalDir, Configuration conf) throws IOException { String res = finalDir; if (!finalDir.endsWith(Path.SEPARATOR)) { res += Path.SEPARATOR; } // TODO: may append something more meaningful than a timestamp, like query ID res += ".staging_" + System.currentTimeMillis(); Path path = new Path(res); FileSystem fs = path.getFileSystem(conf); Preconditions.checkState( fs.exists(path) || fs.mkdirs(path), "Failed to create staging dir " + path); fs.deleteOnExit(path); return res; } {code} I guess the purpose of adding `fs.deleteOnExit(path)` is to clean up the dirty data (while client is shuting down). So shall I keep my Flink Client alive to wait for the job finish? > Write data to hive table in batch mode throws FileNotFoundException. > -------------------------------------------------------------------- > > Key: FLINK-22685 > URL: https://issues.apache.org/jira/browse/FLINK-22685 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Environment: Flink Based on Flink 1.11.1. > Reporter: Moses > Priority: Minor > > h3. Scence > I wanna luanch a batch job to process hive table data and write the result to > another table(*T1*), and my SQL statements is wriiten like below: > {code:sql} > -- use hive dialect > SET table.sql-dialect=hive; > -- insert into hive table > insert overwrite table T1 > partition (p_day_id,p_file_id) > select distinct .... > {code} > The job was success luanched, but it failed on *Sink* operator. On Flink UI > page I saw all task state is `*FINISHED*`, but *the job failed and it > restarted again*. > And I found exception information like below: (*The path was marksed*) > {code:java} > java.lang.Exception: Failed to finalize execution on master > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1291) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:870) > at > org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1125) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1491) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1464) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:497) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) > at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.table.api.TableException: Exception in > finalizeGlobal > at > org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:97) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286) > ... 31 more > Caused by: java.io.FileNotFoundException: File > /XXXX/XX/XXX/.staging_1621244168369 does not exist. > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:814) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:872) > at > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:868) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:868) > at > org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:246) > at > org.apache.hadoop.fs.viewfs.ChRootedFileSystem.listStatus(ChRootedFileSystem.java:246) > at > org.apache.hadoop.fs.viewfs.ViewFileSystem.listStatus(ViewFileSystem.java:395) > at > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:169) > at > org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140) > at > org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98) > at > org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95) > ... 33 more > {code} > h3. Assumption > According to the exception, I tracked the code and found that `.staging_` > directory may not be create. Thus, I try to add the following codes to fix it: > In *org.apache.flink.table.filesystem.FileSystemOutputFormat* > {code:java} > @Override > public void open(int taskNumber, int numTasks) throws IOException { > try { > // ---- Create directory if not exists here > final FileSystem fs = fsFactory.create(tmpPath.toUri()); > if (!fs.exists(tmpPath)) { > fs.mkdirs(tmpPath); > } > PartitionTempFileManager fileManager = new > PartitionTempFileManager( > fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, > outputFileConfig); > PartitionWriter.Context<T> context = new > PartitionWriter.Context<>( > parameters, formatFactory); > writer = PartitionWriterFactory.<T>get( > partitionColumns.length - > staticPartitions.size() > 0, > dynamicGrouped, > staticPartitions).create(context, fileManager, > computer); > } catch (Exception e) { > throw new TableException("Exception in open", e); > } > } > {code} > I rebuilt flink and execute my statements, and *it really worked*. > Did I used Flink SQL Client *not correctly*? Or this is a *BUG*? > Alse I found the flowing codes in > *org.apache.flink.connectors.hive.HiveTableSink* : > {code:java} > private String toStagingDir(String finalDir, Configuration conf) throws > IOException { > String res = finalDir; > if (!finalDir.endsWith(Path.SEPARATOR)) { > res += Path.SEPARATOR; > } > // TODO: may append something more meaningful than a timestamp, like > query ID > res += ".staging_" + System.currentTimeMillis(); > Path path = new Path(res); > FileSystem fs = path.getFileSystem(conf); > Preconditions.checkState( > fs.exists(path) || fs.mkdirs(path), "Failed to create staging > dir " + path); > fs.deleteOnExit(path); > return res; > } > {code} > I guess the purpose of adding `fs.deleteOnExit(path)` is to clean up the > dirty data (while client is shuting down). > So shall I keep my Flink Client alive to wait for the job finish? -- This message was sent by Atlassian Jira (v8.3.4#803005)