Hi Talat,
your analysis is correct, aligning watermarks for jobs with high
watermark skew in input partitions really results in faster checkpoints
and reduces the size of state. There are generally two places you can
implement this - in user code (the source) or inside runner. The user
code can use some external synchronization (e.g. ZooKeeper) to keep
track of progress of all individual sources. Another option is to read
the watermark from Flink's Rest API (some inspiration here [1]).
Another option would be to make use of [2] and implement this directly
in FlinkRunner. I'm not familiar with any possible limitations of this,
this was added to Flink quite recently (we would have to support this
only when running on Flink 1.15+).
If you would like to go for the second approach, I'd be happy to help
with some guidance.
Best,
Jan
[1]
https://github.com/O2-Czech-Republic/proxima-platform/blob/master/flink/utils/src/main/java/cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.java
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
On 5/23/23 01:05, Talat Uyarer via dev wrote:
Maybe the User list does not have knowledge about this. That's why I
also resend on the Dev list. Sorry for cross posting
Hi All,
I have a stream aggregation job which reads from Kafka and writes some
Sinks.
When I submit my job Flink checkpoint size keeps increasing if I use
unaligned checkpoint settings and it does not emit any window results.
If I use an aligned checkpoint, size is somewhat under control(still
big) but Checkpoint alignment takes a long time.
I would like to implement something similar [1]. I believe
if UnboundedSourceWrapper pause reading future watermark partitions it
will reduce the size of the checkpoint and I can use unaligned
checkpointing. What do you think about this approach ? Do you have
another solution ?
One more question: I was reading code to implement the above idea. I
saw this code [2] Does Flink Runner have a similar implementation?
Thanks
[1] https://github.com/apache/flink/pull/11968
[2]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207