Hi Jingsong,

Thanks for the input. Our current implementation is very like the way you
posted.

Btw, we are doing the unification of streaming/batch work using our Flink
SQL engine,
hope to contribute more in this realm.

Jingsong Li <jingsongl...@gmail.com> 于2020年7月30日周四 上午11:49写道:

> Hi Benchao,
>
> I'm very glad that you are developing the ecology of batch~
>
> More detail for #1 is in `CsvInputFormat`:
> - There are dirty records (Comments) in `CsvInputFormat.readRecord`, so
> unfortunately, it must return null.
> - So go on like this, will return null in `nextRecord` like #2, so
> `CsvInputFormat` overwrite `nextRecord` like this:
>
> @Override
> public OUT nextRecord(OUT record) throws IOException {
>    OUT returnRecord = null;
>    do {
>       returnRecord = super.nextRecord(record);
>    } while (returnRecord == null && !reachedEnd());
>
>    return returnRecord;
> }
>
>
> Best,
> Jingsong
>
> On Thu, Jul 30, 2020 at 11:41 AM Benchao Li <libenc...@apache.org> wrote:
>
> > Thanks Jingsong for the response.
> >
> > If we plan to deprecate InputFormat in the near future, I think it's ok
> to
> > keep current behavior
> > as it is for now.
> >
> > I raised this discussion because I found that InputFormatSourceFunction
> is
> > not working as expected
> > while I'm developing a batch table source internally.
> > Our batch table source reads raw data from our internal storage, which
> has
> > the same interface as Kafka,
> > then we can leverage all the formats in streaming, e.g. json/pb. So it's
> > common that we encounters dirty
> > data and want to skip it like in DeserializationSchema.
> > That's why I prefer #2. Anyway, If we are gonna to deprecate InputFormat
> in
> > the near future, I think there
> > is no need to change this in the community.
> >
> >
> > Jingsong Li <jingsongl...@gmail.com> 于2020年7月30日周四 上午10:30写道:
> >
> > > Hi Benchao,
> > >
> > > My understanding is that #1 treats null as the end of input.
> > > This means we should try our best to avoid returning null before the
> end
> > of
> > > input in the implementation of `InputFormat`. Even if we have to return
> > > null, we have to return it at the end of input.
> > > I think this is the most safe implementation.
> > > For #1 and #2, it works.
> > > For #3, I think we should avoid #3, actually, both
> > `CRowValuesInputFormat`
> > > and `FileSystemLookupFunction` have corresponding implementations, the
> > > specific `InputFormat`s can ensure they have nice behavior about
> > > `reachedEnd` and `nextRecord`. Of course, we can also make their
> > > invocations more robust.
> > >
> > > Actually, IIUC, the InputFormat is a legacy interface, the new
> interface
> > > should be FLIP-27.[1] And we are planning implementing FileSource on
> the
> > > new interfaces too.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <libenc...@apache.org>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to discuss about the semantic of returning null from
> > > > InputFormat#nextRecord.
> > > >
> > > > For now, there is no explicit java doc about this. And there are
> three
> > > ways
> > > > to handle
> > > > this in Flink:
> > > > 1. treat null as the end of input
> > > > 2. skip null
> > > > 3. assumes that the value from InputFormat#nextRecord cannot be null
> > > >
> > > > I quickly searched in Flink codebase about these usage:
> > > > - org.apache.flink.api.common.operators.GenericDataSourceBase [2]
> > > > - org.apache.flink.api.java.io.CsvInputFormat [2]
> > > > - org.apache.flink.runtime.operators.DataSourceTask [2]
> > > > -
> > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
> > > > [2]
> > > > -
> > >
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
> > > > [1]
> > > > - org.apache.flink.table.sources.CsvTableSource [1]
> > > > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3]
> > > > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3]
> > > >
> > > > I think we can align these behavior. about the alignment, I
> personally
> > > lean
> > > > to #2
> > > >
> > > > A step further, when will InputFormat#nextRecord returns null?
> > > > One scenario is that we encountered dirty data, and want to skip it.
> > > > Actually we face the same problem in
> > > > org.apache.flink.api.common.serialization.DeserializationSchema
> > > > in the past, and in 1.11 we added a method `void deserialize(byte[]
> > > > message, Collector<T> out)`.
> > > > It's default behavior is to ignore the null return value.
> > > >
> > > > Then could we also add a method `void nextRecord(OT reuse,
> > Collector<OT>
> > > > collector)`
> > > > in InputFormat?
> > > > Adding this method will enable us to return multi records in one
> call,
> > > > which is very flexible for implementing an InputFormat.
> > > >
> > > > WDHY?
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
> Best, Jingsong Lee
>


-- 

Best,
Benchao Li

Reply via email to