Thanks David!

On Fri, Mar 12, 2021, 01:54 David Anderson <dander...@apache.org> wrote:

> WatermarkStrategy.withIdleness works by marking idle streams as idle, so
> that downstream operators will ignore those streams and allow the
> watermarks to progress based only on the advancement of the watermarks of
> the still active streams. As you suspected, this mechanism does not provide
> for the watermark to be advanced in situations where all of the streams are
> idle.
>
> If your goal is ensure that all of the events are processed and all
> event-time timers are fired (and all event-time windows are closed) before
> a job ends, Flink already includes a mechanism for this purpose. If you are
> using a bounded source, then when that source reaches the end of its input,
> a final Watermark of value Watermark.MAX_WATERMARK will be automatically
> emitted. The --drain option, as in
>
> ./bin/flink stop --drain <job-id>
>
> also has this effect [1].
>
> With a Kafka source, you can arrange for this to happen by having your
> kafka deserializer return true from its isEndOfStream() method. Or you
> could use the new KafkaSource connector included in Flink 1.12 with
> its setBounded option.
>
> On the other hand, if you really did need to advance the watermark despite
> a (possibly temporary) total lack of events, you could implement a
> watermark strategy that artificially advances the watermark based on the
> passage of processing time. You'll find an example in [2], though it hasn't
> been updated to use the new watermark strategy interface.
>
> Regards,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint
> [2]
> https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java
>
> On Fri, Mar 12, 2021 at 9:47 AM Dan Hill <quietgol...@gmail.com> wrote:
>
>> I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it
>> broken?  None of my timers trigger when I'd expect idleness to take over.
>>
>> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <quietgol...@gmail.com> wrote:
>>
>>> Hi.
>>>
>>> For local and tests development, I want to flush the events in my system
>>> to make sure I'm processing everything.  My watermark does not progress to
>>> finish all of the data.
>>>
>>> What's the best practice for local development or tests?
>>>
>>> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
>>> guessing there is logic to prevent removing an idle partition if it's the
>>> only partition.  Is there a version of this I can enable for local
>>> development that supports 1 partition?
>>>
>>> I see this tech talk.  Are there other talks to watch?
>>> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>>>
>>> Do I need to write my own watermark generator?  Or change my test data
>>> to have a way of generating watermarks?
>>>
>>> I've tried a few variants of the following source code.  The watermark
>>> doesn't progress in the operator right after creating the source.
>>>
>>> SingleOutputStreamOperator<T> viewInput = env.addSource(...)
>>>         .uid("source-view")
>>>         .assignTimestampsAndWatermarks(
>>>
>>> WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>>
>>

Reply via email to