Hi Stephan,

Thanks for the feedback!

I agree with the approach of starting with a simple implementation
that can address a well understood, significant portion of use cases.

I'm planning to continue work on the prototype that I had shared.
There is production level usage waiting for it fairly soon. I expect
to open a PR in the coming weeks.

Thomas





On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <se...@apache.org> wrote:
>
> Thanks all for this discussion. Looks like there are lots of ideas and
> folks that are eager to do things, so let's see how we can get this moving.
>
> My take on this is the following:
>
> There will probably not be one Hybrid source, but possibly multiple ones,
> because of different strategies/requirements.
>     - One may be very simple, with switching points known up-front. Would
> be good to have this in a very simple implementation.
>     - There may be one where the switch is dynamic and the readers need to
> report back where they left off.
>     - There may be one that switches back and forth multiple times during a
> job, for example Kakfa going to DFS whenever it falls behind retention, in
> order to catch up again.
>
> This also seems hard to "design on paper"; I expect there are nuances in a
> production setup that affect some details of the design. So I'd feel most
> comfortable in adding a variant of the hybrid source to Flink that has been
> used already in a real use case (not necessarily in production, but maybe
> in a testing/staging environment, so it seems to meet all requirements).
>
>
> What do you think about the following approach?
>   - If there is a tested PoC, let's try to get it contributed to Flink
> without trying to make it much more general.
>   - When we see similar but a bit different requirements for another hybrid
> source, then let's try to evolve the contributed one.
>   - If we see new requirements that are so different that they don't fit
> well with the existing hybrid source, then let us look at building a second
> hybrid source for those requirements.
>
> We need to make connector contributions in general more easy, and I think
> it is not a bad thing to end up with different approaches and see how these
> play out against each other when being used by users. For example switching
> with known boundaries, dynamic switching, back-and-forth-switching, etc.
> (I know some committers are planning to do some work on making
> connector contributions easier, with standardized testing frameworks,
> decoupled CI, etc.)
>
> Best,
> Stephan
>
>
> On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <t...@apache.org> wrote:
>
> > Hi,
> >
> > As mentioned in my previous email, I had been working on a prototype for
> > the hybrid source.
> >
> > You can find it at https://github.com/tweise/flink/pull/1
> >
> > It contains:
> > * Switching with configurable chain of sources
> > * Fixed or dynamic start positions
> > * Test with MockSource and FileSource
> >
> > The purpose of the above PR is to gather feedback and help drive consensus
> > on the FLIP.
> >
> > * How to support a dynamic start position within the source chain?
> >
> > Relevant in those (few?) cases where start positions are not known upfront.
> > You can find an example of what that might look like in the tests:
> >
> >
> > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> >
> > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> >
> > When switching, the enumerator of the previous source needs to
> > supply information about consumed splits that allows to set the start
> > position for the next source. That could be something like the last
> > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > doesn't track finished splits.)
> >
> > See previous discussion regarding start/end position. The prototype shows
> > the use of checkpoint state with converter function.
> >
> > * Should readers be deployed dynamically?
> >
> > The prototype assumes a static source chain that is fixed at job submission
> > time. Conceivably there could be use cases that require more flexibility.
> > Such as switching one KafkaSource for another. A step in that direction
> > would be to deploy the actual readers dynamically, at the time of switching
> > source.
> >
> > Looking forward to feedback and suggestions for next steps!
> >
> > Thomas
> >
> > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <t...@apache.org> wrote:
> >
> > > Hi Nicholas,
> > >
> > > Thanks for the reply. I had implemented a small PoC. It switches a
> > > configurable sequence of sources with predefined bounds. I'm using the
> > > unmodified MockSource for illustration. It does not require a
> > "Switchable"
> > > interface. I looked at the code you shared and the delegation and
> > signaling
> > > works quite similar. That's a good validation.
> > >
> > > Hi Kezhu,
> > >
> > > Thanks for bringing the more detailed discussion regarding the start/end
> > > position. I think in most cases the start and end positions will be known
> > > when the job is submitted. If we take a File -> Kafka source chain as
> > > example, there would most likely be a timestamp at which we want to
> > > transition from files to reading from Kafka. So we would either set the
> > > start position for Kafka based on that timestamp or provide the offsets
> > > directly. (Note that I'm skipping a few related nuances here. In order to
> > > achieve an exact switch without duplication or gap, we may also need some
> > > overlap and filtering, but that's a separate issue.)
> > >
> > > The point is that the start positions can be configured by the user,
> > there
> > > is no need to transfer any information from one source to another as part
> > > of switching.
> > >
> > > It gets more complicated if we want to achieve a dynamic switch where the
> > > transition timestamp isn't known when the job starts. For example,
> > consider
> > > a bootstrap scenario where the time taken to process historic data
> > exceeds
> > > the Kafka retention. Here, we would need to dynamically resolve the Kafka
> > > start position based on where the file readers left off, when the
> > switching
> > > occurs. The file source enumerator would determine at runtime when it is
> > > done handing splits to its readers, maybe when the max file timestamp
> > > reaches (processing time - X). This information needs to be transferred
> > to
> > > the Kafka source.
> > >
> > > The timestamp would need to be derived from the file enumerator state,
> > > either by looking at the last splits or explicitly. The natural way to do
> > > that is to introspect the enumerator state which gets checkpointed. Any
> > > other form of "end position" via a special interface would need to be
> > > derived in the same manner.
> > >
> > > The converter that will be provided by the user would look at the file
> > > enumerator state, derive the timestamp and then supply the "start
> > position"
> > > to the Kafka source. The Kafka source was created when the job started.
> > It
> > > needs to be augmented with the new start position. That can be achieved
> > via
> > > a special enumerator interface like
> > SwitchableSplitEnumerator#setStartState
> > > or by using restoreEnumerator with the checkpoint state constructed by
> > the
> > > converter function. I'm leaning towards the latter as long as there is a
> > > convenient way to construct the state from a position (like
> > > enumStateForTimestamp). The converter would map one enum state to another
> > > and can be made very simple by providing a few utility functions instead
> > of
> > > mandating a new interface that enumerators need to implement to become
> > > switchable.
> > >
> > > Again, a converter is only required when sources need to be switched
> > based
> > > on positions not known at graph construction time.
> > >
> > > I'm planning to add such deferred switching to the PoC for illustration
> > > and will share the experiment when that's done.
> > >
> > > Cheers,
> > > Thomas
> > >
> > >
> > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <programg...@163.com>
> > wrote:
> > >
> > >> Hi Kezhu,
> > >>
> > >> Thanks for your detailed points for the Hybrid Source. I follow your
> > >> opinions and make a corresponding explanation as follows:
> > >>
> > >> 1.Would the Hybrid Source be possible to use this feature to
> > switch/chain
> > >> multiple homogeneous sources?
> > >>
> > >> "HybridSource" supports to switch/chain multiple homogeneous sources,
> > >> which
> > >> have the respective implementation for "SwitchableSource" and
> > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
> > >> Sources consisted is homogeneous. From the user's perspective, User only
> > >> adds the "SwitchableSource" into "HybridSource" and leaves the smooth
> > >> migration operation to "HybridSource".
> > >>
> > >> 2."setStartState" is actually a reposition operation for next source to
> > >> start in job runtime?
> > >>
> > >> IMO, "setStartState" is used to determine the initial position of the
> > new
> > >> source for smooth migration, not reposition operation. More importantly,
> > >> the
> > >> "State" mentioned here refers to the start and end positions of reading
> > >> source.
> > >>
> > >> 3.This conversion should be implementation detail of next source, not
> > >> converter function in my opinion?
> > >>
> > >> The state conversion is of course an implementation detail and included
> > in
> > >> the switching mechanism, that should provide users with the conversion
> > >> interface for conversion, which is defined in converter function. What's
> > >> more, when users has already implemented "SwitchableSource" and added to
> > >> the
> > >> Hybrid Source, the users don't need to implement the "SwitchableSource"
> > >> for
> > >> the different conversion. From the user's perspective, users could
> > define
> > >> the different converter functions and create the "SwitchableSource" for
> > >> the
> > >> addition of "HybridSource", no need to implement a Source for the
> > >> converter
> > >> function.
> > >>
> > >> 4.No configurable start-position. In this situation combination of above
> > >> three joints is a nop, and
> > >> "HybridSource" is a chain of start-position pre-configured sources?
> > >>
> > >> Indeed there is no configurable start-position, and this configuration
> > >> could
> > >> be involved in the feature. Users could use
> > >> "SwitchableSplitEnumerator#setStartState" interface or the configuration
> > >> parameters to configure start-position.
> > >>
> > >> 5.I am wonder whether end-position is a must and how it could be useful
> > >> for
> > >> end users in a generic-enough source?
> > >>
> > >> "getEndState" interface is used for the smooth migration scenario, which
> > >> could return null value if it is not needed. In the Hybrid Source
> > >> mechanism,
> > >> this interface is required for the switching between the sources
> > >> consisted,
> > >> otherwise there is no any way to get end-position of upstream source. In
> > >> summary, Hybrid Source needs to be able to set the start position and
> > get
> > >> the end position of each Source, otherwise there is no use to build
> > Hybrid
> > >> Source.
> > >>
> > >> 6.Is it possible for converter function to do blocking operations? How
> > to
> > >> respond to checkpoint request when switching split enumerators cross
> > >> sources? Does end-position or start-position need to be stored in
> > >> checkpoint
> > >> state or not?
> > >>
> > >> The converter function only simply converts the state of upstream source
> > >> to
> > >> the state of downstream source, not blocking operations. The way to
> > >> respond
> > >> the checkpoint request when switching split enumerators cross sources is
> > >> send the corresponding "SourceEvent" to coordination. The end-position
> > or
> > >> start-position don't need to be stored in checkpoint state, only
> > >> implements
> > >> the "getEndState" interface for end-position.
> > >>
> > >> Best,
> > >> Nicholas Jiang
> > >>
> > >>
> > >>
> > >> --
> > >> Sent from:
> > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > >>
> > >
> >

Reply via email to