a mistake,
childSources.get(sourceIndex).setStartTimetamp(switchedTimestamp);

Ran Tao <chucheng...@gmail.com> 于2022年12月19日周一 16:10写道:

> Hi, John. thanks for your comments.
> About question-2 the "handoff" is using for switching next source
> seamlessly. but it's an option. Not every hybrid source job need to using
> this mode.
>
> The hybrid source sql or table need to implement two ways like DataStream
> api below.  One for fixed position, user can specify the earliest, latest
> or specific-offsets etc.
> And the second way is that user can also specify the timestamp to let
> second source using timestamp to consume the kafka data (no need to specify
> earliest, latest or specific-offsets, flink do this conversion).
>
>  * <p>A simple example with FileSource and KafkaSource with fixed Kafka
> start position:
>  * <pre>{@code
>  * FileSource<String> fileSource =
>  *   FileSource.forRecordStreamFormat(new TextLineInputFormat(),
> Path.fromLocalFile(testDir)).build();
>  * KafkaSource<String> kafkaSource =
>  *           KafkaSource.<String>builder()
>  *                   .setBootstrapServers("localhost:9092")
>  *                   .setGroupId("MyGroup")
>  *                   .setTopics(Arrays.asList("quickstart-events"))
>  *                   .setDeserializer(
>  *
> KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>  *                   .setStartingOffsets(OffsetsInitializer.earliest())
>  *                   .build();
>  * HybridSource<String> hybridSource =
>  *           HybridSource.builder(fileSource)
>  *                   .addSource(kafkaSource)
>  *                   .build();
>  * }</pre>
>  *
>  * <p>A more complex example with Kafka start position derived from
> previous source:
>  *
>  * <pre>{@code
>  * HybridSource<String> hybridSource =
>  *     HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
>  *         .addSource(
>  *             switchContext -> {
>  *               StaticFileSplitEnumerator previousEnumerator =
>  *                   switchContext.getPreviousEnumerator();
>  *               // how to get timestamp depends on specific enumerator
>  *               long timestamp = previousEnumerator.getEndTimestamp();
>  *               OffsetsInitializer offsets =
>  *                   OffsetsInitializer.timestamp(timestamp);
>  *               KafkaSource<String> kafkaSource =
>  *                   KafkaSource.<String>builder()
>  *                       .setBootstrapServers("localhost:9092")
>  *                       .setGroupId("MyGroup")
>  *                       .setTopics(Arrays.asList("quickstart-events"))
>  *                       .setDeserializer(
>  *
> KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>  *                       .setStartingOffsets(offsets)
>  *                       .build();
>  *               return kafkaSource;
>  *             },
>  *             Boundedness.CONTINUOUS_UNBOUNDED)
>  *         .build();
>  * }
>
> Currently flink SplitEnumerator interface not expose the
> getEndTimestamp().  I think if we want to implement the "handoff" way we
> need to let SplitEnumerator expose this method.
> Then the question is if we get the previous endTimestamp, how to set it
> back?  we can't build KafkaSource instance because hybrid is a common
> implementation.
> I think we need add a method for example startTimestamp() in new Source.
> then we can implement this:
>
> Switched-start-position demo:
>
> HybridSource.HybridSourceBuilder<RowData, SplitEnumerator> builder =
>                     HybridSource.builder(childSources.get(0));
>             for (int i = 1; i < childSources.size(); i++) {
>                 final int sourceIndex = i;
>                 Boundedness boundedness =
> childSources.get(sourceIndex).getBoundedness();
>                 builder.addSource(
>                         switchContext -> {
>                             SplitEnumerator previousEnumerator =
>                                     switchContext.getPreviousEnumerator();
>                             // how to pass to kafka or other connector ?
> We add a method in new
>                             // source api like startTimestamp();
>                             long switchedTimestamp =
> previousEnumerator.getEndTimestamp();
>                             childSources.setStartTimestamp(
> switchedTimestamp);
>                             return childSources.get(sourceIndex);
>                         },
>                         boundedness);
>             }
>             hybridSource = builder.build();
>
> e.g. if kafka is end source. then kafka use this switchedTimestamp to
> initialize the OffsetsInitializer and consume from this timestamp.
>
> The last question whether this source support chaining together more than
> two sources? absolutely yes. we support more than two sources like
> DataStream API.
> I have added ddl example in the flip.
>
>
> John Roesler <vvcep...@apache.org> 于2022年12月19日周一 11:14写道:
>
>> Hello all,
>>
>> Thanks for the FLIP, Ran!
>>
>> The HybridSource is a really cool feature, and I was glad to see a
>> proposal to expose it in the Table and SQL APIs.
>>
>> My main question is also about the switching control (question 2). It
>> seems like the existing Kafka connector has all the options you'd want to
>> define the switching point[1], and the issue is only how to specify a
>> "handoff" from one source to the next. It seems like you could propose to
>> add a reference to an extracted field or property of the first source to be
>> used in the second one.
>>
>> However, the more I think about it, the more I wonder whether a "handoff"
>> operation ought to be necessary. For example, the use case I have in mind
>> is to bootstrap the table using a snapshot of the data and then have it
>> seamlessly switch over to consuming all the records since that snapshot. In
>> order to support this use case with no loss or duplicates, timestamp isn't
>> sufficient; I'd need to know the exact vector of offsets represented in
>> that snapshot. Then again, if I control the snapshotting process, this
>> should be trivial to compute and store next to the snapshots.
>>
>> Further, when I register the table, I ought to know which exact snapshot
>> I'm selecting, and therefore can just populate the `specific-offsets` as
>> desired. Backing off to timestamp, if I again am naming a path to a
>> specific snapshot of the data, it seems like I have enough information
>> already to also specify the correct `timestamp` option.
>>
>> With this in mind, my question is whether it's necessary to specify some
>> kind of dynamic property, like the DataStream API does[2]. If a fixed
>> property is sufficient, it seems like the current proposal is actually
>> sufficient as well. I think I just don't see the use case for dynamic
>> configuration here.
>>
>> Side question, out of curiosity: would this source support chaining
>> together more than two sources? It seems like the proposed syntax would
>> allow it. It seems like some snapshot-rollup strategies could benefit from
>> it (eg if you want to combine your 2021 yearly rollup with your Jan-Nov
>> monthly rollups, then you first two weekly rollups from Dec, and finally
>> switch over to live data from Kafka or something).
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time
>>
>> Thanks again,
>> -John
>>
>> On Fri, Dec 16, 2022, at 06:20, Ran Tao wrote:
>> > Hi, Martijn, thanks for your comments.
>> >
>> > Using identifier as child source prefix may be a good way instead of
>> index.
>> > i will update the flip to illustrate how we can read from hybrid schema
>> to
>> > generate child schemas for the question1.
>> >
>> > question2 is start position for the next kafka source.  But currently we
>> > can not get the end timestamp for the first bounded source.  In the
>> > datastream api end timestamp can be found from previous enumerator. We
>> need
>> > to offer bounded source(e.g. filesystem) end timestamp support.
>> > if we can get end timestamp then kafka will start from this offset. I
>> think
>> > here we need a option, allow user to start next kafka source from
>> previous
>> > one automatically or from user custom start offset (by using with
>> option in
>> > sql ddl).  Not every second source need binding will previous one, for
>> > example, the next source is already a file, then it not need a start
>> > position.
>> >
>> > question3 about table api, i haven't added to flip yet. I will try to
>> fix
>> > some current  issues and update the flip and add  more details.  Thanks
>> for
>> > your comments.
>> >
>> >
>> > Martijn Visser <martijnvis...@apache.org> 于2022年12月16日周五 16:59写道:
>> >
>> >> Hi Ran,
>> >>
>> >> For completeness, this is a new thread that was already previously
>> started
>> >> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq.
>> I'm
>> >> linking them because I think Timo's comments are relevant to be kept
>> with
>> >> this discussion thread.
>> >>
>> >> I agree with Timo's comments from there that having an index key isn't
>> the
>> >> best option, I would rather have an identifier.
>> >>
>> >> I do wonder how this would work when you want to specify sources from a
>> >> catalog: could you elaborate on that?
>> >>
>> >> What I'm also missing in the FLIP is an example of how to specify the
>> >> starting offset from Kafka. In the DataStream API, there
>> >> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
>> >> specify that in the SQL landscape?
>> >>
>> >> Last but not least: your examples are all SQL only. How do you propose
>> that
>> >> this works in the Table API?
>> >>
>> >> Best regards,
>> >>
>> >> Martijn
>> >>
>> >> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <chucheng...@gmail.com> wrote:
>> >>
>> >> > Fyi.
>> >> >
>> >> > This flip using index as child source option prefix because we may
>> use
>> >> the
>> >> > same connector as hybrid child sources.
>> >> > e.g.
>> >> >
>> >> > create table hybrid_source(
>> >> >  f0 varchar,
>> >> >  f1 varchar,
>> >> >  f2 bigint
>> >> > ) with(
>> >> >  'connector'='hybrid',
>> >> >  'sources'='filesystem,filesystem',
>> >> >  '0.path' = '/tmp/a.csv',
>> >> >  '0.format' = 'csv',
>> >> >  '1.path' = '/tmp/b.csv',
>> >> >  '1.format' = 'csv'"
>> >> > );
>> >> >
>> >> > In this case, we must distinguish the format and path option
>> belonging to
>> >> > which filesystem connector. But as Timo says, it's not clear. He
>> suggest
>> >> > another way like this:
>> >> >
>> >> > CREATE TABLE hybrid_source WITH (
>> >> >    'sources'='historical;realtime',   -- Config option of type string
>> >> list
>> >> >    'historical.connector' = 'filesystem',
>> >> >    'historical.path' = '/tmp/a.csv',
>> >> >    'historcal.format' = 'csv',
>> >> >    'realtime.path' = '/tmp/b.csv',
>> >> >    'realtime.format' = 'csv'"
>> >> > )
>> >> >
>> >> > `sources` option is user-custom name instead of the concrete
>> connector
>> >> > type. And we use this user-custom name as prefix, and using
>> >> > prefix.connector to call concrete connector impl.
>> >> >
>> >>
>> >
>> >
>> > --
>> > Best Regards,
>> > Ran Tao
>> > https://github.com/chucheng92
>>
>
>
> --
> Best Regards,
> Ran Tao
> https://github.com/chucheng92
>


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Reply via email to