Hi Jingsong, Thanks for the input. Our current implementation is very like the way you posted.
Btw, we are doing the unification of streaming/batch work using our Flink SQL engine, hope to contribute more in this realm. Jingsong Li <jingsongl...@gmail.com> 于2020年7月30日周四 上午11:49写道: > Hi Benchao, > > I'm very glad that you are developing the ecology of batch~ > > More detail for #1 is in `CsvInputFormat`: > - There are dirty records (Comments) in `CsvInputFormat.readRecord`, so > unfortunately, it must return null. > - So go on like this, will return null in `nextRecord` like #2, so > `CsvInputFormat` overwrite `nextRecord` like this: > > @Override > public OUT nextRecord(OUT record) throws IOException { > OUT returnRecord = null; > do { > returnRecord = super.nextRecord(record); > } while (returnRecord == null && !reachedEnd()); > > return returnRecord; > } > > > Best, > Jingsong > > On Thu, Jul 30, 2020 at 11:41 AM Benchao Li <libenc...@apache.org> wrote: > > > Thanks Jingsong for the response. > > > > If we plan to deprecate InputFormat in the near future, I think it's ok > to > > keep current behavior > > as it is for now. > > > > I raised this discussion because I found that InputFormatSourceFunction > is > > not working as expected > > while I'm developing a batch table source internally. > > Our batch table source reads raw data from our internal storage, which > has > > the same interface as Kafka, > > then we can leverage all the formats in streaming, e.g. json/pb. So it's > > common that we encounters dirty > > data and want to skip it like in DeserializationSchema. > > That's why I prefer #2. Anyway, If we are gonna to deprecate InputFormat > in > > the near future, I think there > > is no need to change this in the community. > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2020年7月30日周四 上午10:30写道: > > > > > Hi Benchao, > > > > > > My understanding is that #1 treats null as the end of input. > > > This means we should try our best to avoid returning null before the > end > > of > > > input in the implementation of `InputFormat`. Even if we have to return > > > null, we have to return it at the end of input. > > > I think this is the most safe implementation. > > > For #1 and #2, it works. > > > For #3, I think we should avoid #3, actually, both > > `CRowValuesInputFormat` > > > and `FileSystemLookupFunction` have corresponding implementations, the > > > specific `InputFormat`s can ensure they have nice behavior about > > > `reachedEnd` and `nextRecord`. Of course, we can also make their > > > invocations more robust. > > > > > > Actually, IIUC, the InputFormat is a legacy interface, the new > interface > > > should be FLIP-27.[1] And we are planning implementing FileSource on > the > > > new interfaces too. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > > > > > Best, > > > Jingsong > > > > > > On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <libenc...@apache.org> > wrote: > > > > > > > Hi all, > > > > > > > > I'd like to discuss about the semantic of returning null from > > > > InputFormat#nextRecord. > > > > > > > > For now, there is no explicit java doc about this. And there are > three > > > ways > > > > to handle > > > > this in Flink: > > > > 1. treat null as the end of input > > > > 2. skip null > > > > 3. assumes that the value from InputFormat#nextRecord cannot be null > > > > > > > > I quickly searched in Flink codebase about these usage: > > > > - org.apache.flink.api.common.operators.GenericDataSourceBase [2] > > > > - org.apache.flink.api.java.io.CsvInputFormat [2] > > > > - org.apache.flink.runtime.operators.DataSourceTask [2] > > > > - > > > > > > > > > > > > > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator > > > > [2] > > > > - > > > > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction > > > > [1] > > > > - org.apache.flink.table.sources.CsvTableSource [1] > > > > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3] > > > > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3] > > > > > > > > I think we can align these behavior. about the alignment, I > personally > > > lean > > > > to #2 > > > > > > > > A step further, when will InputFormat#nextRecord returns null? > > > > One scenario is that we encountered dirty data, and want to skip it. > > > > Actually we face the same problem in > > > > org.apache.flink.api.common.serialization.DeserializationSchema > > > > in the past, and in 1.11 we added a method `void deserialize(byte[] > > > > message, Collector<T> out)`. > > > > It's default behavior is to ignore the null return value. > > > > > > > > Then could we also add a method `void nextRecord(OT reuse, > > Collector<OT> > > > > collector)` > > > > in InputFormat? > > > > Adding this method will enable us to return multi records in one > call, > > > > which is very flexible for implementing an InputFormat. > > > > > > > > WDHY? > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > > > > -- > > > > Best, > > Benchao Li > > > > > -- > Best, Jingsong Lee > -- Best, Benchao Li