Hi, all. i have updated flip-278[1]. I think all problems or comments has
been addressed.

1.about option prefix, we use identifiers.
2.table api implementation and demo
3.about switched dynamic position (hybrid source use it auto switch from
previous to next source)

More details can be found at draft pr[2], it works well.


> a mistake,
> childSources.get(sourceIndex).setStartTimetamp(switchedTimestamp);
>> Hi, John. thanks for your comments.
>> About question-2 the "handoff" is using for switching next source
>> seamlessly. but it's an option. Not every hybrid source job need to using
>> this mode.
>> The hybrid source sql or table need to implement two ways like DataStream
>> api below.  One for fixed position, user can specify the earliest, latest
>> or specific-offsets etc.
>> And the second way is that user can also specify the timestamp to let
>> second source using timestamp to consume the kafka data (no need to specify
>> earliest, latest or specific-offsets, flink do this conversion).
>>  * <p>A simple example with FileSource and KafkaSource with fixed Kafka
>> start position:
>>  * <pre>{@code
>>  * FileSource<String> fileSource =
>>  *   FileSource.forRecordStreamFormat(new TextLineInputFormat(),
>> Path.fromLocalFile(testDir)).build();
>>  * KafkaSource<String> kafkaSource =
>>  *           KafkaSource.<String>builder()
>>  *                   .setBootstrapServers("localhost:9092")
>>  *                   .setGroupId("MyGroup")
>>  *                   .setTopics(Arrays.asList("quickstart-events"))
>>  *                   .setDeserializer(
>>  *
>> KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>  *                   .setStartingOffsets(OffsetsInitializer.earliest())
>>  *                   .build();
>>  * HybridSource<String> hybridSource =
>>  *           HybridSource.builder(fileSource)
>>  *                   .addSource(kafkaSource)
>>  *                   .build();
>>  * }</pre>
>>  *
>>  * <p>A more complex example with Kafka start position derived from
>> previous source:
>>  *
>>  * <pre>{@code
>>  * HybridSource<String> hybridSource =
>>  *     HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
>>  *         .addSource(
>>  *             switchContext -> {
>>  *               StaticFileSplitEnumerator previousEnumerator =
>>  *                   switchContext.getPreviousEnumerator();
>>  *               // how to get timestamp depends on specific enumerator
>>  *               long timestamp = previousEnumerator.getEndTimestamp();
>>  *               OffsetsInitializer offsets =
>>  *                   OffsetsInitializer.timestamp(timestamp);
>>  *               KafkaSource<String> kafkaSource =
>>  *                   KafkaSource.<String>builder()
>>  *                       .setBootstrapServers("localhost:9092")
>>  *                       .setGroupId("MyGroup")
>>  *                       .setTopics(Arrays.asList("quickstart-events"))
>>  *                       .setDeserializer(
>>  *
>> KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>  *                       .setStartingOffsets(offsets)
>>  *                       .build();
>>  *               return kafkaSource;
>>  *             },
>>  *             Boundedness.CONTINUOUS_UNBOUNDED)
>>  *         .build();
>>  * }
>> Currently flink SplitEnumerator interface not expose the
>> getEndTimestamp().  I think if we want to implement the "handoff" way we
>> need to let SplitEnumerator expose this method.
>> Then the question is if we get the previous endTimestamp, how to set it
>> back?  we can't build KafkaSource instance because hybrid is a common
>> implementation.
>> I think we need add a method for example startTimestamp() in new Source.
>> then we can implement this:
>> Switched-start-position demo:
>> HybridSource.HybridSourceBuilder<RowData, SplitEnumerator> builder =
>>                     HybridSource.builder(childSources.get(0));
>>             for (int i = 1; i < childSources.size(); i++) {
>>                 final int sourceIndex = i;
>>                 Boundedness boundedness =
>> childSources.get(sourceIndex).getBoundedness();
>>                 builder.addSource(
>>                         switchContext -> {
>>                             SplitEnumerator previousEnumerator =
>> switchContext.getPreviousEnumerator();
>>                             // how to pass to kafka or other connector ?
>> We add a method in new
>>                             // source api like startTimestamp();
>>                             long switchedTimestamp =
>> previousEnumerator.getEndTimestamp();
>>                             childSources.setStartTimestamp(
>> switchedTimestamp);
>>                             return childSources.get(sourceIndex);
>>                         },
>>                         boundedness);
>>             }
>>             hybridSource = builder.build();
>> e.g. if kafka is end source. then kafka use this switchedTimestamp to
>> initialize the OffsetsInitializer and consume from this timestamp.
>> The last question whether this source support chaining together more
>> than two sources? absolutely yes. we support more than two sources like
>> DataStream API.
>> I have added ddl example in the flip.
>>> Hello all,
>>> Thanks for the FLIP, Ran!
>>> The HybridSource is a really cool feature, and I was glad to see a
>>> proposal to expose it in the Table and SQL APIs.
>>> My main question is also about the switching control (question 2). It
>>> seems like the existing Kafka connector has all the options you'd want to
>>> define the switching point[1], and the issue is only how to specify a
>>> "handoff" from one source to the next. It seems like you could propose to
>>> add a reference to an extracted field or property of the first source to be
>>> used in the second one.
>>> However, the more I think about it, the more I wonder whether a
>>> "handoff" operation ought to be necessary. For example, the use case I have
>>> in mind is to bootstrap the table using a snapshot of the data and then
>>> have it seamlessly switch over to consuming all the records since that
>>> snapshot. In order to support this use case with no loss or duplicates,
>>> timestamp isn't sufficient; I'd need to know the exact vector of offsets
>>> represented in that snapshot. Then again, if I control the snapshotting
>>> process, this should be trivial to compute and store next to the snapshots.
>>> Further, when I register the table, I ought to know which exact snapshot
>>> I'm selecting, and therefore can just populate the `specific-offsets` as
>>> desired. Backing off to timestamp, if I again am naming a path to a
>>> specific snapshot of the data, it seems like I have enough information
>>> already to also specify the correct `timestamp` option.
>>> With this in mind, my question is whether it's necessary to specify some
>>> kind of dynamic property, like the DataStream API does[2]. If a fixed
>>> property is sufficient, it seems like the current proposal is actually
>>> sufficient as well. I think I just don't see the use case for dynamic
>>> configuration here.
>>> Side question, out of curiosity: would this source support chaining
>>> together more than two sources? It seems like the proposed syntax would
>>> allow it. It seems like some snapshot-rollup strategies could benefit from
>>> it (eg if you want to combine your 2021 yearly rollup with your Jan-Nov
>>> monthly rollups, then you first two weekly rollups from Dec, and finally
>>> switch over to live data from Kafka or something).
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time
>>> Thanks again,
>>> -John
>>> > Hi, Martijn, thanks for your comments.
>>> >
>>> > Using identifier as child source prefix may be a good way instead of
>>> index.
>>> > i will update the flip to illustrate how we can read from hybrid
>>> schema to
>>> > generate child schemas for the question1.
>>> >
>>> > question2 is start position for the next kafka source.  But currently
>>> we
>>> > can not get the end timestamp for the first bounded source.  In the
>>> > datastream api end timestamp can be found from previous enumerator. We
>>> need
>>> > to offer bounded source(e.g. filesystem) end timestamp support.
>>> > if we can get end timestamp then kafka will start from this offset. I
>>> think
>>> > here we need a option, allow user to start next kafka source from
>>> previous
>>> > one automatically or from user custom start offset (by using with
>>> option in
>>> > sql ddl).  Not every second source need binding will previous one, for
>>> > example, the next source is already a file, then it not need a start
>>> > position.
>>> >
>>> > question3 about table api, i haven't added to flip yet. I will try to
>>> fix
>>> > some current  issues and update the flip and add  more details.
>>> Thanks for
>>> > your comments.
>>> >
>>> >
>>> >
>>> >> Hi Ran,
>>> >>
>>> >> For completeness, this is a new thread that was already previously
>>> started
>>> >> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq.
>>> I'm
>>> >> linking them because I think Timo's comments are relevant to be kept
>>> with
>>> >> this discussion thread.
>>> >>
>>> >> I agree with Timo's comments from there that having an index key
>>> isn't the
>>> >> best option, I would rather have an identifier.
>>> >>
>>> >> I do wonder how this would work when you want to specify sources from
>>> a
>>> >> catalog: could you elaborate on that?
>>> >>
>>> >> What I'm also missing in the FLIP is an example of how to specify the
>>> >> starting offset from Kafka. In the DataStream API, there
>>> >> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
>>> >> specify that in the SQL landscape?
>>> >>
>>> >> Last but not least: your examples are all SQL only. How do you
>>> propose that
>>> >> this works in the Table API?
>>> >>
>>> >> Best regards,
>>> >>
>>> >> Martijn
>>> >>
>>> wrote:
>>> >>
>>> >> > Fyi.
>>> >> >
>>> >> > This flip using index as child source option prefix because we may
>>> use
>>> >> the
>>> >> > same connector as hybrid child sources.
>>> >> > e.g.
>>> >> >
>>> >> > create table hybrid_source(
>>> >> >  f0 varchar,
>>> >> >  f1 varchar,
>>> >> >  f2 bigint
>>> >> > ) with(
>>> >> >  'connector'='hybrid',
>>> >> >  'sources'='filesystem,filesystem',
>>> >> >  '0.path' = '/tmp/a.csv',
>>> >> >  '0.format' = 'csv',
>>> >> >  '1.path' = '/tmp/b.csv',
>>> >> >  '1.format' = 'csv'"
>>> >> > );
>>> >> >
>>> >> > In this case, we must distinguish the format and path option
>>> belonging to
>>> >> > which filesystem connector. But as Timo says, it's not clear. He
>>> suggest
>>> >> > another way like this:
>>> >> >
>>> >> > CREATE TABLE hybrid_source WITH (
>>> >> >    'sources'='historical;realtime',   -- Config option of type
>>> string
>>> >> list
>>> >> >    'historical.connector' = 'filesystem',
>>> >> >    'historical.path' = '/tmp/a.csv',
>>> >> >    'historcal.format' = 'csv',
>>> >> >    'realtime.path' = '/tmp/b.csv',
>>> >> >    'realtime.format' = 'csv'"
>>> >> > )
>>> >> >
>>> >> > `sources` option is user-custom name instead of the concrete
>>> connector
>>> >> > type. And we use this user-custom name as prefix, and using
>>> >> > prefix.connector to call concrete connector impl.
>>> >> >
>>> >>
>>> >
>>> >
