Hi, chen.
可以试一下在sink function的invoke函数中使用:
@Override
public void invoke(RowData row, Context context) throws Exception {
context.currentProcessingTime();
context.currentWatermark();
...
}
--
Best!
Xuyang
在 2024-02-20 19:38:44,"Feng Jin" <[email protected]> 写道:
>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>
>Best,
>Feng
>
>On Tue, Feb 20, 2024 at 4:35 PM casel.chen <[email protected]> wrote:
>
>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>
>>
>> public class XxxSinkFunction extends RichSinkFunction<RowData> implements
>> CheckpointedFunction, CheckpointListener {
>>
>>
>> @Override
>> public synchronized void invoke(RowData rowData, Context context)
>> throws IOException {
>> // 这里想从rowData中获取event time和watermark值,如何实现呢?
>> }
>> }
>>
>>
>> 例如source table如下定义
>>
>>
>> CREATE TEMPORARY TABLE source_table(
>> username varchar,
>> click_url varchar,
>> eventtime varchar,
>>
>> ts AS TO_TIMESTAMP(eventtime),
>> WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>> ) with (
>> 'connector'='kafka',
>> ...
>>
>> );
>>
>>
>> CREATE TEMPORARY TABLE sink_table(
>> username varchar,
>> click_url varchar,
>> eventtime varchar
>> ) with (
>> 'connector'='xxx',
>> ...
>> );
>> insert into sink_table select username,click_url,eventtime from
>> source_table;