Ok after second though I'm retracting my previous statement about Catalog
changes you proposed.
I do see a benefit for Delta connector actually with this change and see
why this could be coupled with Catalog.

Delta Connector SQL support, also ships a Delta Catalog implementation for
Flink.
For Delta Catalog, table schema information is fetched from underlying
_delta_log and not stored in metastore. For time travel we actually had a
problem, that if we would like to timetravel back to some old version,
where schema was slightly different, then we would have a conflict since
Catalog would return current schema and not how it was for version X.

With your change, our Delta Catalog can actually fetch schema for version X
and send it to DeltaTableFactory. Currency, Catalog can fetch only current
version. What we would also need however is version (number/timestamp) for
this table passed to DynamicTableFactory so we could properly set Delta
standalone library.

Regards,
Krzysztof

śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> napisał(a):

> Hi,
> happy to see such a feature.
> Small note from my end regarding Catalog changes.
>
> TL;DR
> I don't think it is necessary to delegate this feature to the catalog. I
> think that since "timetravel" is per job/query property, its should not be
> coupled with the Catalog or table definition. In my opinion this is
> something that DynamicTableFactory only has to know about. I would rather
> see this feature as it is - SQL syntax enhancement but delegate clearly to
> DynamicTableFactory.
>
> I've implemented timetravel feature for Delta Connector  [1]  using
> current Flink API.
> Docs are pending code review, but you can find them here [2] and examples
> are available here [3]
>
> The timetravel feature that I've implemented is based on Flink Query
> hints.
> "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */"
>
> The " versionAsOf" (we also have 'timestampAsOf') parameter is handled not
> by Catalog but by DyntamicTableFactory implementation for Delta connector.
> The value of this property is passed to Delta standalone lib API that
> returns table view for given version.
>
> I'm not sure how/if proposed change could benefit Delta connector
> implementation for this feature.
>
> Thanks,
> Krzysztof
>
> [1]
> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink
> [2] https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs
> [3]
> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql
>
> śr., 31 maj 2023 o 06:03 liu ron <ron9....@gmail.com> napisał(a):
>
>> Hi, Feng
>>
>> Thanks for driving this FLIP, Time travel is very useful for Flink
>> integrate with data lake system. I have one question why the
>> implementation
>> of TimeTravel is delegated to Catalog? Assuming that we use Flink to query
>> Hudi table with the time travel syntax, but we don't use the HudiCatalog,
>> instead, we register the hudi table to InMemoryCatalog,  can we support
>> time travel for Hudi table in this case?
>> In contrast, I think time travel should bind to connector instead of
>> Catalog, so the rejected alternative should be considered.
>>
>> Best,
>> Ron
>>
>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年5月30日周二 09:40写道:
>>
>> > Hi, Feng.
>> > Notice this FLIP only support batch mode for time travel.  Would it also
>> > make sense to support stream mode to a read a snapshot of the table as a
>> > bounded stream?
>> >
>> > Best regards,
>> > Yuxia
>> >
>> > ----- 原始邮件 -----
>> > 发件人: "Benchao Li" <libenc...@apache.org>
>> > 收件人: "dev" <dev@flink.apache.org>
>> > 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53
>> > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
>> >
>> > # Can Calcite support this syntax ` VERSION AS OF`  ?
>> >
>> > This also depends on whether this is defined in standard or any known
>> > databases that have implemented this. If not, it would be hard to push
>> it
>> > to Calcite.
>> >
>> > # getTable(ObjectPath object, long timestamp)
>> >
>> > Then we again come to the problem of "casting between timestamp and
>> > numeric", which has been disabled in FLINK-21978[1]. If you're gonna use
>> > this, then we need to clarify that problem first.
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-21978
>> >
>> >
>> > Feng Jin <jinfeng1...@gmail.com> 于2023年5月29日周一 15:57写道:
>> >
>> > > hi, thanks for your reply.
>> > >
>> > > @Benchao
>> > > > did you consider the pushdown abilities compatible
>> > >
>> > > In the current design, the implementation of TimeTravel is delegated
>> to
>> > > Catalog. We have added a function called getTable(ObjectPath
>> tablePath,
>> > > long timestamp) to obtain the corresponding CatalogBaseTable at a
>> > specific
>> > > time.  Therefore, I think it will not have any impact on the original
>> > > pushdown abilities.
>> > >
>> > >
>> > > >   I see there is a rejected  design for adding SupportsTimeTravel,
>> but
>> > I
>> > > didn't see the alternative in  the FLIP doc
>> > >
>> > > Sorry, the document description is not very clear.  Regarding whether
>> to
>> > > support SupportTimeTravel, I have discussed it with yuxia. Since we
>> have
>> > > already passed the corresponding time in getTable(ObjectPath, long
>> > > timestamp) of Catalog, SupportTimeTravel may not be necessary.
>> > >
>> > > In getTable(ObjectPath object, long timestamp), we can obtain the
>> schema
>> > of
>> > > the corresponding time point and put the SNAPSHOT that needs to be
>> > consumed
>> > > into options.
>> > >
>> > >
>> > > @Shammon
>> > > > Could we support this in Flink too?
>> > >
>> > > I personally think it's possible, but limited by Calcite's syntax
>> > > restrictions. I believe we should first support this syntax in
>> Calcite.
>> > > Currently, I think it may not be easy  to support this syntax in
>> Flink's
>> > > parser. @Benchao, what do you think? Can Calcite support this syntax
>> > > ` VERSION AS OF`  ?
>> > >
>> > >
>> > > Best,
>> > > Feng.
>> > >
>> > >
>> > > On Fri, May 26, 2023 at 2:55 PM Shammon FY <zjur...@gmail.com> wrote:
>> > >
>> > > > Thanks Feng, the feature of time travel sounds great!
>> > > >
>> > > > In addition to SYSTEM_TIME, lake houses such as paimon and iceberg
>> > > support
>> > > > snapshot or version. For example, users can query snapshot 1 for
>> paimon
>> > > by
>> > > > the following statement
>> > > > SELECT * FROM t VERSION AS OF 1
>> > > >
>> > > > Could we support this in Flink too?
>> > > >
>> > > > Best,
>> > > > Shammon FY
>> > > >
>> > > > On Fri, May 26, 2023 at 1:20 PM Benchao Li <libenc...@apache.org>
>> > wrote:
>> > > >
>> > > > > Regarding the implementation, did you consider the pushdown
>> abilities
>> > > > > compatible, e.g., projection pushdown, filter pushdown, partition
>> > > > pushdown.
>> > > > > Since `Snapshot` is not handled much in existing rules, I have a
>> > > concern
>> > > > > about this. Of course, it depends on your implementation detail,
>> what
>> > > is
>> > > > > important is that we'd better add some cross tests for these.
>> > > > >
>> > > > > Regarding the interface exposed to Connector, I see there is a
>> > rejected
>> > > > > design for adding SupportsTimeTravel, but I didn't see the
>> > alternative
>> > > in
>> > > > > the FLIP doc. IMO, this is an important thing we need to clarify
>> > > because
>> > > > we
>> > > > > need to know whether the Connector supports this, and what
>> > > > column/metadata
>> > > > > corresponds to 'system_time'.
>> > > > >
>> > > > > Feng Jin <jinfeng1...@gmail.com> 于2023年5月25日周四 22:50写道:
>> > > > >
>> > > > > > Thanks for your reply
>> > > > > >
>> > > > > > @Timo @BenChao @yuxia
>> > > > > >
>> > > > > > Sorry for the mistake,  Currently , calcite only supports  `FOR
>> > > > > SYSTEM_TIME
>> > > > > > AS OF `  syntax.  We can only support `FOR SYSTEM_TIME AS OF` .
>> > I've
>> > > > > > updated the syntax part of the FLIP.
>> > > > > >
>> > > > > >
>> > > > > > @Timo
>> > > > > >
>> > > > > > > We will convert it to TIMESTAMP_LTZ?
>> > > > > >
>> > > > > > Yes, I think we need to convert TIMESTAMP to TIMESTAMP_LTZ and
>> then
>> > > > > convert
>> > > > > > it into a long value.
>> > > > > >
>> > > > > > > How do we want to query the most recent version of a table
>> > > > > >
>> > > > > > I think we can use `AS OF CURRENT_TIMESTAMP` ,But it does cause
>> > > > > > inconsistency with the real-time concept.
>> > > > > > However, from my personal understanding, the scope of  `AS OF
>> > > > > > CURRENT_TIMESTAMP` is the table itself, not the table record.
>> So,
>> > I
>> > > > > think
>> > > > > > using CURRENT_TIMESTAMP should also be reasonable?.
>> > > > > > Additionally, if no version is specified, the latest version
>> should
>> > > be
>> > > > > used
>> > > > > > by default.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > Best,
>> > > > > > Feng
>> > > > > >
>> > > > > >
>> > > > > > On Thu, May 25, 2023 at 7:47 PM yuxia <
>> luoyu...@alumni.sjtu.edu.cn
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > Thanks Feng for bringing this up. It'll be great to introduce
>> > time
>> > > > > travel
>> > > > > > > to Flink to have a better integration with external data
>> soruces.
>> > > > > > >
>> > > > > > > I also share same concern about the syntax.
>> > > > > > > I see in the part of `Whether to support other syntax
>> > > > implementations`
>> > > > > in
>> > > > > > > this FLIP, seems the syntax in Calcite should be `FOR
>> SYSTEM_TIME
>> > > AS
>> > > > > OF`,
>> > > > > > > right?
>> > > > > > > But the the syntax part in this FLIP, it seems to be `AS OF
>> > > > TIMESTAMP`
>> > > > > > > instead of  `FOR SYSTEM_TIME AS OF`. Is it just a mistake or
>> by
>> > > > design?
>> > > > > > >
>> > > > > > >
>> > > > > > > Best regards,
>> > > > > > > Yuxia
>> > > > > > >
>> > > > > > > ----- 原始邮件 -----
>> > > > > > > 发件人: "Benchao Li" <libenc...@apache.org>
>> > > > > > > 收件人: "dev" <dev@flink.apache.org>
>> > > > > > > 发送时间: 星期四, 2023年 5 月 25日 下午 7:27:17
>> > > > > > > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
>> > > > > > >
>> > > > > > > Thanks Feng, it's exciting to have this ability.
>> > > > > > >
>> > > > > > > Regarding the syntax section, are you proposing `AS OF`
>> instead
>> > of
>> > > > `FOR
>> > > > > > > SYSTEM AS OF` to do this? I know `FOR SYSTEM AS OF` is in the
>> SQL
>> > > > > > standard
>> > > > > > > and has been supported in some database vendors such as SQL
>> > Server.
>> > > > > About
>> > > > > > > `AS OF`, is it in the standard or any database vendor supports
>> > > this,
>> > > > if
>> > > > > > > yes, I think it's worth to add this support to Calcite, and I
>> > would
>> > > > > give
>> > > > > > a
>> > > > > > > hand in Calcite side. Otherwise, I think we'd better to use
>> `FOR
>> > > > SYSTEM
>> > > > > > AS
>> > > > > > > OF`.
>> > > > > > >
>> > > > > > > Timo Walther <twal...@apache.org> 于2023年5月25日周四 19:02写道:
>> > > > > > >
>> > > > > > > > Also: How do we want to query the most recent version of a
>> > table?
>> > > > > > > >
>> > > > > > > > `AS OF CURRENT_TIMESTAMP` would be ideal, but according to
>> the
>> > > docs
>> > > > > > both
>> > > > > > > > the type is TIMESTAMP_LTZ and what is even more concerning
>> is
>> > the
>> > > > it
>> > > > > > > > actually is evalated row-based:
>> > > > > > > >
>> > > > > > > >  > Returns the current SQL timestamp in the local time zone,
>> > the
>> > > > > return
>> > > > > > > > type is TIMESTAMP_LTZ(3). It is evaluated for each record in
>> > > > > streaming
>> > > > > > > > mode. But in batch mode, it is evaluated once as the query
>> > starts
>> > > > and
>> > > > > > > > uses the same result for every row.
>> > > > > > > >
>> > > > > > > > This could make it difficult to explain in a join scenario
>> of
>> > > > > multiple
>> > > > > > > > snapshotted tables.
>> > > > > > > >
>> > > > > > > > Regards,
>> > > > > > > > Timo
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On 25.05.23 12:29, Timo Walther wrote:
>> > > > > > > > > Hi Feng,
>> > > > > > > > >
>> > > > > > > > > thanks for proposing this FLIP. It makes a lot of sense to
>> > > > finally
>> > > > > > > > > support querying tables at a specific point in time or
>> > > hopefully
>> > > > > also
>> > > > > > > > > ranges soon. Following time-versioned tables.
>> > > > > > > > >
>> > > > > > > > > Here is some feedback from my side:
>> > > > > > > > >
>> > > > > > > > > 1. Syntax
>> > > > > > > > >
>> > > > > > > > > Can you elaborate a bit on the Calcite restrictions?
>> > > > > > > > >
>> > > > > > > > > Does Calcite currently support `AS OF` syntax for this but
>> > not
>> > > > `FOR
>> > > > > > > > > SYSTEM_TIME AS OF`?
>> > > > > > > > >
>> > > > > > > > > It would be great to support `AS OF` also for
>> time-versioned
>> > > > joins
>> > > > > > and
>> > > > > > > > > have a unified and short syntax.
>> > > > > > > > >
>> > > > > > > > > Once a fix is merged in Calcite for this, we can make this
>> > > > > available
>> > > > > > in
>> > > > > > > > > Flink earlier by copying the corresponding classes until
>> the
>> > > next
>> > > > > > > > > Calcite upgrade is performed.
>> > > > > > > > >
>> > > > > > > > > 2. Semantics
>> > > > > > > > >
>> > > > > > > > > How do we interpret the timestamp? In Flink we have 2
>> > timestamp
>> > > > > types
>> > > > > > > > > (TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF
>> > TIMESTAMP
>> > > > > > > > > '2023-04-27 00:00:00', in which timezone will the
>> timestamp
>> > be?
>> > > > We
>> > > > > > will
>> > > > > > > > > convert it to TIMESTAMP_LTZ?
>> > > > > > > > >
>> > > > > > > > > We definely need to clarify this because the past has
>> shown
>> > > that
>> > > > > > > > > daylight saving times make our lives hard.
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > > Timo
>> > > > > > > > >
>> > > > > > > > > On 25.05.23 10:57, Feng Jin wrote:
>> > > > > > > > >> Hi, everyone.
>> > > > > > > > >>
>> > > > > > > > >> I’d like to start a discussion about FLIP-308: Support
>> Time
>> > > > Travel
>> > > > > > In
>> > > > > > > > >> Batch
>> > > > > > > > >> Mode [1]
>> > > > > > > > >>
>> > > > > > > > >>
>> > > > > > > > >> Time travel is a SQL syntax used to query historical
>> > versions
>> > > of
>> > > > > > data.
>> > > > > > > > It
>> > > > > > > > >> allows users to specify a point in time and retrieve the
>> > data
>> > > > and
>> > > > > > > > >> schema of
>> > > > > > > > >> a table as it appeared at that time. With time travel,
>> users
>> > > can
>> > > > > > > easily
>> > > > > > > > >> analyze and compare historical versions of data.
>> > > > > > > > >>
>> > > > > > > > >>
>> > > > > > > > >> With the widespread use of data lake systems such as
>> Paimon,
>> > > > > > Iceberg,
>> > > > > > > > and
>> > > > > > > > >> Hudi, time travel can provide more convenience for users'
>> > data
>> > > > > > > analysis.
>> > > > > > > > >>
>> > > > > > > > >>
>> > > > > > > > >> Looking forward to your opinions, any suggestions are
>> > > welcomed.
>> > > > > > > > >>
>> > > > > > > > >>
>> > > > > > > > >>
>> > > > > > > > >> 1.
>> > > > > > > > >>
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode
>> > > > > > > > >>
>> > > > > > > > >>
>> > > > > > > > >>
>> > > > > > > > >> Best.
>> > > > > > > > >>
>> > > > > > > > >> Feng
>> > > > > > > > >>
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > >
>> > > > > > > Best,
>> > > > > > > Benchao Li
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > >
>> > > > > Best,
>> > > > > Benchao Li
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > --
>> >
>> > Best,
>> > Benchao Li
>> >
>>
>

Reply via email to