I am also in favor of supporting publishing the Flink watermark as snapshot metadata in Flink sink by the committer operator. `IcebergFilesCommitter` can override the `public void processWatermark(Watermark mark)` to intercept the latest watermark value.
There was also a recent discussion (FLIP-167) in the Flink community [1] on this topic. It doesn't affect the Flink Iceberg sink at the moment, as it addresses the two scenarios that don't apply to FlinSink yet. - the old `SinkFunction` API. FlinkSink doesn't use the old `SinkFunction` API. - new FLIP-143 Unified Sink API. FlinkSink hasn't moved to the new sink API yet. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API . On Sun, Aug 22, 2021 at 11:42 AM Ryan Blue <b...@tabular.io> wrote: > I can comment on how Netflix did this. We added a watermark for arrival > time to each commit, produced from the lowest processing time across > partitions of the incoming data. That was added to the snapshot metadata > for each commit, along with relevant context like the processing region > because we had multiple incoming jobs. Then we had a metadata service that > aggregated the min across all processing regions to produce a watermark for > the table that could be queried and triggered from. > > I'm not entirely sure how this was implemented in Flink, but it worked > well and I'd support adding the snapshot watermark to the current Flink > sink to enable this pattern. > > On Tue, Aug 17, 2021 at 7:06 PM Peidian Li <lipeid...@gmail.com> wrote: > >> >> +1, we haven the same needs, hope the solution to this problem. Thanks. > > > > -- > Ryan Blue > Tabular >