Hello, We are creating two data streams in our Flink application. Both of them are then formed into two Tables. The first data stream has a watermark delay of 24 hours while the second stream has a watermark delay of 60 minutes. The watermark used is of BoundedOutOfOrderness strategy and uses a particular event_time field present within the the records themselves to assign watermarks.
For example, DataStream<Row> fileStream = env.fromSource( fileSource, getWatermarkStrategy(86400000), // custom function, watermark of 24 hours in ms "fileSource"); Table firstTable = tableEnv.fromDataStream(fileStream, apiExpressions); tableEnv.createTemporaryView("fileTable", firstTable); DataStream<Row> kafkaStream = env.fromSource( kafkaSource, getWatermarkStrategy(3600000), // custom function, watermark of 60 minutes in ms "kafkaSource"); Table secondTable = tableEnv.fromDataStream(kafkaStream, apiExpressions); tableEnv.createTemporaryView("kafkaTable", secondTable); Now we want to write a continuous SQL query to join <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/dynamic_tables/#continuous-queries> firstTable and secondTable with a TumbleWindow of 60 minutes "SELECT TUMBLE_START(fileTable.rowtime, INTERVAL '60' MINUTE) AS event_time, MAX(TIMESTAMPDIFF(MINUTE, fileTable.event_time, kafkaTable.event_time))," + "FROM fileTable, kafkaTable " + "where fileTable.id = kafkaTable.id " + "group by TUMBLE(fileTable.rowtime, INTERVAL '60' MINUTE)" What we want to know is, will a join or aggregation queries work correctly between the two tables. Is it the case that the contents of kafkaTable will be purged immediately after 60 minutes and hence a join/aggregation might not give correct results ? Will there be a data loss if tables with different watermark delays are joined ? -- *Regards,* *Meghajit*