Hi, > MessageType schema = reader.getFooter().getFileMetaData().getSchema(); The first thing I'd suggest is to verify that the file contains a valid schema and can be read by some other program, e.g. parquet-tools schema or cat [1].
Regards, Roman On Thu, Jul 2, 2020 at 11:36 PM Jesse Lord <jl...@vectra.ai> wrote: > I am trying to read a parquet file into a datastream and then register > that stream as a temporary table. This file is created by spark 2.4 in HDFS > on AWS EMR. I am using flink version 1.10.0 with EMR 5.30. > > > > I am getting the following error: > > > > Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: > Caught exception when processing split: null > > at > org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058) > > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351) > > Caused by: java.lang.ClassCastException: Expected instance of group > converter but got > "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter" > > at > org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34) > > at > org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267) > > at > org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) > > at > org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) > > at > org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) > > at > org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) > > at > org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118) > > at > org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227) > > at > org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207) > > at > org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233) > > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327) > > > > Below is a snippet of code that shows how I am trying to read the parquet > file: > > > > String filePath = "hdfs:///path/to/single/file.parquet"; > > > > ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile. > fromPath(*new* org.apache.hadoop.fs.Path(filePath), *new* Configuration > ())); > > MessageType schema = reader.getFooter().getFileMetaData().getSchema(); > > > > String parquetPath = "hdfs:///path/to/parquet/directoryā€¯; > > > > DataStream<Row> parquetStream = env.readFile(*new* > ParquetRowInputFormat(*new* org.apache.flink.core.fs.Path(parquetPath), > schema), parquetPath); > > > > Table parquetTable = tEnv.fromDataStream(parquetStream); > > tEnv.createTemporaryView("isession", parquetTable); > > > > Thanks, > > Jesse >