Thanks for the clarification, Thomas. Yes, that makes sense to me. Cheers,
Jiangjie (Becket) Qin On Mon, Apr 26, 2021 at 1:03 AM Thomas Weise <t...@apache.org> wrote: > Hi Becket, > > I agree and am not planning to hard wire a specific combination of > sources (like S3 + Kafka). That also wouldn't help for the use case I > want to address, because there are customized connectors that we need > to be able to plug in. > > Rather, the suggested simplification would be for the flexibility of > switching mechanism. > > The prototype already supports fixed start positions and checkpoint > conversion for any combination of sources; no need to undo that. > > But for testing/example purposes, we will need to settle on a specific > combination. > > Thomas > > On Sat, Apr 24, 2021 at 8:20 PM Becket Qin <becket....@gmail.com> wrote: > > > > Sorry for the late reply. Starting from a specific connector sounds > > reasonable to me. > > > > That said, I would suggest to keep the possibility of future > generalization > > as much as possible. We have already seen some variation of source > > combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL > > Binlog, etc. So it would be good if we can reuse some base abstraction in > > the future instead of having to write each combination from scratch. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <se...@apache.org> wrote: > > > > > Thanks, Thomas! > > > > > > @Becket and @Nicholas - would you be ok with that approach? > > > > > > > > > On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <t...@apache.org> wrote: > > > > > > > Hi Stephan, > > > > > > > > Thanks for the feedback! > > > > > > > > I agree with the approach of starting with a simple implementation > > > > that can address a well understood, significant portion of use cases. > > > > > > > > I'm planning to continue work on the prototype that I had shared. > > > > There is production level usage waiting for it fairly soon. I expect > > > > to open a PR in the coming weeks. > > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <se...@apache.org> > wrote: > > > > > > > > > > Thanks all for this discussion. Looks like there are lots of ideas > and > > > > > folks that are eager to do things, so let's see how we can get this > > > > moving. > > > > > > > > > > My take on this is the following: > > > > > > > > > > There will probably not be one Hybrid source, but possibly multiple > > > ones, > > > > > because of different strategies/requirements. > > > > > - One may be very simple, with switching points known up-front. > > > Would > > > > > be good to have this in a very simple implementation. > > > > > - There may be one where the switch is dynamic and the readers > need > > > > to > > > > > report back where they left off. > > > > > - There may be one that switches back and forth multiple times > > > > during a > > > > > job, for example Kakfa going to DFS whenever it falls behind > retention, > > > > in > > > > > order to catch up again. > > > > > > > > > > This also seems hard to "design on paper"; I expect there are > nuances > > > in > > > > a > > > > > production setup that affect some details of the design. So I'd > feel > > > most > > > > > comfortable in adding a variant of the hybrid source to Flink that > has > > > > been > > > > > used already in a real use case (not necessarily in production, but > > > maybe > > > > > in a testing/staging environment, so it seems to meet all > > > requirements). > > > > > > > > > > > > > > > What do you think about the following approach? > > > > > - If there is a tested PoC, let's try to get it contributed to > Flink > > > > > without trying to make it much more general. > > > > > - When we see similar but a bit different requirements for > another > > > > hybrid > > > > > source, then let's try to evolve the contributed one. > > > > > - If we see new requirements that are so different that they > don't > > > fit > > > > > well with the existing hybrid source, then let us look at building > a > > > > second > > > > > hybrid source for those requirements. > > > > > > > > > > We need to make connector contributions in general more easy, and I > > > think > > > > > it is not a bad thing to end up with different approaches and see > how > > > > these > > > > > play out against each other when being used by users. For example > > > > switching > > > > > with known boundaries, dynamic switching, back-and-forth-switching, > > > etc. > > > > > (I know some committers are planning to do some work on making > > > > > connector contributions easier, with standardized testing > frameworks, > > > > > decoupled CI, etc.) > > > > > > > > > > Best, > > > > > Stephan > > > > > > > > > > > > > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <t...@apache.org> > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > As mentioned in my previous email, I had been working on a > prototype > > > > for > > > > > > the hybrid source. > > > > > > > > > > > > You can find it at https://github.com/tweise/flink/pull/1 > > > > > > > > > > > > It contains: > > > > > > * Switching with configurable chain of sources > > > > > > * Fixed or dynamic start positions > > > > > > * Test with MockSource and FileSource > > > > > > > > > > > > The purpose of the above PR is to gather feedback and help drive > > > > consensus > > > > > > on the FLIP. > > > > > > > > > > > > * How to support a dynamic start position within the source > chain? > > > > > > > > > > > > Relevant in those (few?) cases where start positions are not > known > > > > upfront. > > > > > > You can find an example of what that might look like in the > tests: > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 > > > > > > > > > > > > > > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 > > > > > > > > > > > > When switching, the enumerator of the previous source needs to > > > > > > supply information about consumed splits that allows to set the > start > > > > > > position for the next source. That could be something like the > last > > > > > > processed file, timestamp, etc. (Currently > StaticFileSplitEnumerator > > > > > > doesn't track finished splits.) > > > > > > > > > > > > See previous discussion regarding start/end position. The > prototype > > > > shows > > > > > > the use of checkpoint state with converter function. > > > > > > > > > > > > * Should readers be deployed dynamically? > > > > > > > > > > > > The prototype assumes a static source chain that is fixed at job > > > > submission > > > > > > time. Conceivably there could be use cases that require more > > > > flexibility. > > > > > > Such as switching one KafkaSource for another. A step in that > > > direction > > > > > > would be to deploy the actual readers dynamically, at the time of > > > > switching > > > > > > source. > > > > > > > > > > > > Looking forward to feedback and suggestions for next steps! > > > > > > > > > > > > Thomas > > > > > > > > > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <t...@apache.org> > > > wrote: > > > > > > > > > > > > > Hi Nicholas, > > > > > > > > > > > > > > Thanks for the reply. I had implemented a small PoC. It > switches a > > > > > > > configurable sequence of sources with predefined bounds. I'm > using > > > > the > > > > > > > unmodified MockSource for illustration. It does not require a > > > > > > "Switchable" > > > > > > > interface. I looked at the code you shared and the delegation > and > > > > > > signaling > > > > > > > works quite similar. That's a good validation. > > > > > > > > > > > > > > Hi Kezhu, > > > > > > > > > > > > > > Thanks for bringing the more detailed discussion regarding the > > > > start/end > > > > > > > position. I think in most cases the start and end positions > will be > > > > known > > > > > > > when the job is submitted. If we take a File -> Kafka source > chain > > > as > > > > > > > example, there would most likely be a timestamp at which we > want to > > > > > > > transition from files to reading from Kafka. So we would > either set > > > > the > > > > > > > start position for Kafka based on that timestamp or provide the > > > > offsets > > > > > > > directly. (Note that I'm skipping a few related nuances here. > In > > > > order to > > > > > > > achieve an exact switch without duplication or gap, we may also > > > need > > > > some > > > > > > > overlap and filtering, but that's a separate issue.) > > > > > > > > > > > > > > The point is that the start positions can be configured by the > > > user, > > > > > > there > > > > > > > is no need to transfer any information from one source to > another > > > as > > > > part > > > > > > > of switching. > > > > > > > > > > > > > > It gets more complicated if we want to achieve a dynamic switch > > > > where the > > > > > > > transition timestamp isn't known when the job starts. For > example, > > > > > > consider > > > > > > > a bootstrap scenario where the time taken to process historic > data > > > > > > exceeds > > > > > > > the Kafka retention. Here, we would need to dynamically > resolve the > > > > Kafka > > > > > > > start position based on where the file readers left off, when > the > > > > > > switching > > > > > > > occurs. The file source enumerator would determine at runtime > when > > > > it is > > > > > > > done handing splits to its readers, maybe when the max file > > > timestamp > > > > > > > reaches (processing time - X). This information needs to be > > > > transferred > > > > > > to > > > > > > > the Kafka source. > > > > > > > > > > > > > > The timestamp would need to be derived from the file enumerator > > > > state, > > > > > > > either by looking at the last splits or explicitly. The > natural way > > > > to do > > > > > > > that is to introspect the enumerator state which gets > checkpointed. > > > > Any > > > > > > > other form of "end position" via a special interface would > need to > > > be > > > > > > > derived in the same manner. > > > > > > > > > > > > > > The converter that will be provided by the user would look at > the > > > > file > > > > > > > enumerator state, derive the timestamp and then supply the > "start > > > > > > position" > > > > > > > to the Kafka source. The Kafka source was created when the job > > > > started. > > > > > > It > > > > > > > needs to be augmented with the new start position. That can be > > > > achieved > > > > > > via > > > > > > > a special enumerator interface like > > > > > > SwitchableSplitEnumerator#setStartState > > > > > > > or by using restoreEnumerator with the checkpoint state > constructed > > > > by > > > > > > the > > > > > > > converter function. I'm leaning towards the latter as long as > there > > > > is a > > > > > > > convenient way to construct the state from a position (like > > > > > > > enumStateForTimestamp). The converter would map one enum state > to > > > > another > > > > > > > and can be made very simple by providing a few utility > functions > > > > instead > > > > > > of > > > > > > > mandating a new interface that enumerators need to implement to > > > > become > > > > > > > switchable. > > > > > > > > > > > > > > Again, a converter is only required when sources need to be > > > switched > > > > > > based > > > > > > > on positions not known at graph construction time. > > > > > > > > > > > > > > I'm planning to add such deferred switching to the PoC for > > > > illustration > > > > > > > and will share the experiment when that's done. > > > > > > > > > > > > > > Cheers, > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang < > programg...@163.com > > > > > > > > > > wrote: > > > > > > > > > > > > > >> Hi Kezhu, > > > > > > >> > > > > > > >> Thanks for your detailed points for the Hybrid Source. I > follow > > > your > > > > > > >> opinions and make a corresponding explanation as follows: > > > > > > >> > > > > > > >> 1.Would the Hybrid Source be possible to use this feature to > > > > > > switch/chain > > > > > > >> multiple homogeneous sources? > > > > > > >> > > > > > > >> "HybridSource" supports to switch/chain multiple homogeneous > > > > sources, > > > > > > >> which > > > > > > >> have the respective implementation for "SwitchableSource" and > > > > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit > whether > > > > the > > > > > > >> Sources consisted is homogeneous. From the user's perspective, > > > User > > > > only > > > > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the > > > > smooth > > > > > > >> migration operation to "HybridSource". > > > > > > >> > > > > > > >> 2."setStartState" is actually a reposition operation for next > > > > source to > > > > > > >> start in job runtime? > > > > > > >> > > > > > > >> IMO, "setStartState" is used to determine the initial > position of > > > > the > > > > > > new > > > > > > >> source for smooth migration, not reposition operation. More > > > > importantly, > > > > > > >> the > > > > > > >> "State" mentioned here refers to the start and end positions > of > > > > reading > > > > > > >> source. > > > > > > >> > > > > > > >> 3.This conversion should be implementation detail of next > source, > > > > not > > > > > > >> converter function in my opinion? > > > > > > >> > > > > > > >> The state conversion is of course an implementation detail and > > > > included > > > > > > in > > > > > > >> the switching mechanism, that should provide users with the > > > > conversion > > > > > > >> interface for conversion, which is defined in converter > function. > > > > What's > > > > > > >> more, when users has already implemented "SwitchableSource" > and > > > > added to > > > > > > >> the > > > > > > >> Hybrid Source, the users don't need to implement the > > > > "SwitchableSource" > > > > > > >> for > > > > > > >> the different conversion. From the user's perspective, users > could > > > > > > define > > > > > > >> the different converter functions and create the > > > "SwitchableSource" > > > > for > > > > > > >> the > > > > > > >> addition of "HybridSource", no need to implement a Source for > the > > > > > > >> converter > > > > > > >> function. > > > > > > >> > > > > > > >> 4.No configurable start-position. In this situation > combination of > > > > above > > > > > > >> three joints is a nop, and > > > > > > >> "HybridSource" is a chain of start-position pre-configured > > > sources? > > > > > > >> > > > > > > >> Indeed there is no configurable start-position, and this > > > > configuration > > > > > > >> could > > > > > > >> be involved in the feature. Users could use > > > > > > >> "SwitchableSplitEnumerator#setStartState" interface or the > > > > configuration > > > > > > >> parameters to configure start-position. > > > > > > >> > > > > > > >> 5.I am wonder whether end-position is a must and how it could > be > > > > useful > > > > > > >> for > > > > > > >> end users in a generic-enough source? > > > > > > >> > > > > > > >> "getEndState" interface is used for the smooth migration > scenario, > > > > which > > > > > > >> could return null value if it is not needed. In the Hybrid > Source > > > > > > >> mechanism, > > > > > > >> this interface is required for the switching between the > sources > > > > > > >> consisted, > > > > > > >> otherwise there is no any way to get end-position of upstream > > > > source. In > > > > > > >> summary, Hybrid Source needs to be able to set the start > position > > > > and > > > > > > get > > > > > > >> the end position of each Source, otherwise there is no use to > > > build > > > > > > Hybrid > > > > > > >> Source. > > > > > > >> > > > > > > >> 6.Is it possible for converter function to do blocking > operations? > > > > How > > > > > > to > > > > > > >> respond to checkpoint request when switching split enumerators > > > cross > > > > > > >> sources? Does end-position or start-position need to be > stored in > > > > > > >> checkpoint > > > > > > >> state or not? > > > > > > >> > > > > > > >> The converter function only simply converts the state of > upstream > > > > source > > > > > > >> to > > > > > > >> the state of downstream source, not blocking operations. The > way > > > to > > > > > > >> respond > > > > > > >> the checkpoint request when switching split enumerators cross > > > > sources is > > > > > > >> send the corresponding "SourceEvent" to coordination. The > > > > end-position > > > > > > or > > > > > > >> start-position don't need to be stored in checkpoint state, > only > > > > > > >> implements > > > > > > >> the "getEndState" interface for end-position. > > > > > > >> > > > > > > >> Best, > > > > > > >> Nicholas Jiang > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> -- > > > > > > >> Sent from: > > > > > > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > > > > >> > > > > > > > > > > > > > > > > > > > > >