JFYI in case other users find this in the future.

ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor has a small
issue if modified to be used with the new watermark API and if the events
can have the same timestamp.  I changed my code to do this onPeriodicEmit.
In this situation, we have a lot of events with the same timestamp.  If the
code is still processing events for the same timestamp, periodic emit will
think we ran out of events (even though we've processed a bunch of events)
and then return a bad watermark.  We modified our copy of this code to keep
track of how many events have been emitted.  Since we're just using this
for local development, it's fine.


On Fri, Mar 12, 2021 at 1:55 AM Dan Hill <quietgol...@gmail.com> wrote:

> Thanks David!
>
> On Fri, Mar 12, 2021, 01:54 David Anderson <dander...@apache.org> wrote:
>
>> WatermarkStrategy.withIdleness works by marking idle streams as idle, so
>> that downstream operators will ignore those streams and allow the
>> watermarks to progress based only on the advancement of the watermarks of
>> the still active streams. As you suspected, this mechanism does not provide
>> for the watermark to be advanced in situations where all of the streams are
>> idle.
>>
>> If your goal is ensure that all of the events are processed and all
>> event-time timers are fired (and all event-time windows are closed) before
>> a job ends, Flink already includes a mechanism for this purpose. If you are
>> using a bounded source, then when that source reaches the end of its input,
>> a final Watermark of value Watermark.MAX_WATERMARK will be automatically
>> emitted. The --drain option, as in
>>
>> ./bin/flink stop --drain <job-id>
>>
>> also has this effect [1].
>>
>> With a Kafka source, you can arrange for this to happen by having your
>> kafka deserializer return true from its isEndOfStream() method. Or you
>> could use the new KafkaSource connector included in Flink 1.12 with
>> its setBounded option.
>>
>> On the other hand, if you really did need to advance the watermark
>> despite a (possibly temporary) total lack of events, you could implement a
>> watermark strategy that artificially advances the watermark based on the
>> passage of processing time. You'll find an example in [2], though it hasn't
>> been updated to use the new watermark strategy interface.
>>
>> Regards,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint
>> [2]
>> https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java
>>
>> On Fri, Mar 12, 2021 at 9:47 AM Dan Hill <quietgol...@gmail.com> wrote:
>>
>>> I haven't been able to get WatermarkStrategy.withIdleness to work.  Is
>>> it broken?  None of my timers trigger when I'd expect idleness to take over.
>>>
>>> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <quietgol...@gmail.com> wrote:
>>>
>>>> Hi.
>>>>
>>>> For local and tests development, I want to flush the events in my
>>>> system to make sure I'm processing everything.  My watermark does not
>>>> progress to finish all of the data.
>>>>
>>>> What's the best practice for local development or tests?
>>>>
>>>> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
>>>> guessing there is logic to prevent removing an idle partition if it's the
>>>> only partition.  Is there a version of this I can enable for local
>>>> development that supports 1 partition?
>>>>
>>>> I see this tech talk.  Are there other talks to watch?
>>>> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>>>>
>>>> Do I need to write my own watermark generator?  Or change my test data
>>>> to have a way of generating watermarks?
>>>>
>>>> I've tried a few variants of the following source code.  The watermark
>>>> doesn't progress in the operator right after creating the source.
>>>>
>>>> SingleOutputStreamOperator<T> viewInput = env.addSource(...)
>>>>         .uid("source-view")
>>>>         .assignTimestampsAndWatermarks(
>>>>
>>>> WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>>>
>>>

Reply via email to