Hi Fabian, Thanks for the comments. Please see my reply inline.
On Tue, Jan 11, 2022 at 11:46 PM Fabian Paul <fp...@apache.org> wrote: > Hi Dong, > > I wouldn't change the org.apache.flink.api.connector.source.Source > interface because it either breaks existing sinks or we introduce it > as some kind of optional. I deem both options as not great. My idea is > to introduce a new interface that extends the Source. This way users > who want to develop a source that stops with the record evaluator can > implement the new interface. It also has the nice benefit that we can > give this new type of source a lower stability guarantee than Public > to allow some changes. > Currently the eofRecodEvaluator can be passed from KafkaSourceBuilder/PulsarSourceBuilder to SingleThreadMultiplexSourceReaderBase and SourceReaderBase. This approach also allows developers who want to develop a source that stops with the record evaluator to implement the new feature. Adding a new interface could increase the complexity in our interface and infrastructure. I am not sure if it has added benefits compared to the existing proposal. Could you explain more? I am not very sure what "new type of source a lower stability guarantee" you are referring to. Could you explain more? It looks like a new feature not mentioned in the FLIP. If the changes proposed in this FLIP also support the feature you have in mind, could we discuss this in a separate FLIP? In the SourceOperatorFactory we can then access the record evaluator > from the respective sources and pass it to the source operator. > > Hopefully, this makes sense. So far I did not find information about > the actual stopping logic in the FLIP maybe you had something > different in mind. > By "actual stopping logic", do you mean an example implementation of the RecordEvalutor? I think the use-case is described in the motivation section, which is about a pipeline processing stock transaction data. We can support this use-case with this FLIP, by implementing this RecordEvaluator that stops reading data from a split when there is a message that says "EOF". Users can trigger this feature by sending messages with "EOF" in the payload to all partitions of the source Kafka topic. Does this make sense? > > Best, > Fabian > > On Tue, Jan 11, 2022 at 1:40 AM Dong Lin <lindon...@gmail.com> wrote: > > > > Hi Fabian, > > > > Thanks for the comments! > > > > By "add a source mixin interface", are you suggesting to update > > the org.apache.flink.api.connector.source.Source interface to add the API > > "RecordEvaluator<T> getRecordEvaluator()"? If so, it seems to add more > > public API and thus more complexity than the solution in the FLIP. Could > > you help explain more about the benefits of doing this? > > > > Regarding the 2nd question, I think this FLIP does not change whether > > sources are treated as bounded or unbounded. For example, the > KafkaSource's > > boundedness will continue to be determined with the API > > KafkaSourceBuilder::setBounded(..) and > > KafkaSourceBuilder::setUnbounded(..). Does this answer your question? > > > > Thanks, > > Dong > > > > > > > > > > > > > > > > > > > > On Mon, Jan 10, 2022 at 8:01 PM Fabian Paul <fp...@apache.org> wrote: > > > > > Hi Dong, > > > > > > Thank you for updating the FLIP and making it applicable for all > > > sources. I am a bit unsure about the implementation part. I would > > > propose to add a source mixin interface that implements > > > `getRecordEvaluator` and sources that want to allow dynamically > > > stopping implement that interface. > > > > > > Another question I had was how do we treat sources using the record > > > evaluator as bounded or unbounded? > > > > > > Best, > > > Fabian > > > > > > On Sat, Jan 8, 2022 at 11:52 AM Dong Lin <lindon...@gmail.com> wrote: > > > > > > > > Hi Martijn and Qingsheng, > > > > > > > > The FLIP has been updated to extend the dynamic EOF support for the > > > > PulsarSource. I have not extended this feature to other sources yet > > > since I > > > > am not sure it is a requirement to ensure feature consistency across > > > > different sources. Could you take another look? > > > > > > > > Thanks, > > > > Dong > > > > > > > > On Fri, Jan 7, 2022 at 11:49 PM Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > Hi Martijn, > > > > > > > > > > Thanks for the comments! In general I agree we should avoid feature > > > > > sparsity. > > > > > > > > > > In this particular case, connectors are a bit different than most > other > > > > > features in Flink. AFAIK, we plan to move connectors (including > Kafka > > > and > > > > > Pulsar) out of the Flink project in the future, which means that > the > > > > > development of these connectors will be mostly de-centralized > (outside > > > of > > > > > Flink) and be up to their respective maintainers. While I agree > that we > > > > > should provide API/infrastructure in Flink (as this FLIP does) to > > > support > > > > > feature consistency across connectors, I am not sure we should own > the > > > > > responsibility to actually update all connectors to achieve feature > > > > > consistency, given that we don't plan to do it in Flink anyway due > to > > > its > > > > > heavy burden. > > > > > > > > > > With that being said, I am happy to follow the community guideline > if > > > we > > > > > decide that connector-related FLIP should update every connector's > API > > > to > > > > > ensure feature consistency (to a reasonable extent). For example, > in > > > this > > > > > particular case, it looks like the EOF-detection feature can be > > > applied to > > > > > every connector (including bounded sources). Is it still > sufficient to > > > just > > > > > update Kafka, Pulsar and Kinesis? > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > On Tue, Jan 4, 2022 at 3:31 PM Martijn Visser < > mart...@ververica.com> > > > > > wrote: > > > > > > > > > >> Hi Dong, > > > > >> > > > > >> Thanks for writing the FLIP. It focusses only on the KafkaSource, > but > > > I > > > > >> would expect that if such a functionality is desired, it should be > > > made > > > > >> available for all unbounded sources (for example, Pulsar and > > > Kinesis). If > > > > >> it's only available for Kafka, I see it as if we're increasing > feature > > > > >> sparsity while we actually want to decrease that. What do you > think? > > > > >> > > > > >> Best regards, > > > > >> > > > > >> Martijn > > > > >> > > > > >> On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com> > wrote: > > > > >> > > > > >> > Hi all, > > > > >> > > > > > >> > We created FLIP-208: Update KafkaSource to detect EOF based on > > > > >> > de-serialized records. Please find the KIP wiki in the link > > > > >> > > > > > >> > > > > > >> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records > > > > >> > . > > > > >> > > > > > >> > This FLIP aims to address the use-case where users need to stop > a > > > Flink > > > > >> job > > > > >> > gracefully based on the content of de-serialized records > observed > > > in the > > > > >> > KafkaSource. This feature is needed by users who currently > depend on > > > > >> > KafkaDeserializationSchema::isEndOfStream() to migrate their > Flink > > > job > > > > >> from > > > > >> > FlinkKafkaConsumer to KafkaSource. > > > > >> > > > > > >> > Could you help review this FLIP when you get time? Your > comments are > > > > >> > appreciated! > > > > >> > > > > > >> > Cheers, > > > > >> > Dong > > > > >> > > > > > >> > > > > > > > > >