Hi Nicholas, Thanks for the reply. I had implemented a small PoC. It switches a configurable sequence of sources with predefined bounds. I'm using the unmodified MockSource for illustration. It does not require a "Switchable" interface. I looked at the code you shared and the delegation and signaling works quite similar. That's a good validation.
Hi Kezhu, Thanks for bringing the more detailed discussion regarding the start/end position. I think in most cases the start and end positions will be known when the job is submitted. If we take a File -> Kafka source chain as example, there would most likely be a timestamp at which we want to transition from files to reading from Kafka. So we would either set the start position for Kafka based on that timestamp or provide the offsets directly. (Note that I'm skipping a few related nuances here. In order to achieve an exact switch without duplication or gap, we may also need some overlap and filtering, but that's a separate issue.) The point is that the start positions can be configured by the user, there is no need to transfer any information from one source to another as part of switching. It gets more complicated if we want to achieve a dynamic switch where the transition timestamp isn't known when the job starts. For example, consider a bootstrap scenario where the time taken to process historic data exceeds the Kafka retention. Here, we would need to dynamically resolve the Kafka start position based on where the file readers left off, when the switching occurs. The file source enumerator would determine at runtime when it is done handing splits to its readers, maybe when the max file timestamp reaches (processing time - X). This information needs to be transferred to the Kafka source. The timestamp would need to be derived from the file enumerator state, either by looking at the last splits or explicitly. The natural way to do that is to introspect the enumerator state which gets checkpointed. Any other form of "end position" via a special interface would need to be derived in the same manner. The converter that will be provided by the user would look at the file enumerator state, derive the timestamp and then supply the "start position" to the Kafka source. The Kafka source was created when the job started. It needs to be augmented with the new start position. That can be achieved via a special enumerator interface like SwitchableSplitEnumerator#setStartState or by using restoreEnumerator with the checkpoint state constructed by the converter function. I'm leaning towards the latter as long as there is a convenient way to construct the state from a position (like enumStateForTimestamp). The converter would map one enum state to another and can be made very simple by providing a few utility functions instead of mandating a new interface that enumerators need to implement to become switchable. Again, a converter is only required when sources need to be switched based on positions not known at graph construction time. I'm planning to add such deferred switching to the PoC for illustration and will share the experiment when that's done. Cheers, Thomas On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <programg...@163.com> wrote: > Hi Kezhu, > > Thanks for your detailed points for the Hybrid Source. I follow your > opinions and make a corresponding explanation as follows: > > 1.Would the Hybrid Source be possible to use this feature to switch/chain > multiple homogeneous sources? > > "HybridSource" supports to switch/chain multiple homogeneous sources, which > have the respective implementation for "SwitchableSource" and > "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the > Sources consisted is homogeneous. From the user's perspective, User only > adds the "SwitchableSource" into "HybridSource" and leaves the smooth > migration operation to "HybridSource". > > 2."setStartState" is actually a reposition operation for next source to > start in job runtime? > > IMO, "setStartState" is used to determine the initial position of the new > source for smooth migration, not reposition operation. More importantly, > the > "State" mentioned here refers to the start and end positions of reading > source. > > 3.This conversion should be implementation detail of next source, not > converter function in my opinion? > > The state conversion is of course an implementation detail and included in > the switching mechanism, that should provide users with the conversion > interface for conversion, which is defined in converter function. What's > more, when users has already implemented "SwitchableSource" and added to > the > Hybrid Source, the users don't need to implement the "SwitchableSource" for > the different conversion. From the user's perspective, users could define > the different converter functions and create the "SwitchableSource" for the > addition of "HybridSource", no need to implement a Source for the converter > function. > > 4.No configurable start-position. In this situation combination of above > three joints is a nop, and > "HybridSource" is a chain of start-position pre-configured sources? > > Indeed there is no configurable start-position, and this configuration > could > be involved in the feature. Users could use > "SwitchableSplitEnumerator#setStartState" interface or the configuration > parameters to configure start-position. > > 5.I am wonder whether end-position is a must and how it could be useful for > end users in a generic-enough source? > > "getEndState" interface is used for the smooth migration scenario, which > could return null value if it is not needed. In the Hybrid Source > mechanism, > this interface is required for the switching between the sources consisted, > otherwise there is no any way to get end-position of upstream source. In > summary, Hybrid Source needs to be able to set the start position and get > the end position of each Source, otherwise there is no use to build Hybrid > Source. > > 6.Is it possible for converter function to do blocking operations? How to > respond to checkpoint request when switching split enumerators cross > sources? Does end-position or start-position need to be stored in > checkpoint > state or not? > > The converter function only simply converts the state of upstream source to > the state of downstream source, not blocking operations. The way to respond > the checkpoint request when switching split enumerators cross sources is > send the corresponding "SourceEvent" to coordination. The end-position or > start-position don't need to be stored in checkpoint state, only implements > the "getEndState" interface for end-position. > > Best, > Nicholas Jiang > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ >