Ok after second though I'm retracting my previous statement about Catalog changes you proposed. I do see a benefit for Delta connector actually with this change and see why this could be coupled with Catalog.
Delta Connector SQL support, also ships a Delta Catalog implementation for Flink. For Delta Catalog, table schema information is fetched from underlying _delta_log and not stored in metastore. For time travel we actually had a problem, that if we would like to timetravel back to some old version, where schema was slightly different, then we would have a conflict since Catalog would return current schema and not how it was for version X. With your change, our Delta Catalog can actually fetch schema for version X and send it to DeltaTableFactory. Currency, Catalog can fetch only current version. What we would also need however is version (number/timestamp) for this table passed to DynamicTableFactory so we could properly set Delta standalone library. Regards, Krzysztof śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> napisał(a): > Hi, > happy to see such a feature. > Small note from my end regarding Catalog changes. > > TL;DR > I don't think it is necessary to delegate this feature to the catalog. I > think that since "timetravel" is per job/query property, its should not be > coupled with the Catalog or table definition. In my opinion this is > something that DynamicTableFactory only has to know about. I would rather > see this feature as it is - SQL syntax enhancement but delegate clearly to > DynamicTableFactory. > > I've implemented timetravel feature for Delta Connector [1] using > current Flink API. > Docs are pending code review, but you can find them here [2] and examples > are available here [3] > > The timetravel feature that I've implemented is based on Flink Query > hints. > "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */" > > The " versionAsOf" (we also have 'timestampAsOf') parameter is handled not > by Catalog but by DyntamicTableFactory implementation for Delta connector. > The value of this property is passed to Delta standalone lib API that > returns table view for given version. > > I'm not sure how/if proposed change could benefit Delta connector > implementation for this feature. > > Thanks, > Krzysztof > > [1] > https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink > [2] https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs > [3] > https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql > > śr., 31 maj 2023 o 06:03 liu ron <ron9....@gmail.com> napisał(a): > >> Hi, Feng >> >> Thanks for driving this FLIP, Time travel is very useful for Flink >> integrate with data lake system. I have one question why the >> implementation >> of TimeTravel is delegated to Catalog? Assuming that we use Flink to query >> Hudi table with the time travel syntax, but we don't use the HudiCatalog, >> instead, we register the hudi table to InMemoryCatalog, can we support >> time travel for Hudi table in this case? >> In contrast, I think time travel should bind to connector instead of >> Catalog, so the rejected alternative should be considered. >> >> Best, >> Ron >> >> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年5月30日周二 09:40写道: >> >> > Hi, Feng. >> > Notice this FLIP only support batch mode for time travel. Would it also >> > make sense to support stream mode to a read a snapshot of the table as a >> > bounded stream? >> > >> > Best regards, >> > Yuxia >> > >> > ----- 原始邮件 ----- >> > 发件人: "Benchao Li" <libenc...@apache.org> >> > 收件人: "dev" <dev@flink.apache.org> >> > 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53 >> > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode >> > >> > # Can Calcite support this syntax ` VERSION AS OF` ? >> > >> > This also depends on whether this is defined in standard or any known >> > databases that have implemented this. If not, it would be hard to push >> it >> > to Calcite. >> > >> > # getTable(ObjectPath object, long timestamp) >> > >> > Then we again come to the problem of "casting between timestamp and >> > numeric", which has been disabled in FLINK-21978[1]. If you're gonna use >> > this, then we need to clarify that problem first. >> > >> > [1] https://issues.apache.org/jira/browse/FLINK-21978 >> > >> > >> > Feng Jin <jinfeng1...@gmail.com> 于2023年5月29日周一 15:57写道: >> > >> > > hi, thanks for your reply. >> > > >> > > @Benchao >> > > > did you consider the pushdown abilities compatible >> > > >> > > In the current design, the implementation of TimeTravel is delegated >> to >> > > Catalog. We have added a function called getTable(ObjectPath >> tablePath, >> > > long timestamp) to obtain the corresponding CatalogBaseTable at a >> > specific >> > > time. Therefore, I think it will not have any impact on the original >> > > pushdown abilities. >> > > >> > > >> > > > I see there is a rejected design for adding SupportsTimeTravel, >> but >> > I >> > > didn't see the alternative in the FLIP doc >> > > >> > > Sorry, the document description is not very clear. Regarding whether >> to >> > > support SupportTimeTravel, I have discussed it with yuxia. Since we >> have >> > > already passed the corresponding time in getTable(ObjectPath, long >> > > timestamp) of Catalog, SupportTimeTravel may not be necessary. >> > > >> > > In getTable(ObjectPath object, long timestamp), we can obtain the >> schema >> > of >> > > the corresponding time point and put the SNAPSHOT that needs to be >> > consumed >> > > into options. >> > > >> > > >> > > @Shammon >> > > > Could we support this in Flink too? >> > > >> > > I personally think it's possible, but limited by Calcite's syntax >> > > restrictions. I believe we should first support this syntax in >> Calcite. >> > > Currently, I think it may not be easy to support this syntax in >> Flink's >> > > parser. @Benchao, what do you think? Can Calcite support this syntax >> > > ` VERSION AS OF` ? >> > > >> > > >> > > Best, >> > > Feng. >> > > >> > > >> > > On Fri, May 26, 2023 at 2:55 PM Shammon FY <zjur...@gmail.com> wrote: >> > > >> > > > Thanks Feng, the feature of time travel sounds great! >> > > > >> > > > In addition to SYSTEM_TIME, lake houses such as paimon and iceberg >> > > support >> > > > snapshot or version. For example, users can query snapshot 1 for >> paimon >> > > by >> > > > the following statement >> > > > SELECT * FROM t VERSION AS OF 1 >> > > > >> > > > Could we support this in Flink too? >> > > > >> > > > Best, >> > > > Shammon FY >> > > > >> > > > On Fri, May 26, 2023 at 1:20 PM Benchao Li <libenc...@apache.org> >> > wrote: >> > > > >> > > > > Regarding the implementation, did you consider the pushdown >> abilities >> > > > > compatible, e.g., projection pushdown, filter pushdown, partition >> > > > pushdown. >> > > > > Since `Snapshot` is not handled much in existing rules, I have a >> > > concern >> > > > > about this. Of course, it depends on your implementation detail, >> what >> > > is >> > > > > important is that we'd better add some cross tests for these. >> > > > > >> > > > > Regarding the interface exposed to Connector, I see there is a >> > rejected >> > > > > design for adding SupportsTimeTravel, but I didn't see the >> > alternative >> > > in >> > > > > the FLIP doc. IMO, this is an important thing we need to clarify >> > > because >> > > > we >> > > > > need to know whether the Connector supports this, and what >> > > > column/metadata >> > > > > corresponds to 'system_time'. >> > > > > >> > > > > Feng Jin <jinfeng1...@gmail.com> 于2023年5月25日周四 22:50写道: >> > > > > >> > > > > > Thanks for your reply >> > > > > > >> > > > > > @Timo @BenChao @yuxia >> > > > > > >> > > > > > Sorry for the mistake, Currently , calcite only supports `FOR >> > > > > SYSTEM_TIME >> > > > > > AS OF ` syntax. We can only support `FOR SYSTEM_TIME AS OF` . >> > I've >> > > > > > updated the syntax part of the FLIP. >> > > > > > >> > > > > > >> > > > > > @Timo >> > > > > > >> > > > > > > We will convert it to TIMESTAMP_LTZ? >> > > > > > >> > > > > > Yes, I think we need to convert TIMESTAMP to TIMESTAMP_LTZ and >> then >> > > > > convert >> > > > > > it into a long value. >> > > > > > >> > > > > > > How do we want to query the most recent version of a table >> > > > > > >> > > > > > I think we can use `AS OF CURRENT_TIMESTAMP` ,But it does cause >> > > > > > inconsistency with the real-time concept. >> > > > > > However, from my personal understanding, the scope of `AS OF >> > > > > > CURRENT_TIMESTAMP` is the table itself, not the table record. >> So, >> > I >> > > > > think >> > > > > > using CURRENT_TIMESTAMP should also be reasonable?. >> > > > > > Additionally, if no version is specified, the latest version >> should >> > > be >> > > > > used >> > > > > > by default. >> > > > > > >> > > > > > >> > > > > > >> > > > > > Best, >> > > > > > Feng >> > > > > > >> > > > > > >> > > > > > On Thu, May 25, 2023 at 7:47 PM yuxia < >> luoyu...@alumni.sjtu.edu.cn >> > > >> > > > > wrote: >> > > > > > >> > > > > > > Thanks Feng for bringing this up. It'll be great to introduce >> > time >> > > > > travel >> > > > > > > to Flink to have a better integration with external data >> soruces. >> > > > > > > >> > > > > > > I also share same concern about the syntax. >> > > > > > > I see in the part of `Whether to support other syntax >> > > > implementations` >> > > > > in >> > > > > > > this FLIP, seems the syntax in Calcite should be `FOR >> SYSTEM_TIME >> > > AS >> > > > > OF`, >> > > > > > > right? >> > > > > > > But the the syntax part in this FLIP, it seems to be `AS OF >> > > > TIMESTAMP` >> > > > > > > instead of `FOR SYSTEM_TIME AS OF`. Is it just a mistake or >> by >> > > > design? >> > > > > > > >> > > > > > > >> > > > > > > Best regards, >> > > > > > > Yuxia >> > > > > > > >> > > > > > > ----- 原始邮件 ----- >> > > > > > > 发件人: "Benchao Li" <libenc...@apache.org> >> > > > > > > 收件人: "dev" <dev@flink.apache.org> >> > > > > > > 发送时间: 星期四, 2023年 5 月 25日 下午 7:27:17 >> > > > > > > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode >> > > > > > > >> > > > > > > Thanks Feng, it's exciting to have this ability. >> > > > > > > >> > > > > > > Regarding the syntax section, are you proposing `AS OF` >> instead >> > of >> > > > `FOR >> > > > > > > SYSTEM AS OF` to do this? I know `FOR SYSTEM AS OF` is in the >> SQL >> > > > > > standard >> > > > > > > and has been supported in some database vendors such as SQL >> > Server. >> > > > > About >> > > > > > > `AS OF`, is it in the standard or any database vendor supports >> > > this, >> > > > if >> > > > > > > yes, I think it's worth to add this support to Calcite, and I >> > would >> > > > > give >> > > > > > a >> > > > > > > hand in Calcite side. Otherwise, I think we'd better to use >> `FOR >> > > > SYSTEM >> > > > > > AS >> > > > > > > OF`. >> > > > > > > >> > > > > > > Timo Walther <twal...@apache.org> 于2023年5月25日周四 19:02写道: >> > > > > > > >> > > > > > > > Also: How do we want to query the most recent version of a >> > table? >> > > > > > > > >> > > > > > > > `AS OF CURRENT_TIMESTAMP` would be ideal, but according to >> the >> > > docs >> > > > > > both >> > > > > > > > the type is TIMESTAMP_LTZ and what is even more concerning >> is >> > the >> > > > it >> > > > > > > > actually is evalated row-based: >> > > > > > > > >> > > > > > > > > Returns the current SQL timestamp in the local time zone, >> > the >> > > > > return >> > > > > > > > type is TIMESTAMP_LTZ(3). It is evaluated for each record in >> > > > > streaming >> > > > > > > > mode. But in batch mode, it is evaluated once as the query >> > starts >> > > > and >> > > > > > > > uses the same result for every row. >> > > > > > > > >> > > > > > > > This could make it difficult to explain in a join scenario >> of >> > > > > multiple >> > > > > > > > snapshotted tables. >> > > > > > > > >> > > > > > > > Regards, >> > > > > > > > Timo >> > > > > > > > >> > > > > > > > >> > > > > > > > On 25.05.23 12:29, Timo Walther wrote: >> > > > > > > > > Hi Feng, >> > > > > > > > > >> > > > > > > > > thanks for proposing this FLIP. It makes a lot of sense to >> > > > finally >> > > > > > > > > support querying tables at a specific point in time or >> > > hopefully >> > > > > also >> > > > > > > > > ranges soon. Following time-versioned tables. >> > > > > > > > > >> > > > > > > > > Here is some feedback from my side: >> > > > > > > > > >> > > > > > > > > 1. Syntax >> > > > > > > > > >> > > > > > > > > Can you elaborate a bit on the Calcite restrictions? >> > > > > > > > > >> > > > > > > > > Does Calcite currently support `AS OF` syntax for this but >> > not >> > > > `FOR >> > > > > > > > > SYSTEM_TIME AS OF`? >> > > > > > > > > >> > > > > > > > > It would be great to support `AS OF` also for >> time-versioned >> > > > joins >> > > > > > and >> > > > > > > > > have a unified and short syntax. >> > > > > > > > > >> > > > > > > > > Once a fix is merged in Calcite for this, we can make this >> > > > > available >> > > > > > in >> > > > > > > > > Flink earlier by copying the corresponding classes until >> the >> > > next >> > > > > > > > > Calcite upgrade is performed. >> > > > > > > > > >> > > > > > > > > 2. Semantics >> > > > > > > > > >> > > > > > > > > How do we interpret the timestamp? In Flink we have 2 >> > timestamp >> > > > > types >> > > > > > > > > (TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF >> > TIMESTAMP >> > > > > > > > > '2023-04-27 00:00:00', in which timezone will the >> timestamp >> > be? >> > > > We >> > > > > > will >> > > > > > > > > convert it to TIMESTAMP_LTZ? >> > > > > > > > > >> > > > > > > > > We definely need to clarify this because the past has >> shown >> > > that >> > > > > > > > > daylight saving times make our lives hard. >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > Timo >> > > > > > > > > >> > > > > > > > > On 25.05.23 10:57, Feng Jin wrote: >> > > > > > > > >> Hi, everyone. >> > > > > > > > >> >> > > > > > > > >> I’d like to start a discussion about FLIP-308: Support >> Time >> > > > Travel >> > > > > > In >> > > > > > > > >> Batch >> > > > > > > > >> Mode [1] >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> Time travel is a SQL syntax used to query historical >> > versions >> > > of >> > > > > > data. >> > > > > > > > It >> > > > > > > > >> allows users to specify a point in time and retrieve the >> > data >> > > > and >> > > > > > > > >> schema of >> > > > > > > > >> a table as it appeared at that time. With time travel, >> users >> > > can >> > > > > > > easily >> > > > > > > > >> analyze and compare historical versions of data. >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> With the widespread use of data lake systems such as >> Paimon, >> > > > > > Iceberg, >> > > > > > > > and >> > > > > > > > >> Hudi, time travel can provide more convenience for users' >> > data >> > > > > > > analysis. >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> Looking forward to your opinions, any suggestions are >> > > welcomed. >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> 1. >> > > > > > > > >> >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> Best. >> > > > > > > > >> >> > > > > > > > >> Feng >> > > > > > > > >> >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > >> > > > > > > Best, >> > > > > > > Benchao Li >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > >> > > > > Best, >> > > > > Benchao Li >> > > > > >> > > > >> > > >> > >> > >> > -- >> > >> > Best, >> > Benchao Li >> > >> >