Hi Jan,

Yes My plan is implementing this feature on FlinkRunner. I have one more
question. Does Flink Runner support EventTime or Beam  Custom Watermark ?
Do I need to set AutoWatermarkInterval for stateful Beam Flink Jobs. Or
Beam timers can handle it without setting that param ?

Thanks

On Tue, May 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Talat,
>
> your analysis is correct, aligning watermarks for jobs with high watermark
> skew in input partitions really results in faster checkpoints and reduces
> the size of state. There are generally two places you can implement this -
> in user code (the source) or inside runner. The user code can use some
> external synchronization (e.g. ZooKeeper) to keep track of progress of all
> individual sources. Another option is to read the watermark from Flink's
> Rest API (some inspiration here [1]).
>
> Another option would be to make use of [2] and implement this directly in
> FlinkRunner. I'm not familiar with any possible limitations of this, this
> was added to Flink quite recently (we would have to support this only when
> running on Flink 1.15+).
>
> If you would like to go for the second approach, I'd be happy to help with
> some guidance.
>
> Best,
>
>  Jan
>
> [1]
> https://github.com/O2-Czech-Republic/proxima-platform/blob/master/flink/utils/src/main/java/cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.java
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_O2-2DCzech-2DRepublic_proxima-2Dplatform_blob_master_flink_utils_src_main_java_cz_o2_proxima_flink_utils_FlinkGlobalWatermarkTracker.java&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=9YSIXGwhsRQ08Q4jSLt6pJtZ17cvw5mL-MEt-oCZcP8&e=>
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D182-253A-2BSupport-2Bwatermark-2Balignment-2Bof-2BFLIP-2D27-2BSources&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=xL-Z7KyqXzMfcalEPIc9nMzaorgJ7s3cHH444pReL1c&e=>
> On 5/23/23 01:05, Talat Uyarer via dev wrote:
>
> Maybe the User list does not have knowledge about this. That's why I also
> resend on the Dev list. Sorry for cross posting
>
>
> Hi All,
>
> I have a stream aggregation job which reads from Kafka and writes some
> Sinks.
>
> When I submit my job Flink checkpoint size keeps increasing if I use
> unaligned checkpoint settings and it does not emit any window results.
> If I use an aligned checkpoint, size is somewhat under control(still big)
> but Checkpoint alignment takes a long time.
>
> I would like to implement something similar [1]. I believe
> if UnboundedSourceWrapper pause reading future watermark partitions it will
> reduce the size of the checkpoint and I can use unaligned checkpointing.
> What do you think about this approach ? Do you have another solution ?
>
> One more question: I was reading code to implement the above idea. I saw
> this code [2] Does Flink Runner have a similar implementation?
>
> Thanks
>
> [1] https://github.com/apache/flink/pull/11968
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_11968&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=Rb3yOAuXoya8Yo5IMdRYyxBpvWzJ3UmqhPUgc1WJdNs&e=>
> [2]
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_master_runners_flink_src_main_java_org_apache_beam_runners_flink_translation_wrappers_streaming_state_FlinkStateInternals.java-23L207&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=iyl4EcoO9Vtd-X9IxkJHUtgFaHEmUTyM__0qmkCIeQ4&e=>
>
>

Reply via email to