Hi Dong, I wouldn't change the org.apache.flink.api.connector.source.Source interface because it either breaks existing sinks or we introduce it as some kind of optional. I deem both options as not great. My idea is to introduce a new interface that extends the Source. This way users who want to develop a source that stops with the record evaluator can implement the new interface. It also has the nice benefit that we can give this new type of source a lower stability guarantee than Public to allow some changes. In the SourceOperatorFactory we can then access the record evaluator from the respective sources and pass it to the source operator.
Hopefully, this makes sense. So far I did not find information about the actual stopping logic in the FLIP maybe you had something different in mind. Best, Fabian On Tue, Jan 11, 2022 at 1:40 AM Dong Lin <lindon...@gmail.com> wrote: > > Hi Fabian, > > Thanks for the comments! > > By "add a source mixin interface", are you suggesting to update > the org.apache.flink.api.connector.source.Source interface to add the API > "RecordEvaluator<T> getRecordEvaluator()"? If so, it seems to add more > public API and thus more complexity than the solution in the FLIP. Could > you help explain more about the benefits of doing this? > > Regarding the 2nd question, I think this FLIP does not change whether > sources are treated as bounded or unbounded. For example, the KafkaSource's > boundedness will continue to be determined with the API > KafkaSourceBuilder::setBounded(..) and > KafkaSourceBuilder::setUnbounded(..). Does this answer your question? > > Thanks, > Dong > > > > > > > > > > On Mon, Jan 10, 2022 at 8:01 PM Fabian Paul <fp...@apache.org> wrote: > > > Hi Dong, > > > > Thank you for updating the FLIP and making it applicable for all > > sources. I am a bit unsure about the implementation part. I would > > propose to add a source mixin interface that implements > > `getRecordEvaluator` and sources that want to allow dynamically > > stopping implement that interface. > > > > Another question I had was how do we treat sources using the record > > evaluator as bounded or unbounded? > > > > Best, > > Fabian > > > > On Sat, Jan 8, 2022 at 11:52 AM Dong Lin <lindon...@gmail.com> wrote: > > > > > > Hi Martijn and Qingsheng, > > > > > > The FLIP has been updated to extend the dynamic EOF support for the > > > PulsarSource. I have not extended this feature to other sources yet > > since I > > > am not sure it is a requirement to ensure feature consistency across > > > different sources. Could you take another look? > > > > > > Thanks, > > > Dong > > > > > > On Fri, Jan 7, 2022 at 11:49 PM Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Hi Martijn, > > > > > > > > Thanks for the comments! In general I agree we should avoid feature > > > > sparsity. > > > > > > > > In this particular case, connectors are a bit different than most other > > > > features in Flink. AFAIK, we plan to move connectors (including Kafka > > and > > > > Pulsar) out of the Flink project in the future, which means that the > > > > development of these connectors will be mostly de-centralized (outside > > of > > > > Flink) and be up to their respective maintainers. While I agree that we > > > > should provide API/infrastructure in Flink (as this FLIP does) to > > support > > > > feature consistency across connectors, I am not sure we should own the > > > > responsibility to actually update all connectors to achieve feature > > > > consistency, given that we don't plan to do it in Flink anyway due to > > its > > > > heavy burden. > > > > > > > > With that being said, I am happy to follow the community guideline if > > we > > > > decide that connector-related FLIP should update every connector's API > > to > > > > ensure feature consistency (to a reasonable extent). For example, in > > this > > > > particular case, it looks like the EOF-detection feature can be > > applied to > > > > every connector (including bounded sources). Is it still sufficient to > > just > > > > update Kafka, Pulsar and Kinesis? > > > > > > > > Thanks, > > > > Dong > > > > > > > > On Tue, Jan 4, 2022 at 3:31 PM Martijn Visser <mart...@ververica.com> > > > > wrote: > > > > > > > >> Hi Dong, > > > >> > > > >> Thanks for writing the FLIP. It focusses only on the KafkaSource, but > > I > > > >> would expect that if such a functionality is desired, it should be > > made > > > >> available for all unbounded sources (for example, Pulsar and > > Kinesis). If > > > >> it's only available for Kafka, I see it as if we're increasing feature > > > >> sparsity while we actually want to decrease that. What do you think? > > > >> > > > >> Best regards, > > > >> > > > >> Martijn > > > >> > > > >> On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com> wrote: > > > >> > > > >> > Hi all, > > > >> > > > > >> > We created FLIP-208: Update KafkaSource to detect EOF based on > > > >> > de-serialized records. Please find the KIP wiki in the link > > > >> > > > > >> > > > > >> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records > > > >> > . > > > >> > > > > >> > This FLIP aims to address the use-case where users need to stop a > > Flink > > > >> job > > > >> > gracefully based on the content of de-serialized records observed > > in the > > > >> > KafkaSource. This feature is needed by users who currently depend on > > > >> > KafkaDeserializationSchema::isEndOfStream() to migrate their Flink > > job > > > >> from > > > >> > FlinkKafkaConsumer to KafkaSource. > > > >> > > > > >> > Could you help review this FLIP when you get time? Your comments are > > > >> > appreciated! > > > >> > > > > >> > Cheers, > > > >> > Dong > > > >> > > > > >> > > > > > >