Hi JingSong,
Thanks for your advice! But IIUC, it seems that
`table.exec.hive.fallback-mapred-reader` is false by default?
Moreover, explicitly setting this option might cause a serialization issue.
Wonder if I’m setting it in the right way?
```
tableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-writer",
"false”);
```
The error it caused:
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:715)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at com.my.package.class(JobEntry.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
serialize operator object class
org.apache.flink.streaming.api.operators.SimpleOperatorFactory.
at
org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:263)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:495)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:314)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:260)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:169)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:109)
at
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
at
org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
at
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
... 19 more
Caused by: java.io.NotSerializableException:
org.apache.hadoop.conf.Configuration
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at
org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
at
org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:260)
... 36 more
Best,
Paul Lam
> 2020年7月21日 16:59,Jingsong Li <[email protected]> 写道:
>
> Hi Paul,
>
> If your orc table has no complex(list,map,row) types, you can try to set
> `table.exec.hive.fallback-mapred-writer` to false in TableConfig. And Hive
> sink will use ORC native writer, it is a work-around way.
>
> About this error, I think this is a bug for Hive 1.1 ORC. I will try to
> re-produce it.
>
> I created https://issues.apache.org/jira/browse/FLINK-18659
> <https://issues.apache.org/jira/browse/FLINK-18659> to track this. If it is a
> bug, it should be fixed in 1.11.2
>
> Best,
> Jingsong
>
> On Tue, Jul 21, 2020 at 4:25 PM Rui Li <[email protected]
> <mailto:[email protected]>> wrote:
> Hey Paul,
>
> Could you please share more about your job, e.g. the schema of your Hive
> table, whether it's partitioned, and the table properties you've set?
>
> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam <[email protected]
> <mailto:[email protected]>> wrote:
> Hi,
>
> I'm doing a POC on Hive connectors and find that when writing orc format Hive
> tables, the job failed with FileNotFoundException right after ingesting data
> (full stacktrace at the bottom of the mail).
>
> The error can be steadily reproduced in my environment, which is Hadoop
> 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens in
> orc tables, while other bulk formats are fine.
>
> Does anyone have an idea about this error? Any comment and suggestions are
> appreciated. Thanks!
>
> Stacktrace:
>
> Caused by: java.io.FileNotFoundException: File does not exist:
> hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
> <>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
> at
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
> at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> at
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at StreamExecCalc$2.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$1.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>
> Best,
> Paul Lam
>
>
>
> --
> Best regards!
> Rui Li
>
>
> --
> Best, Jingsong Lee