Hi all,

I'd like to discuss about the semantic of returning null from
InputFormat#nextRecord.

For now, there is no explicit java doc about this. And there are three ways
to handle
this in Flink:
1. treat null as the end of input
2. skip null
3. assumes that the value from InputFormat#nextRecord cannot be null

I quickly searched in Flink codebase about these usage:
- org.apache.flink.api.common.operators.GenericDataSourceBase [2]
- org.apache.flink.api.java.io.CsvInputFormat [2]
- org.apache.flink.runtime.operators.DataSourceTask [2]
-
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
[2]
- org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
[1]
- org.apache.flink.table.sources.CsvTableSource [1]
- org.apache.flink.table.runtime.io.CRowValuesInputFormat [3]
- org.apache.flink.table.filesystem.FileSystemLookupFunction [3]

I think we can align these behavior. about the alignment, I personally lean
to #2

A step further, when will InputFormat#nextRecord returns null?
One scenario is that we encountered dirty data, and want to skip it.
Actually we face the same problem in
org.apache.flink.api.common.serialization.DeserializationSchema
in the past, and in 1.11 we added a method `void deserialize(byte[]
message, Collector<T> out)`.
It's default behavior is to ignore the null return value.

Then could we also add a method `void nextRecord(OT reuse, Collector<OT>
collector)`
in InputFormat?
Adding this method will enable us to return multi records in one call,
which is very flexible for implementing an InputFormat.

WDHY?

-- 

Best,
Benchao Li

Reply via email to