JingGe commented on pull request #17520: URL: https://github.com/apache/flink/pull/17520#issuecomment-948391521
@tsreaper > Are you suggesting an avro `StreamFormat` which produces an avro block, instead of a Flink row data, at a time? If yes we'll need another operator after the source to break the block into several row data. Why not leave all these work inside the source? > It is an option. Even with avro record, it should be fine to use StreamFormat. You can use avro reader in your own `StreamFormat` implementation, according to [the code](https://github.com/apache/avro/blob/42822886c28ea74a744abb7e7a80a942c540faa5/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java#L213) you referred to. I don't see any conflict between StreamFormat read record one by one and avro reader reads blocks. Maybe I didn't get your point. > I guess by "overfulfil the quota a little bit" you mean the number of bytes read from the stream. This is true but what I'm considering is that `StreamFormatAdapter.Reader` is storing all the results in a batch in memory at the same time (see [this code](https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java#L202) and also my reply to @slinkydeveloper). This might cause OOM for a highly compressed file. > If you can use `StreamFormat.FETCH_IO_SIZE` to control the uncompressed data size, how could OOM happen? > Avro is not a compression algorithm or such. It is a type of row-oriented file format and you can see it as normal files like .xls or .json. We won't say this is a xls compression file because xls is not for compression but for recording data, although it uses some compression to help shrink the file size. > > This is why avro exists as a sole module in `flink-formats`, instead of staying with the compressors in `StandardDeCompression` (you don't want to put a xls resolver in that file, do you). If you really do this you'll be essentially moving the whole avro module into `StandardDeCompression`. It is right to have avro as a sub module in `flink-formats`. What is the 'StandardDeCompression'? Do you mean "StandardDeCompressors"? It is a class not a module. I know what avro is, and it goes even more complicated with parquet where each column may have its own compression codec. But this is another story. Since avro reader will handle the decompression logic i.e. `block.decompressUsing(codec)`, we don't even need to touch the compression logic built in the StreamFormatAdapter. We can delegate the decompression work to avro reader and focus on working on the uncompressed data. In the worst case(which is not exist), let's say there were no avro read handles decompression, StandardDeCompressors should be the place to handle it on our own. After all, the StandardDeCompressors and all related Factories are the abstraction designed for it. They are not limited to apache common compressors and extendable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org