Hi Jing,

Thanks for the reply.
Had 2 doubts related to your answer :

1. There was a conversion from Flink GroupType to Parquet MessageType. It
might be possible to build the conversion the other way around.
-> Both GroupType and MessageType are parquet data structures I believe,
present in the org.apache.parquet.schema package. I am actually looking if
it is possible to convert it into a Flink data type, such as RowType.

2. The fieldTypes are required in case the given fields could not be found
in the parquet footer, like for example typo.
-> Does this mean that fieldTypes are not required to be given during the
construction of RowType ? I tried leaving it empty as below, but it gave an
exception *Caused by: java.lang.ClassNotFoundException:
org.apache.flink.table.data.vector.ColumnVector*

            final ParquetColumnarRowInputFormat<FileSourceSplit> format =
                    new ParquetColumnarRowInputFormat<>(
                            new Configuration(),
                            RowType.of(new LogicalType[]{}, new
String[]{"field_name_1", "field_name_2"}),
                            500,
                            false,
                            true);

Regards,
Meghajit

On Thu, Jan 6, 2022 at 3:43 PM Jing Ge <j...@ververica.com> wrote:

> Hi Meghajit,
>
> thanks for asking. If you took a look at the source code
> https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L174,
> you should see Parquet MessageType has been read from the footer and used.
> There was a conversion from Flink GroupType to Parquet MessageType. It
> might be possible to build the conversion the other way around. But the
> question is about the performance, because only the required columns should
> be read, therefore the column names should be given by the user. The
> fieldTypes are required in case the given fields could not be found in the
> parquet footer, like for example typo.
>
> Best regards
> Jing
>
> On Thu, Jan 6, 2022 at 7:01 AM Meghajit Mazumdar <
> meghajit.mazum...@gojek.com> wrote:
>
>> Hello,
>>
>> We want to read and process Parquet Files using a FileSource and the 
>> DataStream API.
>>
>>
>> Currently, as referenced from the documentation 
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>,
>>  this is the way in which a FileSource for Parquet is created. As can be 
>> seen, it requires the construction of a RowType like this
>>
>>
>> RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});*
>>
>>
>> where fieldTypes is created like this:
>>
>>
>> *final* LogicalType*[]* fieldTypes *=*
>>
>>   *new* LogicalType*[]* *{*
>>
>>   *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()*
>>
>>   *};*
>>
>>
>> Ideally, instead of specifying the column names( f7, f99,...) and their data 
>> types(DoubleType, VarCharType, ...), we would like to use the schema of the 
>> Parquet File itself to create a RowType.
>>
>> The schema is present in the footer of the Parquet file, inside the metadata.
>>
>> We wanted to know if there is an easy way by which way we can convert a 
>> parquet schema, i:e, *MessageType* into a Flink *RowType* directly ?
>>
>> The parquet schema of the file can be easily obtained by using 
>> *org.apache.parquet.hadoop.ParquetFileReader* as follows:
>>
>>
>> ParquetFileReader reader = 
>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>>
>> MessageType schema = reader.getFileMetaData().getSchema(); // this schema 
>> has the field names as well as the data types of the parquet records
>>
>>
>> As of now, because we couldn’t find a way to convert the schema into a 
>> RowType directly, we resorted to writing our own custom parser to parse a 
>> Parquet SimpleGroup into a Flink Row like this:
>>
>>
>> ParquetFileReader reader = 
>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>>
>> PageReadStore nextPage = reader.readNextRowGroup();
>>
>> Row row = parseToRow(SimpleGroup g); // custom parser function
>>
>>
>> Looking forward to an answer from the community. Thanks !
>>
>>
>> Regards,
>>
>> Meghajit
>>
>>
>>

Reply via email to