Hi Benchao,

I'm very glad that you are developing the ecology of batch~

More detail for #1 is in `CsvInputFormat`:
- There are dirty records (Comments) in `CsvInputFormat.readRecord`, so
unfortunately, it must return null.
- So go on like this, will return null in `nextRecord` like #2, so
`CsvInputFormat` overwrite `nextRecord` like this:

@Override
public OUT nextRecord(OUT record) throws IOException {
   OUT returnRecord = null;
   do {
      returnRecord = super.nextRecord(record);
   } while (returnRecord == null && !reachedEnd());

   return returnRecord;
}


Best,
Jingsong

On Thu, Jul 30, 2020 at 11:41 AM Benchao Li <libenc...@apache.org> wrote:

> Thanks Jingsong for the response.
>
> If we plan to deprecate InputFormat in the near future, I think it's ok to
> keep current behavior
> as it is for now.
>
> I raised this discussion because I found that InputFormatSourceFunction is
> not working as expected
> while I'm developing a batch table source internally.
> Our batch table source reads raw data from our internal storage, which has
> the same interface as Kafka,
> then we can leverage all the formats in streaming, e.g. json/pb. So it's
> common that we encounters dirty
> data and want to skip it like in DeserializationSchema.
> That's why I prefer #2. Anyway, If we are gonna to deprecate InputFormat in
> the near future, I think there
> is no need to change this in the community.
>
>
> Jingsong Li <jingsongl...@gmail.com> 于2020年7月30日周四 上午10:30写道:
>
> > Hi Benchao,
> >
> > My understanding is that #1 treats null as the end of input.
> > This means we should try our best to avoid returning null before the end
> of
> > input in the implementation of `InputFormat`. Even if we have to return
> > null, we have to return it at the end of input.
> > I think this is the most safe implementation.
> > For #1 and #2, it works.
> > For #3, I think we should avoid #3, actually, both
> `CRowValuesInputFormat`
> > and `FileSystemLookupFunction` have corresponding implementations, the
> > specific `InputFormat`s can ensure they have nice behavior about
> > `reachedEnd` and `nextRecord`. Of course, we can also make their
> > invocations more robust.
> >
> > Actually, IIUC, the InputFormat is a legacy interface, the new interface
> > should be FLIP-27.[1] And we are planning implementing FileSource on the
> > new interfaces too.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >
> > Best,
> > Jingsong
> >
> > On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <libenc...@apache.org> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to discuss about the semantic of returning null from
> > > InputFormat#nextRecord.
> > >
> > > For now, there is no explicit java doc about this. And there are three
> > ways
> > > to handle
> > > this in Flink:
> > > 1. treat null as the end of input
> > > 2. skip null
> > > 3. assumes that the value from InputFormat#nextRecord cannot be null
> > >
> > > I quickly searched in Flink codebase about these usage:
> > > - org.apache.flink.api.common.operators.GenericDataSourceBase [2]
> > > - org.apache.flink.api.java.io.CsvInputFormat [2]
> > > - org.apache.flink.runtime.operators.DataSourceTask [2]
> > > -
> > >
> > >
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
> > > [2]
> > > -
> > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
> > > [1]
> > > - org.apache.flink.table.sources.CsvTableSource [1]
> > > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3]
> > > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3]
> > >
> > > I think we can align these behavior. about the alignment, I personally
> > lean
> > > to #2
> > >
> > > A step further, when will InputFormat#nextRecord returns null?
> > > One scenario is that we encountered dirty data, and want to skip it.
> > > Actually we face the same problem in
> > > org.apache.flink.api.common.serialization.DeserializationSchema
> > > in the past, and in 1.11 we added a method `void deserialize(byte[]
> > > message, Collector<T> out)`.
> > > It's default behavior is to ignore the null return value.
> > >
> > > Then could we also add a method `void nextRecord(OT reuse,
> Collector<OT>
> > > collector)`
> > > in InputFormat?
> > > Adding this method will enable us to return multi records in one call,
> > > which is very flexible for implementing an InputFormat.
> > >
> > > WDHY?
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 
Best, Jingsong Lee

Reply via email to