Hi Flink community,
*Here is the context: *
Theoretically, I would like to write following query but it won't work
since we can only define the WATERMARK in a table DDL:
INSERT into tableC
select tableA.field1
SUM(1) as `count`,
time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
from tableA join tableB
on tableA.joinCol == tableB.joinCol
group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
(note: getEventTimeInNS is a UDF that calculates event time using
tableA.timestamp and tableB.timestamp)
so I have to define a intermediary table to store the results from joining,
and defining event time and watermark in the table DDL, then performs
tumbling windowing on the intermediary table:
CREATE TABLE IntermediaryTable (
field1,
`eventTimestampInNanoseconds` BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'IntermediaryTable',
'properties.bootstrap.servers' = 'xxxxxx',
'properties.group.id' = 'contextevent-streaming-sql',
'format' = 'avro'
);
INSERT INTO IntermediaryTable
select tableA.field1
tableB.field2,
getEventTimeInNS(tableA.timestamp, tableB.timestamp),
from tableA join tableB
on tableA.joinCol == tableB.joinCol;
Then, I can perform tumbling window aggregation on the IntermediaryTable:
INSERT INTO countTable
(select event.field1
SUM(1) as `count`
from IntermediaryTable event
GROUP BY
TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
event.field1
);
This is not convenient because the IntermediaryTable writes to another
kafka topic that is only used by the tumbling window aggregation. When I
try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
END;", it will fail complaining the topic does not exist. I either have to
first create this kafka topic beforehand, or run a separate job to INSERT
INTO IntermediaryTable.
In Java DataStream API, you can easily do so within flink topology without
having to create a separate kafka topic:
final DataStream<xxx> joinedStream =
StreamA.join(StreamB)
.where(xxxx)
.equalTo(xxxx)
.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
.apply(aggregation);
*Question:*
Does the Flink community have any suggestions on how to do this in FlinkSQL
in a friendly way? Would it be a good idea for FlinkSQL to support defining
eventtime and watermark on the fly without a table ddl? Would love to hear
any suggestions. Thanks a lot in advance.
--
Best Wishes & Regards
Shawn Xiangcao Liu