Hi Qingsheng,

Thank you for extending this FLIP to support this feature with Table/SQL. I
also prefer the 2nd option over the 1st option. This is because the 1st
option requires the user to additionally specify an identifier in addition
to defining the RecordEvaluator themselves. Note that this is unlike
InputFormat, where most users will use off-the-shelf de-serializers in
Flink instead of implementing de-serializers by themselves.

Could you help update FLIP-208 with the proposed changes as well as the
explanation of the rejected alternative?

Thanks,
Dong



On Thu, Jan 13, 2022 at 5:18 PM Qingsheng Ren <renqs...@gmail.com> wrote:

> Thanks Dong for the explanation!
>
> I agree with Dong’s idea of keeping the consistency of APIs for setting
> configurations, so I think it’s acceptable for me to pass the record
> evaluator from XXXSourceBuilder and embed it into SourceReaderBase. Also
> considering current usage of the DeserializationSchema#isEndOfStream that
> only Kafka source respect this interface, it’s OK to implement just Kafka
> and Pulsar connectors for now.
>
> Another thing I’d like to mention is about using this feature in Table/SQL
> API. Currently I have two kinds of implementations in my mind:
>
> 1. Similar to format / deserializer, we introduce a factory for
> RecordEvaluator, and users need to specify the factory identifier in table
> options:
>
> CREATE TABLE `kafka` (…) WITH (
>     `connector` = `kafka`,
>     `record.evaluator` =  `my-evaluator-factory-identifier`
> )
>
> 2. Directly use full class path in table options:
>
> CREATE TABLE `kafka` (…) WITH (
>     `connector` = `kafka`,
>     `record.evaluator.class` =  `com.mycompany.evaluator.MyEvaluator`
> )
>
> Personally I prefer the second one, because it’s easier for users to
> implement their own RecordEvaluators.
>
> What do you think?
>
>
> > On Jan 13, 2022, at 11:39 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > Hi Fabian,
> >
> > Thank you for the explanation.
> >
> > The current approach needs to add new constructors for SourceReaderBase
> > and SingleThreadMultiplexSourceReaderBase. This proposed change has now
> > been included in the Public Interfaces section in the FLIP.
> >
> > And yes, I also agree it would be preferred if developers do not have to
> > change their SourceReaders to implement this new logic. The suggestion
> > mentioned by Qingshen in this thread could achieve this goal. Qingshen's
> > idea is to let user specify eofRecordEvaluator via
> > StreamExecutionEnvironment::fromSource(...).withEofRecordEvaluator(...)
> and
> > pass this evaluator through DataStreamSource to SourceOperator. And
> > SourceOperator::emitNext(...) could use this evaluator as appropriate.
> >
> > For now I have chosen not to use this approach because this approach
> > requires users to pass some source configuration via
> > StreamExecutionEnvironment::fromSource(...) and some other source
> > configuration via e.g. KafkaSourceBuilder(...). This might create a sense
> > of inconsistency/confusion. Given that the number of connector users are
> > much more than the number of connector developers, I believe it is
> probably
> > better to optimize the user experience in this case.
> >
> > The description of this alternative approach and its pros/cons has been
> > included in the FLIP.
> >
> > And yep, I think I understand your suggestion. Indeed those connector
> > configs (e.g. de-serializer, boundedness, eofRecordEvaluator) can be
> passed
> > from XXXSourceBuilder to their shared infra (e.g. SourceReaderBase). Both
> > solutions work. Given that the existing configs (e.g. serializer) are
> > already passed to SourceReaderBase via the constructor parameter, I guess
> > it is simpler to follow the existing pattern for now.
> >
> > Regards,
> > Dong
> >
> >
> > On Wed, Jan 12, 2022 at 11:17 PM Fabian Paul <fp...@apache.org> wrote:
> >
> >> Hi Dong,
> >>
> >> I think I am beginning to understand your idea. Since SourceReaderBase
> >> is marked as PublicEvolving can you also update the FLIP with the
> >> changes you want to make to it? Ideally, connector developers do not
> >> have to change their SourceReaders to implement this new logic.
> >>
> >> My idea was to introduce a second source interface that extends the
> >> existing interface and offers only the method getRecordEvaluator().
> >> The record evaluator is still passed as you have described through the
> >> builder and at the end held by the source object. This way the source
> >> framework can automatically use the evaluator without the need that
> >> connector developers have to implement the complicated stopping logic
> >> or change their SourceReaders.
> >>
> >> Best,
> >> Fabian
> >>
> >>
> >> On Wed, Jan 12, 2022 at 2:22 AM Dong Lin <lindon...@gmail.com> wrote:
> >>>
> >>> Hi Fabian,
> >>>
> >>> Thanks for the comments. Please see my reply inline.
> >>>
> >>> On Tue, Jan 11, 2022 at 11:46 PM Fabian Paul <fp...@apache.org> wrote:
> >>>
> >>>> Hi Dong,
> >>>>
> >>>> I wouldn't change the org.apache.flink.api.connector.source.Source
> >>>> interface because it either breaks existing sinks or we introduce it
> >>>> as some kind of optional. I deem both options as not great. My idea is
> >>>> to introduce a new interface that extends the Source. This way users
> >>>> who want to develop a source that stops with the record evaluator can
> >>>> implement the new interface. It also has the nice benefit that we can
> >>>> give this new type of source a lower stability guarantee than Public
> >>>> to allow some changes.
> >>>>
> >>>
> >>> Currently the eofRecodEvaluator can be passed from
> >>> KafkaSourceBuilder/PulsarSourceBuilder
> >>> to SingleThreadMultiplexSourceReaderBase and SourceReaderBase. This
> >>> approach also allows developers who want to develop a source that stops
> >>> with the record evaluator to implement the new feature. Adding a new
> >>> interface could increase the complexity in our interface and
> >>> infrastructure. I am not sure if it has added benefits compared to the
> >>> existing proposal. Could you explain more?
> >>>
> >>> I am not very sure what "new type of source a lower stability
> guarantee"
> >>> you are referring to. Could you explain more? It looks like a new
> feature
> >>> not mentioned in the FLIP. If the changes proposed in this FLIP also
> >>> support the feature you have in mind, could we discuss this in a
> separate
> >>> FLIP?
> >>>
> >>> In the SourceOperatorFactory we can then access the record evaluator
> >>>> from the respective sources and pass it to the source operator.
> >>>>
> >>>> Hopefully, this makes sense. So far I did not find information about
> >>>> the actual stopping logic in the FLIP maybe you had something
> >>>> different in mind.
> >>>>
> >>>
> >>> By "actual stopping logic", do you mean an example implementation of
> the
> >>> RecordEvalutor? I think the use-case is described in the motivation
> >>> section, which is about a pipeline processing stock transaction data.
> >>>
> >>> We can support this use-case with this FLIP, by implementing this
> >>> RecordEvaluator that stops reading data from a split when there is a
> >>> message that says "EOF". Users can trigger this feature by sending
> >> messages
> >>> with "EOF" in the payload to all partitions of the source Kafka topic.
> >>>
> >>> Does this make sense?
> >>>
> >>>
> >>>>
> >>>> Best,
> >>>> Fabian
> >>>>
> >>>> On Tue, Jan 11, 2022 at 1:40 AM Dong Lin <lindon...@gmail.com> wrote:
> >>>>>
> >>>>> Hi Fabian,
> >>>>>
> >>>>> Thanks for the comments!
> >>>>>
> >>>>> By "add a source mixin interface", are you suggesting to update
> >>>>> the org.apache.flink.api.connector.source.Source interface to add
> >> the API
> >>>>> "RecordEvaluator<T> getRecordEvaluator()"? If so, it seems to add
> >> more
> >>>>> public API and thus more complexity than the solution in the FLIP.
> >> Could
> >>>>> you help explain more about the benefits of doing this?
> >>>>>
> >>>>> Regarding the 2nd question, I think this FLIP does not change whether
> >>>>> sources are treated as bounded or unbounded. For example, the
> >>>> KafkaSource's
> >>>>> boundedness will continue to be determined with the API
> >>>>> KafkaSourceBuilder::setBounded(..) and
> >>>>> KafkaSourceBuilder::setUnbounded(..). Does this answer your question?
> >>>>>
> >>>>> Thanks,
> >>>>> Dong
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Mon, Jan 10, 2022 at 8:01 PM Fabian Paul <fp...@apache.org>
> >> wrote:
> >>>>>
> >>>>>> Hi Dong,
> >>>>>>
> >>>>>> Thank you for updating the FLIP and making it applicable for all
> >>>>>> sources. I am a bit unsure about the implementation part. I would
> >>>>>> propose to add a source mixin interface that implements
> >>>>>> `getRecordEvaluator` and sources that want to allow dynamically
> >>>>>> stopping implement that interface.
> >>>>>>
> >>>>>> Another question I had was how do we treat sources using the record
> >>>>>> evaluator as bounded or unbounded?
> >>>>>>
> >>>>>> Best,
> >>>>>> Fabian
> >>>>>>
> >>>>>> On Sat, Jan 8, 2022 at 11:52 AM Dong Lin <lindon...@gmail.com>
> >> wrote:
> >>>>>>>
> >>>>>>> Hi Martijn and Qingsheng,
> >>>>>>>
> >>>>>>> The FLIP has been updated to extend the dynamic EOF support for
> >> the
> >>>>>>> PulsarSource. I have not extended this feature to other sources
> >> yet
> >>>>>> since I
> >>>>>>> am not sure it is a requirement to ensure feature consistency
> >> across
> >>>>>>> different sources. Could you take another look?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Dong
> >>>>>>>
> >>>>>>> On Fri, Jan 7, 2022 at 11:49 PM Dong Lin <lindon...@gmail.com>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Martijn,
> >>>>>>>>
> >>>>>>>> Thanks for the comments! In general I agree we should avoid
> >> feature
> >>>>>>>> sparsity.
> >>>>>>>>
> >>>>>>>> In this particular case, connectors are a bit different than
> >> most
> >>>> other
> >>>>>>>> features in Flink. AFAIK, we plan to move connectors (including
> >>>> Kafka
> >>>>>> and
> >>>>>>>> Pulsar) out of the Flink project in the future, which means
> >> that
> >>>> the
> >>>>>>>> development of these connectors will be mostly de-centralized
> >>>> (outside
> >>>>>> of
> >>>>>>>> Flink) and be up to their respective maintainers. While I agree
> >>>> that we
> >>>>>>>> should provide API/infrastructure in Flink (as this FLIP does)
> >> to
> >>>>>> support
> >>>>>>>> feature consistency across connectors, I am not sure we should
> >> own
> >>>> the
> >>>>>>>> responsibility to actually update all connectors to achieve
> >> feature
> >>>>>>>> consistency, given that we don't plan to do it in Flink anyway
> >> due
> >>>> to
> >>>>>> its
> >>>>>>>> heavy burden.
> >>>>>>>>
> >>>>>>>> With that being said, I am happy to follow the community
> >> guideline
> >>>> if
> >>>>>> we
> >>>>>>>> decide that connector-related FLIP should update every
> >> connector's
> >>>> API
> >>>>>> to
> >>>>>>>> ensure feature consistency (to a reasonable extent). For
> >> example,
> >>>> in
> >>>>>> this
> >>>>>>>> particular case, it looks like the EOF-detection feature can be
> >>>>>> applied to
> >>>>>>>> every connector (including bounded sources). Is it still
> >>>> sufficient to
> >>>>>> just
> >>>>>>>> update Kafka, Pulsar and Kinesis?
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Dong
> >>>>>>>>
> >>>>>>>> On Tue, Jan 4, 2022 at 3:31 PM Martijn Visser <
> >>>> mart...@ververica.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Dong,
> >>>>>>>>>
> >>>>>>>>> Thanks for writing the FLIP. It focusses only on the
> >> KafkaSource,
> >>>> but
> >>>>>> I
> >>>>>>>>> would expect that if such a functionality is desired, it
> >> should be
> >>>>>> made
> >>>>>>>>> available for all unbounded sources (for example, Pulsar and
> >>>>>> Kinesis). If
> >>>>>>>>> it's only available for Kafka, I see it as if we're increasing
> >>>> feature
> >>>>>>>>> sparsity while we actually want to decrease that. What do you
> >>>> think?
> >>>>>>>>>
> >>>>>>>>> Best regards,
> >>>>>>>>>
> >>>>>>>>> Martijn
> >>>>>>>>>
> >>>>>>>>> On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com>
> >>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> We created FLIP-208: Update KafkaSource to detect EOF based
> >> on
> >>>>>>>>>> de-serialized records. Please find the KIP wiki in the link
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records
> >>>>>>>>>> .
> >>>>>>>>>>
> >>>>>>>>>> This FLIP aims to address the use-case where users need to
> >> stop
> >>>> a
> >>>>>> Flink
> >>>>>>>>> job
> >>>>>>>>>> gracefully based on the content of de-serialized records
> >>>> observed
> >>>>>> in the
> >>>>>>>>>> KafkaSource. This feature is needed by users who currently
> >>>> depend on
> >>>>>>>>>> KafkaDeserializationSchema::isEndOfStream() to migrate their
> >>>> Flink
> >>>>>> job
> >>>>>>>>> from
> >>>>>>>>>> FlinkKafkaConsumer to KafkaSource.
> >>>>>>>>>>
> >>>>>>>>>> Could you help review this FLIP when you get time? Your
> >>>> comments are
> >>>>>>>>>> appreciated!
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Dong
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
>
>

Reply via email to