Hi Paul,

If your orc table has no complex(list,map,row) types, you can try to set
`table.exec.hive.fallback-mapred-writer` to false in TableConfig. And Hive
sink will use ORC native writer, it is a work-around way.

About this error, I think this is a bug for Hive 1.1 ORC. I will try to
re-produce it.

I created https://issues.apache.org/jira/browse/FLINK-18659 to track this.
If it is a bug, it should be fixed in 1.11.2

Best,
Jingsong

On Tue, Jul 21, 2020 at 4:25 PM Rui Li <lirui.fu...@gmail.com> wrote:

> Hey Paul,
>
> Could you please share more about your job, e.g. the schema of your Hive
> table, whether it's partitioned, and the table properties you've set?
>
> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam <paullin3...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm doing a POC on Hive connectors and find that when writing orc format
>> Hive tables, the job failed with FileNotFoundException right after
>> ingesting data (full stacktrace at the bottom of the mail).
>>
>> The error can be steadily reproduced in my environment, which is Hadoop
>> 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens
>> in orc tables, while other bulk formats are fine.
>>
>> Does anyone have an idea about this error? Any comment and suggestions
>> are appreciated. Thanks!
>>
>> Stacktrace:
>>
>> Caused by: java.io.FileNotFoundException: File does not exist:
>> hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
>> at
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
>> at
>> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
>> at
>> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>> at
>> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at StreamExecCalc$2.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at SourceConversion$1.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>
>>
>> Best,
>> Paul Lam
>>
>>
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee

Reply via email to