Hi,

> MessageType schema = reader.getFooter().getFileMetaData().getSchema();
The first thing I'd suggest is to verify that the file contains a valid
schema and can be read by some other program, e.g. parquet-tools schema or
cat [1].

Regards,
Roman


On Thu, Jul 2, 2020 at 11:36 PM Jesse Lord <jl...@vectra.ai> wrote:

> I am trying to read a parquet file into a datastream and then register
> that stream as a temporary table. This file is created by spark 2.4 in HDFS
> on AWS EMR. I am using flink version 1.10.0 with EMR 5.30.
>
>
>
> I am getting the following error:
>
>
>
> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> Caught exception when processing split: null
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
>
>         at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>
> Caused by: java.lang.ClassCastException: Expected instance of group
> converter but got
> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
>
>         at
> org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
>
>         at
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
>
>         at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>
>         at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>
>         at
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>
>         at
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>
>         at
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
>
>         at
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
>
>         at
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
>
>         at
> org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
>
>         at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)
>
>
>
> Below is a snippet of code that shows how I am trying to read the parquet
> file:
>
>
>
>     String filePath = "hdfs:///path/to/single/file.parquet";
>
>
>
>     ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.
> fromPath(*new* org.apache.hadoop.fs.Path(filePath), *new* Configuration
> ()));
>
>     MessageType schema = reader.getFooter().getFileMetaData().getSchema();
>
>
>
>     String parquetPath = "hdfs:///path/to/parquet/directoryā€¯;
>
>
>
>     DataStream<Row> parquetStream = env.readFile(*new*
> ParquetRowInputFormat(*new* org.apache.flink.core.fs.Path(parquetPath),
> schema), parquetPath);
>
>
>
>     Table parquetTable = tEnv.fromDataStream(parquetStream);
>
>     tEnv.createTemporaryView("isession", parquetTable);
>
>
>
> Thanks,
>
> Jesse
>

Reply via email to