Hi everyone,

Thank you all for your valuable input. If there are no further questions or
concerns regarding FLIP-308[1], I would like to start voting on Monday,
June 19th.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel


Best,

Feng

On Mon, Jun 12, 2023 at 10:57 AM Feng Jin <jinfeng1...@gmail.com> wrote:

> Thanks Benchao and Leonard.
>
> 'Implicitly type conversion' makes sense to me.   I will emphasize the
> 'Implicitly type conversion' in the document.
>
>
> Best,
> Feng
>
> On Sat, Jun 10, 2023 at 10:11 AM Benchao Li <libenc...@apache.org> wrote:
>
>> Thanks Leonard for the input, "Implicitly type conversion" way sounds good
>> to me.
>> I also agree that this should be done in planner instead of connector,
>> it'll be a lot easier for connector development.
>>
>> Leonard Xu <xbjt...@gmail.com> 于2023年6月9日周五 20:11写道:
>>
>> > About the semantics consideration, I have some new input after rethink.
>> >
>> > 1. We can support both TIMESTAMP and TIMESTAMP_LTZ expression following
>> > the syntax  `SELECT [column_name(s)] FROM [table_name] FOR SYSTEM_TIME
>> AS
>> > OF `
>> >
>> > 2. For TIMESTAMP_LTZ type, give a long instant value to CatalogTable is
>> > pretty intuitive, for TIMESTAMP_type, it will be implied cast to
>> > TIMESTAMP_LTZ type by planner using session timezone and then pass to
>> > CatalogTable. This case can be considered as a Function
>> AsOfSnapshot(Table
>> > t, TIMESTAMP_LTZ arg), which can pass arg with TIMESTAMP_LTZ type, but
>> our
>> > framework supports implicit type conversion thus users can also pass arg
>> > with TIMESTAMP type. Hint, Spark[1] did the  implicit type conversion
>> too.
>> >
>> > 3.I also considered handing over the implicit type conversion to the
>> > connector instead of planner, such as passing a TIMESTAMP literal, and
>> the
>> > connector using the session timezone to perform type conversion, but
>> this
>> > is more complicated than previous planner handling, and it’s not
>> friendly
>> > to the connector developers.
>> >
>> > 4. The last point,  TIMESTAMP_LTZ  '1970-01-01 00:00:04.001’ should be
>> an
>> > invalid expression as if you can not define a instant point (i.e
>> > TIMSTAMP_LTZ semantics in SQL) from a timestamp literal without
>> timezone.
>> > You can use explicit type conversion like `cast(ts_ntz as
>> TIMESTAMP_LTZ)`
>> > after `FOR SYSTEM_TIME AS OF ` if you want to use
>> > Timestamp type/expression/literal without timezone.
>> >
>> > 5. The last last point, the TIMESTAMP_LTZ type of Flink SQL supports DST
>> > time[2] well that will help user avoid many corner case.
>> >
>> >
>> > Best,
>> > Leonard
>> >
>> > [1]
>> >
>> https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala#L56
>> > [2]
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/timezone/#daylight-saving-time-support
>> >
>> >
>> >
>> >
>> > > On Jun 9, 2023, at 1:13 PM, Benchao Li <libenc...@apache.org> wrote:
>> > >
>> > > As you can see that you must use `UNIX_TIMESTAMP` to do this work,
>> that's
>> > > where the time zone happens.
>> > >
>> > > What I'm talking about is casting timestamp/timestamp_ltz to long
>> > directly,
>> > > that's why the semantic is tricky when you are casting timestamp to
>> long
>> > > using time zone.
>> > >
>> > > For other systems, such as SQL server[1], they actually uses a string
>> > > instead of timestamp literal `FOR SYSTEM_TIME AS OF '2021-01-01
>> > > 00:00:00.0000000'`, I'm not sure whether they convert the string
>> > implicitly
>> > > to TIMESTAMP_LTZ, or they just have a different definition of the
>> syntax.
>> > >
>> > > But for us, we are definitely using timestamp/timestmap_ltz literal
>> here,
>> > > that's why it is special, and we must highlight this behavior that we
>> are
>> > > converting a timestamp without time zone literal to long using the
>> > session
>> > > time zone.
>> > >
>> > > [1]
>> > >
>> >
>> https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-table-usage-scenarios?view=sql-server-ver16
>> > >
>> > > Feng Jin <jinfeng1...@gmail.com> 于2023年6月8日周四 11:35写道:
>> > >
>> > >> Hi all,
>> > >>
>> > >> thanks for your input
>> > >>
>> > >>
>> > >> @Benchao
>> > >>
>> > >>> The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP
>> > >> WITHOUT TIME ZONE", converting it to unix timestamp would use UTC
>> > timezone,
>> > >> which is not usually expected by users.
>> > >>
>> > >> It was indeed the case before Flink 1.13, but now my understanding is
>> > that
>> > >> there have been some slight changes in the definition of TIMESTAMP.
>> > >>
>> > >> TIMESTAMP is currently used to specify the year, month, day, hour,
>> > minute
>> > >> and second. We recommend that users use
>> > *UNIX_TIMESTAMP(CAST(timestamp_col
>> > >> AS STRING))* to convert *TIMESTAMP values* and *long values*. The
>> > >> *UNIX_TIMESTAMP* function will use the *LOCAL TIME ZONE*. Therefore,
>> > >> whether converting TIMESTAMP or TIMESTAMP_LTZ to Long values will
>> > involve
>> > >> using the *LOCAL TIME ZONE*.
>> > >>
>> > >>
>> > >> Here is an test:
>> > >>
>> > >> Flink SQL> SET 'table.local-time-zone' = 'UTC';
>> > >> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01
>> 00:00:00' as
>> > >> STRING)) as `timestamp`;
>> > >> ---------------
>> > >> timestamp
>> > >> --------------
>> > >> 0
>> > >>
>> > >> Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
>> > >> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01
>> 00:00:00' as
>> > >> STRING)) as `timestamp`;
>> > >> ---------------
>> > >> timestamp
>> > >> --------------
>> > >> -28800
>> > >>
>> > >> Therefore, the current conversion method exposed to users is also
>> using
>> > >> LOCAL TIME ZONE.
>> > >>
>> > >>
>> > >> @yuxia
>> > >>
>> > >> Thank you very much for providing the list of behaviors of TIMESTAMP
>> in
>> > >> other systems.
>> > >>
>> > >>> I think we can align them to avoid the inconsistency to other
>> engines
>> > and
>> > >> provide convenience for the external connectors while integrating
>> > Flink's
>> > >> time travel API.
>> > >>
>> > >> +1 for this.
>> > >>
>> > >>> Regarding the inconsistency, I think we can consider time-travel as
>> a
>> > >> specical case, and we do needs to highlight this in this FLIP.
>> > >> As for "violate the restriction outlined in FLINK-21978[1]", since we
>> > cast
>> > >> timestamp to epochMillis only for the internal use, and won't expose
>> it
>> > to
>> > >> users, I don't think it will violate the restriction.
>> > >> Btw, please add a brief desc to explain the meaning of the parameter
>> > >> `timestamp` in method `CatalogBaseTable getTable(ObjectPath
>> tablePath,
>> > long
>> > >> timestamp)`. Maybe something like "timestamp of the table snapt,
>> which
>> > is
>> > >> millseconds since 1970-01-01 00:00:00 UTC".
>> > >>
>> > >> Thank you for the suggestions regarding the document. I will add
>> them to
>> > >> FLIP.
>> > >>
>> > >>
>> > >> Best,
>> > >> Feng
>> > >>
>> > >>
>> > >> On Wed, Jun 7, 2023 at 12:18 PM Benchao Li <libenc...@apache.org>
>> > wrote:
>> > >>
>> > >>> I also share the concern about the timezone problem.
>> > >>>
>> > >>> The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP
>> > >> WITHOUT
>> > >>> TIME ZONE", converting it to unix timestamp would use UTC timezone,
>> > which
>> > >>> is not usually expected by users.
>> > >>>
>> > >>> If we want to keep consistent with the standard, we probably should
>> use
>> > >>> "TIMESTAMP WITH LOCAL ZONE '2023-04-27 00:00:00'", which type is
>> > >> "TIMESTAMP
>> > >>> WITH LOCAL TIME ZONE", and converting it to unix timestamp will
>> > consider
>> > >>> the session timezone, which is the expected result. But it's
>> > inconvenient
>> > >>> for users.
>> > >>>
>> > >>> Taking this a special case, and converting "TIMESTAMP '2023-04-27
>> > >>> 00:00:00'" to a unix timestamp with session timezone, will be
>> > convenient
>> > >>> for users, but will break the standard. I will +0.5 for this choice.
>> > >>>
>> > >>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年6月7日周三 12:06写道:
>> > >>>
>> > >>>> Hi, Feng Jin.
>> > >>>> I think the concern of Leonard may be the inconsistency of the
>> > behavior
>> > >>> of
>> > >>>> TIMESTAMP '2023-04-27 00:00:00' beween timetravel and other sql
>> > >>> statement.
>> > >>>>
>> > >>>> For the normal sql:
>> > >>>> `SELECT TIMESTAMP '2023-04-27 00:00:00'`, we won't consider
>> timezone.
>> > >>>> But for the sql for timetravl:
>> > >>>> `SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP
>> '2023-04-27
>> > >>>> 00:00:00'`, we will consider the timezone and convert to UTC
>> > timestamp.
>> > >>>>
>> > >>>> The concern is valid. But for time travel, most style of engines,
>> > >>>> Spark[1], Hive[2], Trino[3] also do the time conversion with
>> > >> considering
>> > >>>> the seesion time zone. I think we can align them to avoid the
>> > >>> inconsistency
>> > >>>> to other engines and provide convenience for the external
>> connectors
>> > >>> while
>> > >>>> integrating Flink's time travel API.
>> > >>>>
>> > >>>> Regarding the inconsistency, I think we can consider time-travel
>> as a
>> > >>>> specical case, and we do needs to highlight this in this FLIP.
>> > >>>> As for "violate the restriction outlined in FLINK-21978[1]", since
>> we
>> > >>> cast
>> > >>>> timestamp to epochMillis only for the internal use, and won't
>> expose
>> > it
>> > >>> to
>> > >>>> users, I don't think it will violate the restriction.
>> > >>>> Btw, please add a brief desc to explain the meaning of the
>> parameter
>> > >>>> `timestamp` in method `CatalogBaseTable getTable(ObjectPath
>> tablePath,
>> > >>> long
>> > >>>> timestamp)`. Maybe something like "timestamp of the table snapt,
>> which
>> > >> is
>> > >>>> millseconds since 1970-01-01 00:00:00 UTC".
>> > >>>>
>> > >>>> [1]
>> > >>>>
>> > >>>
>> > >>
>> >
>> https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala#L56
>> > >>>> [2]
>> > >>>>
>> > >>>
>> > >>
>> >
>> https://github.com/apache/hive/blob/f5e69dc38d7ff26c70be19adc9d1a3ae90dc4cf2/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java#L989
>> > >>>> [3]
>> > >>>>
>> > >>>
>> > >>
>> >
>> https://github.com/trinodb/trino/blob/2433d9e60f1abb0d85c32374c1758525560e1a86/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java#L443
>> > >>>>
>> > >>>>
>> > >>>> Best regards,
>> > >>>> Yuxia
>> > >>>>
>> > >>>> ----- 原始邮件 -----
>> > >>>> 发件人: "Feng Jin" <jinfeng1...@gmail.com>
>> > >>>> 收件人: "dev" <dev@flink.apache.org>
>> > >>>> 发送时间: 星期二, 2023年 6 月 06日 下午 10:15:47
>> > >>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
>> > >>>>
>> > >>>> Hi everyone
>> > >>>>
>> > >>>> Thanks everyone for your input.
>> > >>>>
>> > >>>>
>> > >>>> @Yun
>> > >>>>
>> > >>>>> I think you could add descriptions of how to align backfill time
>> > >>> travel
>> > >>>> with querying the latest data. And I think you should also update
>> the
>> > >>>> "Discussion thread" in the original FLIP.
>> > >>>>
>> > >>>> Thank you for the suggestion, I will update it in the document.
>> > >>>>
>> > >>>>> I have a question about getting the table schema from the catalog.
>> > >> I'm
>> > >>>> not sure whether the Catalog#getTable(tablePath, timestamp) will be
>> > >>> called
>> > >>>> only once.
>> > >>>>
>> > >>>> I understand that in a query, the schema of the table is determined
>> > >>> before
>> > >>>> execution. The schema used will be based on the latest schema
>> within
>> > >> the
>> > >>>> TimeTravel period.
>> > >>>>
>> > >>>> In addition, due to current syntax limitations, we are unable to
>> > >> support
>> > >>>> the use of BETWEEN AND.
>> > >>>>
>> > >>>>
>> > >>>> @Jing
>> > >>>>
>> > >>>>> Would you like to update your thoughts described in your previous
>> > >>> email
>> > >>>> about why SupportsTimeTravel has been rejected into the FLIP?
>> > >>>>
>> > >>>> Sure,  I updated the doc.
>> > >>>>
>> > >>>>
>> > >>>>>   Since we always directly add overload methods into Catalog
>> > >> according
>> > >>>> to new requirements, which makes the interface bloated
>> > >>>>
>> > >>>> Your concern is valid. If we need to support the long type version
>> in
>> > >> the
>> > >>>> future, we may have to add another method "getTable(ObjectPath,
>> long
>> > >>>> version)". However, I understand that
>> > >>>> "Catalog.getTable(tablePath).on(timeStamp)" may not meet the
>> > >>> requirements.
>> > >>>> The timestamp is for Catalog's use, and Catalog obtains the
>> > >> corresponding
>> > >>>> schema based on this time.
>> > >>>>
>> > >>>>
>> > >>>> @liu @Regards
>> > >>>>
>> > >>>> I am very sorry for the unclear description in the document. I have
>> > >>> updated
>> > >>>> relevant descriptions regarding why it needs to be implemented in
>> > >>> Catalog.
>> > >>>>
>> > >>>> Travel not only requires obtaining data at the corresponding time
>> > >> point,
>> > >>>> but also requires the corresponding Schema at that time point
>> > >>>>
>> > >>>>
>> > >>>> @Shammon
>> > >>>>
>> > >>>>> Flink or connector such as  iceberg/paimon can create sources from
>> > >> the
>> > >>>> `CatalogBaseTable` directly without the need to get the snapshot ID
>> > >> from
>> > >>>> `CatalogTable.getSnapshot()`.  What do you think of it?
>> > >>>>
>> > >>>> You are right, we don't need the getSnapshot interface for
>> > >> PaimonCatalog
>> > >>> or
>> > >>>> IcebergCatalog tables, but we may need it for temporary tables.
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> Best,
>> > >>>> Feng
>> > >>>>
>> > >>>>
>> > >>>> On Tue, Jun 6, 2023 at 9:32 PM Feng Jin <jinfeng1...@gmail.com>
>> > wrote:
>> > >>>>
>> > >>>>> Sorry I replied to the wrong mail. Please ignore the last email.
>> > >>>>>
>> > >>>>>
>> > >>>>> Hi Leonard
>> > >>>>>
>> > >>>>>> 1. Unification SQL
>> > >>>>>
>> > >>>>> I agree that it is crucial for us to support both batch and
>> streaming
>> > >>>>> processing.  The current design allows for the support of both
>> batch
>> > >>> and
>> > >>>>> streaming processing. I'll update the FLIP later.
>> > >>>>>
>> > >>>>>
>> > >>>>>> 2.Semantics
>> > >>>>>
>> > >>>>> In my opinion, it would be feasible to perform the conversion
>> based
>> > >> on
>> > >>>> the
>> > >>>>> current session time, regardless of whether it is TIMESTAMP or
>> > >>>>> TIMESTAMP_LTZ.
>> > >>>>>
>> > >>>>> However, this may indeed violate the restriction outlined in
>> > >>>>> FLINK-21978[1]  as Benchao mentioned, and I am uncertain as to
>> > >> whether
>> > >>> it
>> > >>>>> is reasonable.
>> > >>>>>
>> > >>>>>
>> > >>>>>>  3.  Some external systems may use timestamp value to mark a
>> > >>> version,
>> > >>>>> but others may use version number、file position、log offset.
>> > >>>>>
>> > >>>>> It is true that most systems support time-related operations, and
>> I
>> > >>>>> believe that the current design is compatible with most systems.
>> > >>> However,
>> > >>>>> if we want to support long data type, it may require Calcite to
>> > >> support
>> > >>>> the
>> > >>>>> VERSION AS OF syntax. I understand that this is something that we
>> may
>> > >>>> need
>> > >>>>> to consider in the future.
>> > >>>>>
>> > >>>>>
>> > >>>>> Best,
>> > >>>>> Feng
>> > >>>>>
>> > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21978
>> > >>>>>
>> > >>>>> On Tue, Jun 6, 2023 at 8:28 PM Leonard Xu <xbjt...@gmail.com>
>> wrote:
>> > >>>>>
>> > >>>>>> Hi, Feng
>> > >>>>>>
>> > >>>>>> Thanks for driving this FLIP, very impressive feature that users
>> > >> want,
>> > >>>>>> I’ve some quick questions here.
>> > >>>>>>
>> > >>>>>> 1.Unification SQL:
>> > >>>>>>        The snapshot  concept exists both in Batch mode and
>> > >> Streaming
>> > >>>>>> mode,  could we consider a unified proposal? I think users won’t
>> > >>> another
>> > >>>>>> SQL syntax named
>> > >>>>>> Time travel for Streaming mode.
>> > >>>>>>
>> > >>>>>> 2.Semantics:
>> > >>>>>>        Flink supports TIMESTAMP and TIMESTAMP_LTZ types, to get a
>> > >>> long
>> > >>>>>> timestamp value (getTable(ObjectPath tablePath, long timestamp))
>> we
>> > >>> need
>> > >>>>>> two information i.e. a TIMESTAMP value and current session
>> timezone,
>> > >>>> how
>> > >>>>>> we deal the value with current proposed SQL syntax.
>> > >>>>>>
>> > >>>>>> 3. Is it enough using sinlge timestamp to track a
>> snapshot(version)
>> > >> of
>> > >>>>>> external table?   Some external systems may use timestamp value
>> to
>> > >>> mark
>> > >>>> a
>> > >>>>>> version, but others may use version number、file position、log
>> offset.
>> > >>>>>>
>> > >>>>>> Best,
>> > >>>>>> Leonard
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>> On Jun 5, 2023, at 3:28 PM, Yun Tang <myas...@live.com> wrote:
>> > >>>>>>>
>> > >>>>>>> Hi Feng,
>> > >>>>>>>
>> > >>>>>>> I think this FLIP would provide one important feature to unify
>> the
>> > >>>>>> stream-SQL and batch-SQL when we backfill the historical data in
>> > >> batch
>> > >>>> mode.
>> > >>>>>>>
>> > >>>>>>> For the "Syntax" session, I think you could add descriptions of
>> > >> how
>> > >>> to
>> > >>>>>> align backfill time travel with querying the latest data. And I
>> > >> think
>> > >>>> you
>> > >>>>>> should also update the "Discussion thread" in the original FLIP.
>> > >>>>>>>
>> > >>>>>>> Moreover, I have a question about getting the table schema from
>> > >> the
>> > >>>>>> catalog. I'm not sure whether the Catalog#getTable(tablePath,
>> > >>> timestamp)
>> > >>>>>> will be called only once. If we have a backfill query between
>> > >>> 2023-05-29
>> > >>>>>> and 2023-06-04 in the past week, and the table schema changed on
>> > >>>>>> 2023-06-01, will the query below detect the schema changes during
>> > >>>> backfill
>> > >>>>>> the whole week?
>> > >>>>>>>
>> > >>>>>>> SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP BETWEEN
>> > >>>>>> '2023-05-29 00:00:00' AND '2023-06-05 00:00:00'
>> > >>>>>>>
>> > >>>>>>> Best
>> > >>>>>>> Yun Tang
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>> ________________________________
>> > >>>>>>> From: Shammon FY <zjur...@gmail.com>
>> > >>>>>>> Sent: Thursday, June 1, 2023 17:57
>> > >>>>>>> To: dev@flink.apache.org <dev@flink.apache.org>
>> > >>>>>>> Subject: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch
>> Mode
>> > >>>>>>>
>> > >>>>>>> Hi Feng,
>> > >>>>>>>
>> > >>>>>>> I have one minor comment about the public interface
>> > >> `Optional<Long>
>> > >>>>>>> getSnapshot()` in the `CatalogTable`.
>> > >>>>>>>
>> > >>>>>>> As we can get tables from the new method
>> > >>> `Catalog.getTable(ObjectPath
>> > >>>>>>> tablePath, long timestamp)`, I think the returned
>> > >> `CatalogBaseTable`
>> > >>>>>> will
>> > >>>>>>> have the information of timestamp. Flink or connector such as
>> > >>>>>>> iceberg/paimon can create sources from the `CatalogBaseTable`
>> > >>> directly
>> > >>>>>>> without the need to get the snapshot ID from
>> > >>>>>> `CatalogTable.getSnapshot()`.
>> > >>>>>>> What do you think of it?
>> > >>>>>>>
>> > >>>>>>> Best,
>> > >>>>>>> Shammon FY
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>> On Thu, Jun 1, 2023 at 7:22 AM Jing Ge
>> <j...@ververica.com.invalid
>> > >>>
>> > >>>>>> wrote:
>> > >>>>>>>
>> > >>>>>>>> Hi Feng,
>> > >>>>>>>>
>> > >>>>>>>> Thanks for the proposal! Very interesting feature. Would you
>> like
>> > >>> to
>> > >>>>>> update
>> > >>>>>>>> your thoughts described in your previous email about why
>> > >>>>>> SupportsTimeTravel
>> > >>>>>>>> has been rejected into the FLIP? This will help readers
>> > >> understand
>> > >>>> the
>> > >>>>>>>> context (in the future).
>> > >>>>>>>>
>> > >>>>>>>> Since we always directly add overload methods into Catalog
>> > >>> according
>> > >>>>>> to new
>> > >>>>>>>> requirements, which makes the interface bloated. Just out of
>> > >>>> curiosity,
>> > >>>>>>>> does it make sense to introduce some DSL design? Like
>> > >>>>>>>> Catalog.getTable(tablePath).on(timeStamp),
>> > >>>>>>>> Catalog.getTable(tablePath).current() for the most current
>> > >> version,
>> > >>>> and
>> > >>>>>>>> more room for further extension like timestamp range, etc. I
>> > >>> haven't
>> > >>>>>> read
>> > >>>>>>>> all the source code yet and I'm not sure if it is possible.
>> But a
>> > >>>>>>>> design like this will keep the Catalog API lean and the API/DSL
>> > >>> will
>> > >>>> be
>> > >>>>>>>> self described and easier to use.
>> > >>>>>>>>
>> > >>>>>>>> Best regards,
>> > >>>>>>>> Jing
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> On Wed, May 31, 2023 at 12:08 PM Krzysztof Chmielewski <
>> > >>>>>>>> krzysiek.chmielew...@gmail.com> wrote:
>> > >>>>>>>>
>> > >>>>>>>>> Ok after second though I'm retracting my previous statement
>> > >> about
>> > >>>>>> Catalog
>> > >>>>>>>>> changes you proposed.
>> > >>>>>>>>> I do see a benefit for Delta connector actually with this
>> change
>> > >>> and
>> > >>>>>> see
>> > >>>>>>>>> why this could be coupled with Catalog.
>> > >>>>>>>>>
>> > >>>>>>>>> Delta Connector SQL support, also ships a Delta Catalog
>> > >>>> implementation
>> > >>>>>>>> for
>> > >>>>>>>>> Flink.
>> > >>>>>>>>> For Delta Catalog, table schema information is fetched from
>> > >>>> underlying
>> > >>>>>>>>> _delta_log and not stored in metastore. For time travel we
>> > >>> actually
>> > >>>>>> had a
>> > >>>>>>>>> problem, that if we would like to timetravel back to some old
>> > >>>> version,
>> > >>>>>>>>> where schema was slightly different, then we would have a
>> > >> conflict
>> > >>>>>> since
>> > >>>>>>>>> Catalog would return current schema and not how it was for
>> > >> version
>> > >>>> X.
>> > >>>>>>>>>
>> > >>>>>>>>> With your change, our Delta Catalog can actually fetch schema
>> > >> for
>> > >>>>>>>> version X
>> > >>>>>>>>> and send it to DeltaTableFactory. Currency, Catalog can fetch
>> > >> only
>> > >>>>>>>> current
>> > >>>>>>>>> version. What we would also need however is version
>> > >>>> (number/timestamp)
>> > >>>>>>>> for
>> > >>>>>>>>> this table passed to DynamicTableFactory so we could properly
>> > >> set
>> > >>>>>> Delta
>> > >>>>>>>>> standalone library.
>> > >>>>>>>>>
>> > >>>>>>>>> Regards,
>> > >>>>>>>>> Krzysztof
>> > >>>>>>>>>
>> > >>>>>>>>> śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski <
>> > >>>>>>>>> krzysiek.chmielew...@gmail.com> napisał(a):
>> > >>>>>>>>>
>> > >>>>>>>>>> Hi,
>> > >>>>>>>>>> happy to see such a feature.
>> > >>>>>>>>>> Small note from my end regarding Catalog changes.
>> > >>>>>>>>>>
>> > >>>>>>>>>> TL;DR
>> > >>>>>>>>>> I don't think it is necessary to delegate this feature to the
>> > >>>>>> catalog.
>> > >>>>>>>> I
>> > >>>>>>>>>> think that since "timetravel" is per job/query property, its
>> > >>> should
>> > >>>>>> not
>> > >>>>>>>>> be
>> > >>>>>>>>>> coupled with the Catalog or table definition. In my opinion
>> > >> this
>> > >>> is
>> > >>>>>>>>>> something that DynamicTableFactory only has to know about. I
>> > >>> would
>> > >>>>>>>> rather
>> > >>>>>>>>>> see this feature as it is - SQL syntax enhancement but
>> delegate
>> > >>>>>> clearly
>> > >>>>>>>>> to
>> > >>>>>>>>>> DynamicTableFactory.
>> > >>>>>>>>>>
>> > >>>>>>>>>> I've implemented timetravel feature for Delta Connector  [1]
>> > >>> using
>> > >>>>>>>>>> current Flink API.
>> > >>>>>>>>>> Docs are pending code review, but you can find them here [2]
>> > >> and
>> > >>>>>>>> examples
>> > >>>>>>>>>> are available here [3]
>> > >>>>>>>>>>
>> > >>>>>>>>>> The timetravel feature that I've implemented is based on
>> Flink
>> > >>>> Query
>> > >>>>>>>>>> hints.
>> > >>>>>>>>>> "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1')
>> */"
>> > >>>>>>>>>>
>> > >>>>>>>>>> The " versionAsOf" (we also have 'timestampAsOf') parameter
>> is
>> > >>>>>> handled
>> > >>>>>>>>> not
>> > >>>>>>>>>> by Catalog but by DyntamicTableFactory implementation for
>> Delta
>> > >>>>>>>>> connector.
>> > >>>>>>>>>> The value of this property is passed to Delta standalone lib
>> > >> API
>> > >>>> that
>> > >>>>>>>>>> returns table view for given version.
>> > >>>>>>>>>>
>> > >>>>>>>>>> I'm not sure how/if proposed change could benefit Delta
>> > >> connector
>> > >>>>>>>>>> implementation for this feature.
>> > >>>>>>>>>>
>> > >>>>>>>>>> Thanks,
>> > >>>>>>>>>> Krzysztof
>> > >>>>>>>>>>
>> > >>>>>>>>>> [1]
>> > >>>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>
>> > >>>>
>> > >>>
>> > >>
>> >
>> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink
>> > >>>>>>>>>> [2]
>> > >>>>>>
>> https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs
>> > >>>>>>>>>> [3]
>> > >>>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>
>> > >>>>
>> > >>>
>> > >>
>> >
>> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql
>> > >>>>>>>>>>
>> > >>>>>>>>>> śr., 31 maj 2023 o 06:03 liu ron <ron9....@gmail.com>
>> > >>> napisał(a):
>> > >>>>>>>>>>
>> > >>>>>>>>>>> Hi, Feng
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Thanks for driving this FLIP, Time travel is very useful for
>> > >>> Flink
>> > >>>>>>>>>>> integrate with data lake system. I have one question why the
>> > >>>>>>>>>>> implementation
>> > >>>>>>>>>>> of TimeTravel is delegated to Catalog? Assuming that we use
>> > >>> Flink
>> > >>>> to
>> > >>>>>>>>> query
>> > >>>>>>>>>>> Hudi table with the time travel syntax, but we don't use the
>> > >>>>>>>>> HudiCatalog,
>> > >>>>>>>>>>> instead, we register the hudi table to InMemoryCatalog,  can
>> > >> we
>> > >>>>>>>> support
>> > >>>>>>>>>>> time travel for Hudi table in this case?
>> > >>>>>>>>>>> In contrast, I think time travel should bind to connector
>> > >>> instead
>> > >>>> of
>> > >>>>>>>>>>> Catalog, so the rejected alternative should be considered.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Best,
>> > >>>>>>>>>>> Ron
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年5月30日周二 09:40写道:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> Hi, Feng.
>> > >>>>>>>>>>>> Notice this FLIP only support batch mode for time travel.
>> > >>> Would
>> > >>>> it
>> > >>>>>>>>> also
>> > >>>>>>>>>>>> make sense to support stream mode to a read a snapshot of
>> the
>> > >>>> table
>> > >>>>>>>>> as a
>> > >>>>>>>>>>>> bounded stream?
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Best regards,
>> > >>>>>>>>>>>> Yuxia
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> ----- 原始邮件 -----
>> > >>>>>>>>>>>> 发件人: "Benchao Li" <libenc...@apache.org>
>> > >>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org>
>> > >>>>>>>>>>>> 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53
>> > >>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch
>> Mode
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> # Can Calcite support this syntax ` VERSION AS OF`  ?
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> This also depends on whether this is defined in standard or
>> > >> any
>> > >>>>>>>> known
>> > >>>>>>>>>>>> databases that have implemented this. If not, it would be
>> > >> hard
>> > >>> to
>> > >>>>>>>> push
>> > >>>>>>>>>>> it
>> > >>>>>>>>>>>> to Calcite.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> # getTable(ObjectPath object, long timestamp)
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Then we again come to the problem of "casting between
>> > >> timestamp
>> > >>>> and
>> > >>>>>>>>>>>> numeric", which has been disabled in FLINK-21978[1]. If
>> > >> you're
>> > >>>>>> gonna
>> > >>>>>>>>> use
>> > >>>>>>>>>>>> this, then we need to clarify that problem first.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-21978
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月29日周一 15:57写道:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> hi, thanks for your reply.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> @Benchao
>> > >>>>>>>>>>>>>> did you consider the pushdown abilities compatible
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> In the current design, the implementation of TimeTravel is
>> > >>>>>>>> delegated
>> > >>>>>>>>>>> to
>> > >>>>>>>>>>>>> Catalog. We have added a function called
>> getTable(ObjectPath
>> > >>>>>>>>>>> tablePath,
>> > >>>>>>>>>>>>> long timestamp) to obtain the corresponding
>> CatalogBaseTable
>> > >>> at
>> > >>>> a
>> > >>>>>>>>>>>> specific
>> > >>>>>>>>>>>>> time.  Therefore, I think it will not have any impact on
>> the
>> > >>>>>>>>> original
>> > >>>>>>>>>>>>> pushdown abilities.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> I see there is a rejected  design for adding
>> > >>>>>>>> SupportsTimeTravel,
>> > >>>>>>>>>>> but
>> > >>>>>>>>>>>> I
>> > >>>>>>>>>>>>> didn't see the alternative in  the FLIP doc
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Sorry, the document description is not very clear.
>> > >> Regarding
>> > >>>>>>>>> whether
>> > >>>>>>>>>>> to
>> > >>>>>>>>>>>>> support SupportTimeTravel, I have discussed it with yuxia.
>> > >>> Since
>> > >>>>>>>> we
>> > >>>>>>>>>>> have
>> > >>>>>>>>>>>>> already passed the corresponding time in
>> > >> getTable(ObjectPath,
>> > >>>> long
>> > >>>>>>>>>>>>> timestamp) of Catalog, SupportTimeTravel may not be
>> > >> necessary.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> In getTable(ObjectPath object, long timestamp), we can
>> > >> obtain
>> > >>>> the
>> > >>>>>>>>>>> schema
>> > >>>>>>>>>>>> of
>> > >>>>>>>>>>>>> the corresponding time point and put the SNAPSHOT that
>> needs
>> > >>> to
>> > >>>> be
>> > >>>>>>>>>>>> consumed
>> > >>>>>>>>>>>>> into options.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> @Shammon
>> > >>>>>>>>>>>>>> Could we support this in Flink too?
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> I personally think it's possible, but limited by Calcite's
>> > >>>> syntax
>> > >>>>>>>>>>>>> restrictions. I believe we should first support this
>> syntax
>> > >> in
>> > >>>>>>>>>>> Calcite.
>> > >>>>>>>>>>>>> Currently, I think it may not be easy  to support this
>> > >> syntax
>> > >>> in
>> > >>>>>>>>>>> Flink's
>> > >>>>>>>>>>>>> parser. @Benchao, what do you think? Can Calcite support
>> > >> this
>> > >>>>>>>> syntax
>> > >>>>>>>>>>>>> ` VERSION AS OF`  ?
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Best,
>> > >>>>>>>>>>>>> Feng.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> On Fri, May 26, 2023 at 2:55 PM Shammon FY <
>> > >> zjur...@gmail.com
>> > >>>>
>> > >>>>>>>>> wrote:
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Thanks Feng, the feature of time travel sounds great!
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> In addition to SYSTEM_TIME, lake houses such as paimon
>> and
>> > >>>>>>>> iceberg
>> > >>>>>>>>>>>>> support
>> > >>>>>>>>>>>>>> snapshot or version. For example, users can query
>> snapshot
>> > >> 1
>> > >>>> for
>> > >>>>>>>>>>> paimon
>> > >>>>>>>>>>>>> by
>> > >>>>>>>>>>>>>> the following statement
>> > >>>>>>>>>>>>>> SELECT * FROM t VERSION AS OF 1
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Could we support this in Flink too?
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Best,
>> > >>>>>>>>>>>>>> Shammon FY
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> On Fri, May 26, 2023 at 1:20 PM Benchao Li <
>> > >>>>>>>> libenc...@apache.org>
>> > >>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Regarding the implementation, did you consider the
>> > >> pushdown
>> > >>>>>>>>>>> abilities
>> > >>>>>>>>>>>>>>> compatible, e.g., projection pushdown, filter pushdown,
>> > >>>>>>>>> partition
>> > >>>>>>>>>>>>>> pushdown.
>> > >>>>>>>>>>>>>>> Since `Snapshot` is not handled much in existing rules,
>> I
>> > >>>>>>>> have a
>> > >>>>>>>>>>>>> concern
>> > >>>>>>>>>>>>>>> about this. Of course, it depends on your implementation
>> > >>>>>>>> detail,
>> > >>>>>>>>>>> what
>> > >>>>>>>>>>>>> is
>> > >>>>>>>>>>>>>>> important is that we'd better add some cross tests for
>> > >>> these.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Regarding the interface exposed to Connector, I see
>> there
>> > >>> is a
>> > >>>>>>>>>>>> rejected
>> > >>>>>>>>>>>>>>> design for adding SupportsTimeTravel, but I didn't see
>> the
>> > >>>>>>>>>>>> alternative
>> > >>>>>>>>>>>>> in
>> > >>>>>>>>>>>>>>> the FLIP doc. IMO, this is an important thing we need to
>> > >>>>>>>> clarify
>> > >>>>>>>>>>>>> because
>> > >>>>>>>>>>>>>> we
>> > >>>>>>>>>>>>>>> need to know whether the Connector supports this, and
>> what
>> > >>>>>>>>>>>>>> column/metadata
>> > >>>>>>>>>>>>>>> corresponds to 'system_time'.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月25日周四 22:50写道:
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> Thanks for your reply
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> @Timo @BenChao @yuxia
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> Sorry for the mistake,  Currently , calcite only
>> supports
>> > >>>>>>>>> `FOR
>> > >>>>>>>>>>>>>>> SYSTEM_TIME
>> > >>>>>>>>>>>>>>>> AS OF `  syntax.  We can only support `FOR SYSTEM_TIME
>> AS
>> > >>>>>>>> OF`
>> > >>>>>>>>> .
>> > >>>>>>>>>>>> I've
>> > >>>>>>>>>>>>>>>> updated the syntax part of the FLIP.
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> @Timo
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> We will convert it to TIMESTAMP_LTZ?
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> Yes, I think we need to convert TIMESTAMP to
>> > >> TIMESTAMP_LTZ
>> > >>>>>>>> and
>> > >>>>>>>>>>> then
>> > >>>>>>>>>>>>>>> convert
>> > >>>>>>>>>>>>>>>> it into a long value.
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> How do we want to query the most recent version of a
>> > >> table
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> I think we can use `AS OF CURRENT_TIMESTAMP` ,But it
>> does
>> > >>>>>>>>> cause
>> > >>>>>>>>>>>>>>>> inconsistency with the real-time concept.
>> > >>>>>>>>>>>>>>>> However, from my personal understanding, the scope of
>> > >> `AS
>> > >>>>>>>> OF
>> > >>>>>>>>>>>>>>>> CURRENT_TIMESTAMP` is the table itself, not the table
>> > >>>>>>>> record.
>> > >>>>>>>>>>> So,
>> > >>>>>>>>>>>> I
>> > >>>>>>>>>>>>>>> think
>> > >>>>>>>>>>>>>>>> using CURRENT_TIMESTAMP should also be reasonable?.
>> > >>>>>>>>>>>>>>>> Additionally, if no version is specified, the latest
>> > >>> version
>> > >>>>>>>>>>> should
>> > >>>>>>>>>>>>> be
>> > >>>>>>>>>>>>>>> used
>> > >>>>>>>>>>>>>>>> by default.
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> Best,
>> > >>>>>>>>>>>>>>>> Feng
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> On Thu, May 25, 2023 at 7:47 PM yuxia <
>> > >>>>>>>>>>> luoyu...@alumni.sjtu.edu.cn
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Thanks Feng for bringing this up. It'll be great to
>> > >>>>>>>>> introduce
>> > >>>>>>>>>>>> time
>> > >>>>>>>>>>>>>>> travel
>> > >>>>>>>>>>>>>>>>> to Flink to have a better integration with external
>> data
>> > >>>>>>>>>>> soruces.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> I also share same concern about the syntax.
>> > >>>>>>>>>>>>>>>>> I see in the part of `Whether to support other syntax
>> > >>>>>>>>>>>>>> implementations`
>> > >>>>>>>>>>>>>>> in
>> > >>>>>>>>>>>>>>>>> this FLIP, seems the syntax in Calcite should be `FOR
>> > >>>>>>>>>>> SYSTEM_TIME
>> > >>>>>>>>>>>>> AS
>> > >>>>>>>>>>>>>>> OF`,
>> > >>>>>>>>>>>>>>>>> right?
>> > >>>>>>>>>>>>>>>>> But the the syntax part in this FLIP, it seems to be
>> `AS
>> > >>>>>>>> OF
>> > >>>>>>>>>>>>>> TIMESTAMP`
>> > >>>>>>>>>>>>>>>>> instead of  `FOR SYSTEM_TIME AS OF`. Is it just a
>> > >> mistake
>> > >>>>>>>> or
>> > >>>>>>>>>>> by
>> > >>>>>>>>>>>>>> design?
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Best regards,
>> > >>>>>>>>>>>>>>>>> Yuxia
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> ----- 原始邮件 -----
>> > >>>>>>>>>>>>>>>>> 发件人: "Benchao Li" <libenc...@apache.org>
>> > >>>>>>>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org>
>> > >>>>>>>>>>>>>>>>> 发送时间: 星期四, 2023年 5 月 25日 下午 7:27:17
>> > >>>>>>>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In
>> Batch
>> > >>>>>>>>> Mode
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Thanks Feng, it's exciting to have this ability.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Regarding the syntax section, are you proposing `AS
>> OF`
>> > >>>>>>>>>>> instead
>> > >>>>>>>>>>>> of
>> > >>>>>>>>>>>>>> `FOR
>> > >>>>>>>>>>>>>>>>> SYSTEM AS OF` to do this? I know `FOR SYSTEM AS OF` is
>> > >> in
>> > >>>>>>>>> the
>> > >>>>>>>>>>> SQL
>> > >>>>>>>>>>>>>>>> standard
>> > >>>>>>>>>>>>>>>>> and has been supported in some database vendors such
>> as
>> > >>>>>>>> SQL
>> > >>>>>>>>>>>> Server.
>> > >>>>>>>>>>>>>>> About
>> > >>>>>>>>>>>>>>>>> `AS OF`, is it in the standard or any database vendor
>> > >>>>>>>>> supports
>> > >>>>>>>>>>>>> this,
>> > >>>>>>>>>>>>>> if
>> > >>>>>>>>>>>>>>>>> yes, I think it's worth to add this support to
>> Calcite,
>> > >>>>>>>> and
>> > >>>>>>>>> I
>> > >>>>>>>>>>>> would
>> > >>>>>>>>>>>>>>> give
>> > >>>>>>>>>>>>>>>> a
>> > >>>>>>>>>>>>>>>>> hand in Calcite side. Otherwise, I think we'd better
>> to
>> > >>>>>>>> use
>> > >>>>>>>>>>> `FOR
>> > >>>>>>>>>>>>>> SYSTEM
>> > >>>>>>>>>>>>>>>> AS
>> > >>>>>>>>>>>>>>>>> OF`.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Timo Walther <twal...@apache.org> 于2023年5月25日周四
>> > >> 19:02写道:
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> Also: How do we want to query the most recent version
>> > >>>>>>>> of a
>> > >>>>>>>>>>>> table?
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> `AS OF CURRENT_TIMESTAMP` would be ideal, but
>> according
>> > >>>>>>>> to
>> > >>>>>>>>>>> the
>> > >>>>>>>>>>>>> docs
>> > >>>>>>>>>>>>>>>> both
>> > >>>>>>>>>>>>>>>>>> the type is TIMESTAMP_LTZ and what is even more
>> > >>>>>>>> concerning
>> > >>>>>>>>>>> is
>> > >>>>>>>>>>>> the
>> > >>>>>>>>>>>>>> it
>> > >>>>>>>>>>>>>>>>>> actually is evalated row-based:
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> Returns the current SQL timestamp in the local time
>> > >>>>>>>>> zone,
>> > >>>>>>>>>>>> the
>> > >>>>>>>>>>>>>>> return
>> > >>>>>>>>>>>>>>>>>> type is TIMESTAMP_LTZ(3). It is evaluated for each
>> > >>>>>>>> record
>> > >>>>>>>>> in
>> > >>>>>>>>>>>>>>> streaming
>> > >>>>>>>>>>>>>>>>>> mode. But in batch mode, it is evaluated once as the
>> > >>>>>>>> query
>> > >>>>>>>>>>>> starts
>> > >>>>>>>>>>>>>> and
>> > >>>>>>>>>>>>>>>>>> uses the same result for every row.
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> This could make it difficult to explain in a join
>> > >>>>>>>> scenario
>> > >>>>>>>>>>> of
>> > >>>>>>>>>>>>>>> multiple
>> > >>>>>>>>>>>>>>>>>> snapshotted tables.
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> Regards,
>> > >>>>>>>>>>>>>>>>>> Timo
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> On 25.05.23 12:29, Timo Walther wrote:
>> > >>>>>>>>>>>>>>>>>>> Hi Feng,
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> thanks for proposing this FLIP. It makes a lot of
>> > >>>>>>>> sense
>> > >>>>>>>>> to
>> > >>>>>>>>>>>>>> finally
>> > >>>>>>>>>>>>>>>>>>> support querying tables at a specific point in time
>> or
>> > >>>>>>>>>>>>> hopefully
>> > >>>>>>>>>>>>>>> also
>> > >>>>>>>>>>>>>>>>>>> ranges soon. Following time-versioned tables.
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> Here is some feedback from my side:
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> 1. Syntax
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> Can you elaborate a bit on the Calcite restrictions?
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> Does Calcite currently support `AS OF` syntax for
>> this
>> > >>>>>>>>> but
>> > >>>>>>>>>>>> not
>> > >>>>>>>>>>>>>> `FOR
>> > >>>>>>>>>>>>>>>>>>> SYSTEM_TIME AS OF`?
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> It would be great to support `AS OF` also for
>> > >>>>>>>>>>> time-versioned
>> > >>>>>>>>>>>>>> joins
>> > >>>>>>>>>>>>>>>> and
>> > >>>>>>>>>>>>>>>>>>> have a unified and short syntax.
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> Once a fix is merged in Calcite for this, we can
>> make
>> > >>>>>>>>> this
>> > >>>>>>>>>>>>>>> available
>> > >>>>>>>>>>>>>>>> in
>> > >>>>>>>>>>>>>>>>>>> Flink earlier by copying the corresponding classes
>> > >>>>>>>> until
>> > >>>>>>>>>>> the
>> > >>>>>>>>>>>>> next
>> > >>>>>>>>>>>>>>>>>>> Calcite upgrade is performed.
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> 2. Semantics
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> How do we interpret the timestamp? In Flink we have
>> 2
>> > >>>>>>>>>>>> timestamp
>> > >>>>>>>>>>>>>>> types
>> > >>>>>>>>>>>>>>>>>>> (TIMESTAMP and TIMESTAMP_LTZ). If users specify AS
>> OF
>> > >>>>>>>>>>>> TIMESTAMP
>> > >>>>>>>>>>>>>>>>>>> '2023-04-27 00:00:00', in which timezone will the
>> > >>>>>>>>>>> timestamp
>> > >>>>>>>>>>>> be?
>> > >>>>>>>>>>>>>> We
>> > >>>>>>>>>>>>>>>> will
>> > >>>>>>>>>>>>>>>>>>> convert it to TIMESTAMP_LTZ?
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> We definely need to clarify this because the past
>> has
>> > >>>>>>>>>>> shown
>> > >>>>>>>>>>>>> that
>> > >>>>>>>>>>>>>>>>>>> daylight saving times make our lives hard.
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> Thanks,
>> > >>>>>>>>>>>>>>>>>>> Timo
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>> On 25.05.23 10:57, Feng Jin wrote:
>> > >>>>>>>>>>>>>>>>>>>> Hi, everyone.
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>> I’d like to start a discussion about FLIP-308:
>> > >>>>>>>> Support
>> > >>>>>>>>>>> Time
>> > >>>>>>>>>>>>>> Travel
>> > >>>>>>>>>>>>>>>> In
>> > >>>>>>>>>>>>>>>>>>>> Batch
>> > >>>>>>>>>>>>>>>>>>>> Mode [1]
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>> Time travel is a SQL syntax used to query
>> historical
>> > >>>>>>>>>>>> versions
>> > >>>>>>>>>>>>> of
>> > >>>>>>>>>>>>>>>> data.
>> > >>>>>>>>>>>>>>>>>> It
>> > >>>>>>>>>>>>>>>>>>>> allows users to specify a point in time and
>> retrieve
>> > >>>>>>>>> the
>> > >>>>>>>>>>>> data
>> > >>>>>>>>>>>>>> and
>> > >>>>>>>>>>>>>>>>>>>> schema of
>> > >>>>>>>>>>>>>>>>>>>> a table as it appeared at that time. With time
>> > >>>>>>>> travel,
>> > >>>>>>>>>>> users
>> > >>>>>>>>>>>>> can
>> > >>>>>>>>>>>>>>>>> easily
>> > >>>>>>>>>>>>>>>>>>>> analyze and compare historical versions of data.
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>> With the widespread use of data lake systems such
>> as
>> > >>>>>>>>>>> Paimon,
>> > >>>>>>>>>>>>>>>> Iceberg,
>> > >>>>>>>>>>>>>>>>>> and
>> > >>>>>>>>>>>>>>>>>>>> Hudi, time travel can provide more convenience for
>> > >>>>>>>>> users'
>> > >>>>>>>>>>>> data
>> > >>>>>>>>>>>>>>>>> analysis.
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>> Looking forward to your opinions, any suggestions
>> are
>> > >>>>>>>>>>>>> welcomed.
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>> 1.
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>
>> > >>>>
>> > >>>
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>> Best.
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>> Feng
>> > >>>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> --
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Best,
>> > >>>>>>>>>>>>>>>>> Benchao Li
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> --
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Best,
>> > >>>>>>>>>>>>>>> Benchao Li
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> --
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Best,
>> > >>>>>>>>>>>> Benchao Li
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>
>> > >>>
>> > >>>
>> > >>> --
>> > >>>
>> > >>> Best,
>> > >>> Benchao Li
>> > >>>
>> > >>
>> > >
>> > >
>> > > --
>> > >
>> > > Best,
>> > > Benchao Li
>> >
>> >
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

Reply via email to