Hi Fabian,

Thanks for the comments. Please see my reply inline.

On Tue, Jan 11, 2022 at 11:46 PM Fabian Paul <fp...@apache.org> wrote:

> Hi Dong,
>
> I wouldn't change the org.apache.flink.api.connector.source.Source
> interface because it either breaks existing sinks or we introduce it
> as some kind of optional. I deem both options as not great. My idea is
> to introduce a new interface that extends the Source. This way users
> who want to develop a source that stops with the record evaluator can
> implement the new interface. It also has the nice benefit that we can
> give this new type of source a lower stability guarantee than Public
> to allow some changes.
>

Currently the eofRecodEvaluator can be passed from
KafkaSourceBuilder/PulsarSourceBuilder
to SingleThreadMultiplexSourceReaderBase and SourceReaderBase. This
approach also allows developers who want to develop a source that stops
with the record evaluator to implement the new feature. Adding a new
interface could increase the complexity in our interface and
infrastructure. I am not sure if it has added benefits compared to the
existing proposal. Could you explain more?

I am not very sure what "new type of source a lower stability guarantee"
you are referring to. Could you explain more? It looks like a new feature
not mentioned in the FLIP. If the changes proposed in this FLIP also
support the feature you have in mind, could we discuss this in a separate
FLIP?

In the SourceOperatorFactory we can then access the record evaluator
> from the respective sources and pass it to the source operator.
>
> Hopefully, this makes sense. So far I did not find information about
> the actual stopping logic in the FLIP maybe you had something
> different in mind.
>

By "actual stopping logic", do you mean an example implementation of the
RecordEvalutor? I think the use-case is described in the motivation
section, which is about a pipeline processing stock transaction data.

We can support this use-case with this FLIP, by implementing this
RecordEvaluator that stops reading data from a split when there is a
message that says "EOF". Users can trigger this feature by sending messages
with "EOF" in the payload to all partitions of the source Kafka topic.

Does this make sense?


>
> Best,
> Fabian
>
> On Tue, Jan 11, 2022 at 1:40 AM Dong Lin <lindon...@gmail.com> wrote:
> >
> > Hi Fabian,
> >
> > Thanks for the comments!
> >
> > By "add a source mixin interface", are you suggesting to update
> > the org.apache.flink.api.connector.source.Source interface to add the API
> > "RecordEvaluator<T> getRecordEvaluator()"? If so, it seems to add more
> > public API and thus more complexity than the solution in the FLIP. Could
> > you help explain more about the benefits of doing this?
> >
> > Regarding the 2nd question, I think this FLIP does not change whether
> > sources are treated as bounded or unbounded. For example, the
> KafkaSource's
> > boundedness will continue to be determined with the API
> > KafkaSourceBuilder::setBounded(..) and
> > KafkaSourceBuilder::setUnbounded(..). Does this answer your question?
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Jan 10, 2022 at 8:01 PM Fabian Paul <fp...@apache.org> wrote:
> >
> > > Hi Dong,
> > >
> > > Thank you for updating the FLIP and making it applicable for all
> > > sources. I am a bit unsure about the implementation part. I would
> > > propose to add a source mixin interface that implements
> > > `getRecordEvaluator` and sources that want to allow dynamically
> > > stopping implement that interface.
> > >
> > > Another question I had was how do we treat sources using the record
> > > evaluator as bounded or unbounded?
> > >
> > > Best,
> > > Fabian
> > >
> > > On Sat, Jan 8, 2022 at 11:52 AM Dong Lin <lindon...@gmail.com> wrote:
> > > >
> > > > Hi Martijn and Qingsheng,
> > > >
> > > > The FLIP has been updated to extend the dynamic EOF support for the
> > > > PulsarSource. I have not extended this feature to other sources yet
> > > since I
> > > > am not sure it is a requirement to ensure feature consistency across
> > > > different sources. Could you take another look?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Fri, Jan 7, 2022 at 11:49 PM Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hi Martijn,
> > > > >
> > > > > Thanks for the comments! In general I agree we should avoid feature
> > > > > sparsity.
> > > > >
> > > > > In this particular case, connectors are a bit different than most
> other
> > > > > features in Flink. AFAIK, we plan to move connectors (including
> Kafka
> > > and
> > > > > Pulsar) out of the Flink project in the future, which means that
> the
> > > > > development of these connectors will be mostly de-centralized
> (outside
> > > of
> > > > > Flink) and be up to their respective maintainers. While I agree
> that we
> > > > > should provide API/infrastructure in Flink (as this FLIP does) to
> > > support
> > > > > feature consistency across connectors, I am not sure we should own
> the
> > > > > responsibility to actually update all connectors to achieve feature
> > > > > consistency, given that we don't plan to do it in Flink anyway due
> to
> > > its
> > > > > heavy burden.
> > > > >
> > > > > With that being said, I am happy to follow the community guideline
> if
> > > we
> > > > > decide that connector-related FLIP should update every connector's
> API
> > > to
> > > > > ensure feature consistency (to a reasonable extent). For example,
> in
> > > this
> > > > > particular case, it looks like the EOF-detection feature can be
> > > applied to
> > > > > every connector (including bounded sources). Is it still
> sufficient to
> > > just
> > > > > update Kafka, Pulsar and Kinesis?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Tue, Jan 4, 2022 at 3:31 PM Martijn Visser <
> mart...@ververica.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Dong,
> > > > >>
> > > > >> Thanks for writing the FLIP. It focusses only on the KafkaSource,
> but
> > > I
> > > > >> would expect that if such a functionality is desired, it should be
> > > made
> > > > >> available for all unbounded sources (for example, Pulsar and
> > > Kinesis). If
> > > > >> it's only available for Kafka, I see it as if we're increasing
> feature
> > > > >> sparsity while we actually want to decrease that. What do you
> think?
> > > > >>
> > > > >> Best regards,
> > > > >>
> > > > >> Martijn
> > > > >>
> > > > >> On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com>
> wrote:
> > > > >>
> > > > >> > Hi all,
> > > > >> >
> > > > >> > We created FLIP-208: Update KafkaSource to detect EOF based on
> > > > >> > de-serialized records. Please find the KIP wiki in the link
> > > > >> >
> > > > >> >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records
> > > > >> > .
> > > > >> >
> > > > >> > This FLIP aims to address the use-case where users need to stop
> a
> > > Flink
> > > > >> job
> > > > >> > gracefully based on the content of de-serialized records
> observed
> > > in the
> > > > >> > KafkaSource. This feature is needed by users who currently
> depend on
> > > > >> > KafkaDeserializationSchema::isEndOfStream() to migrate their
> Flink
> > > job
> > > > >> from
> > > > >> > FlinkKafkaConsumer to KafkaSource.
> > > > >> >
> > > > >> > Could you help review this FLIP when you get time? Your
> comments are
> > > > >> > appreciated!
> > > > >> >
> > > > >> > Cheers,
> > > > >> > Dong
> > > > >> >
> > > > >>
> > > > >
> > >
>

Reply via email to