Hi, The watermark of the join operator is the minimum of the watermark of the input streams.
``` JoinOperator.watermark = min(left.watermark, right.watermark); ``` I think it's enough for most cases. Could you share more details about the logic in the UDF getEventTimeInNS? I think the better solution comparing to the intermediate table is to define the watermark on the VIEW. But Flink doesn't support it now. Best, Shengkai liuxiangcao <[email protected]> 于2022年4月16日周六 03:07写道: > Hi Flink community, > > *Here is the context: * > Theoretically, I would like to write following query but it won't work > since we can only define the WATERMARK in a table DDL: > > INSERT into tableC > select tableA.field1 > SUM(1) as `count`, > time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp), > WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS > from tableA join tableB > on tableA.joinCol == tableB.joinCol > group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1 > (note: getEventTimeInNS is a UDF that calculates event time using > tableA.timestamp and tableB.timestamp) > > > so I have to define a intermediary table to store the results from > joining, and defining event time and watermark in the table DDL, then > performs tumbling windowing on the intermediary table: > > CREATE TABLE IntermediaryTable ( > field1, > `eventTimestampInNanoseconds` BIGINT, > time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3), > WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'IntermediaryTable', > 'properties.bootstrap.servers' = 'xxxxxx', > 'properties.group.id' = 'contextevent-streaming-sql', > 'format' = 'avro' > ); > > INSERT INTO IntermediaryTable > select tableA.field1 > tableB.field2, > getEventTimeInNS(tableA.timestamp, tableB.timestamp), > from tableA join tableB > on tableA.joinCol == tableB.joinCol; > > Then, I can perform tumbling window aggregation on the IntermediaryTable: > > INSERT INTO countTable > (select event.field1 > SUM(1) as `count` > from IntermediaryTable event > GROUP BY > TUMBLE(event.time_ltz, INTERVAL '30' SECOND), > event.field1 > ); > > > This is not convenient because the IntermediaryTable writes to another > kafka topic that is only used by the tumbling window aggregation. When I > try to group the two INSERT INTO statements within "BEGIN STATEMENT SET; > END;", it will fail complaining the topic does not exist. I either have to > first create this kafka topic beforehand, or run a separate job to INSERT > INTO IntermediaryTable. > > In Java DataStream API, you can easily do so within flink topology without > having to create a separate kafka topic: > > final DataStream<xxx> joinedStream = > StreamA.join(StreamB) > .where(xxxx) > .equalTo(xxxx) > .window(TumblingProcessingTimeWindows.of(Time.seconds(30))) > .apply(aggregation); > > > *Question:* > Does the Flink community have any suggestions on how to do this in > FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support > defining eventtime and watermark on the fly without a table ddl? Would love > to hear any suggestions. Thanks a lot in advance. > > -- > Best Wishes & Regards > Shawn Xiangcao Liu >
