Hello,

We are creating two data streams in our Flink application. Both of them are
then formed into two Tables. The first data stream has a watermark delay of
24 hours while the second stream has a watermark delay of 60 minutes. The
watermark used is of BoundedOutOfOrderness strategy and uses a particular
event_time field present within the the records themselves to assign
watermarks.

For example,

DataStream<Row> fileStream = env.fromSource(
                    fileSource,
                    getWatermarkStrategy(86400000), // custom function,
watermark of 24 hours in ms
                    "fileSource");
Table firstTable = tableEnv.fromDataStream(fileStream, apiExpressions);
tableEnv.createTemporaryView("fileTable", firstTable);

DataStream<Row> kafkaStream = env.fromSource(
                    kafkaSource,
                    getWatermarkStrategy(3600000), // custom function,
watermark
of 60 minutes in ms
                    "kafkaSource");
Table secondTable = tableEnv.fromDataStream(kafkaStream, apiExpressions);
tableEnv.createTemporaryView("kafkaTable", secondTable);

Now we want to write a continuous SQL query to join
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/dynamic_tables/#continuous-queries>
 firstTable and secondTable with a TumbleWindow of 60 minutes

"SELECT TUMBLE_START(fileTable.rowtime, INTERVAL '60' MINUTE) AS
event_time,
MAX(TIMESTAMPDIFF(MINUTE, fileTable.event_time, kafkaTable.event_time))," +
                    "FROM fileTable, kafkaTable " +
                    "where fileTable.id = kafkaTable.id " +
                    "group by TUMBLE(fileTable.rowtime, INTERVAL '60'
MINUTE)"

What we want to know is, will a join or aggregation queries work correctly
between the two tables.  Is it the case that the contents of  kafkaTable
will be purged immediately after 60 minutes and hence a join/aggregation
might not give correct results ?
Will there be a data loss if tables with different watermark delays are
joined ?

-- 
*Regards,*
*Meghajit*

Reply via email to