Here is a summary of where we are at with the PR: * Added capability to construct sources at switch time through a factory interface. This can support all previously discussed scenarios. The simple case (sources with fixed start position) is still simple, but for scenarios that require deferred instantiation, sources can now be created through their respective builders at switch time with access to the previous enumerator. This is a modification of option 3 described previously.
* There is now unit test coverage for reader and enumerator. * Ideas such as a universal interface for exchange of start positions can be added on top of the current implementation. However, I would like to keep that as exercise for the future and the scope of this initial work contained. * FLIP page will be updated to reflect the changes made since it was originally created. Nicholas volunteered to take this up and also send a VOTE thread. Thanks all and especially Arvid for taking the time to review and discuss. Thomas On Tue, Jun 15, 2021 at 11:01 AM Thomas Weise <t...@apache.org> wrote: > > Hi Arvid, > > Thanks for your reply --> > > On Mon, Jun 14, 2021 at 2:55 PM Arvid Heise <ar...@ververica.com> wrote: > > > > Hi Thomas, > > > > Thanks for bringing this up. I think this is a tough nut to crack :/. > > Imho 1 and 3 or 1+3 can work but it is ofc a pity if the source implementor > > is not aware of HybridSource. I'm also worried that we may not have a > > universal interface to specify start offset/time. > > I guess it also would be much easier if we would have an abstract base > > source class where we could implement some basic support. > > > > When I initially looked at the issue I was thinking that sources should > > always be immutable (we have some bad experiences with mutable life-cycles > > in operator implementations) and the only modifiable thing should be the > > builder. That would mean that a HybridSource actually just gets a list of > > source builders and creates the sources when needed with the correct > > start/end offset. However, we neither have base builders (something that > > I'd like to change) nor are any of the builders serializable. We could > > convert sources back to builders, update the start offset, and convert to > > sources again but this also seems overly complicated. So I'm assuming that > > we should go with modifiable sources as also expressed in the FLIP draft. > > The need to set a start position at runtime indicates that sources > should not be immutable. I think it would be better to have a setter > on the source that clearly describes the mutation. > > Regarding deferred construction of the sources (supplier pattern): > This is actually a very interesting idea that would also help in > situations where the exact sequence of sources isn't known upfront. > However, Source is also the factory for split and enumerator > checkpoint serializers. If we were to instantiate the source at switch > time, we would also need to distribute the serializers at switch time. > This would lead to even more complexity and move us further away from > the original goal of having a relatively simple implementation for the > basic scenarios. > > > If we could assume that we are always switching by time, we could also > > change Source(Enumerator)#start to take the start time as a parameter. Can > > we deduce the end time by the record timestamp? But I guess that has all > > been discussed already, so sorry if I derail the discussion. > > This actually hasn't been discussed. The original proposal left the > type of the start position open, which also makes it less attractive > (user still has to supply a converter). > > For initial internal usage of the hybrid source, we are planning to > use a timestamp. But there may be use cases where the start position > could be encoded in other ways, such as based on Kafka offsets. > > > I'm also leaning towards extending the Source interface to include these > > methods (with defaults) to make it harder for implementers to miss. > > It would be possible to introduce an optional interface as a follow-up > task. It can be implemented as the default of option 3. > > > > > > > On Fri, Jun 11, 2021 at 7:02 PM Thomas Weise <t...@apache.org> wrote: > > > > > Thanks for the suggestions and feedback on the PR. > > > > > > A variation of hybrid source that can switch back and forth was > > > brought up before and it is something that will be eventually > > > required. It was also suggested by Stephan that in the future there > > > may be more than one implementation of hybrid source for different > > > requirements. > > > > > > I want to bring back the topic of how enumerator end state can be > > > converted into start position from the PR [1]. We started in the FLIP > > > page with "switchable" interfaces, the prototype had checkpoint > > > conversion and now the PR has a function that allows to augment the > > > source. Each of these has pros and cons but we will need to converge. > > > > > > 1. Switchable interfaces > > > * unified solution > > > * requires sources to implement a special interface to participate in > > > HybridSource, even when no dynamic conversion is needed > > > > > > 2. Checkpoint state > > > * unified solution > > > * no interface changes > > > * requires implementation change to existing enumerators to include > > > end state (like a timestamp) into their checkpoint state > > > * existing sources work as is for fixed start position > > > > > > 3. Source modification at switch time to set start position > > > * can be solved per source, least restrictive > > > * no interface changes > > > * requires enumerator to expose end state (as a getter) and source to > > > be either mutable or source to be copied and augmented with the start > > > position. > > > * existing sources work as is for fixed start position > > > > > > I think more eyes might help to finalize the approach. > > > > > > [1] https://github.com/apache/flink/pull/15924#discussion_r649929865 > > > > > > On Mon, Jun 7, 2021 at 11:18 PM Steven Wu <stevenz...@gmail.com> wrote: > > > > > > > > > hybrid sounds to me more like the source would constantly switch back > > > and forth > > > > > > > > Initially, the focus of hybrid source is more like a sequenced chain. > > > > > > > > But in the future it would be cool that hybrid sources can intelligently > > > switch back and forth between historical data source (like Iceberg) and > > > live data source (like Kafka). E.g., > > > > - if the Flink job is lagging behind Kafka retention, automatically > > > switch to Iceberg source > > > > - once job caught up, switch back to Kafka source > > > > > > > > That can simplify operational aspects of manually switching. > > > > > > > > > > > > On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <ar...@apache.org> wrote: > > > >> > > > >> Sorry for joining the party so late, but it's such an interesting FLIP > > > with > > > >> a huge impact that I wanted to add my 2 cents. [1] > > > >> I'm mirroring some basic question from the PR review to this thread > > > because > > > >> it's about the name: > > > >> > > > >> We could rename the thing to ConcatenatedSource(s), SourceSequence, or > > > >> similar. > > > >> Hybrid has the connotation of 2 for me (maybe because I'm a non-native) > > > and > > > >> does not carry the concatentation concept as well (hybrid sounds to me > > > more > > > >> like the source would constantly switch back and forth). > > > >> > > > >> Could we take a few minutes to think if this is the most intuitive name > > > for > > > >> new users? I'm especially hoping that natives might give some ideas (or > > > >> declare that Hybrid is perfect). > > > >> > > > >> [1] > > > https://github.com/apache/flink/pull/15924#pullrequestreview-677376664 > > > >> > > > >> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <stevenz...@gmail.com> wrote: > > > >> > > > >> > > Converter function relies on the specific enumerator capabilities > > > to set > > > >> > the new start position (e.g. > > > >> > fileSourceEnumerator.getEndTimestamp() and > > > >> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) > > > >> > > > > >> > I guess the premise is that a converter is for a specific tuple of > > > >> > (upstream source, downstream source) . We don't have to define > > > >> > generic > > > >> > EndtStateT and SwitchableEnumerator interfaces. That should work. > > > >> > > > > >> > The benefit of defining EndtStateT and SwitchableEnumerator > > > interfaces is > > > >> > probably promoting uniformity across sources that support > > > hybrid/switchable > > > >> > source. > > > >> > > > > >> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <t...@apache.org> wrote: > > > >> > > > > >> > > Hi Steven, > > > >> > > > > > >> > > Thank you for the thorough review of the PR and for bringing this > > > back > > > >> > > to the mailing list. > > > >> > > > > > >> > > All, > > > >> > > > > > >> > > I updated the FLIP-150 page to highlight aspects in which the PR > > > >> > > deviates from the original proposal [1]. The goal would be to > > > >> > > update > > > >> > > the FLIP soon and bring it to a vote, as previously suggested > > > offline > > > >> > > by Nicholas. > > > >> > > > > > >> > > A few minor issues in the PR are outstanding and I'm working on > > > >> > > test > > > >> > > coverage for the recovery behavior, which should be completed soon. > > > >> > > > > > >> > > The dynamic position transfer needs to be concluded before we can > > > move > > > >> > > forward however. > > > >> > > > > > >> > > There have been various ideas, including the special > > > >> > > "SwitchableEnumerator" interface, using enumerator checkpoint state > > > or > > > >> > > an enumerator interface extension to extract the end state. > > > >> > > > > > >> > > One goal in the FLIP is to "Reuse the existing Source connectors > > > built > > > >> > > with FLIP-27 without any change." and I think it is important to > > > honor > > > >> > > that goal given that fixed start positions do not require interface > > > >> > > changes. > > > >> > > > > > >> > > Based on the feedback the following might be a good solution for > > > >> > > runtime position transfer: > > > >> > > > > > >> > > * User supplies the optional converter function (not applicable for > > > >> > > fixed positions). > > > >> > > * Instead of relying on the enumerator checkpoint state [2], the > > > >> > > converter function will be supplied with the current and next > > > >> > > enumerator (source.createEnumerator). > > > >> > > * Converter function relies on the specific enumerator capabilities > > > to > > > >> > > set the new start position (e.g. > > > >> > > fileSourceEnumerator.getEndTimestamp() and > > > >> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) > > > >> > > * HybridSourceSplitEnumerator starts new underlying enumerator > > > >> > > > > > >> > > With this approach, there is no need to augment FLIP-27 interfaces > > > and > > > >> > > custom source capabilities are easier to integrate. Removing the > > > >> > > mandate to rely on enumerator checkpoint state also avoids > > > >> > > potential > > > >> > > upgrade/compatibility issues. > > > >> > > > > > >> > > Thoughts? > > > >> > > > > > >> > > Thanks, > > > >> > > Thomas > > > >> > > > > > >> > > [1] > > > >> > > > > > >> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation > > > >> > > [2] > > > >> > > > > > >> > > > > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281 > > > >> > > > > > >> > > > > > >> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <stevenz...@gmail.com> > > > wrote: > > > >> > > > > > > >> > > > discussed the PR with Thosmas offline. Thomas, please correct me > > > if I > > > >> > > > missed anything. > > > >> > > > > > > >> > > > Right now, the PR differs from the FLIP-150 doc regarding the > > > >> > converter. > > > >> > > > * Current PR uses the enumerator checkpoint state type as the > > > input for > > > >> > > the > > > >> > > > converter > > > >> > > > * FLIP-150 defines a new EndStateT interface. > > > >> > > > It seems that the FLIP-150 approach of EndStateT is more > > > flexible, as > > > >> > > > transition EndStateT doesn't have to be included in the upstream > > > source > > > >> > > > checkpoint state. > > > >> > > > > > > >> > > > Let's look at two use cases: > > > >> > > > 1) static cutover time at 5 pm. File source reads all data btw 9 > > > am - 5 > > > >> > > pm, > > > >> > > > then Kafka source starts with initial position of 5 pm. In this > > > case, > > > >> > > there > > > >> > > > is no need for converter or EndStateT since the starting time for > > > Kafka > > > >> > > > source is known and fixed. > > > >> > > > 2) dynamic cutover time at 1 hour before now. This is useful when > > > the > > > >> > > > bootstrap of historic data takes a long time (like days or weeks) > > > and > > > >> > we > > > >> > > > don't know the exact time of cutover when a job is launched. > > > Instead, > > > >> > we > > > >> > > > are instructing the file source to stop when it gets close to > > > >> > > > live > > > >> > data. > > > >> > > In > > > >> > > > this case, hybrid source construction will specify a relative > > > time (now > > > >> > > - 1 > > > >> > > > hour), the EndStateT (of file source) will be resolved to an > > > absolute > > > >> > > time > > > >> > > > for cutover. We probably don't need to include EndStateT (end > > > >> > timestamp) > > > >> > > as > > > >> > > > the file source checkpoint state. Hence, the separate EndStateT > > > >> > > > is > > > >> > > probably > > > >> > > > more desirable. > > > >> > > > > > > >> > > > We also discussed the converter for the Kafka source. Kafka > > > >> > > > source > > > >> > > supports > > > >> > > > different OffsetsInitializer impls (including > > > >> > > TimestampOffsetsInitializer). > > > >> > > > To support the dynamic cutover time (use case #2 above), we can > > > plug > > > >> > in a > > > >> > > > SupplierTimestampOffsetInitializer, where the starting timestamp > > > is not > > > >> > > set > > > >> > > > during source/job construction. Rather it is a supplier model > > > where the > > > >> > > > starting timestamp value is set to the resolved absolute > > > >> > > > timestamp > > > >> > during > > > >> > > > switch. > > > >> > > > > > > >> > > > Thanks, > > > >> > > > Steven > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <t...@apache.org> > > > wrote: > > > >> > > > > > > >> > > > > Hi Nicholas, > > > >> > > > > > > > >> > > > > Thanks for taking a look at the PR! > > > >> > > > > > > > >> > > > > 1. Regarding switching mechanism: > > > >> > > > > > > > >> > > > > There has been previous discussion in this thread regarding the > > > pros > > > >> > > > > and cons of how the switching can be exposed to the user. > > > >> > > > > > > > >> > > > > With fixed start positions, no special switching interface to > > > >> > transfer > > > >> > > > > information between enumerators is required. Sources are > > > configured > > > >> > as > > > >> > > > > they would be when used standalone and just plugged into > > > >> > HybridSource. > > > >> > > > > I expect that to be a common use case. You can find an example > > > for > > > >> > > > > this in the ITCase: > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > >> > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101 > > > >> > > > > > > > >> > > > > For dynamic start position, the checkpoint state is used to > > > transfer > > > >> > > > > information from old to new enumerator. An example for that can > > > be > > > >> > > > > found here: > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > >> > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136 > > > >> > > > > > > > >> > > > > That may look verbose, but the code to convert from one state > > > >> > > > > to > > > >> > > > > another can be factored out into a utility and the function > > > becomes a > > > >> > > > > one-liner. > > > >> > > > > > > > >> > > > > For common sources like files and Kafka we can potentially > > > (later) > > > >> > > > > implement the conversion logic as part of the respective > > > connector's > > > >> > > > > checkpoint and split classes. > > > >> > > > > > > > >> > > > > I hope that with the PR up for review, we can soon reach a > > > conclusion > > > >> > > > > on how we want to expose this to the user. > > > >> > > > > > > > >> > > > > Following is an example for Files -> Files -> Kafka that I'm > > > using > > > >> > for > > > >> > > > > e2e testing. It exercises both ways of setting the start > > > position. > > > >> > > > > > > > >> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a > > > >> > > > > > > > >> > > > > > > > >> > > > > 2. Regarding the events used to implement the actual switch > > > between > > > >> > > > > enumerator and readers: I updated the PR with javadoc to > > > clarify the > > > >> > > > > intent. Please let me know if that helps or let's continue to > > > discuss > > > >> > > > > those details on the PR? > > > >> > > > > > > > >> > > > > > > > >> > > > > Thanks, > > > >> > > > > Thomas > > > >> > > > > > > > >> > > > > > > > >> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang < > > > programg...@163.com> > > > >> > > > > wrote: > > > >> > > > > > > > > >> > > > > > Hi Thomas, > > > >> > > > > > > > > >> > > > > > Sorry for later reply for your POC. I have reviewed the > > > based > > > >> > > abstract > > > >> > > > > > implementation of your pull request: > > > >> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the > > > switching > > > >> > > > > > mechanism, this level of abstraction is not concise enough, > > > which > > > >> > > doesn't > > > >> > > > > > make connector contribution easier. In theory, it is > > > necessary to > > > >> > > > > introduce > > > >> > > > > > a set of interfaces to support the switching mechanism. The > > > >> > > > > SwitchableSource > > > >> > > > > > and SwitchableSplitEnumerator interfaces are needed for > > > connector > > > >> > > > > > expansibility. > > > >> > > > > > In other words, the whole switching process of above > > > mentioned > > > >> > PR > > > >> > > is > > > >> > > > > > different from that mentioned in FLIP-150. In the above > > > >> > > implementation, > > > >> > > > > the > > > >> > > > > > source reading switching is executed after receving the > > > >> > > > > SwitchSourceEvent, > > > >> > > > > > which could be before the sending SourceReaderFinishEvent. > > > This > > > >> > > timeline > > > >> > > > > of > > > >> > > > > > source reading switching could be discussed here. > > > >> > > > > > @Stephan @Becket, if you are available, please help to > > > review > > > >> > the > > > >> > > > > > abstract implementation, and compare with the interfaces > > > mentioned > > > >> > in > > > >> > > > > > FLIP-150. > > > >> > > > > > > > > >> > > > > > Thanks, > > > >> > > > > > Nicholas Jiang > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > -- > > > >> > > > > > Sent from: > > > >> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > >> > > > > > > > >> > > > > > >> > > > > > > > > > > -- > > > > Arvid Heise | Senior Java Developer > > > > <https://www.ververica.com/> > > > > Follow us @VervericaData > > > > -- > > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > Conference > > > > Stream Processing | Event Driven | Real Time > > > > -- > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > -- > > Ververica GmbH > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > > (Toni) Cheng