[ 
https://issues.apache.org/jira/browse/FLINK-25416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Guardiani updated FLINK-25416:
----------------------------------------
    Component/s: Formats (JSON, Avro, Parquet, ORC, SequenceFile)

> Build unified Parquet BulkFormat for both Table API and DataStream API
> ----------------------------------------------------------------------
>
>                 Key: FLINK-25416
>                 URL: https://issues.apache.org/jira/browse/FLINK-25416
>             Project: Flink
>          Issue Type: New Feature
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: Jing Ge
>            Assignee: Jing Ge
>            Priority: Major
>
> *Background information*
> Current AvroParquet implementation AvroParquetRecordFormat uses the high 
> level API ParquetReader that does not provide offset information, which turns 
> out the restoreReader logic has big room to improve.
> Beyond AvroParquetRecordFormat there is another format implementation 
> ParquetVectorizedInputFormat w.r.t. the parquet which is coupled tightly with 
> the Table API.
> It would be better to provide an unified Parquet BulkFormat with one 
> implementation that can support both Table API and DataStream API.
>  
> *Some thoughts*
> Use the low level API {{ParquetFileReader}} with {{BulkFormat}} directly like 
> 'ParquetVectorizedInputFormat' did instead of with {{StreamFormat}} for the 
> following reasons:
>  * the read logic is built in the internal low level class 
> {{InternalParquetRecordReader}} with package private visibility in 
> parquet-hadoop lib which uses another low level class {{ParquetFileReader}} 
> internally. This makes the implementation of StreamFormat very complicated. I 
> think the design idea of StreamFormat is to simplify the implementation. They 
> do not seem to work together.
>  * {{{}ParquetFileReader{}}}reads data in batch mode, i.e. {{{}PageReadStore 
> pages = reader.readNextFilteredRowGroup();{}}}. If we build these logic into 
> StreamFormat({{{}AvroParquetRecordFormat{}}} in this case), 
> {{AvroParquetRecordFormat}} has to take over the role 
> {{InternalParquetRecordReader}} does, including but not limited to
>  ## read {{PageReadStore}} in batch mode.
>  ## manage {{{}PageReadStore{}}}, i.e. read next page when all records in the 
> current page have been consumed and cache it.
>  ## manage the read index within the current {{PageReadStore}} because 
> StreamFormat has its own setting for read size, etc.
> All of these make {{AvroParquetRecordFormat}} become the {{BulkFormat}} 
> instead of {{StreamFormat}}
>  * {{StreamFormat}} can only be used via {{{}StreamFormatAdapter{}}}, which 
> means everything we will do with the low level APIs for parquet-hadoop lib 
> should have no conflict with the built-in logic provided by 
> {{{}StreamFormatAdapter{}}}.
> Now we could see if we build these logics into a {{StreamFormat}} 
> implementation, i.e. {{{}AvroParquetRecordFormat{}}}, all convenient built-in 
> logic provided by the {{StreamFormatAdapter}} turns into obstacles. There is 
> also a violation of single responsibility principle, i.e. 
> {{AvroParquetRecordFormat }}will take some responsibility of 
> {{{}BulkFormat{}}}. These might be the reasons why 
> 'ParquetVectorizedInputFormat' implemented {{BulkFormat}} instead of 
> {{{}StreamFormat{}}}.
> In order to build a unified parquet implementation for both Table API and 
> DataStream API, it makes more sense to consider building these code into a 
> {{BulkFormat}} implementation class. Since the output data types are 
> different, {{RowData}} vs. {{{}Avro{}}}, extra converter logic should be 
> introduced into the architecture design. Depending on how complicated the 
> issue will be and how big the impact it will have on the current code base, a 
> new FLIP might be required. 
> Following code piece were suggested by Arvid Heise for the next optimized 
> AvroParquetReader:
> {code:java}
> // Injected
>             GenericData model = GenericData.get();
>             org.apache.hadoop.conf.Configuration conf = new 
> org.apache.hadoop.conf.Configuration();
>             // Low level reader - fetch metadata
>             ParquetFileReader reader = null;
>             MessageType fileSchema = reader.getFileMetaData().getSchema();
>             Map<String, String> metaData = 
> reader.getFileMetaData().getKeyValueMetaData();
>             // init Avro specific things
>             AvroReadSupport<T> readSupport = new AvroReadSupport<>(model);
>             ReadSupport.ReadContext readContext =
>                     readSupport.init(
>                             new InitContext(
>                                   conf,
>                                     metaData.entrySet().stream()
>                                             .collect(Collectors.toMap(e -> 
> e.getKey(), e -> Collections.singleton(e.getValue()))),
>                                     fileSchema));
>             RecordMaterializer<T> recordMaterializer = 
> readSupport.prepareForRead(conf, metaData, fileSchema, readContext);
>             MessageType requestedSchema = readContext.getRequestedSchema();
>             // prepare record reader
>             ColumnIOFactory columnIOFactory = new 
> ColumnIOFactory(reader.getFileMetaData().getCreatedBy());
>             MessageColumnIO columnIO = 
> columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);
>             // for recovery
>             while (...) {
>               reader.skipNextRowGroup();
>             }
>             // for reading
>             PageReadStore pages;
>             for (int block = 0; (pages = reader.readNextRowGroup()) != null; 
> block++) {
>                 RecordReader<T> recordReader = 
> columnIO.getRecordReader(pages, recordMaterializer);
>                 for (int i = 0; i < pages.getRowCount(); i++) {
>                     T record = recordReader.read();
>                     emit record;
>                 }
>             } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to