Hi, Martijn, thanks for your comments.

Using identifier as child source prefix may be a good way instead of index.
i will update the flip to illustrate how we can read from hybrid schema to
generate child schemas for the question1.

question2 is start position for the next kafka source.  But currently we
can not get the end timestamp for the first bounded source.  In the
datastream api end timestamp can be found from previous enumerator. We need
to offer bounded source(e.g. filesystem) end timestamp support.
if we can get end timestamp then kafka will start from this offset. I think
here we need a option, allow user to start next kafka source from previous
one automatically or from user custom start offset (by using with option in
sql ddl).  Not every second source need binding will previous one, for
example, the next source is already a file, then it not need a start
position.

question3 about table api, i haven't added to flip yet. I will try to fix
some current  issues and update the flip and add  more details.  Thanks for
your comments.


Martijn Visser <martijnvis...@apache.org> 于2022年12月16日周五 16:59写道:

> Hi Ran,
>
> For completeness, this is a new thread that was already previously started
> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq. I'm
> linking them because I think Timo's comments are relevant to be kept with
> this discussion thread.
>
> I agree with Timo's comments from there that having an index key isn't the
> best option, I would rather have an identifier.
>
> I do wonder how this would work when you want to specify sources from a
> catalog: could you elaborate on that?
>
> What I'm also missing in the FLIP is an example of how to specify the
> starting offset from Kafka. In the DataStream API, there
> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
> specify that in the SQL landscape?
>
> Last but not least: your examples are all SQL only. How do you propose that
> this works in the Table API?
>
> Best regards,
>
> Martijn
>
> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <chucheng...@gmail.com> wrote:
>
> > Fyi.
> >
> > This flip using index as child source option prefix because we may use
> the
> > same connector as hybrid child sources.
> > e.g.
> >
> > create table hybrid_source(
> >  f0 varchar,
> >  f1 varchar,
> >  f2 bigint
> > ) with(
> >  'connector'='hybrid',
> >  'sources'='filesystem,filesystem',
> >  '0.path' = '/tmp/a.csv',
> >  '0.format' = 'csv',
> >  '1.path' = '/tmp/b.csv',
> >  '1.format' = 'csv'"
> > );
> >
> > In this case, we must distinguish the format and path option belonging to
> > which filesystem connector. But as Timo says, it's not clear. He suggest
> > another way like this:
> >
> > CREATE TABLE hybrid_source WITH (
> >    'sources'='historical;realtime',   -- Config option of type string
> list
> >    'historical.connector' = 'filesystem',
> >    'historical.path' = '/tmp/a.csv',
> >    'historcal.format' = 'csv',
> >    'realtime.path' = '/tmp/b.csv',
> >    'realtime.format' = 'csv'"
> > )
> >
> > `sources` option is user-custom name instead of the concrete connector
> > type. And we use this user-custom name as prefix, and using
> > prefix.connector to call concrete connector impl.
> >
>


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Reply via email to