Denis created FLINK-30476:
-----------------------------

             Summary: TrackingFsDataInputStream batch tracking issue
                 Key: FLINK-30476
                 URL: https://issues.apache.org/jira/browse/FLINK-30476
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem
    Affects Versions: 1.15.3, 1.15.2, 1.15.1
            Reporter: Denis


{{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream}}
 wraps underlying InputStream to count bytes consumed.
{{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.Reader}} relies 
on this to create batches of data.
{code:java}
            while (stream.hasRemainingInBatch() && (next = reader.read()) != 
null) {
                result.add(next);
            }
{code}
{{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream#read(byte[],
 int, int)}} contains a bug that can lead to arbitrary size batches due to 
counter ({{{}remainingInBatch{}}}) underflow.
{code:java}
        public int read(byte[] b, int off, int len) throws IOException {
            remainingInBatch -= len;
            return stream.read(b, off, len);
        }
{code}
Every time we perform a {{stream.read()}} it may return less than {{len}} 
according to the javadoc.
{code:java}
Params:
b – the buffer into which the data is read. off – the start offset in array b 
at which the data is written. len – the maximum number of bytes to read.
Returns:
the total number of bytes read into the buffer, or -1 if there is no more data 
because the end of the stream has been reached.
{code}
But current implementation accounts only bytes that were requested 
({{{}{{len}}{}}}).

E.g. S3 Hadoop FS can return less than {{len}} as a result of 
{{{}stream.read(b, off, len){}}}. This is expected and readers are aware of 
this 
{{org.apache.parquet.io.DelegatingSeekableInputStream#readFully(java.io.InputStream,
 byte[], int, int)}}

As a result reading parquet file may result in underflow 
{{TrackingFsDataInputStream#read(byte[], int, int)}} because parquet reader 
tries to read the whole Row Group (large) and may execute {{read()}} multiple 
times. Underflow leads to unlimited batch size that may lead to OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to