Hello,

We want to read and process Parquet Files using a FileSource and the
DataStream API.


Currently, as referenced from the documentation
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>,
this is the way in which a FileSource for Parquet is created. As can
be seen, it requires the construction of a RowType like this


RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});*


where fieldTypes is created like this:


*final* LogicalType*[]* fieldTypes *=*

  *new* LogicalType*[]* *{*

  *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()*

  *};*


Ideally, instead of specifying the column names( f7, f99,...) and
their data types(DoubleType, VarCharType, ...), we would like to use
the schema of the Parquet File itself to create a RowType.

The schema is present in the footer of the Parquet file, inside the metadata.

We wanted to know if there is an easy way by which way we can convert
a parquet schema, i:e, *MessageType* into a Flink *RowType* directly ?

The parquet schema of the file can be easily obtained by using
*org.apache.parquet.hadoop.ParquetFileReader* as follows:


ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));

MessageType schema = reader.getFileMetaData().getSchema(); // this
schema has the field names as well as the data types of the parquet
records


As of now, because we couldn’t find a way to convert the schema into a
RowType directly, we resorted to writing our own custom parser to
parse a Parquet SimpleGroup into a Flink Row like this:


ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));

PageReadStore nextPage = reader.readNextRowGroup();

Row row = parseToRow(SimpleGroup g); // custom parser function


Looking forward to an answer from the community. Thanks !


Regards,

Meghajit

Reply via email to