a mistake, childSources.get(sourceIndex).setStartTimetamp(switchedTimestamp);
Ran Tao <chucheng...@gmail.com> 于2022年12月19日周一 16:10写道: > Hi, John. thanks for your comments. > About question-2 the "handoff" is using for switching next source > seamlessly. but it's an option. Not every hybrid source job need to using > this mode. > > The hybrid source sql or table need to implement two ways like DataStream > api below. One for fixed position, user can specify the earliest, latest > or specific-offsets etc. > And the second way is that user can also specify the timestamp to let > second source using timestamp to consume the kafka data (no need to specify > earliest, latest or specific-offsets, flink do this conversion). > > * <p>A simple example with FileSource and KafkaSource with fixed Kafka > start position: > * <pre>{@code > * FileSource<String> fileSource = > * FileSource.forRecordStreamFormat(new TextLineInputFormat(), > Path.fromLocalFile(testDir)).build(); > * KafkaSource<String> kafkaSource = > * KafkaSource.<String>builder() > * .setBootstrapServers("localhost:9092") > * .setGroupId("MyGroup") > * .setTopics(Arrays.asList("quickstart-events")) > * .setDeserializer( > * > KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) > * .setStartingOffsets(OffsetsInitializer.earliest()) > * .build(); > * HybridSource<String> hybridSource = > * HybridSource.builder(fileSource) > * .addSource(kafkaSource) > * .build(); > * }</pre> > * > * <p>A more complex example with Kafka start position derived from > previous source: > * > * <pre>{@code > * HybridSource<String> hybridSource = > * HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource) > * .addSource( > * switchContext -> { > * StaticFileSplitEnumerator previousEnumerator = > * switchContext.getPreviousEnumerator(); > * // how to get timestamp depends on specific enumerator > * long timestamp = previousEnumerator.getEndTimestamp(); > * OffsetsInitializer offsets = > * OffsetsInitializer.timestamp(timestamp); > * KafkaSource<String> kafkaSource = > * KafkaSource.<String>builder() > * .setBootstrapServers("localhost:9092") > * .setGroupId("MyGroup") > * .setTopics(Arrays.asList("quickstart-events")) > * .setDeserializer( > * > KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) > * .setStartingOffsets(offsets) > * .build(); > * return kafkaSource; > * }, > * Boundedness.CONTINUOUS_UNBOUNDED) > * .build(); > * } > > Currently flink SplitEnumerator interface not expose the > getEndTimestamp(). I think if we want to implement the "handoff" way we > need to let SplitEnumerator expose this method. > Then the question is if we get the previous endTimestamp, how to set it > back? we can't build KafkaSource instance because hybrid is a common > implementation. > I think we need add a method for example startTimestamp() in new Source. > then we can implement this: > > Switched-start-position demo: > > HybridSource.HybridSourceBuilder<RowData, SplitEnumerator> builder = > HybridSource.builder(childSources.get(0)); > for (int i = 1; i < childSources.size(); i++) { > final int sourceIndex = i; > Boundedness boundedness = > childSources.get(sourceIndex).getBoundedness(); > builder.addSource( > switchContext -> { > SplitEnumerator previousEnumerator = > switchContext.getPreviousEnumerator(); > // how to pass to kafka or other connector ? > We add a method in new > // source api like startTimestamp(); > long switchedTimestamp = > previousEnumerator.getEndTimestamp(); > childSources.setStartTimestamp( > switchedTimestamp); > return childSources.get(sourceIndex); > }, > boundedness); > } > hybridSource = builder.build(); > > e.g. if kafka is end source. then kafka use this switchedTimestamp to > initialize the OffsetsInitializer and consume from this timestamp. > > The last question whether this source support chaining together more than > two sources? absolutely yes. we support more than two sources like > DataStream API. > I have added ddl example in the flip. > > > John Roesler <vvcep...@apache.org> 于2022年12月19日周一 11:14写道: > >> Hello all, >> >> Thanks for the FLIP, Ran! >> >> The HybridSource is a really cool feature, and I was glad to see a >> proposal to expose it in the Table and SQL APIs. >> >> My main question is also about the switching control (question 2). It >> seems like the existing Kafka connector has all the options you'd want to >> define the switching point[1], and the issue is only how to specify a >> "handoff" from one source to the next. It seems like you could propose to >> add a reference to an extracted field or property of the first source to be >> used in the second one. >> >> However, the more I think about it, the more I wonder whether a "handoff" >> operation ought to be necessary. For example, the use case I have in mind >> is to bootstrap the table using a snapshot of the data and then have it >> seamlessly switch over to consuming all the records since that snapshot. In >> order to support this use case with no loss or duplicates, timestamp isn't >> sufficient; I'd need to know the exact vector of offsets represented in >> that snapshot. Then again, if I control the snapshotting process, this >> should be trivial to compute and store next to the snapshots. >> >> Further, when I register the table, I ought to know which exact snapshot >> I'm selecting, and therefore can just populate the `specific-offsets` as >> desired. Backing off to timestamp, if I again am naming a path to a >> specific snapshot of the data, it seems like I have enough information >> already to also specify the correct `timestamp` option. >> >> With this in mind, my question is whether it's necessary to specify some >> kind of dynamic property, like the DataStream API does[2]. If a fixed >> property is sufficient, it seems like the current proposal is actually >> sufficient as well. I think I just don't see the use case for dynamic >> configuration here. >> >> Side question, out of curiosity: would this source support chaining >> together more than two sources? It seems like the proposed syntax would >> allow it. It seems like some snapshot-rollup strategies could benefit from >> it (eg if you want to combine your 2021 yearly rollup with your Jan-Nov >> monthly rollups, then you first two weekly rollups from Dec, and finally >> switch over to live data from Kafka or something). >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position >> [2] >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time >> >> Thanks again, >> -John >> >> On Fri, Dec 16, 2022, at 06:20, Ran Tao wrote: >> > Hi, Martijn, thanks for your comments. >> > >> > Using identifier as child source prefix may be a good way instead of >> index. >> > i will update the flip to illustrate how we can read from hybrid schema >> to >> > generate child schemas for the question1. >> > >> > question2 is start position for the next kafka source. But currently we >> > can not get the end timestamp for the first bounded source. In the >> > datastream api end timestamp can be found from previous enumerator. We >> need >> > to offer bounded source(e.g. filesystem) end timestamp support. >> > if we can get end timestamp then kafka will start from this offset. I >> think >> > here we need a option, allow user to start next kafka source from >> previous >> > one automatically or from user custom start offset (by using with >> option in >> > sql ddl). Not every second source need binding will previous one, for >> > example, the next source is already a file, then it not need a start >> > position. >> > >> > question3 about table api, i haven't added to flip yet. I will try to >> fix >> > some current issues and update the flip and add more details. Thanks >> for >> > your comments. >> > >> > >> > Martijn Visser <martijnvis...@apache.org> 于2022年12月16日周五 16:59写道: >> > >> >> Hi Ran, >> >> >> >> For completeness, this is a new thread that was already previously >> started >> >> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq. >> I'm >> >> linking them because I think Timo's comments are relevant to be kept >> with >> >> this discussion thread. >> >> >> >> I agree with Timo's comments from there that having an index key isn't >> the >> >> best option, I would rather have an identifier. >> >> >> >> I do wonder how this would work when you want to specify sources from a >> >> catalog: could you elaborate on that? >> >> >> >> What I'm also missing in the FLIP is an example of how to specify the >> >> starting offset from Kafka. In the DataStream API, there >> >> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you >> >> specify that in the SQL landscape? >> >> >> >> Last but not least: your examples are all SQL only. How do you propose >> that >> >> this works in the Table API? >> >> >> >> Best regards, >> >> >> >> Martijn >> >> >> >> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <chucheng...@gmail.com> wrote: >> >> >> >> > Fyi. >> >> > >> >> > This flip using index as child source option prefix because we may >> use >> >> the >> >> > same connector as hybrid child sources. >> >> > e.g. >> >> > >> >> > create table hybrid_source( >> >> > f0 varchar, >> >> > f1 varchar, >> >> > f2 bigint >> >> > ) with( >> >> > 'connector'='hybrid', >> >> > 'sources'='filesystem,filesystem', >> >> > '0.path' = '/tmp/a.csv', >> >> > '0.format' = 'csv', >> >> > '1.path' = '/tmp/b.csv', >> >> > '1.format' = 'csv'" >> >> > ); >> >> > >> >> > In this case, we must distinguish the format and path option >> belonging to >> >> > which filesystem connector. But as Timo says, it's not clear. He >> suggest >> >> > another way like this: >> >> > >> >> > CREATE TABLE hybrid_source WITH ( >> >> > 'sources'='historical;realtime', -- Config option of type string >> >> list >> >> > 'historical.connector' = 'filesystem', >> >> > 'historical.path' = '/tmp/a.csv', >> >> > 'historcal.format' = 'csv', >> >> > 'realtime.path' = '/tmp/b.csv', >> >> > 'realtime.format' = 'csv'" >> >> > ) >> >> > >> >> > `sources` option is user-custom name instead of the concrete >> connector >> >> > type. And we use this user-custom name as prefix, and using >> >> > prefix.connector to call concrete connector impl. >> >> > >> >> >> > >> > >> > -- >> > Best Regards, >> > Ran Tao >> > https://github.com/chucheng92 >> > > > -- > Best Regards, > Ran Tao > https://github.com/chucheng92 > -- Best Regards, Ran Tao https://github.com/chucheng92