Yes, FlinkRunner supports Beam's event-time semantics without any additional configuration options.

 Jan

On 5/23/23 09:52, Talat Uyarer via dev wrote:
Hi Jan,

Yes My plan is implementing this feature on FlinkRunner. I have one more question. Does Flink Runner support EventTime or Beam  Custom Watermark ? Do I need to set AutoWatermarkInterval for stateful Beam Flink Jobs. Or Beam timers can handle it without setting that param ?

Thanks

On Tue, May 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote:

    Hi Talat,

    your analysis is correct, aligning watermarks for jobs with high
    watermark skew in input partitions really results in faster
    checkpoints and reduces the size of state. There are generally two
    places you can implement this - in user code (the source) or
    inside runner. The user code can use some external synchronization
    (e.g. ZooKeeper) to keep track of progress of all individual
    sources. Another option is to read the watermark from Flink's Rest
    API (some inspiration here [1]).

    Another option would be to make use of [2] and implement this
    directly in FlinkRunner. I'm not familiar with any possible
    limitations of this, this was added to Flink quite recently (we
    would have to support this only when running on Flink 1.15+).

    If you would like to go for the second approach, I'd be happy to
    help with some guidance.

    Best,

     Jan

    [1]
    
https://github.com/O2-Czech-Republic/proxima-platform/blob/master/flink/utils/src/main/java/cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.java
    
<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_O2-2DCzech-2DRepublic_proxima-2Dplatform_blob_master_flink_utils_src_main_java_cz_o2_proxima_flink_utils_FlinkGlobalWatermarkTracker.java&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=9YSIXGwhsRQ08Q4jSLt6pJtZ17cvw5mL-MEt-oCZcP8&e=>
    [2]
    
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
    
<https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D182-253A-2BSupport-2Bwatermark-2Balignment-2Bof-2BFLIP-2D27-2BSources&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=xL-Z7KyqXzMfcalEPIc9nMzaorgJ7s3cHH444pReL1c&e=>

    On 5/23/23 01:05, Talat Uyarer via dev wrote:
    Maybe the User list does not have knowledge about this. That's
    why I also resend on the Dev list. Sorry for cross posting


    Hi All,

    I have a stream aggregation job which reads from Kafka and writes
    some Sinks.

    When I submit my job Flink checkpoint size keeps increasing if I
    use unaligned checkpoint settings and it does not emit any window
    results.
    If I use an aligned checkpoint, size is somewhat under
    control(still big) but Checkpoint alignment takes a long time.

    I would like to implement something similar [1]. I believe
    if UnboundedSourceWrapper pause reading future watermark
    partitions it will reduce the size of the checkpoint and I can
    use unaligned checkpointing. What do you think about this
    approach ? Do you have another solution ?

    One more question: I was reading code to implement the above
    idea. I saw this code [2] Does Flink Runner have a similar
    implementation?

    Thanks

    [1] https://github.com/apache/flink/pull/11968
    
<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_11968&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=Rb3yOAuXoya8Yo5IMdRYyxBpvWzJ3UmqhPUgc1WJdNs&e=>
    [2]
    
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207
    
<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_master_runners_flink_src_main_java_org_apache_beam_runners_flink_translation_wrappers_streaming_state_FlinkStateInternals.java-23L207&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=iyl4EcoO9Vtd-X9IxkJHUtgFaHEmUTyM__0qmkCIeQ4&e=>

Reply via email to