Hi Jing, Thanks for explaining this. This helps.
As you suggested, I tried specifying some of the field names with the field types for my parquet files, and it works. I am able to read the specific fields. However, I have some nested fields also in my parquet schema like this which I want to read : optional group location = 11 { optional double latitude = 1; optional double longitude = 2; } How do you suppose I create a RowType for this ? I did something like this below, but I got exception `Caused by: java.lang.UnsupportedOperationException: Complex types not supported` RowType nestedRowType = RowType.of(new LogicalType[] {new DoubleType(), new DoubleType()}, new String[]{"latitude", "longitude"}); final LogicalType[] fieldTypes = new LogicalType[]{nestedRowType}; final ParquetColumnarRowInputFormat<FileSourceSplit> format = new ParquetColumnarRowInputFormat<>( new Configuration(), RowType.of(fieldTypes, new String[]{"location"}), 500, false, true); On Thu, Jan 6, 2022 at 6:54 PM Jing Ge <j...@ververica.com> wrote: > Hi Meghajit, > > good catch! Thanks for correcting me. The question is about how to use > column-oriented storage format like Parquet. What I tried to explain was > that the original MessageType has been used to build a projected > MessageType, since only required columns should be read. Without the input > from the user, there is no way to build the projected schema except read > all columns. Even if we could convert the MessageType to RowType, we would > still need the user's input. The fieldTypes are therefore (mandatorily) > required with current implementation because, when the given fields could > not be found *by the ParquetVectorizedInputFormat *in the parquet footer, > a type info is still needed to build the projected schema. > > Best regards > Jing > > On Thu, Jan 6, 2022 at 12:38 PM Meghajit Mazumdar < > meghajit.mazum...@gojek.com> wrote: > >> Hi Jing, >> >> Thanks for the reply. >> Had 2 doubts related to your answer : >> >> 1. There was a conversion from Flink GroupType to Parquet MessageType. It >> might be possible to build the conversion the other way around. >> -> Both GroupType and MessageType are parquet data structures I believe, >> present in the org.apache.parquet.schema package. I am actually looking if >> it is possible to convert it into a Flink data type, such as RowType. >> >> 2. The fieldTypes are required in case the given fields could not be >> found in the parquet footer, like for example typo. >> -> Does this mean that fieldTypes are not required to be given during the >> construction of RowType ? I tried leaving it empty as below, but it gave an >> exception *Caused by: java.lang.ClassNotFoundException: >> org.apache.flink.table.data.vector.ColumnVector* >> >> final ParquetColumnarRowInputFormat<FileSourceSplit> format = >> new ParquetColumnarRowInputFormat<>( >> new Configuration(), >> RowType.of(new LogicalType[]{}, new >> String[]{"field_name_1", "field_name_2"}), >> 500, >> false, >> true); >> >> Regards, >> Meghajit >> >> On Thu, Jan 6, 2022 at 3:43 PM Jing Ge <j...@ververica.com> wrote: >> >>> Hi Meghajit, >>> >>> thanks for asking. If you took a look at the source code >>> https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L174, >>> you should see Parquet MessageType has been read from the footer and used. >>> There was a conversion from Flink GroupType to Parquet MessageType. It >>> might be possible to build the conversion the other way around. But the >>> question is about the performance, because only the required columns should >>> be read, therefore the column names should be given by the user. The >>> fieldTypes are required in case the given fields could not be found in the >>> parquet footer, like for example typo. >>> >>> Best regards >>> Jing >>> >>> On Thu, Jan 6, 2022 at 7:01 AM Meghajit Mazumdar < >>> meghajit.mazum...@gojek.com> wrote: >>> >>>> Hello, >>>> >>>> We want to read and process Parquet Files using a FileSource and the >>>> DataStream API. >>>> >>>> >>>> Currently, as referenced from the documentation >>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>, >>>> this is the way in which a FileSource for Parquet is created. As can be >>>> seen, it requires the construction of a RowType like this >>>> >>>> >>>> RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});* >>>> >>>> >>>> where fieldTypes is created like this: >>>> >>>> >>>> *final* LogicalType*[]* fieldTypes *=* >>>> >>>> *new* LogicalType*[]* *{* >>>> >>>> *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()* >>>> >>>> *};* >>>> >>>> >>>> Ideally, instead of specifying the column names( f7, f99,...) and their >>>> data types(DoubleType, VarCharType, ...), we would like to use the schema >>>> of the Parquet File itself to create a RowType. >>>> >>>> The schema is present in the footer of the Parquet file, inside the >>>> metadata. >>>> >>>> We wanted to know if there is an easy way by which way we can convert a >>>> parquet schema, i:e, *MessageType* into a Flink *RowType* directly ? >>>> >>>> The parquet schema of the file can be easily obtained by using >>>> *org.apache.parquet.hadoop.ParquetFileReader* as follows: >>>> >>>> >>>> ParquetFileReader reader = >>>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)); >>>> >>>> MessageType schema = reader.getFileMetaData().getSchema(); // this schema >>>> has the field names as well as the data types of the parquet records >>>> >>>> >>>> As of now, because we couldn’t find a way to convert the schema into a >>>> RowType directly, we resorted to writing our own custom parser to parse a >>>> Parquet SimpleGroup into a Flink Row like this: >>>> >>>> >>>> ParquetFileReader reader = >>>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)); >>>> >>>> PageReadStore nextPage = reader.readNextRowGroup(); >>>> >>>> Row row = parseToRow(SimpleGroup g); // custom parser function >>>> >>>> >>>> Looking forward to an answer from the community. Thanks ! >>>> >>>> >>>> Regards, >>>> >>>> Meghajit >>>> >>>> >>>>