Hi Dong,

Thanks for updating the FLIP and including Pulsar. I was indeed referring
that we should have a generic interface that allows connector maintainers
to implement this capability if they think it should be supported.

Could you see a feature like this also be useful for a connector like
FileSystem?

Best regards,

Martijn

On Tue, 11 Jan 2022 at 16:47, Fabian Paul <fp...@apache.org> wrote:

> Hi Dong,
>
> I wouldn't change the org.apache.flink.api.connector.source.Source
> interface because it either breaks existing sinks or we introduce it
> as some kind of optional. I deem both options as not great. My idea is
> to introduce a new interface that extends the Source. This way users
> who want to develop a source that stops with the record evaluator can
> implement the new interface. It also has the nice benefit that we can
> give this new type of source a lower stability guarantee than Public
> to allow some changes.
> In the SourceOperatorFactory we can then access the record evaluator
> from the respective sources and pass it to the source operator.
>
> Hopefully, this makes sense. So far I did not find information about
> the actual stopping logic in the FLIP maybe you had something
> different in mind.
>
> Best,
> Fabian
>
> On Tue, Jan 11, 2022 at 1:40 AM Dong Lin <lindon...@gmail.com> wrote:
> >
> > Hi Fabian,
> >
> > Thanks for the comments!
> >
> > By "add a source mixin interface", are you suggesting to update
> > the org.apache.flink.api.connector.source.Source interface to add the API
> > "RecordEvaluator<T> getRecordEvaluator()"? If so, it seems to add more
> > public API and thus more complexity than the solution in the FLIP. Could
> > you help explain more about the benefits of doing this?
> >
> > Regarding the 2nd question, I think this FLIP does not change whether
> > sources are treated as bounded or unbounded. For example, the
> KafkaSource's
> > boundedness will continue to be determined with the API
> > KafkaSourceBuilder::setBounded(..) and
> > KafkaSourceBuilder::setUnbounded(..). Does this answer your question?
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Jan 10, 2022 at 8:01 PM Fabian Paul <fp...@apache.org> wrote:
> >
> > > Hi Dong,
> > >
> > > Thank you for updating the FLIP and making it applicable for all
> > > sources. I am a bit unsure about the implementation part. I would
> > > propose to add a source mixin interface that implements
> > > `getRecordEvaluator` and sources that want to allow dynamically
> > > stopping implement that interface.
> > >
> > > Another question I had was how do we treat sources using the record
> > > evaluator as bounded or unbounded?
> > >
> > > Best,
> > > Fabian
> > >
> > > On Sat, Jan 8, 2022 at 11:52 AM Dong Lin <lindon...@gmail.com> wrote:
> > > >
> > > > Hi Martijn and Qingsheng,
> > > >
> > > > The FLIP has been updated to extend the dynamic EOF support for the
> > > > PulsarSource. I have not extended this feature to other sources yet
> > > since I
> > > > am not sure it is a requirement to ensure feature consistency across
> > > > different sources. Could you take another look?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Fri, Jan 7, 2022 at 11:49 PM Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hi Martijn,
> > > > >
> > > > > Thanks for the comments! In general I agree we should avoid feature
> > > > > sparsity.
> > > > >
> > > > > In this particular case, connectors are a bit different than most
> other
> > > > > features in Flink. AFAIK, we plan to move connectors (including
> Kafka
> > > and
> > > > > Pulsar) out of the Flink project in the future, which means that
> the
> > > > > development of these connectors will be mostly de-centralized
> (outside
> > > of
> > > > > Flink) and be up to their respective maintainers. While I agree
> that we
> > > > > should provide API/infrastructure in Flink (as this FLIP does) to
> > > support
> > > > > feature consistency across connectors, I am not sure we should own
> the
> > > > > responsibility to actually update all connectors to achieve feature
> > > > > consistency, given that we don't plan to do it in Flink anyway due
> to
> > > its
> > > > > heavy burden.
> > > > >
> > > > > With that being said, I am happy to follow the community guideline
> if
> > > we
> > > > > decide that connector-related FLIP should update every connector's
> API
> > > to
> > > > > ensure feature consistency (to a reasonable extent). For example,
> in
> > > this
> > > > > particular case, it looks like the EOF-detection feature can be
> > > applied to
> > > > > every connector (including bounded sources). Is it still
> sufficient to
> > > just
> > > > > update Kafka, Pulsar and Kinesis?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Tue, Jan 4, 2022 at 3:31 PM Martijn Visser <
> mart...@ververica.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Dong,
> > > > >>
> > > > >> Thanks for writing the FLIP. It focusses only on the KafkaSource,
> but
> > > I
> > > > >> would expect that if such a functionality is desired, it should be
> > > made
> > > > >> available for all unbounded sources (for example, Pulsar and
> > > Kinesis). If
> > > > >> it's only available for Kafka, I see it as if we're increasing
> feature
> > > > >> sparsity while we actually want to decrease that. What do you
> think?
> > > > >>
> > > > >> Best regards,
> > > > >>
> > > > >> Martijn
> > > > >>
> > > > >> On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com>
> wrote:
> > > > >>
> > > > >> > Hi all,
> > > > >> >
> > > > >> > We created FLIP-208: Update KafkaSource to detect EOF based on
> > > > >> > de-serialized records. Please find the KIP wiki in the link
> > > > >> >
> > > > >> >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records
> > > > >> > .
> > > > >> >
> > > > >> > This FLIP aims to address the use-case where users need to stop
> a
> > > Flink
> > > > >> job
> > > > >> > gracefully based on the content of de-serialized records
> observed
> > > in the
> > > > >> > KafkaSource. This feature is needed by users who currently
> depend on
> > > > >> > KafkaDeserializationSchema::isEndOfStream() to migrate their
> Flink
> > > job
> > > > >> from
> > > > >> > FlinkKafkaConsumer to KafkaSource.
> > > > >> >
> > > > >> > Could you help review this FLIP when you get time? Your
> comments are
> > > > >> > appreciated!
> > > > >> >
> > > > >> > Cheers,
> > > > >> > Dong
> > > > >> >
> > > > >>
> > > > >
> > >
>

Reply via email to