Hi Talat,

your analysis is correct, aligning watermarks for jobs with high watermark skew in input partitions really results in faster checkpoints and reduces the size of state. There are generally two places you can implement this - in user code (the source) or inside runner. The user code can use some external synchronization (e.g. ZooKeeper) to keep track of progress of all individual sources. Another option is to read the watermark from Flink's Rest API (some inspiration here [1]).

Another option would be to make use of [2] and implement this directly in FlinkRunner. I'm not familiar with any possible limitations of this, this was added to Flink quite recently (we would have to support this only when running on Flink 1.15+).

If you would like to go for the second approach, I'd be happy to help with some guidance.

Best,

 Jan

[1] https://github.com/O2-Czech-Republic/proxima-platform/blob/master/flink/utils/src/main/java/cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.java [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

On 5/23/23 01:05, Talat Uyarer via dev wrote:
Maybe the User list does not have knowledge about this. That's why I also resend on the Dev list. Sorry for cross posting


Hi All,

I have a stream aggregation job which reads from Kafka and writes some Sinks.

When I submit my job Flink checkpoint size keeps increasing if I use unaligned checkpoint settings and it does not emit any window results. If I use an aligned checkpoint, size is somewhat under control(still big) but Checkpoint alignment takes a long time.

I would like to implement something similar [1]. I believe if UnboundedSourceWrapper pause reading future watermark partitions it will reduce the size of the checkpoint and I can use unaligned checkpointing. What do you think about this approach ? Do you have another solution ?

One more question: I was reading code to implement the above idea. I saw this code [2] Does Flink Runner have a similar implementation?

Thanks

[1] https://github.com/apache/flink/pull/11968
[2] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207

Reply via email to