Hi Jing,

Thanks for explaining this. This helps.

As you suggested, I tried specifying some of the field names with the field
types for my parquet files, and it works. I am able to read the
specific fields.

However, I have some nested fields also in my parquet schema like this
which I want to read :

  optional group location = 11 {
    optional double latitude = 1;
    optional double longitude = 2;
  }

How do you suppose I create a RowType for this ? I did something like this
below, but I got exception `Caused by:
java.lang.UnsupportedOperationException: Complex types not supported`

            RowType nestedRowType = RowType.of(new LogicalType[] {new
DoubleType(), new DoubleType()}, new String[]{"latitude", "longitude"});
            final LogicalType[] fieldTypes = new
LogicalType[]{nestedRowType};
            final ParquetColumnarRowInputFormat<FileSourceSplit> format =
                    new ParquetColumnarRowInputFormat<>(
                            new Configuration(),
                            RowType.of(fieldTypes, new
String[]{"location"}),
                            500,
                            false,
                            true);

On Thu, Jan 6, 2022 at 6:54 PM Jing Ge <j...@ververica.com> wrote:

> Hi Meghajit,
>
> good catch! Thanks for correcting me. The question is about how to use
> column-oriented storage format like Parquet. What I tried to explain was
> that the original MessageType has been used to build a projected
> MessageType, since only required columns should be read. Without the input
> from the user, there is no way to build the projected schema except read
> all columns. Even if we could convert the MessageType to RowType, we would
> still need the user's input. The fieldTypes are therefore (mandatorily)
> required with current implementation because, when the given fields could
> not be found *by the ParquetVectorizedInputFormat *in the parquet footer,
> a type info is still needed to build the projected schema.
>
> Best regards
> Jing
>
> On Thu, Jan 6, 2022 at 12:38 PM Meghajit Mazumdar <
> meghajit.mazum...@gojek.com> wrote:
>
>> Hi Jing,
>>
>> Thanks for the reply.
>> Had 2 doubts related to your answer :
>>
>> 1. There was a conversion from Flink GroupType to Parquet MessageType. It
>> might be possible to build the conversion the other way around.
>> -> Both GroupType and MessageType are parquet data structures I believe,
>> present in the org.apache.parquet.schema package. I am actually looking if
>> it is possible to convert it into a Flink data type, such as RowType.
>>
>> 2. The fieldTypes are required in case the given fields could not be
>> found in the parquet footer, like for example typo.
>> -> Does this mean that fieldTypes are not required to be given during the
>> construction of RowType ? I tried leaving it empty as below, but it gave an
>> exception *Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.table.data.vector.ColumnVector*
>>
>>             final ParquetColumnarRowInputFormat<FileSourceSplit> format =
>>                     new ParquetColumnarRowInputFormat<>(
>>                             new Configuration(),
>>                             RowType.of(new LogicalType[]{}, new
>> String[]{"field_name_1", "field_name_2"}),
>>                             500,
>>                             false,
>>                             true);
>>
>> Regards,
>> Meghajit
>>
>> On Thu, Jan 6, 2022 at 3:43 PM Jing Ge <j...@ververica.com> wrote:
>>
>>> Hi Meghajit,
>>>
>>> thanks for asking. If you took a look at the source code
>>> https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L174,
>>> you should see Parquet MessageType has been read from the footer and used.
>>> There was a conversion from Flink GroupType to Parquet MessageType. It
>>> might be possible to build the conversion the other way around. But the
>>> question is about the performance, because only the required columns should
>>> be read, therefore the column names should be given by the user. The
>>> fieldTypes are required in case the given fields could not be found in the
>>> parquet footer, like for example typo.
>>>
>>> Best regards
>>> Jing
>>>
>>> On Thu, Jan 6, 2022 at 7:01 AM Meghajit Mazumdar <
>>> meghajit.mazum...@gojek.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> We want to read and process Parquet Files using a FileSource and the 
>>>> DataStream API.
>>>>
>>>>
>>>> Currently, as referenced from the documentation 
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>,
>>>>  this is the way in which a FileSource for Parquet is created. As can be 
>>>> seen, it requires the construction of a RowType like this
>>>>
>>>>
>>>> RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});*
>>>>
>>>>
>>>> where fieldTypes is created like this:
>>>>
>>>>
>>>> *final* LogicalType*[]* fieldTypes *=*
>>>>
>>>>   *new* LogicalType*[]* *{*
>>>>
>>>>   *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()*
>>>>
>>>>   *};*
>>>>
>>>>
>>>> Ideally, instead of specifying the column names( f7, f99,...) and their 
>>>> data types(DoubleType, VarCharType, ...), we would like to use the schema 
>>>> of the Parquet File itself to create a RowType.
>>>>
>>>> The schema is present in the footer of the Parquet file, inside the 
>>>> metadata.
>>>>
>>>> We wanted to know if there is an easy way by which way we can convert a 
>>>> parquet schema, i:e, *MessageType* into a Flink *RowType* directly ?
>>>>
>>>> The parquet schema of the file can be easily obtained by using 
>>>> *org.apache.parquet.hadoop.ParquetFileReader* as follows:
>>>>
>>>>
>>>> ParquetFileReader reader = 
>>>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>>>>
>>>> MessageType schema = reader.getFileMetaData().getSchema(); // this schema 
>>>> has the field names as well as the data types of the parquet records
>>>>
>>>>
>>>> As of now, because we couldn’t find a way to convert the schema into a 
>>>> RowType directly, we resorted to writing our own custom parser to parse a 
>>>> Parquet SimpleGroup into a Flink Row like this:
>>>>
>>>>
>>>> ParquetFileReader reader = 
>>>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>>>>
>>>> PageReadStore nextPage = reader.readNextRowGroup();
>>>>
>>>> Row row = parseToRow(SimpleGroup g); // custom parser function
>>>>
>>>>
>>>> Looking forward to an answer from the community. Thanks !
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Meghajit
>>>>
>>>>
>>>>

Reply via email to