I can comment on how Netflix did this. We added a watermark for arrival time to each commit, produced from the lowest processing time across partitions of the incoming data. That was added to the snapshot metadata for each commit, along with relevant context like the processing region because we had multiple incoming jobs. Then we had a metadata service that aggregated the min across all processing regions to produce a watermark for the table that could be queried and triggered from.
I'm not entirely sure how this was implemented in Flink, but it worked well and I'd support adding the snapshot watermark to the current Flink sink to enable this pattern. On Tue, Aug 17, 2021 at 7:06 PM Peidian Li <lipeid...@gmail.com> wrote: > > +1, we haven the same needs, hope the solution to this problem. Thanks. -- Ryan Blue Tabular