[ https://issues.apache.org/jira/browse/FLINK-25416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Francesco Guardiani updated FLINK-25416: ---------------------------------------- Component/s: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Build unified Parquet BulkFormat for both Table API and DataStream API > ---------------------------------------------------------------------- > > Key: FLINK-25416 > URL: https://issues.apache.org/jira/browse/FLINK-25416 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Reporter: Jing Ge > Assignee: Jing Ge > Priority: Major > > *Background information* > Current AvroParquet implementation AvroParquetRecordFormat uses the high > level API ParquetReader that does not provide offset information, which turns > out the restoreReader logic has big room to improve. > Beyond AvroParquetRecordFormat there is another format implementation > ParquetVectorizedInputFormat w.r.t. the parquet which is coupled tightly with > the Table API. > It would be better to provide an unified Parquet BulkFormat with one > implementation that can support both Table API and DataStream API. > > *Some thoughts* > Use the low level API {{ParquetFileReader}} with {{BulkFormat}} directly like > 'ParquetVectorizedInputFormat' did instead of with {{StreamFormat}} for the > following reasons: > * the read logic is built in the internal low level class > {{InternalParquetRecordReader}} with package private visibility in > parquet-hadoop lib which uses another low level class {{ParquetFileReader}} > internally. This makes the implementation of StreamFormat very complicated. I > think the design idea of StreamFormat is to simplify the implementation. They > do not seem to work together. > * {{{}ParquetFileReader{}}}reads data in batch mode, i.e. {{{}PageReadStore > pages = reader.readNextFilteredRowGroup();{}}}. If we build these logic into > StreamFormat({{{}AvroParquetRecordFormat{}}} in this case), > {{AvroParquetRecordFormat}} has to take over the role > {{InternalParquetRecordReader}} does, including but not limited to > ## read {{PageReadStore}} in batch mode. > ## manage {{{}PageReadStore{}}}, i.e. read next page when all records in the > current page have been consumed and cache it. > ## manage the read index within the current {{PageReadStore}} because > StreamFormat has its own setting for read size, etc. > All of these make {{AvroParquetRecordFormat}} become the {{BulkFormat}} > instead of {{StreamFormat}} > * {{StreamFormat}} can only be used via {{{}StreamFormatAdapter{}}}, which > means everything we will do with the low level APIs for parquet-hadoop lib > should have no conflict with the built-in logic provided by > {{{}StreamFormatAdapter{}}}. > Now we could see if we build these logics into a {{StreamFormat}} > implementation, i.e. {{{}AvroParquetRecordFormat{}}}, all convenient built-in > logic provided by the {{StreamFormatAdapter}} turns into obstacles. There is > also a violation of single responsibility principle, i.e. > {{AvroParquetRecordFormat }}will take some responsibility of > {{{}BulkFormat{}}}. These might be the reasons why > 'ParquetVectorizedInputFormat' implemented {{BulkFormat}} instead of > {{{}StreamFormat{}}}. > In order to build a unified parquet implementation for both Table API and > DataStream API, it makes more sense to consider building these code into a > {{BulkFormat}} implementation class. Since the output data types are > different, {{RowData}} vs. {{{}Avro{}}}, extra converter logic should be > introduced into the architecture design. Depending on how complicated the > issue will be and how big the impact it will have on the current code base, a > new FLIP might be required. > Following code piece were suggested by Arvid Heise for the next optimized > AvroParquetReader: > {code:java} > // Injected > GenericData model = GenericData.get(); > org.apache.hadoop.conf.Configuration conf = new > org.apache.hadoop.conf.Configuration(); > // Low level reader - fetch metadata > ParquetFileReader reader = null; > MessageType fileSchema = reader.getFileMetaData().getSchema(); > Map<String, String> metaData = > reader.getFileMetaData().getKeyValueMetaData(); > // init Avro specific things > AvroReadSupport<T> readSupport = new AvroReadSupport<>(model); > ReadSupport.ReadContext readContext = > readSupport.init( > new InitContext( > conf, > metaData.entrySet().stream() > .collect(Collectors.toMap(e -> > e.getKey(), e -> Collections.singleton(e.getValue()))), > fileSchema)); > RecordMaterializer<T> recordMaterializer = > readSupport.prepareForRead(conf, metaData, fileSchema, readContext); > MessageType requestedSchema = readContext.getRequestedSchema(); > // prepare record reader > ColumnIOFactory columnIOFactory = new > ColumnIOFactory(reader.getFileMetaData().getCreatedBy()); > MessageColumnIO columnIO = > columnIOFactory.getColumnIO(requestedSchema, fileSchema, true); > // for recovery > while (...) { > reader.skipNextRowGroup(); > } > // for reading > PageReadStore pages; > for (int block = 0; (pages = reader.readNextRowGroup()) != null; > block++) { > RecordReader<T> recordReader = > columnIO.getRecordReader(pages, recordMaterializer); > for (int i = 0; i < pages.getRowCount(); i++) { > T record = recordReader.read(); > emit record; > } > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)