JingGe commented on pull request #17520:
URL: https://github.com/apache/flink/pull/17520#issuecomment-948391521


   @tsreaper 
   
   > Are you suggesting an avro `StreamFormat` which produces an avro block, 
instead of a Flink row data, at a time? If yes we'll need another operator 
after the source to break the block into several row data. Why not leave all 
these work inside the source?
   > 
   
   It is an option. Even with avro record, it should be fine to use 
StreamFormat. You can use avro reader in your own `StreamFormat` 
implementation, according to [the 
code](https://github.com/apache/avro/blob/42822886c28ea74a744abb7e7a80a942c540faa5/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java#L213)
 you referred to. I don't see any conflict between StreamFormat read record one 
by one and avro reader reads blocks. Maybe I didn't get your point. 
   
   > I guess by "overfulfil the quota a little bit" you mean the number of 
bytes read from the stream. This is true but what I'm considering is that 
`StreamFormatAdapter.Reader` is storing all the results in a batch in memory at 
the same time (see [this 
code](https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java#L202)
 and also my reply to @slinkydeveloper). This might cause OOM for a highly 
compressed file.
   > 
   
   If you can use `StreamFormat.FETCH_IO_SIZE` to control the uncompressed data 
size, how could OOM happen?
   
   > Avro is not a compression algorithm or such. It is a type of row-oriented 
file format and you can see it as normal files like .xls or .json. We won't say 
this is a xls compression file because xls is not for compression but for 
recording data, although it uses some compression to help shrink the file size.
   > 
   > This is why avro exists as a sole module in `flink-formats`, instead of 
staying with the compressors in `StandardDeCompression` (you don't want to put 
a xls resolver in that file, do you). If you really do this you'll be 
essentially moving the whole avro module into `StandardDeCompression`.
   
   It is right to have avro as a sub module in `flink-formats`. What is the 
'StandardDeCompression'? Do you mean "StandardDeCompressors"? It is a class not 
a module. I know what avro is, and it goes even more complicated with parquet 
where each column may have its own compression codec. But this is another 
story. Since avro reader will handle the decompression logic i.e. 
`block.decompressUsing(codec)`, we don't even need to touch the compression 
logic built in the StreamFormatAdapter. We can delegate the decompression work 
to avro reader and focus on working on the uncompressed data. In the worst 
case(which is not exist), let's say there were no avro read handles 
decompression, StandardDeCompressors should be the place to handle it on our 
own. After all, the StandardDeCompressors and all related Factories are the 
abstraction designed for it. They are not limited to apache common compressors 
and extendable.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to