Hi,

As mentioned in my previous email, I had been working on a prototype for
the hybrid source.

You can find it at https://github.com/tweise/flink/pull/1

It contains:
* Switching with configurable chain of sources
* Fixed or dynamic start positions
* Test with MockSource and FileSource

The purpose of the above PR is to gather feedback and help drive consensus
on the FLIP.

* How to support a dynamic start position within the source chain?

Relevant in those (few?) cases where start positions are not known upfront.
You can find an example of what that might look like in the tests:

https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132

When switching, the enumerator of the previous source needs to
supply information about consumed splits that allows to set the start
position for the next source. That could be something like the last
processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
doesn't track finished splits.)

See previous discussion regarding start/end position. The prototype shows
the use of checkpoint state with converter function.

* Should readers be deployed dynamically?

The prototype assumes a static source chain that is fixed at job submission
time. Conceivably there could be use cases that require more flexibility.
Such as switching one KafkaSource for another. A step in that direction
would be to deploy the actual readers dynamically, at the time of switching
source.

Looking forward to feedback and suggestions for next steps!

Thomas

On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <t...@apache.org> wrote:

> Hi Nicholas,
>
> Thanks for the reply. I had implemented a small PoC. It switches a
> configurable sequence of sources with predefined bounds. I'm using the
> unmodified MockSource for illustration. It does not require a "Switchable"
> interface. I looked at the code you shared and the delegation and signaling
> works quite similar. That's a good validation.
>
> Hi Kezhu,
>
> Thanks for bringing the more detailed discussion regarding the start/end
> position. I think in most cases the start and end positions will be known
> when the job is submitted. If we take a File -> Kafka source chain as
> example, there would most likely be a timestamp at which we want to
> transition from files to reading from Kafka. So we would either set the
> start position for Kafka based on that timestamp or provide the offsets
> directly. (Note that I'm skipping a few related nuances here. In order to
> achieve an exact switch without duplication or gap, we may also need some
> overlap and filtering, but that's a separate issue.)
>
> The point is that the start positions can be configured by the user, there
> is no need to transfer any information from one source to another as part
> of switching.
>
> It gets more complicated if we want to achieve a dynamic switch where the
> transition timestamp isn't known when the job starts. For example, consider
> a bootstrap scenario where the time taken to process historic data exceeds
> the Kafka retention. Here, we would need to dynamically resolve the Kafka
> start position based on where the file readers left off, when the switching
> occurs. The file source enumerator would determine at runtime when it is
> done handing splits to its readers, maybe when the max file timestamp
> reaches (processing time - X). This information needs to be transferred to
> the Kafka source.
>
> The timestamp would need to be derived from the file enumerator state,
> either by looking at the last splits or explicitly. The natural way to do
> that is to introspect the enumerator state which gets checkpointed. Any
> other form of "end position" via a special interface would need to be
> derived in the same manner.
>
> The converter that will be provided by the user would look at the file
> enumerator state, derive the timestamp and then supply the "start position"
> to the Kafka source. The Kafka source was created when the job started. It
> needs to be augmented with the new start position. That can be achieved via
> a special enumerator interface like SwitchableSplitEnumerator#setStartState
> or by using restoreEnumerator with the checkpoint state constructed by the
> converter function. I'm leaning towards the latter as long as there is a
> convenient way to construct the state from a position (like
> enumStateForTimestamp). The converter would map one enum state to another
> and can be made very simple by providing a few utility functions instead of
> mandating a new interface that enumerators need to implement to become
> switchable.
>
> Again, a converter is only required when sources need to be switched based
> on positions not known at graph construction time.
>
> I'm planning to add such deferred switching to the PoC for illustration
> and will share the experiment when that's done.
>
> Cheers,
> Thomas
>
>
> On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <programg...@163.com> wrote:
>
>> Hi Kezhu,
>>
>> Thanks for your detailed points for the Hybrid Source. I follow your
>> opinions and make a corresponding explanation as follows:
>>
>> 1.Would the Hybrid Source be possible to use this feature to switch/chain
>> multiple homogeneous sources?
>>
>> "HybridSource" supports to switch/chain multiple homogeneous sources,
>> which
>> have the respective implementation for "SwitchableSource" and
>> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
>> Sources consisted is homogeneous. From the user's perspective, User only
>> adds the "SwitchableSource" into "HybridSource" and leaves the smooth
>> migration operation to "HybridSource".
>>
>> 2."setStartState" is actually a reposition operation for next source to
>> start in job runtime?
>>
>> IMO, "setStartState" is used to determine the initial position of the new
>> source for smooth migration, not reposition operation. More importantly,
>> the
>> "State" mentioned here refers to the start and end positions of reading
>> source.
>>
>> 3.This conversion should be implementation detail of next source, not
>> converter function in my opinion?
>>
>> The state conversion is of course an implementation detail and included in
>> the switching mechanism, that should provide users with the conversion
>> interface for conversion, which is defined in converter function. What's
>> more, when users has already implemented "SwitchableSource" and added to
>> the
>> Hybrid Source, the users don't need to implement the "SwitchableSource"
>> for
>> the different conversion. From the user's perspective, users could define
>> the different converter functions and create the "SwitchableSource" for
>> the
>> addition of "HybridSource", no need to implement a Source for the
>> converter
>> function.
>>
>> 4.No configurable start-position. In this situation combination of above
>> three joints is a nop, and
>> "HybridSource" is a chain of start-position pre-configured sources?
>>
>> Indeed there is no configurable start-position, and this configuration
>> could
>> be involved in the feature. Users could use
>> "SwitchableSplitEnumerator#setStartState" interface or the configuration
>> parameters to configure start-position.
>>
>> 5.I am wonder whether end-position is a must and how it could be useful
>> for
>> end users in a generic-enough source?
>>
>> "getEndState" interface is used for the smooth migration scenario, which
>> could return null value if it is not needed. In the Hybrid Source
>> mechanism,
>> this interface is required for the switching between the sources
>> consisted,
>> otherwise there is no any way to get end-position of upstream source. In
>> summary, Hybrid Source needs to be able to set the start position and get
>> the end position of each Source, otherwise there is no use to build Hybrid
>> Source.
>>
>> 6.Is it possible for converter function to do blocking operations? How to
>> respond to checkpoint request when switching split enumerators cross
>> sources? Does end-position or start-position need to be stored in
>> checkpoint
>> state or not?
>>
>> The converter function only simply converts the state of upstream source
>> to
>> the state of downstream source, not blocking operations. The way to
>> respond
>> the checkpoint request when switching split enumerators cross sources is
>> send the corresponding "SourceEvent" to coordination. The end-position or
>> start-position don't need to be stored in checkpoint state, only
>> implements
>> the "getEndState" interface for end-position.
>>
>> Best,
>> Nicholas Jiang
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>>
>

Reply via email to