I'm doing a POC on moving an existing Datastream API job to use Table SQL to make it more accessible for some of my teammates.
However I'm at a loss on how to handle watermarking in a similar way to how it was handled in the Datastream API. In the existing job a CDC stream is read, and 3 SQL tables are written out. CDC ----> table1 \ |---> table2 ---- pk/watermark stream -> recovery table |---> table3 / The 3 tables are all written in an AsyncFunction which writes out the table primary keys and the CDC watermark which then gets written to a fourth table for recovery and tracking purposes. (If the job is stop/started/crashed we are not relying on Flink state currently but on the recovery table to restart where processing left off). Is there a way to do something similar in the SQL API where I can store the LEAST watermark of all 3 table writes in a 4th table? I'm drawing at a loss on how to do it short of writing a custom sink. (Currently using the JDBC connector sink). var statements = tEnv.createStatementSet(); ... insert table1 select ... from cdc; ... insert table2 select ... from cdc; ... insert table3 select ... from cdc; statements.attachAsDataStream(); Or is there a way to do something similar within the Table API? Use a completely different approach? The CDC watermark after inserting into the 3 tables is what I'm after. (CDC source is custom table source). Any ideas? Thanks!