Hi everyone, Thanks for all the comments. If there are no further comments, we plan to start the voting thread on 1/24.
Cheers, Dong On Fri, Jan 14, 2022 at 8:26 AM Dong Lin <lindon...@gmail.com> wrote: > Hi Qingsheng, > > Thank you for extending this FLIP to support this feature with Table/SQL. > I also prefer the 2nd option over the 1st option. This is because the 1st > option requires the user to additionally specify an identifier in addition > to defining the RecordEvaluator themselves. Note that this is unlike > InputFormat, where most users will use off-the-shelf de-serializers in > Flink instead of implementing de-serializers by themselves. > > Could you help update FLIP-208 with the proposed changes as well as the > explanation of the rejected alternative? > > Thanks, > Dong > > > > On Thu, Jan 13, 2022 at 5:18 PM Qingsheng Ren <renqs...@gmail.com> wrote: > >> Thanks Dong for the explanation! >> >> I agree with Dong’s idea of keeping the consistency of APIs for setting >> configurations, so I think it’s acceptable for me to pass the record >> evaluator from XXXSourceBuilder and embed it into SourceReaderBase. Also >> considering current usage of the DeserializationSchema#isEndOfStream that >> only Kafka source respect this interface, it’s OK to implement just Kafka >> and Pulsar connectors for now. >> >> Another thing I’d like to mention is about using this feature in >> Table/SQL API. Currently I have two kinds of implementations in my mind: >> >> 1. Similar to format / deserializer, we introduce a factory for >> RecordEvaluator, and users need to specify the factory identifier in table >> options: >> >> CREATE TABLE `kafka` (…) WITH ( >> `connector` = `kafka`, >> `record.evaluator` = `my-evaluator-factory-identifier` >> ) >> >> 2. Directly use full class path in table options: >> >> CREATE TABLE `kafka` (…) WITH ( >> `connector` = `kafka`, >> `record.evaluator.class` = `com.mycompany.evaluator.MyEvaluator` >> ) >> >> Personally I prefer the second one, because it’s easier for users to >> implement their own RecordEvaluators. >> >> What do you think? >> >> >> > On Jan 13, 2022, at 11:39 AM, Dong Lin <lindon...@gmail.com> wrote: >> > >> > Hi Fabian, >> > >> > Thank you for the explanation. >> > >> > The current approach needs to add new constructors for SourceReaderBase >> > and SingleThreadMultiplexSourceReaderBase. This proposed change has now >> > been included in the Public Interfaces section in the FLIP. >> > >> > And yes, I also agree it would be preferred if developers do not have to >> > change their SourceReaders to implement this new logic. The suggestion >> > mentioned by Qingshen in this thread could achieve this goal. Qingshen's >> > idea is to let user specify eofRecordEvaluator via >> > StreamExecutionEnvironment::fromSource(...).withEofRecordEvaluator(...) >> and >> > pass this evaluator through DataStreamSource to SourceOperator. And >> > SourceOperator::emitNext(...) could use this evaluator as appropriate. >> > >> > For now I have chosen not to use this approach because this approach >> > requires users to pass some source configuration via >> > StreamExecutionEnvironment::fromSource(...) and some other source >> > configuration via e.g. KafkaSourceBuilder(...). This might create a >> sense >> > of inconsistency/confusion. Given that the number of connector users are >> > much more than the number of connector developers, I believe it is >> probably >> > better to optimize the user experience in this case. >> > >> > The description of this alternative approach and its pros/cons has been >> > included in the FLIP. >> > >> > And yep, I think I understand your suggestion. Indeed those connector >> > configs (e.g. de-serializer, boundedness, eofRecordEvaluator) can be >> passed >> > from XXXSourceBuilder to their shared infra (e.g. SourceReaderBase). >> Both >> > solutions work. Given that the existing configs (e.g. serializer) are >> > already passed to SourceReaderBase via the constructor parameter, I >> guess >> > it is simpler to follow the existing pattern for now. >> > >> > Regards, >> > Dong >> > >> > >> > On Wed, Jan 12, 2022 at 11:17 PM Fabian Paul <fp...@apache.org> wrote: >> > >> >> Hi Dong, >> >> >> >> I think I am beginning to understand your idea. Since SourceReaderBase >> >> is marked as PublicEvolving can you also update the FLIP with the >> >> changes you want to make to it? Ideally, connector developers do not >> >> have to change their SourceReaders to implement this new logic. >> >> >> >> My idea was to introduce a second source interface that extends the >> >> existing interface and offers only the method getRecordEvaluator(). >> >> The record evaluator is still passed as you have described through the >> >> builder and at the end held by the source object. This way the source >> >> framework can automatically use the evaluator without the need that >> >> connector developers have to implement the complicated stopping logic >> >> or change their SourceReaders. >> >> >> >> Best, >> >> Fabian >> >> >> >> >> >> On Wed, Jan 12, 2022 at 2:22 AM Dong Lin <lindon...@gmail.com> wrote: >> >>> >> >>> Hi Fabian, >> >>> >> >>> Thanks for the comments. Please see my reply inline. >> >>> >> >>> On Tue, Jan 11, 2022 at 11:46 PM Fabian Paul <fp...@apache.org> >> wrote: >> >>> >> >>>> Hi Dong, >> >>>> >> >>>> I wouldn't change the org.apache.flink.api.connector.source.Source >> >>>> interface because it either breaks existing sinks or we introduce it >> >>>> as some kind of optional. I deem both options as not great. My idea >> is >> >>>> to introduce a new interface that extends the Source. This way users >> >>>> who want to develop a source that stops with the record evaluator can >> >>>> implement the new interface. It also has the nice benefit that we can >> >>>> give this new type of source a lower stability guarantee than Public >> >>>> to allow some changes. >> >>>> >> >>> >> >>> Currently the eofRecodEvaluator can be passed from >> >>> KafkaSourceBuilder/PulsarSourceBuilder >> >>> to SingleThreadMultiplexSourceReaderBase and SourceReaderBase. This >> >>> approach also allows developers who want to develop a source that >> stops >> >>> with the record evaluator to implement the new feature. Adding a new >> >>> interface could increase the complexity in our interface and >> >>> infrastructure. I am not sure if it has added benefits compared to the >> >>> existing proposal. Could you explain more? >> >>> >> >>> I am not very sure what "new type of source a lower stability >> guarantee" >> >>> you are referring to. Could you explain more? It looks like a new >> feature >> >>> not mentioned in the FLIP. If the changes proposed in this FLIP also >> >>> support the feature you have in mind, could we discuss this in a >> separate >> >>> FLIP? >> >>> >> >>> In the SourceOperatorFactory we can then access the record evaluator >> >>>> from the respective sources and pass it to the source operator. >> >>>> >> >>>> Hopefully, this makes sense. So far I did not find information about >> >>>> the actual stopping logic in the FLIP maybe you had something >> >>>> different in mind. >> >>>> >> >>> >> >>> By "actual stopping logic", do you mean an example implementation of >> the >> >>> RecordEvalutor? I think the use-case is described in the motivation >> >>> section, which is about a pipeline processing stock transaction data. >> >>> >> >>> We can support this use-case with this FLIP, by implementing this >> >>> RecordEvaluator that stops reading data from a split when there is a >> >>> message that says "EOF". Users can trigger this feature by sending >> >> messages >> >>> with "EOF" in the payload to all partitions of the source Kafka topic. >> >>> >> >>> Does this make sense? >> >>> >> >>> >> >>>> >> >>>> Best, >> >>>> Fabian >> >>>> >> >>>> On Tue, Jan 11, 2022 at 1:40 AM Dong Lin <lindon...@gmail.com> >> wrote: >> >>>>> >> >>>>> Hi Fabian, >> >>>>> >> >>>>> Thanks for the comments! >> >>>>> >> >>>>> By "add a source mixin interface", are you suggesting to update >> >>>>> the org.apache.flink.api.connector.source.Source interface to add >> >> the API >> >>>>> "RecordEvaluator<T> getRecordEvaluator()"? If so, it seems to add >> >> more >> >>>>> public API and thus more complexity than the solution in the FLIP. >> >> Could >> >>>>> you help explain more about the benefits of doing this? >> >>>>> >> >>>>> Regarding the 2nd question, I think this FLIP does not change >> whether >> >>>>> sources are treated as bounded or unbounded. For example, the >> >>>> KafkaSource's >> >>>>> boundedness will continue to be determined with the API >> >>>>> KafkaSourceBuilder::setBounded(..) and >> >>>>> KafkaSourceBuilder::setUnbounded(..). Does this answer your >> question? >> >>>>> >> >>>>> Thanks, >> >>>>> Dong >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> On Mon, Jan 10, 2022 at 8:01 PM Fabian Paul <fp...@apache.org> >> >> wrote: >> >>>>> >> >>>>>> Hi Dong, >> >>>>>> >> >>>>>> Thank you for updating the FLIP and making it applicable for all >> >>>>>> sources. I am a bit unsure about the implementation part. I would >> >>>>>> propose to add a source mixin interface that implements >> >>>>>> `getRecordEvaluator` and sources that want to allow dynamically >> >>>>>> stopping implement that interface. >> >>>>>> >> >>>>>> Another question I had was how do we treat sources using the record >> >>>>>> evaluator as bounded or unbounded? >> >>>>>> >> >>>>>> Best, >> >>>>>> Fabian >> >>>>>> >> >>>>>> On Sat, Jan 8, 2022 at 11:52 AM Dong Lin <lindon...@gmail.com> >> >> wrote: >> >>>>>>> >> >>>>>>> Hi Martijn and Qingsheng, >> >>>>>>> >> >>>>>>> The FLIP has been updated to extend the dynamic EOF support for >> >> the >> >>>>>>> PulsarSource. I have not extended this feature to other sources >> >> yet >> >>>>>> since I >> >>>>>>> am not sure it is a requirement to ensure feature consistency >> >> across >> >>>>>>> different sources. Could you take another look? >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> Dong >> >>>>>>> >> >>>>>>> On Fri, Jan 7, 2022 at 11:49 PM Dong Lin <lindon...@gmail.com> >> >>>> wrote: >> >>>>>>> >> >>>>>>>> Hi Martijn, >> >>>>>>>> >> >>>>>>>> Thanks for the comments! In general I agree we should avoid >> >> feature >> >>>>>>>> sparsity. >> >>>>>>>> >> >>>>>>>> In this particular case, connectors are a bit different than >> >> most >> >>>> other >> >>>>>>>> features in Flink. AFAIK, we plan to move connectors (including >> >>>> Kafka >> >>>>>> and >> >>>>>>>> Pulsar) out of the Flink project in the future, which means >> >> that >> >>>> the >> >>>>>>>> development of these connectors will be mostly de-centralized >> >>>> (outside >> >>>>>> of >> >>>>>>>> Flink) and be up to their respective maintainers. While I agree >> >>>> that we >> >>>>>>>> should provide API/infrastructure in Flink (as this FLIP does) >> >> to >> >>>>>> support >> >>>>>>>> feature consistency across connectors, I am not sure we should >> >> own >> >>>> the >> >>>>>>>> responsibility to actually update all connectors to achieve >> >> feature >> >>>>>>>> consistency, given that we don't plan to do it in Flink anyway >> >> due >> >>>> to >> >>>>>> its >> >>>>>>>> heavy burden. >> >>>>>>>> >> >>>>>>>> With that being said, I am happy to follow the community >> >> guideline >> >>>> if >> >>>>>> we >> >>>>>>>> decide that connector-related FLIP should update every >> >> connector's >> >>>> API >> >>>>>> to >> >>>>>>>> ensure feature consistency (to a reasonable extent). For >> >> example, >> >>>> in >> >>>>>> this >> >>>>>>>> particular case, it looks like the EOF-detection feature can be >> >>>>>> applied to >> >>>>>>>> every connector (including bounded sources). Is it still >> >>>> sufficient to >> >>>>>> just >> >>>>>>>> update Kafka, Pulsar and Kinesis? >> >>>>>>>> >> >>>>>>>> Thanks, >> >>>>>>>> Dong >> >>>>>>>> >> >>>>>>>> On Tue, Jan 4, 2022 at 3:31 PM Martijn Visser < >> >>>> mart...@ververica.com> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> Hi Dong, >> >>>>>>>>> >> >>>>>>>>> Thanks for writing the FLIP. It focusses only on the >> >> KafkaSource, >> >>>> but >> >>>>>> I >> >>>>>>>>> would expect that if such a functionality is desired, it >> >> should be >> >>>>>> made >> >>>>>>>>> available for all unbounded sources (for example, Pulsar and >> >>>>>> Kinesis). If >> >>>>>>>>> it's only available for Kafka, I see it as if we're increasing >> >>>> feature >> >>>>>>>>> sparsity while we actually want to decrease that. What do you >> >>>> think? >> >>>>>>>>> >> >>>>>>>>> Best regards, >> >>>>>>>>> >> >>>>>>>>> Martijn >> >>>>>>>>> >> >>>>>>>>> On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com> >> >>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Hi all, >> >>>>>>>>>> >> >>>>>>>>>> We created FLIP-208: Update KafkaSource to detect EOF based >> >> on >> >>>>>>>>>> de-serialized records. Please find the KIP wiki in the link >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>> >> >>>> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records >> >>>>>>>>>> . >> >>>>>>>>>> >> >>>>>>>>>> This FLIP aims to address the use-case where users need to >> >> stop >> >>>> a >> >>>>>> Flink >> >>>>>>>>> job >> >>>>>>>>>> gracefully based on the content of de-serialized records >> >>>> observed >> >>>>>> in the >> >>>>>>>>>> KafkaSource. This feature is needed by users who currently >> >>>> depend on >> >>>>>>>>>> KafkaDeserializationSchema::isEndOfStream() to migrate their >> >>>> Flink >> >>>>>> job >> >>>>>>>>> from >> >>>>>>>>>> FlinkKafkaConsumer to KafkaSource. >> >>>>>>>>>> >> >>>>>>>>>> Could you help review this FLIP when you get time? Your >> >>>> comments are >> >>>>>>>>>> appreciated! >> >>>>>>>>>> >> >>>>>>>>>> Cheers, >> >>>>>>>>>> Dong >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>> >> >> >> >>