Hi, Martijn, thanks for your comments. Using identifier as child source prefix may be a good way instead of index. i will update the flip to illustrate how we can read from hybrid schema to generate child schemas for the question1.
question2 is start position for the next kafka source. But currently we can not get the end timestamp for the first bounded source. In the datastream api end timestamp can be found from previous enumerator. We need to offer bounded source(e.g. filesystem) end timestamp support. if we can get end timestamp then kafka will start from this offset. I think here we need a option, allow user to start next kafka source from previous one automatically or from user custom start offset (by using with option in sql ddl). Not every second source need binding will previous one, for example, the next source is already a file, then it not need a start position. question3 about table api, i haven't added to flip yet. I will try to fix some current issues and update the flip and add more details. Thanks for your comments. Martijn Visser <martijnvis...@apache.org> 于2022年12月16日周五 16:59写道: > Hi Ran, > > For completeness, this is a new thread that was already previously started > at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq. I'm > linking them because I think Timo's comments are relevant to be kept with > this discussion thread. > > I agree with Timo's comments from there that having an index key isn't the > best option, I would rather have an identifier. > > I do wonder how this would work when you want to specify sources from a > catalog: could you elaborate on that? > > What I'm also missing in the FLIP is an example of how to specify the > starting offset from Kafka. In the DataStream API, there > is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you > specify that in the SQL landscape? > > Last but not least: your examples are all SQL only. How do you propose that > this works in the Table API? > > Best regards, > > Martijn > > On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <chucheng...@gmail.com> wrote: > > > Fyi. > > > > This flip using index as child source option prefix because we may use > the > > same connector as hybrid child sources. > > e.g. > > > > create table hybrid_source( > > f0 varchar, > > f1 varchar, > > f2 bigint > > ) with( > > 'connector'='hybrid', > > 'sources'='filesystem,filesystem', > > '0.path' = '/tmp/a.csv', > > '0.format' = 'csv', > > '1.path' = '/tmp/b.csv', > > '1.format' = 'csv'" > > ); > > > > In this case, we must distinguish the format and path option belonging to > > which filesystem connector. But as Timo says, it's not clear. He suggest > > another way like this: > > > > CREATE TABLE hybrid_source WITH ( > > 'sources'='historical;realtime', -- Config option of type string > list > > 'historical.connector' = 'filesystem', > > 'historical.path' = '/tmp/a.csv', > > 'historcal.format' = 'csv', > > 'realtime.path' = '/tmp/b.csv', > > 'realtime.format' = 'csv'" > > ) > > > > `sources` option is user-custom name instead of the concrete connector > > type. And we use this user-custom name as prefix, and using > > prefix.connector to call concrete connector impl. > > > -- Best Regards, Ran Tao https://github.com/chucheng92