Hi everyone,

Thanks for all the comments. If there are no further comments, we plan to
start the voting thread on 1/24.

Cheers,
Dong

On Fri, Jan 14, 2022 at 8:26 AM Dong Lin <lindon...@gmail.com> wrote:

> Hi Qingsheng,
>
> Thank you for extending this FLIP to support this feature with Table/SQL.
> I also prefer the 2nd option over the 1st option. This is because the 1st
> option requires the user to additionally specify an identifier in addition
> to defining the RecordEvaluator themselves. Note that this is unlike
> InputFormat, where most users will use off-the-shelf de-serializers in
> Flink instead of implementing de-serializers by themselves.
>
> Could you help update FLIP-208 with the proposed changes as well as the
> explanation of the rejected alternative?
>
> Thanks,
> Dong
>
>
>
> On Thu, Jan 13, 2022 at 5:18 PM Qingsheng Ren <renqs...@gmail.com> wrote:
>
>> Thanks Dong for the explanation!
>>
>> I agree with Dong’s idea of keeping the consistency of APIs for setting
>> configurations, so I think it’s acceptable for me to pass the record
>> evaluator from XXXSourceBuilder and embed it into SourceReaderBase. Also
>> considering current usage of the DeserializationSchema#isEndOfStream that
>> only Kafka source respect this interface, it’s OK to implement just Kafka
>> and Pulsar connectors for now.
>>
>> Another thing I’d like to mention is about using this feature in
>> Table/SQL API. Currently I have two kinds of implementations in my mind:
>>
>> 1. Similar to format / deserializer, we introduce a factory for
>> RecordEvaluator, and users need to specify the factory identifier in table
>> options:
>>
>> CREATE TABLE `kafka` (…) WITH (
>>     `connector` = `kafka`,
>>     `record.evaluator` =  `my-evaluator-factory-identifier`
>> )
>>
>> 2. Directly use full class path in table options:
>>
>> CREATE TABLE `kafka` (…) WITH (
>>     `connector` = `kafka`,
>>     `record.evaluator.class` =  `com.mycompany.evaluator.MyEvaluator`
>> )
>>
>> Personally I prefer the second one, because it’s easier for users to
>> implement their own RecordEvaluators.
>>
>> What do you think?
>>
>>
>> > On Jan 13, 2022, at 11:39 AM, Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > Hi Fabian,
>> >
>> > Thank you for the explanation.
>> >
>> > The current approach needs to add new constructors for SourceReaderBase
>> > and SingleThreadMultiplexSourceReaderBase. This proposed change has now
>> > been included in the Public Interfaces section in the FLIP.
>> >
>> > And yes, I also agree it would be preferred if developers do not have to
>> > change their SourceReaders to implement this new logic. The suggestion
>> > mentioned by Qingshen in this thread could achieve this goal. Qingshen's
>> > idea is to let user specify eofRecordEvaluator via
>> > StreamExecutionEnvironment::fromSource(...).withEofRecordEvaluator(...)
>> and
>> > pass this evaluator through DataStreamSource to SourceOperator. And
>> > SourceOperator::emitNext(...) could use this evaluator as appropriate.
>> >
>> > For now I have chosen not to use this approach because this approach
>> > requires users to pass some source configuration via
>> > StreamExecutionEnvironment::fromSource(...) and some other source
>> > configuration via e.g. KafkaSourceBuilder(...). This might create a
>> sense
>> > of inconsistency/confusion. Given that the number of connector users are
>> > much more than the number of connector developers, I believe it is
>> probably
>> > better to optimize the user experience in this case.
>> >
>> > The description of this alternative approach and its pros/cons has been
>> > included in the FLIP.
>> >
>> > And yep, I think I understand your suggestion. Indeed those connector
>> > configs (e.g. de-serializer, boundedness, eofRecordEvaluator) can be
>> passed
>> > from XXXSourceBuilder to their shared infra (e.g. SourceReaderBase).
>> Both
>> > solutions work. Given that the existing configs (e.g. serializer) are
>> > already passed to SourceReaderBase via the constructor parameter, I
>> guess
>> > it is simpler to follow the existing pattern for now.
>> >
>> > Regards,
>> > Dong
>> >
>> >
>> > On Wed, Jan 12, 2022 at 11:17 PM Fabian Paul <fp...@apache.org> wrote:
>> >
>> >> Hi Dong,
>> >>
>> >> I think I am beginning to understand your idea. Since SourceReaderBase
>> >> is marked as PublicEvolving can you also update the FLIP with the
>> >> changes you want to make to it? Ideally, connector developers do not
>> >> have to change their SourceReaders to implement this new logic.
>> >>
>> >> My idea was to introduce a second source interface that extends the
>> >> existing interface and offers only the method getRecordEvaluator().
>> >> The record evaluator is still passed as you have described through the
>> >> builder and at the end held by the source object. This way the source
>> >> framework can automatically use the evaluator without the need that
>> >> connector developers have to implement the complicated stopping logic
>> >> or change their SourceReaders.
>> >>
>> >> Best,
>> >> Fabian
>> >>
>> >>
>> >> On Wed, Jan 12, 2022 at 2:22 AM Dong Lin <lindon...@gmail.com> wrote:
>> >>>
>> >>> Hi Fabian,
>> >>>
>> >>> Thanks for the comments. Please see my reply inline.
>> >>>
>> >>> On Tue, Jan 11, 2022 at 11:46 PM Fabian Paul <fp...@apache.org>
>> wrote:
>> >>>
>> >>>> Hi Dong,
>> >>>>
>> >>>> I wouldn't change the org.apache.flink.api.connector.source.Source
>> >>>> interface because it either breaks existing sinks or we introduce it
>> >>>> as some kind of optional. I deem both options as not great. My idea
>> is
>> >>>> to introduce a new interface that extends the Source. This way users
>> >>>> who want to develop a source that stops with the record evaluator can
>> >>>> implement the new interface. It also has the nice benefit that we can
>> >>>> give this new type of source a lower stability guarantee than Public
>> >>>> to allow some changes.
>> >>>>
>> >>>
>> >>> Currently the eofRecodEvaluator can be passed from
>> >>> KafkaSourceBuilder/PulsarSourceBuilder
>> >>> to SingleThreadMultiplexSourceReaderBase and SourceReaderBase. This
>> >>> approach also allows developers who want to develop a source that
>> stops
>> >>> with the record evaluator to implement the new feature. Adding a new
>> >>> interface could increase the complexity in our interface and
>> >>> infrastructure. I am not sure if it has added benefits compared to the
>> >>> existing proposal. Could you explain more?
>> >>>
>> >>> I am not very sure what "new type of source a lower stability
>> guarantee"
>> >>> you are referring to. Could you explain more? It looks like a new
>> feature
>> >>> not mentioned in the FLIP. If the changes proposed in this FLIP also
>> >>> support the feature you have in mind, could we discuss this in a
>> separate
>> >>> FLIP?
>> >>>
>> >>> In the SourceOperatorFactory we can then access the record evaluator
>> >>>> from the respective sources and pass it to the source operator.
>> >>>>
>> >>>> Hopefully, this makes sense. So far I did not find information about
>> >>>> the actual stopping logic in the FLIP maybe you had something
>> >>>> different in mind.
>> >>>>
>> >>>
>> >>> By "actual stopping logic", do you mean an example implementation of
>> the
>> >>> RecordEvalutor? I think the use-case is described in the motivation
>> >>> section, which is about a pipeline processing stock transaction data.
>> >>>
>> >>> We can support this use-case with this FLIP, by implementing this
>> >>> RecordEvaluator that stops reading data from a split when there is a
>> >>> message that says "EOF". Users can trigger this feature by sending
>> >> messages
>> >>> with "EOF" in the payload to all partitions of the source Kafka topic.
>> >>>
>> >>> Does this make sense?
>> >>>
>> >>>
>> >>>>
>> >>>> Best,
>> >>>> Fabian
>> >>>>
>> >>>> On Tue, Jan 11, 2022 at 1:40 AM Dong Lin <lindon...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>> Hi Fabian,
>> >>>>>
>> >>>>> Thanks for the comments!
>> >>>>>
>> >>>>> By "add a source mixin interface", are you suggesting to update
>> >>>>> the org.apache.flink.api.connector.source.Source interface to add
>> >> the API
>> >>>>> "RecordEvaluator<T> getRecordEvaluator()"? If so, it seems to add
>> >> more
>> >>>>> public API and thus more complexity than the solution in the FLIP.
>> >> Could
>> >>>>> you help explain more about the benefits of doing this?
>> >>>>>
>> >>>>> Regarding the 2nd question, I think this FLIP does not change
>> whether
>> >>>>> sources are treated as bounded or unbounded. For example, the
>> >>>> KafkaSource's
>> >>>>> boundedness will continue to be determined with the API
>> >>>>> KafkaSourceBuilder::setBounded(..) and
>> >>>>> KafkaSourceBuilder::setUnbounded(..). Does this answer your
>> question?
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Dong
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On Mon, Jan 10, 2022 at 8:01 PM Fabian Paul <fp...@apache.org>
>> >> wrote:
>> >>>>>
>> >>>>>> Hi Dong,
>> >>>>>>
>> >>>>>> Thank you for updating the FLIP and making it applicable for all
>> >>>>>> sources. I am a bit unsure about the implementation part. I would
>> >>>>>> propose to add a source mixin interface that implements
>> >>>>>> `getRecordEvaluator` and sources that want to allow dynamically
>> >>>>>> stopping implement that interface.
>> >>>>>>
>> >>>>>> Another question I had was how do we treat sources using the record
>> >>>>>> evaluator as bounded or unbounded?
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Fabian
>> >>>>>>
>> >>>>>> On Sat, Jan 8, 2022 at 11:52 AM Dong Lin <lindon...@gmail.com>
>> >> wrote:
>> >>>>>>>
>> >>>>>>> Hi Martijn and Qingsheng,
>> >>>>>>>
>> >>>>>>> The FLIP has been updated to extend the dynamic EOF support for
>> >> the
>> >>>>>>> PulsarSource. I have not extended this feature to other sources
>> >> yet
>> >>>>>> since I
>> >>>>>>> am not sure it is a requirement to ensure feature consistency
>> >> across
>> >>>>>>> different sources. Could you take another look?
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Dong
>> >>>>>>>
>> >>>>>>> On Fri, Jan 7, 2022 at 11:49 PM Dong Lin <lindon...@gmail.com>
>> >>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi Martijn,
>> >>>>>>>>
>> >>>>>>>> Thanks for the comments! In general I agree we should avoid
>> >> feature
>> >>>>>>>> sparsity.
>> >>>>>>>>
>> >>>>>>>> In this particular case, connectors are a bit different than
>> >> most
>> >>>> other
>> >>>>>>>> features in Flink. AFAIK, we plan to move connectors (including
>> >>>> Kafka
>> >>>>>> and
>> >>>>>>>> Pulsar) out of the Flink project in the future, which means
>> >> that
>> >>>> the
>> >>>>>>>> development of these connectors will be mostly de-centralized
>> >>>> (outside
>> >>>>>> of
>> >>>>>>>> Flink) and be up to their respective maintainers. While I agree
>> >>>> that we
>> >>>>>>>> should provide API/infrastructure in Flink (as this FLIP does)
>> >> to
>> >>>>>> support
>> >>>>>>>> feature consistency across connectors, I am not sure we should
>> >> own
>> >>>> the
>> >>>>>>>> responsibility to actually update all connectors to achieve
>> >> feature
>> >>>>>>>> consistency, given that we don't plan to do it in Flink anyway
>> >> due
>> >>>> to
>> >>>>>> its
>> >>>>>>>> heavy burden.
>> >>>>>>>>
>> >>>>>>>> With that being said, I am happy to follow the community
>> >> guideline
>> >>>> if
>> >>>>>> we
>> >>>>>>>> decide that connector-related FLIP should update every
>> >> connector's
>> >>>> API
>> >>>>>> to
>> >>>>>>>> ensure feature consistency (to a reasonable extent). For
>> >> example,
>> >>>> in
>> >>>>>> this
>> >>>>>>>> particular case, it looks like the EOF-detection feature can be
>> >>>>>> applied to
>> >>>>>>>> every connector (including bounded sources). Is it still
>> >>>> sufficient to
>> >>>>>> just
>> >>>>>>>> update Kafka, Pulsar and Kinesis?
>> >>>>>>>>
>> >>>>>>>> Thanks,
>> >>>>>>>> Dong
>> >>>>>>>>
>> >>>>>>>> On Tue, Jan 4, 2022 at 3:31 PM Martijn Visser <
>> >>>> mart...@ververica.com>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi Dong,
>> >>>>>>>>>
>> >>>>>>>>> Thanks for writing the FLIP. It focusses only on the
>> >> KafkaSource,
>> >>>> but
>> >>>>>> I
>> >>>>>>>>> would expect that if such a functionality is desired, it
>> >> should be
>> >>>>>> made
>> >>>>>>>>> available for all unbounded sources (for example, Pulsar and
>> >>>>>> Kinesis). If
>> >>>>>>>>> it's only available for Kafka, I see it as if we're increasing
>> >>>> feature
>> >>>>>>>>> sparsity while we actually want to decrease that. What do you
>> >>>> think?
>> >>>>>>>>>
>> >>>>>>>>> Best regards,
>> >>>>>>>>>
>> >>>>>>>>> Martijn
>> >>>>>>>>>
>> >>>>>>>>> On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com>
>> >>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Hi all,
>> >>>>>>>>>>
>> >>>>>>>>>> We created FLIP-208: Update KafkaSource to detect EOF based
>> >> on
>> >>>>>>>>>> de-serialized records. Please find the KIP wiki in the link
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>
>> >>>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records
>> >>>>>>>>>> .
>> >>>>>>>>>>
>> >>>>>>>>>> This FLIP aims to address the use-case where users need to
>> >> stop
>> >>>> a
>> >>>>>> Flink
>> >>>>>>>>> job
>> >>>>>>>>>> gracefully based on the content of de-serialized records
>> >>>> observed
>> >>>>>> in the
>> >>>>>>>>>> KafkaSource. This feature is needed by users who currently
>> >>>> depend on
>> >>>>>>>>>> KafkaDeserializationSchema::isEndOfStream() to migrate their
>> >>>> Flink
>> >>>>>> job
>> >>>>>>>>> from
>> >>>>>>>>>> FlinkKafkaConsumer to KafkaSource.
>> >>>>>>>>>>
>> >>>>>>>>>> Could you help review this FLIP when you get time? Your
>> >>>> comments are
>> >>>>>>>>>> appreciated!
>> >>>>>>>>>>
>> >>>>>>>>>> Cheers,
>> >>>>>>>>>> Dong
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>
>> >>
>>
>>

Reply via email to