Oh, sorry, the example above is wrong. The column name should come first. So the full example should be:
create table t ( user_id string, action string, ts string, new_ts AS TO_TIMESTAMP(ts, 'yyyy-MM-dd''T''HH:mm:ss.SSS''Z'''), watermark for new_ts as new_ts - interval '5' second ) with ( ... ) Docs: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html Best, Jark On Wed, 11 Nov 2020 at 11:11, Fanbin Bu <fanbin...@coinbase.com> wrote: > Jark, > > Thanks for the quick response. > I tried to_timestamp(ts, ...), but got the following error: > > Exception in thread "main" org.apache.flink.table.api.SqlParserException: > SQL parse failed. Encountered "(" at line > > looks like it complains about the second `(` in > create table t (... to_timestamp(...)...) > > > > On Tue, Nov 10, 2020 at 6:47 PM Jark Wu <imj...@gmail.com> wrote: > >> Hi Fanbin, >> >> The example you gave is correct: >> >> create table t ( >> user_id string, >> action string, >> ts string, >> transform_ts_format(ts) as new_ts, >> watermark for new_ts as new_ts - interval '5' second >> ) with ( >> ... >> ) >> >> You can use "TO_TIMESTAMP" built-in function instead of the UDF, e.g. >> TO_TIMESTAMP(ts, 'yyyy-MM-dd''T''HH:mm:ss.SSS''Z''') as new_ts >> >> >> Q1: how does watermark know new_ts is a valid timestamp? >> > the framework will validate the return type of the computed column >> expression. >> Currently, it must be a type of TIMESTAMP(3). >> >> Q2: is it possible to reuse ts without introducing a new column? >> > Currently, it is not supported. This requires to support "TIMESTAMP >> WITH LOCAL TIME ZONE" as rowtime attribute first. >> >> Bes, >> Jark >> >> On Wed, 11 Nov 2020 at 10:33, Fanbin Bu <fanbin...@coinbase.com> wrote: >> >>> In the `computed column` section of [1], i saw some related doc: >>> >>> ``` >>> On the other hand, computed column can be used to derive event time >>> column because an event time column >>> may need to be derived from existing fields, e.g. the original field is >>> not TIMESTAMP(3) type or is nested in a JSON string. >>> ``` >>> could you please provide a concrete example for this? >>> Thanks >>> Fanbin >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table >>> >>> On Tue, Nov 10, 2020 at 6:18 PM Fanbin Bu <fanbin...@coinbase.com> >>> wrote: >>> >>>> i also tried: >>>> ts TIMESTAMP WITH LOCAL TIME ZONE >>>> >>>> but it failed with >>>> Rowtime attribute 'ts' must be of type TIMESTAMP but is of type >>>> 'TIMESTAMP(6) WITH LOCAL TIME ZONE'. >>>> >>>> On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu <fanbin...@coinbase.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have source json data like: >>>>> {"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action": >>>>> "click"} >>>>> ... >>>>> >>>>> my sql is: >>>>> create table t ( >>>>> user_id string, >>>>> action string, >>>>> ts timestamp, >>>>> watermark for ts as ts - interval '5' second >>>>> ) with ( >>>>> 'connector' = 'kafka', >>>>> 'topic' = 'test', >>>>> 'json.timestamp-format.standard' = 'ISO-8601' >>>>> ... >>>>> ) >>>>> >>>>> this does not work since ISO-8601 does not expect `Z` at the end of >>>>> the timestamp. >>>>> It only works for "2020-11-09T20:26:10.368". However, I'm not able to >>>>> change the format. >>>>> >>>>> I checked a few threads and somebody suggested to use udf for >>>>> unsupported timestamp format. what would the create table statement look >>>>> like? I also need watermark working. >>>>> >>>>> I'm thinking about this: >>>>> create table t ( >>>>> user_id string, >>>>> action string, >>>>> ts string, >>>>> transform_ts_format(ts) as new_ts, >>>>> watermark for new_ts as new_ts - interval '5' second >>>>> ) with ( >>>>> ... >>>>> >>>>> q: >>>>> 1. how does watermark know new_ts is a valid timestamp? >>>>> 2. is it possible to reuse ts without introducing a new column? >>>>> >>>>> Thanks, >>>>> Fanbin >>>>> >>>>>