Thanks for the clarification, Thomas. Yes, that makes sense to me.

Cheers,

Jiangjie (Becket) Qin

On Mon, Apr 26, 2021 at 1:03 AM Thomas Weise <t...@apache.org> wrote:

> Hi Becket,
>
> I agree and am not planning to hard wire a specific combination of
> sources (like S3 + Kafka). That also wouldn't help for the use case I
> want to address, because there are customized connectors that we need
> to be able to plug in.
>
> Rather, the suggested simplification would be for the flexibility of
> switching mechanism.
>
> The prototype already supports fixed start positions and checkpoint
> conversion for any combination of sources; no need to undo that.
>
> But for testing/example purposes, we will need to settle on a specific
> combination.
>
> Thomas
>
> On Sat, Apr 24, 2021 at 8:20 PM Becket Qin <becket....@gmail.com> wrote:
> >
> > Sorry for the late reply. Starting from a specific connector sounds
> > reasonable to me.
> >
> > That said, I would suggest to keep the possibility of future
> generalization
> > as much as possible. We have already seen some variation of source
> > combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL
> > Binlog, etc. So it would be good if we can reuse some base abstraction in
> > the future instead of having to write each combination from scratch.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <se...@apache.org> wrote:
> >
> > > Thanks, Thomas!
> > >
> > > @Becket and @Nicholas - would you be ok with that approach?
> > >
> > >
> > > On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <t...@apache.org> wrote:
> > >
> > > > Hi Stephan,
> > > >
> > > > Thanks for the feedback!
> > > >
> > > > I agree with the approach of starting with a simple implementation
> > > > that can address a well understood, significant portion of use cases.
> > > >
> > > > I'm planning to continue work on the prototype that I had shared.
> > > > There is production level usage waiting for it fairly soon. I expect
> > > > to open a PR in the coming weeks.
> > > >
> > > > Thomas
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <se...@apache.org>
> wrote:
> > > > >
> > > > > Thanks all for this discussion. Looks like there are lots of ideas
> and
> > > > > folks that are eager to do things, so let's see how we can get this
> > > > moving.
> > > > >
> > > > > My take on this is the following:
> > > > >
> > > > > There will probably not be one Hybrid source, but possibly multiple
> > > ones,
> > > > > because of different strategies/requirements.
> > > > >     - One may be very simple, with switching points known up-front.
> > > Would
> > > > > be good to have this in a very simple implementation.
> > > > >     - There may be one where the switch is dynamic and the readers
> need
> > > > to
> > > > > report back where they left off.
> > > > >     - There may be one that switches back and forth multiple times
> > > > during a
> > > > > job, for example Kakfa going to DFS whenever it falls behind
> retention,
> > > > in
> > > > > order to catch up again.
> > > > >
> > > > > This also seems hard to "design on paper"; I expect there are
> nuances
> > > in
> > > > a
> > > > > production setup that affect some details of the design. So I'd
> feel
> > > most
> > > > > comfortable in adding a variant of the hybrid source to Flink that
> has
> > > > been
> > > > > used already in a real use case (not necessarily in production, but
> > > maybe
> > > > > in a testing/staging environment, so it seems to meet all
> > > requirements).
> > > > >
> > > > >
> > > > > What do you think about the following approach?
> > > > >   - If there is a tested PoC, let's try to get it contributed to
> Flink
> > > > > without trying to make it much more general.
> > > > >   - When we see similar but a bit different requirements for
> another
> > > > hybrid
> > > > > source, then let's try to evolve the contributed one.
> > > > >   - If we see new requirements that are so different that they
> don't
> > > fit
> > > > > well with the existing hybrid source, then let us look at building
> a
> > > > second
> > > > > hybrid source for those requirements.
> > > > >
> > > > > We need to make connector contributions in general more easy, and I
> > > think
> > > > > it is not a bad thing to end up with different approaches and see
> how
> > > > these
> > > > > play out against each other when being used by users. For example
> > > > switching
> > > > > with known boundaries, dynamic switching, back-and-forth-switching,
> > > etc.
> > > > > (I know some committers are planning to do some work on making
> > > > > connector contributions easier, with standardized testing
> frameworks,
> > > > > decoupled CI, etc.)
> > > > >
> > > > > Best,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <t...@apache.org>
> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > As mentioned in my previous email, I had been working on a
> prototype
> > > > for
> > > > > > the hybrid source.
> > > > > >
> > > > > > You can find it at https://github.com/tweise/flink/pull/1
> > > > > >
> > > > > > It contains:
> > > > > > * Switching with configurable chain of sources
> > > > > > * Fixed or dynamic start positions
> > > > > > * Test with MockSource and FileSource
> > > > > >
> > > > > > The purpose of the above PR is to gather feedback and help drive
> > > > consensus
> > > > > > on the FLIP.
> > > > > >
> > > > > > * How to support a dynamic start position within the source
> chain?
> > > > > >
> > > > > > Relevant in those (few?) cases where start positions are not
> known
> > > > upfront.
> > > > > > You can find an example of what that might look like in the
> tests:
> > > > > >
> > > > > >
> > > > > >
> > > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > > > > >
> > > > > >
> > > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > > > > >
> > > > > > When switching, the enumerator of the previous source needs to
> > > > > > supply information about consumed splits that allows to set the
> start
> > > > > > position for the next source. That could be something like the
> last
> > > > > > processed file, timestamp, etc. (Currently
> StaticFileSplitEnumerator
> > > > > > doesn't track finished splits.)
> > > > > >
> > > > > > See previous discussion regarding start/end position. The
> prototype
> > > > shows
> > > > > > the use of checkpoint state with converter function.
> > > > > >
> > > > > > * Should readers be deployed dynamically?
> > > > > >
> > > > > > The prototype assumes a static source chain that is fixed at job
> > > > submission
> > > > > > time. Conceivably there could be use cases that require more
> > > > flexibility.
> > > > > > Such as switching one KafkaSource for another. A step in that
> > > direction
> > > > > > would be to deploy the actual readers dynamically, at the time of
> > > > switching
> > > > > > source.
> > > > > >
> > > > > > Looking forward to feedback and suggestions for next steps!
> > > > > >
> > > > > > Thomas
> > > > > >
> > > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <t...@apache.org>
> > > wrote:
> > > > > >
> > > > > > > Hi Nicholas,
> > > > > > >
> > > > > > > Thanks for the reply. I had implemented a small PoC. It
> switches a
> > > > > > > configurable sequence of sources with predefined bounds. I'm
> using
> > > > the
> > > > > > > unmodified MockSource for illustration. It does not require a
> > > > > > "Switchable"
> > > > > > > interface. I looked at the code you shared and the delegation
> and
> > > > > > signaling
> > > > > > > works quite similar. That's a good validation.
> > > > > > >
> > > > > > > Hi Kezhu,
> > > > > > >
> > > > > > > Thanks for bringing the more detailed discussion regarding the
> > > > start/end
> > > > > > > position. I think in most cases the start and end positions
> will be
> > > > known
> > > > > > > when the job is submitted. If we take a File -> Kafka source
> chain
> > > as
> > > > > > > example, there would most likely be a timestamp at which we
> want to
> > > > > > > transition from files to reading from Kafka. So we would
> either set
> > > > the
> > > > > > > start position for Kafka based on that timestamp or provide the
> > > > offsets
> > > > > > > directly. (Note that I'm skipping a few related nuances here.
> In
> > > > order to
> > > > > > > achieve an exact switch without duplication or gap, we may also
> > > need
> > > > some
> > > > > > > overlap and filtering, but that's a separate issue.)
> > > > > > >
> > > > > > > The point is that the start positions can be configured by the
> > > user,
> > > > > > there
> > > > > > > is no need to transfer any information from one source to
> another
> > > as
> > > > part
> > > > > > > of switching.
> > > > > > >
> > > > > > > It gets more complicated if we want to achieve a dynamic switch
> > > > where the
> > > > > > > transition timestamp isn't known when the job starts. For
> example,
> > > > > > consider
> > > > > > > a bootstrap scenario where the time taken to process historic
> data
> > > > > > exceeds
> > > > > > > the Kafka retention. Here, we would need to dynamically
> resolve the
> > > > Kafka
> > > > > > > start position based on where the file readers left off, when
> the
> > > > > > switching
> > > > > > > occurs. The file source enumerator would determine at runtime
> when
> > > > it is
> > > > > > > done handing splits to its readers, maybe when the max file
> > > timestamp
> > > > > > > reaches (processing time - X). This information needs to be
> > > > transferred
> > > > > > to
> > > > > > > the Kafka source.
> > > > > > >
> > > > > > > The timestamp would need to be derived from the file enumerator
> > > > state,
> > > > > > > either by looking at the last splits or explicitly. The
> natural way
> > > > to do
> > > > > > > that is to introspect the enumerator state which gets
> checkpointed.
> > > > Any
> > > > > > > other form of "end position" via a special interface would
> need to
> > > be
> > > > > > > derived in the same manner.
> > > > > > >
> > > > > > > The converter that will be provided by the user would look at
> the
> > > > file
> > > > > > > enumerator state, derive the timestamp and then supply the
> "start
> > > > > > position"
> > > > > > > to the Kafka source. The Kafka source was created when the job
> > > > started.
> > > > > > It
> > > > > > > needs to be augmented with the new start position. That can be
> > > > achieved
> > > > > > via
> > > > > > > a special enumerator interface like
> > > > > > SwitchableSplitEnumerator#setStartState
> > > > > > > or by using restoreEnumerator with the checkpoint state
> constructed
> > > > by
> > > > > > the
> > > > > > > converter function. I'm leaning towards the latter as long as
> there
> > > > is a
> > > > > > > convenient way to construct the state from a position (like
> > > > > > > enumStateForTimestamp). The converter would map one enum state
> to
> > > > another
> > > > > > > and can be made very simple by providing a few utility
> functions
> > > > instead
> > > > > > of
> > > > > > > mandating a new interface that enumerators need to implement to
> > > > become
> > > > > > > switchable.
> > > > > > >
> > > > > > > Again, a converter is only required when sources need to be
> > > switched
> > > > > > based
> > > > > > > on positions not known at graph construction time.
> > > > > > >
> > > > > > > I'm planning to add such deferred switching to the PoC for
> > > > illustration
> > > > > > > and will share the experiment when that's done.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Thomas
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <
> programg...@163.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Kezhu,
> > > > > > >>
> > > > > > >> Thanks for your detailed points for the Hybrid Source. I
> follow
> > > your
> > > > > > >> opinions and make a corresponding explanation as follows:
> > > > > > >>
> > > > > > >> 1.Would the Hybrid Source be possible to use this feature to
> > > > > > switch/chain
> > > > > > >> multiple homogeneous sources?
> > > > > > >>
> > > > > > >> "HybridSource" supports to switch/chain multiple homogeneous
> > > > sources,
> > > > > > >> which
> > > > > > >> have the respective implementation for "SwitchableSource" and
> > > > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit
> whether
> > > > the
> > > > > > >> Sources consisted is homogeneous. From the user's perspective,
> > > User
> > > > only
> > > > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the
> > > > smooth
> > > > > > >> migration operation to "HybridSource".
> > > > > > >>
> > > > > > >> 2."setStartState" is actually a reposition operation for next
> > > > source to
> > > > > > >> start in job runtime?
> > > > > > >>
> > > > > > >> IMO, "setStartState" is used to determine the initial
> position of
> > > > the
> > > > > > new
> > > > > > >> source for smooth migration, not reposition operation. More
> > > > importantly,
> > > > > > >> the
> > > > > > >> "State" mentioned here refers to the start and end positions
> of
> > > > reading
> > > > > > >> source.
> > > > > > >>
> > > > > > >> 3.This conversion should be implementation detail of next
> source,
> > > > not
> > > > > > >> converter function in my opinion?
> > > > > > >>
> > > > > > >> The state conversion is of course an implementation detail and
> > > > included
> > > > > > in
> > > > > > >> the switching mechanism, that should provide users with the
> > > > conversion
> > > > > > >> interface for conversion, which is defined in converter
> function.
> > > > What's
> > > > > > >> more, when users has already implemented "SwitchableSource"
> and
> > > > added to
> > > > > > >> the
> > > > > > >> Hybrid Source, the users don't need to implement the
> > > > "SwitchableSource"
> > > > > > >> for
> > > > > > >> the different conversion. From the user's perspective, users
> could
> > > > > > define
> > > > > > >> the different converter functions and create the
> > > "SwitchableSource"
> > > > for
> > > > > > >> the
> > > > > > >> addition of "HybridSource", no need to implement a Source for
> the
> > > > > > >> converter
> > > > > > >> function.
> > > > > > >>
> > > > > > >> 4.No configurable start-position. In this situation
> combination of
> > > > above
> > > > > > >> three joints is a nop, and
> > > > > > >> "HybridSource" is a chain of start-position pre-configured
> > > sources?
> > > > > > >>
> > > > > > >> Indeed there is no configurable start-position, and this
> > > > configuration
> > > > > > >> could
> > > > > > >> be involved in the feature. Users could use
> > > > > > >> "SwitchableSplitEnumerator#setStartState" interface or the
> > > > configuration
> > > > > > >> parameters to configure start-position.
> > > > > > >>
> > > > > > >> 5.I am wonder whether end-position is a must and how it could
> be
> > > > useful
> > > > > > >> for
> > > > > > >> end users in a generic-enough source?
> > > > > > >>
> > > > > > >> "getEndState" interface is used for the smooth migration
> scenario,
> > > > which
> > > > > > >> could return null value if it is not needed. In the Hybrid
> Source
> > > > > > >> mechanism,
> > > > > > >> this interface is required for the switching between the
> sources
> > > > > > >> consisted,
> > > > > > >> otherwise there is no any way to get end-position of upstream
> > > > source. In
> > > > > > >> summary, Hybrid Source needs to be able to set the start
> position
> > > > and
> > > > > > get
> > > > > > >> the end position of each Source, otherwise there is no use to
> > > build
> > > > > > Hybrid
> > > > > > >> Source.
> > > > > > >>
> > > > > > >> 6.Is it possible for converter function to do blocking
> operations?
> > > > How
> > > > > > to
> > > > > > >> respond to checkpoint request when switching split enumerators
> > > cross
> > > > > > >> sources? Does end-position or start-position need to be
> stored in
> > > > > > >> checkpoint
> > > > > > >> state or not?
> > > > > > >>
> > > > > > >> The converter function only simply converts the state of
> upstream
> > > > source
> > > > > > >> to
> > > > > > >> the state of downstream source, not blocking operations. The
> way
> > > to
> > > > > > >> respond
> > > > > > >> the checkpoint request when switching split enumerators cross
> > > > sources is
> > > > > > >> send the corresponding "SourceEvent" to coordination. The
> > > > end-position
> > > > > > or
> > > > > > >> start-position don't need to be stored in checkpoint state,
> only
> > > > > > >> implements
> > > > > > >> the "getEndState" interface for end-position.
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Nicholas Jiang
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> --
> > > > > > >> Sent from:
> > > > > > >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > > > >>
> > > > > > >
> > > > > >
> > > >
> > >
>

Reply via email to