Hi Benchao, My understanding is that #1 treats null as the end of input. This means we should try our best to avoid returning null before the end of input in the implementation of `InputFormat`. Even if we have to return null, we have to return it at the end of input. I think this is the most safe implementation. For #1 and #2, it works. For #3, I think we should avoid #3, actually, both `CRowValuesInputFormat` and `FileSystemLookupFunction` have corresponding implementations, the specific `InputFormat`s can ensure they have nice behavior about `reachedEnd` and `nextRecord`. Of course, we can also make their invocations more robust.
Actually, IIUC, the InputFormat is a legacy interface, the new interface should be FLIP-27.[1] And we are planning implementing FileSource on the new interfaces too. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface Best, Jingsong On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <libenc...@apache.org> wrote: > Hi all, > > I'd like to discuss about the semantic of returning null from > InputFormat#nextRecord. > > For now, there is no explicit java doc about this. And there are three ways > to handle > this in Flink: > 1. treat null as the end of input > 2. skip null > 3. assumes that the value from InputFormat#nextRecord cannot be null > > I quickly searched in Flink codebase about these usage: > - org.apache.flink.api.common.operators.GenericDataSourceBase [2] > - org.apache.flink.api.java.io.CsvInputFormat [2] > - org.apache.flink.runtime.operators.DataSourceTask [2] > - > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator > [2] > - org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction > [1] > - org.apache.flink.table.sources.CsvTableSource [1] > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3] > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3] > > I think we can align these behavior. about the alignment, I personally lean > to #2 > > A step further, when will InputFormat#nextRecord returns null? > One scenario is that we encountered dirty data, and want to skip it. > Actually we face the same problem in > org.apache.flink.api.common.serialization.DeserializationSchema > in the past, and in 1.11 we added a method `void deserialize(byte[] > message, Collector<T> out)`. > It's default behavior is to ignore the null return value. > > Then could we also add a method `void nextRecord(OT reuse, Collector<OT> > collector)` > in InputFormat? > Adding this method will enable us to return multi records in one call, > which is very flexible for implementing an InputFormat. > > WDHY? > > -- > > Best, > Benchao Li > -- Best, Jingsong Lee