Hi, Sorry for this. This work around only works in Hive 2+. We can only wait for 1.11.2.
Best, Jingsong On Tue, Jul 21, 2020 at 6:15 PM Rui Li <lirui.fu...@gmail.com> wrote: > Hi Paul, > > I believe Jingsong meant try using native writer, for which the option key > is `table.exec.hive.fallback-mapred-writer` and is by default set to true. > You can set it to false like > this: tableEnv.getConfig().getConfiguration().set( > HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false) > > On Tue, Jul 21, 2020 at 6:07 PM Paul Lam <paullin3...@gmail.com> wrote: > >> Hi JingSong, >> >> Thanks for your advice! But IIUC, it seems >> that `table.exec.hive.fallback-mapred-reader` is false by default? >> >> Moreover, explicitly setting this option might cause a serialization >> issue. Wonder if I’m setting it in the right way? >> >> ``` >> >> tableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-writer", >> "false”); >> >> ``` >> >> The error it caused: >> >> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:715) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> at com.my.package.class(JobEntry.java:65) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >> ... 11 more >> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: >> Cannot serialize operator object class >> org.apache.flink.streaming.api.operators.SimpleOperatorFactory. >> at >> org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:263) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:495) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:314) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:260) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:169) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:109) >> at >> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850) >> at >> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52) >> at >> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) >> at >> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55) >> at >> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812) >> at >> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) >> at >> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) >> ... 19 more >> Caused by: java.io.NotSerializableException: >> org.apache.hadoop.conf.Configuration >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586) >> at >> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515) >> at >> org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:260) >> ... 36 more >> >> >> Best, >> Paul Lam >> >> 2020年7月21日 16:59,Jingsong Li <jingsongl...@gmail.com> 写道: >> >> Hi Paul, >> >> If your orc table has no complex(list,map,row) types, you can try to set >> `table.exec.hive.fallback-mapred-writer` to false in TableConfig. And Hive >> sink will use ORC native writer, it is a work-around way. >> >> About this error, I think this is a bug for Hive 1.1 ORC. I will try to >> re-produce it. >> >> I created https://issues.apache.org/jira/browse/FLINK-18659 to track >> this. If it is a bug, it should be fixed in 1.11.2 >> >> Best, >> Jingsong >> >> On Tue, Jul 21, 2020 at 4:25 PM Rui Li <lirui.fu...@gmail.com> wrote: >> >>> Hey Paul, >>> >>> Could you please share more about your job, e.g. the schema of your Hive >>> table, whether it's partitioned, and the table properties you've set? >>> >>> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam <paullin3...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I'm doing a POC on Hive connectors and find that when writing orc >>>> format Hive tables, the job failed with FileNotFoundException right after >>>> ingesting data (full stacktrace at the bottom of the mail). >>>> >>>> The error can be steadily reproduced in my environment, which is Hadoop >>>> 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens >>>> in orc tables, while other bulk formats are fine. >>>> >>>> Does anyone have an idea about this error? Any comment and suggestions >>>> are appreciated. Thanks! >>>> >>>> Stacktrace: >>>> >>>> Caused by: java.io.FileNotFoundException: File does not exist: >>>> hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000 >>>> at >>>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218) >>>> at >>>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210) >>>> at >>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) >>>> at >>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210) >>>> at >>>> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54) >>>> at >>>> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84) >>>> at >>>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451) >>>> at >>>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421) >>>> at >>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193) >>>> at >>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) >>>> at >>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) >>>> at >>>> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >>>> at >>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >>>> at >>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >>>> at StreamExecCalc$2.processElement(Unknown Source) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >>>> at >>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >>>> at >>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >>>> at SourceConversion$1.processElement(Unknown Source) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >>>> at >>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >>>> at >>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >>>> >>>> >>>> Best, >>>> Paul Lam >>>> >>>> >>> >>> -- >>> Best regards! >>> Rui Li >>> >> >> >> -- >> Best, Jingsong Lee >> >> >> > > -- > Best regards! > Rui Li > -- Best, Jingsong Lee