Hi Hongshun, SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is already an interface, and we need to make that as a PublicEvolving API as well.
So overall, a source developer can potentially do a few things in the SplitFetcherManager. 1. for customized logic including split-to-fetcher assignment, threading model, etc. 2. create their own SplitFetcherTask for the SplitFetcher / SplitReader to execute in a coordinated manner. It should be powerful enough for the vast majority of the source implementation, if not all. Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue > as a > constructor parameter, which is not allowed > now. Are you referring to FetchTask which implements SplitFetcherTask? That class will remain internal. Thanks, Jiangjie (Becket) Qin On Fri, Nov 17, 2023 at 5:23 PM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi, Jiangjie(Becket) , > Thank you for your advice. I have learned a lot. > > If SplitFetcherManager becomes PublicEvolving, that also means > > SplitFetcher needs to be PublicEvolving, because it is returned by the > > protected method SplitFetcherManager.createSplitFetcher(). > > I completely agree with you. However, if SplitFetcher becomes > PublicEvolving, SplitFetcherTask also needs to be PublicEvolving > because it is returned by the public method SplitFetcher#enqueueTask. > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue > as a > constructor parameter, which is not allowed > now. Therefore, I propose changing SplitFetcher to a public Interface > and moving its implementation details to an implement class (e.g., > SplitFetcherImpl or another suitable name). SplitFetcherImpl will be > marked as internal and managed by SplitFetcherManager, > and put data in the queue. > Subclasses of SplitFetcherManager can only use the SplitFetcher interface, > also ensuring that the current subclasses are not affected. > > > > The current SplitFetcherManager basically looks up > > the SplitT from the fetcher with the split Id, and immediately passes the > > SplitT back to the fetcher, which is unnecessary. > > I inferred that this is because SplitReader#pauseOrResumeSplits > requires SplitT instead of SpiltId. Perhaps some external source > requires more information to pause. However, SplitReader doesn't store > all its split data, while SplitFetcherManager saves them. > CC, @Dawid Wysakowicz > > > > If not, SplitFetcher.pause() and > > SplitFetcher.resume() can be removed. In fact, they seem no longer used > > anywhere. > > It seems no use any more. CC, @Arvid Heise > > > > Thanks, > Hongshun Wang > > On Fri, Nov 17, 2023 at 11:42 AM Becket Qin <becket....@gmail.com> wrote: > > > Hi Hongshun, > > > > Thanks for updating the FLIP. I think that makes sense. A few comments > > below: > > > > 1. If SplitFetcherManager becomes PublicEvolving, that also means > > SplitFetcher needs to be PublicEvolving, because it is returned by the > > protected method SplitFetcherManager.createSplitFetcher(). > > > > 2. When checking the API of the classes to be marked as PublicEvolving, > > there might be a few methods' signatures worth some discussion. > > > > For SplitFetcherManager: > > a) Currently removeSplits() methods takes a list of SplitT. I am > wondering > > if it should be a list of splitIds. SplitT actually contains two parts of > > information, the static split Id and some dynamically changing state of > the > > split (e.g. Kafka consumer offset). The source of truth for the dynamic > > state is SourceReaderBase. Currently we are passing in the full source > > split with the dynamic state for split removal. But it looks like only > > split id is needed for the split removal. > > Maybe this is intentional, as sometimes when a SplitReader removes a > split, > > it also wants to know the dynamic state of the split. If so, we can keep > it > > as is. But then the question is why > > SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead > of > > SplitT. Should we make them consistent? > > > > For SplitFetcher: > > a) The SplitFetcher.pauseOrResumeSplits() method takes collections of > > SplitT as arguments. We may want to adjust that according to what we do > to > > the SplitFetcherManager. The current SplitFetcherManager basically looks > up > > the SplitT from the fetcher with the split Id, and immediately passes the > > SplitT back to the fetcher, which is unnecessary. > > b) After supporting split level pause and resume, do we still need split > > fetcher level pause and resume? If not, SplitFetcher.pause() and > > SplitFetcher.resume() can be removed. In fact, they seem no longer used > > anywhere. > > > > Other than the above potential API adjustment before we mark the classes > > PublicEvolving, the API looks fine to me. > > > > I think it is good timing for deprecation now. We will mark the impacted > > constructors as deprecated in 1.19, and remove them in release of 2.0. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Thu, Nov 16, 2023 at 8:26 PM Hongshun Wang <loserwang1...@gmail.com> > > wrote: > > > > > Hi Devs, > > > > > > I have just modified the content of FLIP-389: Annotate > > > SingleThreadFetcherManager as PublicEvolving[1]. > > > > > > Now this Flip mainly do two thing: > > > > > > 1. Annotate SingleThreadFetcherManager as PublicEvolving > > > 2. Remove all public constructors which use > > > FutureCompletingBlockingQueue. This will make many constructors as > > > @Depricated. > > > > > > This may influence many connectors, so I am looking forward to hearing > > from > > > you. > > > > > > > > > Best regards, > > > Hongshun > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498 > > > > > > On Wed, Nov 15, 2023 at 7:57 AM Becket Qin <becket....@gmail.com> > wrote: > > > > > > > Hi Hongshun, > > > > > > > > > > > > > > > However, it will be tricky because SplitFetcherManager includes <E, > > > > SplitT > > > > > extends SourceSplit>, while FutureCompletingBlockingQueue includes > > <T>. > > > > > This means that SplitFetcherManager would have to be modified to > <T, > > E, > > > > > SplitT extends SourceSplit>, which would affect the compatibility > of > > > the > > > > > SplitFetcherManager class. I'm afraid this change will influence > > other > > > > > sources. > > > > > > > > Although the FutureCompletingBlockingQueue class itself has a > template > > > > class <T>. In the SourceReaderBase and SplitFetcherManager, this <T> > is > > > > actually RecordsWithSplitIds<E>. So it looks like we can just let > > > > SplitFetcherManager.poll() return a RecordsWithSplitIds<E>. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Tue, Nov 14, 2023 at 8:11 PM Hongshun Wang < > loserwang1...@gmail.com > > > > > > > wrote: > > > > > > > > > Hi Becket, > > > > > I agree with you and try to modify this Flip[1], which > include > > > > these > > > > > changes: > > > > > > > > > > 1. Mark constructor of SingleThreadMultiplexSourceReaderBase as > > > > > @Depricated > > > > > 2. > > > > > > > > > > Mark constructor of SourceReaderBase as *@Depricated* and > provide > > a > > > > new > > > > > constructor without > > > > > > > > > > FutureCompletingBlockingQueue > > > > > 3. > > > > > > > > > > Mark constructor of SplitFetcherManager > > > andSingleThreadFetcherManager > > > > > as *@Depricated* and provide a new constructor > > > > > without FutureCompletingBlockingQueue. Mark SplitFetcherManager > > > > > andSingleThreadFetcherManager as *@PublicEvolving* > > > > > 4. > > > > > > > > > > SplitFetcherManager provides wrapper methods for > > > > > FutureCompletingBlockingQueue to replace its usage in > > > > SourceReaderBase. > > > > > Then we can use FutureCompletingBlockingQueue only in > > > > > SplitFetcherManager. > > > > > > > > > > However, it will be tricky because SplitFetcherManager includes <E, > > > > SplitT > > > > > extends SourceSplit>, while FutureCompletingBlockingQueue includes > > <T>. > > > > > This means that SplitFetcherManager would have to be modified to > <T, > > E, > > > > > SplitT extends SourceSplit>, which would affect the compatibility > of > > > the > > > > > SplitFetcherManager class. I'm afraid this change will influence > > other > > > > > sources. > > > > > > > > > > > > > > > > > > > > Looking forward to hearing from you. > > > > > > > > > > Best regards, > > > > > Hongshun > > > > > > > > > > [1] > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498 > > > > > > > > > > On Sat, Nov 11, 2023 at 10:55 AM Becket Qin <becket....@gmail.com> > > > > wrote: > > > > > > > > > > > Hi Hongshun and Martijn, > > > > > > > > > > > > Sorry for the late reply as I was on travel and still catching up > > > with > > > > > the > > > > > > emails. Please allow me to provide more context. > > > > > > > > > > > > 1. The original design of SplitFetcherManager and its subclasses > > was > > > to > > > > > > make them public to the Source developers. The goal is to let us > > take > > > > > care > > > > > > of the threading model, while the Source developers can just > focus > > on > > > > the > > > > > > SplitReader implementation. Therefore, I think making > > > > > SplitFetcherManater / > > > > > > SingleThreadFetcherManager public aligns with the original > design. > > > That > > > > > is > > > > > > also why these classes are exposed in the constructor of > > > > > SourceReaderBase. > > > > > > > > > > > > 2. For FutureCompletingBlockingQueue, as a hindsight, it might be > > > > better > > > > > to > > > > > > not expose it to the Source developers. They are unlikely to use > it > > > > > > anywhere other than just constructing it. The reason that > > > > > > FutureCompletingBlockingQueue is currently exposed in the > > > > > SourceReaderBase > > > > > > constructor is because both the SplitFetcherManager and > > > > SourceReaderBase > > > > > > need it. One way to hide the FutureCompletingBlockingQueue from > the > > > > > public > > > > > > API is to make SplitFetcherManager the only owner class of the > > queue, > > > > and > > > > > > expose some of its methods via SplitFetcherManager. This way, the > > > > > > SourceReaderBase can invoke the methods via SplitFetcherManager. > I > > > > > believe > > > > > > this also makes the code slightly cleaner. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Fri, Nov 10, 2023 at 12:28 PM Hongshun Wang < > > > > loserwang1...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > @Martijn, I agree with you. > > > > > > > > > > > > > > > > > > > > > I also have two questions at the beginning: > > > > > > > > > > > > > > - Why is an Internal class > > > > > > > exposed as a constructor param of a Public class? > > > > > > > - Should these classes be exposed as public? > > > > > > > > > > > > > > For the first question, I noticed that before the original > > > Jira[1] , > > > > > > > all these classes missed the annotate , so it was not abnormal > > that > > > > > > > FutureCompletingBlockingQueue and SingleThreadFetcherManager > were > > > > > > > constructor params of SingleThreadMultiplexSourceReaderBase. > > > > > > > However, > > > > > > > this jira marked FutureCompletingBlockingQueue and > > > > > > > SingleThreadFetcherManager as Internal, while marked > > > > > > > SingleThreadMultiplexSourceReaderBase as Public. It's a good > > > choice, > > > > > > > but also forget that FutureCompletingBlockingQueue and > > > > > > > SingleThreadFetcherManager have already been exposed by > > > > > > > SingleThreadMultiplexSourceReaderBase. > > > > > > > Thus, this problem occurs because we didn't > > > > > > > clearly define the boundaries at the origin design. We should > pay > > > > more > > > > > > > attention to it when creating a new class. > > > > > > > > > > > > > > > > > > > > > For the second question, I think at least SplitFetcherManager > > > > > > > should be Public. There are few reasons: > > > > > > > > > > > > > > - Connector developers want to decide their own > > > > > > > thread mode. For example, Whether to recycle fetchers by > > > > overriding > > > > > > > SplitFetcherManager#maybeShutdownFinishedFetchers > > > > > > > when idle. Sometimes, developers want SplitFetcherManager > > react > > > > as a > > > > > > > FixedThreadPool, because > > > > > > > each time a thread is recycled then recreated, the context > > > > > > > resources need to be rebuilt. I met a related issue in flink > > > cdc[2]. > > > > > > > - > > > > > > > KafkaSourceFetcherManager[3] also extends > > > > > > > SingleThreadFetcherManager to commitOffsets. But now kafka > souce > > is > > > > > > > not in Flink repository, so it's not allowed any more. > > > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-22358 > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/ververica/flink-cdc-connectors/pull/2571#issuecomment-1797585418 > > > > > > > > > > > > > > [3] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L52 > > > > > > > > > > > > > > Looking forward to hearing from you. > > > > > > > > > > > > > > Best regards, > > > > > > > Hongshun > > > > > > > > > > > > > > On Thu, Nov 9, 2023 at 11:46 PM Martijn Visser < > > > > > martijnvis...@apache.org > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > I'm looking at the original Jira that introduced these > > stability > > > > > > > > designations [1] and I'm just curious if it was intended that > > > these > > > > > > > > Internal classes would be used directly, or if we just > haven't > > > > > created > > > > > > > > the right abstractions? The reason for asking is because > moving > > > > > > > > something from Internal to a public designation is an easy > fix, > > > > but I > > > > > > > > want to make sure that it's also the right fix. If we are > > missing > > > > > good > > > > > > > > abstractions, then I would rather invest in those. > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > Martijn > > > > > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-22358 > > > > > > > > > > > > > > > > On Wed, Nov 8, 2023 at 12:40 PM Leonard Xu < > xbjt...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > > Thanks Hongshun for starting this discussion. > > > > > > > > > > > > > > > > > > +1 from my side. > > > > > > > > > > > > > > > > > > IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324 > > > > > > comment[1]. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Leonard > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2023年11月8日 下午5:42,Hongshun Wang <loserwang1...@gmail.com > > > > > 写道: > > > > > > > > > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > > > > > > > > > I would like to start a discussion on FLIP-389: Annotate > > > > > > > > > > SingleThreadFetcherManager and > > FutureCompletingBlockingQueue > > > as > > > > > > > > > > PublicEvolving.[ > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498 > > > > > > > > > > > > > > > > > > > 1]. > > > > > > > > > > > > > > > > > > > > Though the SingleThreadFetcherManager is annotated as > > > Internal, > > > > > it > > > > > > > > actually > > > > > > > > > > acts as some-degree public API, which is widely used in > > many > > > > > > > connector > > > > > > > > > > projects: flink-cdc-connector > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/ververica/flink-cdc-connectors/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L93 > > > > > > > > > > > > > > > > > > > , flink-connector-mongodb > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink-connector-mongodb/blob/main/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java#L58 > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > soon. > > > > > > > > > > > > > > > > > > > > Moreover, even the constructor of > > > > > > > SingleThreadMultiplexSourceReaderBase > > > > > > > > > > (which is PublicEvolving) includes the params of > > > > > > > > SingleThreadFetcherManager > > > > > > > > > > and FutureCompletingBlockingQueue. That means that the > > > > > > > > > > SingleThreadFetcherManager and > > FutureCompletingBlockingQueue > > > > > have > > > > > > > > already > > > > > > > > > > been exposed to users for a long time and are widely > used. > > > > > > > > > > > > > > > > > > > > Considering that all source implementations are using > them > > de > > > > > > facto, > > > > > > > > why > > > > > > > > > > not annotate SingleThreadFetcherManager and > > > > > > > > FutureCompletingBlockingQueue > > > > > > > > > > as PublicEvolving so that developers will modify it more > > > > > carefully > > > > > > to > > > > > > > > avoid > > > > > > > > > > any potential issues. As shown in FLINK-31324[2], > > > > FLINK-28853[3] > > > > > > > used > > > > > > > > > > to change the default constructor of > > > > SingleThreadFetcherManager. > > > > > > > > However, > > > > > > > > > > it influenced a lot. Finally, the former constructor was > > > added > > > > > back > > > > > > > and > > > > > > > > > > marked as Deprecated。 > > > > > > > > > > > > > > > > > > > > In conclusion, the goal of this FLIP is to annotate > > > > > > > > > > SingleThreadFetcherManager(includes its parent class) and > > > > > > > > > > FutureCompletingBlockingQueue as PublicEvolving. > > > > > > > > > > > > > > > > > > > > Looking forward to hearing from you. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498 > > > > > > > > > > > > > > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-31324 > > > > > > > > > > > > > > > > > > > > [3] https://issues.apache.org/jira/browse/FLINK-28853 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >