Hi all, I'd like to discuss about the semantic of returning null from InputFormat#nextRecord.
For now, there is no explicit java doc about this. And there are three ways to handle this in Flink: 1. treat null as the end of input 2. skip null 3. assumes that the value from InputFormat#nextRecord cannot be null I quickly searched in Flink codebase about these usage: - org.apache.flink.api.common.operators.GenericDataSourceBase [2] - org.apache.flink.api.java.io.CsvInputFormat [2] - org.apache.flink.runtime.operators.DataSourceTask [2] - org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator [2] - org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction [1] - org.apache.flink.table.sources.CsvTableSource [1] - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3] - org.apache.flink.table.filesystem.FileSystemLookupFunction [3] I think we can align these behavior. about the alignment, I personally lean to #2 A step further, when will InputFormat#nextRecord returns null? One scenario is that we encountered dirty data, and want to skip it. Actually we face the same problem in org.apache.flink.api.common.serialization.DeserializationSchema in the past, and in 1.11 we added a method `void deserialize(byte[] message, Collector<T> out)`. It's default behavior is to ignore the null return value. Then could we also add a method `void nextRecord(OT reuse, Collector<OT> collector)` in InputFormat? Adding this method will enable us to return multi records in one call, which is very flexible for implementing an InputFormat. WDHY? -- Best, Benchao Li