Denis created FLINK-30476: ----------------------------- Summary: TrackingFsDataInputStream batch tracking issue Key: FLINK-30476 URL: https://issues.apache.org/jira/browse/FLINK-30476 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.15.3, 1.15.2, 1.15.1 Reporter: Denis
{{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream}} wraps underlying InputStream to count bytes consumed. {{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.Reader}} relies on this to create batches of data. {code:java} while (stream.hasRemainingInBatch() && (next = reader.read()) != null) { result.add(next); } {code} {{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream#read(byte[], int, int)}} contains a bug that can lead to arbitrary size batches due to counter ({{{}remainingInBatch{}}}) underflow. {code:java} public int read(byte[] b, int off, int len) throws IOException { remainingInBatch -= len; return stream.read(b, off, len); } {code} Every time we perform a {{stream.read()}} it may return less than {{len}} according to the javadoc. {code:java} Params: b – the buffer into which the data is read. off – the start offset in array b at which the data is written. len – the maximum number of bytes to read. Returns: the total number of bytes read into the buffer, or -1 if there is no more data because the end of the stream has been reached. {code} But current implementation accounts only bytes that were requested ({{{}{{len}}{}}}). E.g. S3 Hadoop FS can return less than {{len}} as a result of {{{}stream.read(b, off, len){}}}. This is expected and readers are aware of this {{org.apache.parquet.io.DelegatingSeekableInputStream#readFully(java.io.InputStream, byte[], int, int)}} As a result reading parquet file may result in underflow {{TrackingFsDataInputStream#read(byte[], int, int)}} because parquet reader tries to read the whole Row Group (large) and may execute {{read()}} multiple times. Underflow leads to unlimited batch size that may lead to OOM. -- This message was sent by Atlassian Jira (v8.20.10#820010)