Hi Leonard, Thanks for your reply.
> 1. a How to construct a CatalogDescriptor ? I think it would be helpful to add a method for constructing a CatalogDescriptor, as you mentioned in 1.c. I will update the documentation later. > 1.b How to visit the fields ? Could we use Configuration instead of Map<String, String> ? I believe that the use of Map<String, String> options is only intended for creating a catalog and not for accessing internal parameters. Since all of the relevant parameters for CREATE CATALOG are also stored in Map<String, String> options, my understanding is that using Map<String, String> options should suffice. Here is the implementation of execute CREATE CATALOG statement. ```java private TableResultInternal createCatalog(CreateCatalogOperation operation) { String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString()); try { String catalogName = operation.getCatalogName(); Map<String, String> properties = operation.getProperties(); Catalog catalog = FactoryUtil.createCatalog( catalogName, properties, tableConfig, resourceManager.getUserClassLoader()); catalogManager.registerCatalog(catalogName, catalog); return TableResultImpl.TABLE_RESULT_OK; } catch (CatalogException e) { throw new ValidationException(exMsg, e); } } ``` > 2. Do we have plan to offer a default CatalogStore if user didn’t config this? Yes, the in-memory catalogStore will be used as the default CatalogStore even if the user has not configured one Best, Feng On Tue, Jun 6, 2023 at 8:28 PM Leonard Xu <xbjt...@gmail.com> wrote: > Hi, Feng > > Thanks for driving this FLIP, very impressive feature that users want, > I’ve some quick questions here. > > 1.Unification SQL: > The snapshot concept exists both in Batch mode and Streaming > mode, could we consider a unified proposal? I think users won’t another > SQL syntax named > Time travel for Streaming mode. > > 2.Semantics: > Flink supports TIMESTAMP and TIMESTAMP_LTZ types, to get a long > timestamp value (getTable(ObjectPath tablePath, long timestamp)) we need > two information i.e. a TIMESTAMP value and current session timezone, how > we deal the value with current proposed SQL syntax. > > 3. Is it enough using sinlge timestamp to track a snapshot(version) of > external table? Some external systems may use timestamp value to mark a > version, but others may use version number、file position、log offset. > > Best, > Leonard > > > > > On Jun 5, 2023, at 3:28 PM, Yun Tang <myas...@live.com> wrote: > > > > Hi Feng, > > > > I think this FLIP would provide one important feature to unify the > stream-SQL and batch-SQL when we backfill the historical data in batch mode. > > > > For the "Syntax" session, I think you could add descriptions of how to > align backfill time travel with querying the latest data. And I think you > should also update the "Discussion thread" in the original FLIP. > > > > Moreover, I have a question about getting the table schema from the > catalog. I'm not sure whether the Catalog#getTable(tablePath, timestamp) > will be called only once. If we have a backfill query between 2023-05-29 > and 2023-06-04 in the past week, and the table schema changed on > 2023-06-01, will the query below detect the schema changes during backfill > the whole week? > > > > SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP BETWEEN > '2023-05-29 00:00:00' AND '2023-06-05 00:00:00' > > > > Best > > Yun Tang > > > > > > ________________________________ > > From: Shammon FY <zjur...@gmail.com> > > Sent: Thursday, June 1, 2023 17:57 > > To: dev@flink.apache.org <dev@flink.apache.org> > > Subject: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode > > > > Hi Feng, > > > > I have one minor comment about the public interface `Optional<Long> > > getSnapshot()` in the `CatalogTable`. > > > > As we can get tables from the new method `Catalog.getTable(ObjectPath > > tablePath, long timestamp)`, I think the returned `CatalogBaseTable` will > > have the information of timestamp. Flink or connector such as > > iceberg/paimon can create sources from the `CatalogBaseTable` directly > > without the need to get the snapshot ID from > `CatalogTable.getSnapshot()`. > > What do you think of it? > > > > Best, > > Shammon FY > > > > > > On Thu, Jun 1, 2023 at 7:22 AM Jing Ge <j...@ververica.com.invalid> > wrote: > > > >> Hi Feng, > >> > >> Thanks for the proposal! Very interesting feature. Would you like to > update > >> your thoughts described in your previous email about why > SupportsTimeTravel > >> has been rejected into the FLIP? This will help readers understand the > >> context (in the future). > >> > >> Since we always directly add overload methods into Catalog according to > new > >> requirements, which makes the interface bloated. Just out of curiosity, > >> does it make sense to introduce some DSL design? Like > >> Catalog.getTable(tablePath).on(timeStamp), > >> Catalog.getTable(tablePath).current() for the most current version, and > >> more room for further extension like timestamp range, etc. I haven't > read > >> all the source code yet and I'm not sure if it is possible. But a > >> design like this will keep the Catalog API lean and the API/DSL will be > >> self described and easier to use. > >> > >> Best regards, > >> Jing > >> > >> > >> On Wed, May 31, 2023 at 12:08 PM Krzysztof Chmielewski < > >> krzysiek.chmielew...@gmail.com> wrote: > >> > >>> Ok after second though I'm retracting my previous statement about > Catalog > >>> changes you proposed. > >>> I do see a benefit for Delta connector actually with this change and > see > >>> why this could be coupled with Catalog. > >>> > >>> Delta Connector SQL support, also ships a Delta Catalog implementation > >> for > >>> Flink. > >>> For Delta Catalog, table schema information is fetched from underlying > >>> _delta_log and not stored in metastore. For time travel we actually > had a > >>> problem, that if we would like to timetravel back to some old version, > >>> where schema was slightly different, then we would have a conflict > since > >>> Catalog would return current schema and not how it was for version X. > >>> > >>> With your change, our Delta Catalog can actually fetch schema for > >> version X > >>> and send it to DeltaTableFactory. Currency, Catalog can fetch only > >> current > >>> version. What we would also need however is version (number/timestamp) > >> for > >>> this table passed to DynamicTableFactory so we could properly set Delta > >>> standalone library. > >>> > >>> Regards, > >>> Krzysztof > >>> > >>> śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski < > >>> krzysiek.chmielew...@gmail.com> napisał(a): > >>> > >>>> Hi, > >>>> happy to see such a feature. > >>>> Small note from my end regarding Catalog changes. > >>>> > >>>> TL;DR > >>>> I don't think it is necessary to delegate this feature to the catalog. > >> I > >>>> think that since "timetravel" is per job/query property, its should > not > >>> be > >>>> coupled with the Catalog or table definition. In my opinion this is > >>>> something that DynamicTableFactory only has to know about. I would > >> rather > >>>> see this feature as it is - SQL syntax enhancement but delegate > clearly > >>> to > >>>> DynamicTableFactory. > >>>> > >>>> I've implemented timetravel feature for Delta Connector [1] using > >>>> current Flink API. > >>>> Docs are pending code review, but you can find them here [2] and > >> examples > >>>> are available here [3] > >>>> > >>>> The timetravel feature that I've implemented is based on Flink Query > >>>> hints. > >>>> "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */" > >>>> > >>>> The " versionAsOf" (we also have 'timestampAsOf') parameter is handled > >>> not > >>>> by Catalog but by DyntamicTableFactory implementation for Delta > >>> connector. > >>>> The value of this property is passed to Delta standalone lib API that > >>>> returns table view for given version. > >>>> > >>>> I'm not sure how/if proposed change could benefit Delta connector > >>>> implementation for this feature. > >>>> > >>>> Thanks, > >>>> Krzysztof > >>>> > >>>> [1] > >>>> > >>> > >> > https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink > >>>> [2] https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs > >>>> [3] > >>>> > >>> > >> > https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql > >>>> > >>>> śr., 31 maj 2023 o 06:03 liu ron <ron9....@gmail.com> napisał(a): > >>>> > >>>>> Hi, Feng > >>>>> > >>>>> Thanks for driving this FLIP, Time travel is very useful for Flink > >>>>> integrate with data lake system. I have one question why the > >>>>> implementation > >>>>> of TimeTravel is delegated to Catalog? Assuming that we use Flink to > >>> query > >>>>> Hudi table with the time travel syntax, but we don't use the > >>> HudiCatalog, > >>>>> instead, we register the hudi table to InMemoryCatalog, can we > >> support > >>>>> time travel for Hudi table in this case? > >>>>> In contrast, I think time travel should bind to connector instead of > >>>>> Catalog, so the rejected alternative should be considered. > >>>>> > >>>>> Best, > >>>>> Ron > >>>>> > >>>>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年5月30日周二 09:40写道: > >>>>> > >>>>>> Hi, Feng. > >>>>>> Notice this FLIP only support batch mode for time travel. Would it > >>> also > >>>>>> make sense to support stream mode to a read a snapshot of the table > >>> as a > >>>>>> bounded stream? > >>>>>> > >>>>>> Best regards, > >>>>>> Yuxia > >>>>>> > >>>>>> ----- 原始邮件 ----- > >>>>>> 发件人: "Benchao Li" <libenc...@apache.org> > >>>>>> 收件人: "dev" <dev@flink.apache.org> > >>>>>> 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53 > >>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode > >>>>>> > >>>>>> # Can Calcite support this syntax ` VERSION AS OF` ? > >>>>>> > >>>>>> This also depends on whether this is defined in standard or any > >> known > >>>>>> databases that have implemented this. If not, it would be hard to > >> push > >>>>> it > >>>>>> to Calcite. > >>>>>> > >>>>>> # getTable(ObjectPath object, long timestamp) > >>>>>> > >>>>>> Then we again come to the problem of "casting between timestamp and > >>>>>> numeric", which has been disabled in FLINK-21978[1]. If you're gonna > >>> use > >>>>>> this, then we need to clarify that problem first. > >>>>>> > >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-21978 > >>>>>> > >>>>>> > >>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月29日周一 15:57写道: > >>>>>> > >>>>>>> hi, thanks for your reply. > >>>>>>> > >>>>>>> @Benchao > >>>>>>>> did you consider the pushdown abilities compatible > >>>>>>> > >>>>>>> In the current design, the implementation of TimeTravel is > >> delegated > >>>>> to > >>>>>>> Catalog. We have added a function called getTable(ObjectPath > >>>>> tablePath, > >>>>>>> long timestamp) to obtain the corresponding CatalogBaseTable at a > >>>>>> specific > >>>>>>> time. Therefore, I think it will not have any impact on the > >>> original > >>>>>>> pushdown abilities. > >>>>>>> > >>>>>>> > >>>>>>>> I see there is a rejected design for adding > >> SupportsTimeTravel, > >>>>> but > >>>>>> I > >>>>>>> didn't see the alternative in the FLIP doc > >>>>>>> > >>>>>>> Sorry, the document description is not very clear. Regarding > >>> whether > >>>>> to > >>>>>>> support SupportTimeTravel, I have discussed it with yuxia. Since > >> we > >>>>> have > >>>>>>> already passed the corresponding time in getTable(ObjectPath, long > >>>>>>> timestamp) of Catalog, SupportTimeTravel may not be necessary. > >>>>>>> > >>>>>>> In getTable(ObjectPath object, long timestamp), we can obtain the > >>>>> schema > >>>>>> of > >>>>>>> the corresponding time point and put the SNAPSHOT that needs to be > >>>>>> consumed > >>>>>>> into options. > >>>>>>> > >>>>>>> > >>>>>>> @Shammon > >>>>>>>> Could we support this in Flink too? > >>>>>>> > >>>>>>> I personally think it's possible, but limited by Calcite's syntax > >>>>>>> restrictions. I believe we should first support this syntax in > >>>>> Calcite. > >>>>>>> Currently, I think it may not be easy to support this syntax in > >>>>> Flink's > >>>>>>> parser. @Benchao, what do you think? Can Calcite support this > >> syntax > >>>>>>> ` VERSION AS OF` ? > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Feng. > >>>>>>> > >>>>>>> > >>>>>>> On Fri, May 26, 2023 at 2:55 PM Shammon FY <zjur...@gmail.com> > >>> wrote: > >>>>>>> > >>>>>>>> Thanks Feng, the feature of time travel sounds great! > >>>>>>>> > >>>>>>>> In addition to SYSTEM_TIME, lake houses such as paimon and > >> iceberg > >>>>>>> support > >>>>>>>> snapshot or version. For example, users can query snapshot 1 for > >>>>> paimon > >>>>>>> by > >>>>>>>> the following statement > >>>>>>>> SELECT * FROM t VERSION AS OF 1 > >>>>>>>> > >>>>>>>> Could we support this in Flink too? > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Shammon FY > >>>>>>>> > >>>>>>>> On Fri, May 26, 2023 at 1:20 PM Benchao Li < > >> libenc...@apache.org> > >>>>>> wrote: > >>>>>>>> > >>>>>>>>> Regarding the implementation, did you consider the pushdown > >>>>> abilities > >>>>>>>>> compatible, e.g., projection pushdown, filter pushdown, > >>> partition > >>>>>>>> pushdown. > >>>>>>>>> Since `Snapshot` is not handled much in existing rules, I > >> have a > >>>>>>> concern > >>>>>>>>> about this. Of course, it depends on your implementation > >> detail, > >>>>> what > >>>>>>> is > >>>>>>>>> important is that we'd better add some cross tests for these. > >>>>>>>>> > >>>>>>>>> Regarding the interface exposed to Connector, I see there is a > >>>>>> rejected > >>>>>>>>> design for adding SupportsTimeTravel, but I didn't see the > >>>>>> alternative > >>>>>>> in > >>>>>>>>> the FLIP doc. IMO, this is an important thing we need to > >> clarify > >>>>>>> because > >>>>>>>> we > >>>>>>>>> need to know whether the Connector supports this, and what > >>>>>>>> column/metadata > >>>>>>>>> corresponds to 'system_time'. > >>>>>>>>> > >>>>>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月25日周四 22:50写道: > >>>>>>>>> > >>>>>>>>>> Thanks for your reply > >>>>>>>>>> > >>>>>>>>>> @Timo @BenChao @yuxia > >>>>>>>>>> > >>>>>>>>>> Sorry for the mistake, Currently , calcite only supports > >>> `FOR > >>>>>>>>> SYSTEM_TIME > >>>>>>>>>> AS OF ` syntax. We can only support `FOR SYSTEM_TIME AS > >> OF` > >>> . > >>>>>> I've > >>>>>>>>>> updated the syntax part of the FLIP. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> @Timo > >>>>>>>>>> > >>>>>>>>>>> We will convert it to TIMESTAMP_LTZ? > >>>>>>>>>> > >>>>>>>>>> Yes, I think we need to convert TIMESTAMP to TIMESTAMP_LTZ > >> and > >>>>> then > >>>>>>>>> convert > >>>>>>>>>> it into a long value. > >>>>>>>>>> > >>>>>>>>>>> How do we want to query the most recent version of a table > >>>>>>>>>> > >>>>>>>>>> I think we can use `AS OF CURRENT_TIMESTAMP` ,But it does > >>> cause > >>>>>>>>>> inconsistency with the real-time concept. > >>>>>>>>>> However, from my personal understanding, the scope of `AS > >> OF > >>>>>>>>>> CURRENT_TIMESTAMP` is the table itself, not the table > >> record. > >>>>> So, > >>>>>> I > >>>>>>>>> think > >>>>>>>>>> using CURRENT_TIMESTAMP should also be reasonable?. > >>>>>>>>>> Additionally, if no version is specified, the latest version > >>>>> should > >>>>>>> be > >>>>>>>>> used > >>>>>>>>>> by default. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Feng > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Thu, May 25, 2023 at 7:47 PM yuxia < > >>>>> luoyu...@alumni.sjtu.edu.cn > >>>>>>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks Feng for bringing this up. It'll be great to > >>> introduce > >>>>>> time > >>>>>>>>> travel > >>>>>>>>>>> to Flink to have a better integration with external data > >>>>> soruces. > >>>>>>>>>>> > >>>>>>>>>>> I also share same concern about the syntax. > >>>>>>>>>>> I see in the part of `Whether to support other syntax > >>>>>>>> implementations` > >>>>>>>>> in > >>>>>>>>>>> this FLIP, seems the syntax in Calcite should be `FOR > >>>>> SYSTEM_TIME > >>>>>>> AS > >>>>>>>>> OF`, > >>>>>>>>>>> right? > >>>>>>>>>>> But the the syntax part in this FLIP, it seems to be `AS > >> OF > >>>>>>>> TIMESTAMP` > >>>>>>>>>>> instead of `FOR SYSTEM_TIME AS OF`. Is it just a mistake > >> or > >>>>> by > >>>>>>>> design? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Best regards, > >>>>>>>>>>> Yuxia > >>>>>>>>>>> > >>>>>>>>>>> ----- 原始邮件 ----- > >>>>>>>>>>> 发件人: "Benchao Li" <libenc...@apache.org> > >>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org> > >>>>>>>>>>> 发送时间: 星期四, 2023年 5 月 25日 下午 7:27:17 > >>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch > >>> Mode > >>>>>>>>>>> > >>>>>>>>>>> Thanks Feng, it's exciting to have this ability. > >>>>>>>>>>> > >>>>>>>>>>> Regarding the syntax section, are you proposing `AS OF` > >>>>> instead > >>>>>> of > >>>>>>>> `FOR > >>>>>>>>>>> SYSTEM AS OF` to do this? I know `FOR SYSTEM AS OF` is in > >>> the > >>>>> SQL > >>>>>>>>>> standard > >>>>>>>>>>> and has been supported in some database vendors such as > >> SQL > >>>>>> Server. > >>>>>>>>> About > >>>>>>>>>>> `AS OF`, is it in the standard or any database vendor > >>> supports > >>>>>>> this, > >>>>>>>> if > >>>>>>>>>>> yes, I think it's worth to add this support to Calcite, > >> and > >>> I > >>>>>> would > >>>>>>>>> give > >>>>>>>>>> a > >>>>>>>>>>> hand in Calcite side. Otherwise, I think we'd better to > >> use > >>>>> `FOR > >>>>>>>> SYSTEM > >>>>>>>>>> AS > >>>>>>>>>>> OF`. > >>>>>>>>>>> > >>>>>>>>>>> Timo Walther <twal...@apache.org> 于2023年5月25日周四 19:02写道: > >>>>>>>>>>> > >>>>>>>>>>>> Also: How do we want to query the most recent version > >> of a > >>>>>> table? > >>>>>>>>>>>> > >>>>>>>>>>>> `AS OF CURRENT_TIMESTAMP` would be ideal, but according > >> to > >>>>> the > >>>>>>> docs > >>>>>>>>>> both > >>>>>>>>>>>> the type is TIMESTAMP_LTZ and what is even more > >> concerning > >>>>> is > >>>>>> the > >>>>>>>> it > >>>>>>>>>>>> actually is evalated row-based: > >>>>>>>>>>>> > >>>>>>>>>>>>> Returns the current SQL timestamp in the local time > >>> zone, > >>>>>> the > >>>>>>>>> return > >>>>>>>>>>>> type is TIMESTAMP_LTZ(3). It is evaluated for each > >> record > >>> in > >>>>>>>>> streaming > >>>>>>>>>>>> mode. But in batch mode, it is evaluated once as the > >> query > >>>>>> starts > >>>>>>>> and > >>>>>>>>>>>> uses the same result for every row. > >>>>>>>>>>>> > >>>>>>>>>>>> This could make it difficult to explain in a join > >> scenario > >>>>> of > >>>>>>>>> multiple > >>>>>>>>>>>> snapshotted tables. > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> Timo > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 25.05.23 12:29, Timo Walther wrote: > >>>>>>>>>>>>> Hi Feng, > >>>>>>>>>>>>> > >>>>>>>>>>>>> thanks for proposing this FLIP. It makes a lot of > >> sense > >>> to > >>>>>>>> finally > >>>>>>>>>>>>> support querying tables at a specific point in time or > >>>>>>> hopefully > >>>>>>>>> also > >>>>>>>>>>>>> ranges soon. Following time-versioned tables. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Here is some feedback from my side: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. Syntax > >>>>>>>>>>>>> > >>>>>>>>>>>>> Can you elaborate a bit on the Calcite restrictions? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Does Calcite currently support `AS OF` syntax for this > >>> but > >>>>>> not > >>>>>>>> `FOR > >>>>>>>>>>>>> SYSTEM_TIME AS OF`? > >>>>>>>>>>>>> > >>>>>>>>>>>>> It would be great to support `AS OF` also for > >>>>> time-versioned > >>>>>>>> joins > >>>>>>>>>> and > >>>>>>>>>>>>> have a unified and short syntax. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Once a fix is merged in Calcite for this, we can make > >>> this > >>>>>>>>> available > >>>>>>>>>> in > >>>>>>>>>>>>> Flink earlier by copying the corresponding classes > >> until > >>>>> the > >>>>>>> next > >>>>>>>>>>>>> Calcite upgrade is performed. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2. Semantics > >>>>>>>>>>>>> > >>>>>>>>>>>>> How do we interpret the timestamp? In Flink we have 2 > >>>>>> timestamp > >>>>>>>>> types > >>>>>>>>>>>>> (TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF > >>>>>> TIMESTAMP > >>>>>>>>>>>>> '2023-04-27 00:00:00', in which timezone will the > >>>>> timestamp > >>>>>> be? > >>>>>>>> We > >>>>>>>>>> will > >>>>>>>>>>>>> convert it to TIMESTAMP_LTZ? > >>>>>>>>>>>>> > >>>>>>>>>>>>> We definely need to clarify this because the past has > >>>>> shown > >>>>>>> that > >>>>>>>>>>>>> daylight saving times make our lives hard. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Timo > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 25.05.23 10:57, Feng Jin wrote: > >>>>>>>>>>>>>> Hi, everyone. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I’d like to start a discussion about FLIP-308: > >> Support > >>>>> Time > >>>>>>>> Travel > >>>>>>>>>> In > >>>>>>>>>>>>>> Batch > >>>>>>>>>>>>>> Mode [1] > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Time travel is a SQL syntax used to query historical > >>>>>> versions > >>>>>>> of > >>>>>>>>>> data. > >>>>>>>>>>>> It > >>>>>>>>>>>>>> allows users to specify a point in time and retrieve > >>> the > >>>>>> data > >>>>>>>> and > >>>>>>>>>>>>>> schema of > >>>>>>>>>>>>>> a table as it appeared at that time. With time > >> travel, > >>>>> users > >>>>>>> can > >>>>>>>>>>> easily > >>>>>>>>>>>>>> analyze and compare historical versions of data. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> With the widespread use of data lake systems such as > >>>>> Paimon, > >>>>>>>>>> Iceberg, > >>>>>>>>>>>> and > >>>>>>>>>>>>>> Hudi, time travel can provide more convenience for > >>> users' > >>>>>> data > >>>>>>>>>>> analysis. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Looking forward to your opinions, any suggestions are > >>>>>>> welcomed. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Feng > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Benchao Li > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Benchao Li > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> > >>>>>> Best, > >>>>>> Benchao Li > >>>>>> > >>>>> > >>>> > >>> > >> > >