Thanks all for this discussion. Looks like there are lots of ideas and folks that are eager to do things, so let's see how we can get this moving.
My take on this is the following: There will probably not be one Hybrid source, but possibly multiple ones, because of different strategies/requirements. - One may be very simple, with switching points known up-front. Would be good to have this in a very simple implementation. - There may be one where the switch is dynamic and the readers need to report back where they left off. - There may be one that switches back and forth multiple times during a job, for example Kakfa going to DFS whenever it falls behind retention, in order to catch up again. This also seems hard to "design on paper"; I expect there are nuances in a production setup that affect some details of the design. So I'd feel most comfortable in adding a variant of the hybrid source to Flink that has been used already in a real use case (not necessarily in production, but maybe in a testing/staging environment, so it seems to meet all requirements). What do you think about the following approach? - If there is a tested PoC, let's try to get it contributed to Flink without trying to make it much more general. - When we see similar but a bit different requirements for another hybrid source, then let's try to evolve the contributed one. - If we see new requirements that are so different that they don't fit well with the existing hybrid source, then let us look at building a second hybrid source for those requirements. We need to make connector contributions in general more easy, and I think it is not a bad thing to end up with different approaches and see how these play out against each other when being used by users. For example switching with known boundaries, dynamic switching, back-and-forth-switching, etc. (I know some committers are planning to do some work on making connector contributions easier, with standardized testing frameworks, decoupled CI, etc.) Best, Stephan On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <t...@apache.org> wrote: > Hi, > > As mentioned in my previous email, I had been working on a prototype for > the hybrid source. > > You can find it at https://github.com/tweise/flink/pull/1 > > It contains: > * Switching with configurable chain of sources > * Fixed or dynamic start positions > * Test with MockSource and FileSource > > The purpose of the above PR is to gather feedback and help drive consensus > on the FLIP. > > * How to support a dynamic start position within the source chain? > > Relevant in those (few?) cases where start positions are not known upfront. > You can find an example of what that might look like in the tests: > > > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 > > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 > > When switching, the enumerator of the previous source needs to > supply information about consumed splits that allows to set the start > position for the next source. That could be something like the last > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator > doesn't track finished splits.) > > See previous discussion regarding start/end position. The prototype shows > the use of checkpoint state with converter function. > > * Should readers be deployed dynamically? > > The prototype assumes a static source chain that is fixed at job submission > time. Conceivably there could be use cases that require more flexibility. > Such as switching one KafkaSource for another. A step in that direction > would be to deploy the actual readers dynamically, at the time of switching > source. > > Looking forward to feedback and suggestions for next steps! > > Thomas > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <t...@apache.org> wrote: > > > Hi Nicholas, > > > > Thanks for the reply. I had implemented a small PoC. It switches a > > configurable sequence of sources with predefined bounds. I'm using the > > unmodified MockSource for illustration. It does not require a > "Switchable" > > interface. I looked at the code you shared and the delegation and > signaling > > works quite similar. That's a good validation. > > > > Hi Kezhu, > > > > Thanks for bringing the more detailed discussion regarding the start/end > > position. I think in most cases the start and end positions will be known > > when the job is submitted. If we take a File -> Kafka source chain as > > example, there would most likely be a timestamp at which we want to > > transition from files to reading from Kafka. So we would either set the > > start position for Kafka based on that timestamp or provide the offsets > > directly. (Note that I'm skipping a few related nuances here. In order to > > achieve an exact switch without duplication or gap, we may also need some > > overlap and filtering, but that's a separate issue.) > > > > The point is that the start positions can be configured by the user, > there > > is no need to transfer any information from one source to another as part > > of switching. > > > > It gets more complicated if we want to achieve a dynamic switch where the > > transition timestamp isn't known when the job starts. For example, > consider > > a bootstrap scenario where the time taken to process historic data > exceeds > > the Kafka retention. Here, we would need to dynamically resolve the Kafka > > start position based on where the file readers left off, when the > switching > > occurs. The file source enumerator would determine at runtime when it is > > done handing splits to its readers, maybe when the max file timestamp > > reaches (processing time - X). This information needs to be transferred > to > > the Kafka source. > > > > The timestamp would need to be derived from the file enumerator state, > > either by looking at the last splits or explicitly. The natural way to do > > that is to introspect the enumerator state which gets checkpointed. Any > > other form of "end position" via a special interface would need to be > > derived in the same manner. > > > > The converter that will be provided by the user would look at the file > > enumerator state, derive the timestamp and then supply the "start > position" > > to the Kafka source. The Kafka source was created when the job started. > It > > needs to be augmented with the new start position. That can be achieved > via > > a special enumerator interface like > SwitchableSplitEnumerator#setStartState > > or by using restoreEnumerator with the checkpoint state constructed by > the > > converter function. I'm leaning towards the latter as long as there is a > > convenient way to construct the state from a position (like > > enumStateForTimestamp). The converter would map one enum state to another > > and can be made very simple by providing a few utility functions instead > of > > mandating a new interface that enumerators need to implement to become > > switchable. > > > > Again, a converter is only required when sources need to be switched > based > > on positions not known at graph construction time. > > > > I'm planning to add such deferred switching to the PoC for illustration > > and will share the experiment when that's done. > > > > Cheers, > > Thomas > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <programg...@163.com> > wrote: > > > >> Hi Kezhu, > >> > >> Thanks for your detailed points for the Hybrid Source. I follow your > >> opinions and make a corresponding explanation as follows: > >> > >> 1.Would the Hybrid Source be possible to use this feature to > switch/chain > >> multiple homogeneous sources? > >> > >> "HybridSource" supports to switch/chain multiple homogeneous sources, > >> which > >> have the respective implementation for "SwitchableSource" and > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the > >> Sources consisted is homogeneous. From the user's perspective, User only > >> adds the "SwitchableSource" into "HybridSource" and leaves the smooth > >> migration operation to "HybridSource". > >> > >> 2."setStartState" is actually a reposition operation for next source to > >> start in job runtime? > >> > >> IMO, "setStartState" is used to determine the initial position of the > new > >> source for smooth migration, not reposition operation. More importantly, > >> the > >> "State" mentioned here refers to the start and end positions of reading > >> source. > >> > >> 3.This conversion should be implementation detail of next source, not > >> converter function in my opinion? > >> > >> The state conversion is of course an implementation detail and included > in > >> the switching mechanism, that should provide users with the conversion > >> interface for conversion, which is defined in converter function. What's > >> more, when users has already implemented "SwitchableSource" and added to > >> the > >> Hybrid Source, the users don't need to implement the "SwitchableSource" > >> for > >> the different conversion. From the user's perspective, users could > define > >> the different converter functions and create the "SwitchableSource" for > >> the > >> addition of "HybridSource", no need to implement a Source for the > >> converter > >> function. > >> > >> 4.No configurable start-position. In this situation combination of above > >> three joints is a nop, and > >> "HybridSource" is a chain of start-position pre-configured sources? > >> > >> Indeed there is no configurable start-position, and this configuration > >> could > >> be involved in the feature. Users could use > >> "SwitchableSplitEnumerator#setStartState" interface or the configuration > >> parameters to configure start-position. > >> > >> 5.I am wonder whether end-position is a must and how it could be useful > >> for > >> end users in a generic-enough source? > >> > >> "getEndState" interface is used for the smooth migration scenario, which > >> could return null value if it is not needed. In the Hybrid Source > >> mechanism, > >> this interface is required for the switching between the sources > >> consisted, > >> otherwise there is no any way to get end-position of upstream source. In > >> summary, Hybrid Source needs to be able to set the start position and > get > >> the end position of each Source, otherwise there is no use to build > Hybrid > >> Source. > >> > >> 6.Is it possible for converter function to do blocking operations? How > to > >> respond to checkpoint request when switching split enumerators cross > >> sources? Does end-position or start-position need to be stored in > >> checkpoint > >> state or not? > >> > >> The converter function only simply converts the state of upstream source > >> to > >> the state of downstream source, not blocking operations. The way to > >> respond > >> the checkpoint request when switching split enumerators cross sources is > >> send the corresponding "SourceEvent" to coordination. The end-position > or > >> start-position don't need to be stored in checkpoint state, only > >> implements > >> the "getEndState" interface for end-position. > >> > >> Best, > >> Nicholas Jiang > >> > >> > >> > >> -- > >> Sent from: > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > >> > > >