Thanks David! On Fri, Mar 12, 2021, 01:54 David Anderson <dander...@apache.org> wrote:
> WatermarkStrategy.withIdleness works by marking idle streams as idle, so > that downstream operators will ignore those streams and allow the > watermarks to progress based only on the advancement of the watermarks of > the still active streams. As you suspected, this mechanism does not provide > for the watermark to be advanced in situations where all of the streams are > idle. > > If your goal is ensure that all of the events are processed and all > event-time timers are fired (and all event-time windows are closed) before > a job ends, Flink already includes a mechanism for this purpose. If you are > using a bounded source, then when that source reaches the end of its input, > a final Watermark of value Watermark.MAX_WATERMARK will be automatically > emitted. The --drain option, as in > > ./bin/flink stop --drain <job-id> > > also has this effect [1]. > > With a Kafka source, you can arrange for this to happen by having your > kafka deserializer return true from its isEndOfStream() method. Or you > could use the new KafkaSource connector included in Flink 1.12 with > its setBounded option. > > On the other hand, if you really did need to advance the watermark despite > a (possibly temporary) total lack of events, you could implement a > watermark strategy that artificially advances the watermark based on the > passage of processing time. You'll find an example in [2], though it hasn't > been updated to use the new watermark strategy interface. > > Regards, > David > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint > [2] > https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java > > On Fri, Mar 12, 2021 at 9:47 AM Dan Hill <quietgol...@gmail.com> wrote: > >> I haven't been able to get WatermarkStrategy.withIdleness to work. Is it >> broken? None of my timers trigger when I'd expect idleness to take over. >> >> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <quietgol...@gmail.com> wrote: >> >>> Hi. >>> >>> For local and tests development, I want to flush the events in my system >>> to make sure I'm processing everything. My watermark does not progress to >>> finish all of the data. >>> >>> What's the best practice for local development or tests? >>> >>> If I use idle sources for 1 Kafka partition, this appears broken. I'm >>> guessing there is logic to prevent removing an idle partition if it's the >>> only partition. Is there a version of this I can enable for local >>> development that supports 1 partition? >>> >>> I see this tech talk. Are there other talks to watch? >>> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be >>> >>> Do I need to write my own watermark generator? Or change my test data >>> to have a way of generating watermarks? >>> >>> I've tried a few variants of the following source code. The watermark >>> doesn't progress in the operator right after creating the source. >>> >>> SingleOutputStreamOperator<T> viewInput = env.addSource(...) >>> .uid("source-view") >>> .assignTimestampsAndWatermarks( >>> >>> WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); >>> >>