Hello, We want to read and process Parquet Files using a FileSource and the DataStream API.
Currently, as referenced from the documentation <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>, this is the way in which a FileSource for Parquet is created. As can be seen, it requires the construction of a RowType like this RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});* where fieldTypes is created like this: *final* LogicalType*[]* fieldTypes *=* *new* LogicalType*[]* *{* *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()* *};* Ideally, instead of specifying the column names( f7, f99,...) and their data types(DoubleType, VarCharType, ...), we would like to use the schema of the Parquet File itself to create a RowType. The schema is present in the footer of the Parquet file, inside the metadata. We wanted to know if there is an easy way by which way we can convert a parquet schema, i:e, *MessageType* into a Flink *RowType* directly ? The parquet schema of the file can be easily obtained by using *org.apache.parquet.hadoop.ParquetFileReader* as follows: ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)); MessageType schema = reader.getFileMetaData().getSchema(); // this schema has the field names as well as the data types of the parquet records As of now, because we couldn’t find a way to convert the schema into a RowType directly, we resorted to writing our own custom parser to parse a Parquet SimpleGroup into a Flink Row like this: ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)); PageReadStore nextPage = reader.readNextRowGroup(); Row row = parseToRow(SimpleGroup g); // custom parser function Looking forward to an answer from the community. Thanks ! Regards, Meghajit
