Hi Jing, Thanks for the reply. Had 2 doubts related to your answer :
1. There was a conversion from Flink GroupType to Parquet MessageType. It might be possible to build the conversion the other way around. -> Both GroupType and MessageType are parquet data structures I believe, present in the org.apache.parquet.schema package. I am actually looking if it is possible to convert it into a Flink data type, such as RowType. 2. The fieldTypes are required in case the given fields could not be found in the parquet footer, like for example typo. -> Does this mean that fieldTypes are not required to be given during the construction of RowType ? I tried leaving it empty as below, but it gave an exception *Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.data.vector.ColumnVector* final ParquetColumnarRowInputFormat<FileSourceSplit> format = new ParquetColumnarRowInputFormat<>( new Configuration(), RowType.of(new LogicalType[]{}, new String[]{"field_name_1", "field_name_2"}), 500, false, true); Regards, Meghajit On Thu, Jan 6, 2022 at 3:43 PM Jing Ge <j...@ververica.com> wrote: > Hi Meghajit, > > thanks for asking. If you took a look at the source code > https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L174, > you should see Parquet MessageType has been read from the footer and used. > There was a conversion from Flink GroupType to Parquet MessageType. It > might be possible to build the conversion the other way around. But the > question is about the performance, because only the required columns should > be read, therefore the column names should be given by the user. The > fieldTypes are required in case the given fields could not be found in the > parquet footer, like for example typo. > > Best regards > Jing > > On Thu, Jan 6, 2022 at 7:01 AM Meghajit Mazumdar < > meghajit.mazum...@gojek.com> wrote: > >> Hello, >> >> We want to read and process Parquet Files using a FileSource and the >> DataStream API. >> >> >> Currently, as referenced from the documentation >> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>, >> this is the way in which a FileSource for Parquet is created. As can be >> seen, it requires the construction of a RowType like this >> >> >> RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});* >> >> >> where fieldTypes is created like this: >> >> >> *final* LogicalType*[]* fieldTypes *=* >> >> *new* LogicalType*[]* *{* >> >> *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()* >> >> *};* >> >> >> Ideally, instead of specifying the column names( f7, f99,...) and their data >> types(DoubleType, VarCharType, ...), we would like to use the schema of the >> Parquet File itself to create a RowType. >> >> The schema is present in the footer of the Parquet file, inside the metadata. >> >> We wanted to know if there is an easy way by which way we can convert a >> parquet schema, i:e, *MessageType* into a Flink *RowType* directly ? >> >> The parquet schema of the file can be easily obtained by using >> *org.apache.parquet.hadoop.ParquetFileReader* as follows: >> >> >> ParquetFileReader reader = >> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)); >> >> MessageType schema = reader.getFileMetaData().getSchema(); // this schema >> has the field names as well as the data types of the parquet records >> >> >> As of now, because we couldn’t find a way to convert the schema into a >> RowType directly, we resorted to writing our own custom parser to parse a >> Parquet SimpleGroup into a Flink Row like this: >> >> >> ParquetFileReader reader = >> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)); >> >> PageReadStore nextPage = reader.readNextRowGroup(); >> >> Row row = parseToRow(SimpleGroup g); // custom parser function >> >> >> Looking forward to an answer from the community. Thanks ! >> >> >> Regards, >> >> Meghajit >> >> >>