Hi Benchao,

My understanding is that #1 treats null as the end of input.
This means we should try our best to avoid returning null before the end of
input in the implementation of `InputFormat`. Even if we have to return
null, we have to return it at the end of input.
I think this is the most safe implementation.
For #1 and #2, it works.
For #3, I think we should avoid #3, actually, both `CRowValuesInputFormat`
and `FileSystemLookupFunction` have corresponding implementations, the
specific `InputFormat`s can ensure they have nice behavior about
`reachedEnd` and `nextRecord`. Of course, we can also make their
invocations more robust.

Actually, IIUC, the InputFormat is a legacy interface, the new interface
should be FLIP-27.[1] And we are planning implementing FileSource on the
new interfaces too.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

Best,
Jingsong

On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <libenc...@apache.org> wrote:

> Hi all,
>
> I'd like to discuss about the semantic of returning null from
> InputFormat#nextRecord.
>
> For now, there is no explicit java doc about this. And there are three ways
> to handle
> this in Flink:
> 1. treat null as the end of input
> 2. skip null
> 3. assumes that the value from InputFormat#nextRecord cannot be null
>
> I quickly searched in Flink codebase about these usage:
> - org.apache.flink.api.common.operators.GenericDataSourceBase [2]
> - org.apache.flink.api.java.io.CsvInputFormat [2]
> - org.apache.flink.runtime.operators.DataSourceTask [2]
> -
>
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
> [2]
> - org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
> [1]
> - org.apache.flink.table.sources.CsvTableSource [1]
> - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3]
> - org.apache.flink.table.filesystem.FileSystemLookupFunction [3]
>
> I think we can align these behavior. about the alignment, I personally lean
> to #2
>
> A step further, when will InputFormat#nextRecord returns null?
> One scenario is that we encountered dirty data, and want to skip it.
> Actually we face the same problem in
> org.apache.flink.api.common.serialization.DeserializationSchema
> in the past, and in 1.11 we added a method `void deserialize(byte[]
> message, Collector<T> out)`.
> It's default behavior is to ignore the null return value.
>
> Then could we also add a method `void nextRecord(OT reuse, Collector<OT>
> collector)`
> in InputFormat?
> Adding this method will enable us to return multi records in one call,
> which is very flexible for implementing an InputFormat.
>
> WDHY?
>
> --
>
> Best,
> Benchao Li
>


-- 
Best, Jingsong Lee

Reply via email to