I am also in favor of supporting publishing the Flink watermark as snapshot
metadata in Flink sink by the committer operator.  `IcebergFilesCommitter`
can override the `public void processWatermark(Watermark mark)` to
intercept the latest watermark value.

There was also a recent discussion (FLIP-167) in the Flink community [1] on
this topic. It doesn't affect the Flink Iceberg sink at the moment, as it
addresses the two scenarios that don't apply to FlinSink yet.
-  the old `SinkFunction` API. FlinkSink doesn't use the old `SinkFunction`
API.
- new FLIP-143 Unified Sink API. FlinkSink hasn't moved to the new sink API
yet.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
.

On Sun, Aug 22, 2021 at 11:42 AM Ryan Blue <b...@tabular.io> wrote:

> I can comment on how Netflix did this. We added a watermark for arrival
> time to each commit, produced from the lowest processing time across
> partitions of the incoming data. That was added to the snapshot metadata
> for each commit, along with relevant context like the processing region
> because we had multiple incoming jobs. Then we had a metadata service that
> aggregated the min across all processing regions to produce a watermark for
> the table that could be queried and triggered from.
>
> I'm not entirely sure how this was implemented in Flink, but it worked
> well and I'd support adding the snapshot watermark to the current Flink
> sink to enable this pattern.
>
> On Tue, Aug 17, 2021 at 7:06 PM Peidian Li <lipeid...@gmail.com> wrote:
>
>>
>> +1, we haven the same needs, hope the solution to this problem. Thanks.
>
>
>
> --
> Ryan Blue
> Tabular
>

Reply via email to