Oh, sorry, the example above is wrong. The column name should come first.
So the full example should be:

create table t (
  user_id string,
  action string,
  ts string,
  new_ts AS TO_TIMESTAMP(ts, 'yyyy-MM-dd''T''HH:mm:ss.SSS''Z'''),
  watermark for new_ts as new_ts - interval '5' second
) with (
 ...
)

Docs:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html

Best,
Jark

On Wed, 11 Nov 2020 at 11:11, Fanbin Bu <fanbin...@coinbase.com> wrote:

> Jark,
>
> Thanks for the quick response.
> I tried to_timestamp(ts, ...), but got the following error:
>
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "(" at line
>
> looks like it complains about the second `(` in
> create table t (... to_timestamp(...)...)
>
>
>
> On Tue, Nov 10, 2020 at 6:47 PM Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Fanbin,
>>
>> The example you gave is correct:
>>
>> create table t (
>>   user_id string,
>>   action string,
>>   ts string,
>>   transform_ts_format(ts) as new_ts,
>>   watermark for new_ts as new_ts - interval '5' second
>> ) with (
>>  ...
>> )
>>
>> You can use "TO_TIMESTAMP" built-in function instead of the UDF, e.g.
>> TO_TIMESTAMP(ts, 'yyyy-MM-dd''T''HH:mm:ss.SSS''Z''') as new_ts
>>
>>
>> Q1: how does watermark know new_ts is a valid timestamp?
>> > the framework will validate the return type of the computed column
>> expression.
>>    Currently, it must be a type of TIMESTAMP(3).
>>
>> Q2: is it possible to reuse ts without introducing a new column?
>> > Currently, it is not supported. This requires to support "TIMESTAMP
>> WITH LOCAL TIME ZONE" as rowtime attribute first.
>>
>> Bes,
>> Jark
>>
>> On Wed, 11 Nov 2020 at 10:33, Fanbin Bu <fanbin...@coinbase.com> wrote:
>>
>>> In the `computed column` section of [1], i saw some related doc:
>>>
>>> ```
>>> On the other hand, computed column can be used to derive event time
>>> column because an event time column
>>> may need to be derived from existing fields, e.g. the original field is
>>> not TIMESTAMP(3) type or is nested in a JSON string.
>>> ```
>>> could you please provide a concrete example for this?
>>> Thanks
>>> Fanbin
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>>>
>>> On Tue, Nov 10, 2020 at 6:18 PM Fanbin Bu <fanbin...@coinbase.com>
>>> wrote:
>>>
>>>> i also tried:
>>>> ts TIMESTAMP WITH LOCAL TIME ZONE
>>>>
>>>> but it failed with
>>>> Rowtime attribute 'ts' must be of type TIMESTAMP but is of type
>>>> 'TIMESTAMP(6) WITH LOCAL TIME ZONE'.
>>>>
>>>> On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu <fanbin...@coinbase.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have source json data like:
>>>>> {"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action":
>>>>> "click"}
>>>>> ...
>>>>>
>>>>> my sql is:
>>>>> create table t (
>>>>> user_id string,
>>>>> action string,
>>>>> ts timestamp,
>>>>> watermark for ts as ts - interval '5' second
>>>>> ) with (
>>>>> 'connector' = 'kafka',
>>>>> 'topic' = 'test',
>>>>> 'json.timestamp-format.standard' = 'ISO-8601'
>>>>> ...
>>>>> )
>>>>>
>>>>> this does not work since ISO-8601 does not expect `Z` at the end of
>>>>> the timestamp.
>>>>> It only works for "2020-11-09T20:26:10.368". However, I'm not able to
>>>>> change the format.
>>>>>
>>>>> I checked a few threads and somebody suggested to use udf for
>>>>> unsupported timestamp format. what would the create table statement look
>>>>> like? I also need watermark working.
>>>>>
>>>>> I'm thinking about this:
>>>>> create table t (
>>>>> user_id string,
>>>>> action string,
>>>>> ts string,
>>>>> transform_ts_format(ts) as new_ts,
>>>>> watermark for new_ts as new_ts - interval '5' second
>>>>> ) with (
>>>>> ...
>>>>>
>>>>> q:
>>>>> 1. how does watermark know new_ts is a valid timestamp?
>>>>> 2. is it possible to reuse ts without introducing a new column?
>>>>>
>>>>> Thanks,
>>>>> Fanbin
>>>>>
>>>>>

Reply via email to