Hey Prakhar, Sorry for taking so long to reply. One possible strategy is to advance the watermark after not receiving new messages for T milliseconds. In order to do this you must be fairly confident that you will not get messages delayed for longer than T milliseconds. To this end I've written my own version of the AscendingTimestampExtractor that progresses the watermark after not receiving a message for T milliseconds. Note that T shouldn't be so small that the watermark advances artificially before receiving the first message.
Personally, I'm surprised a class like this doesn't already exist in Flink, maybe someone knows of a good reason for that. Disclaimer: I'm not running this code in a production environment, only in a personal project as I'm trialling Flink myself. I do believe it works though. Code attached as image. Flink itself promotes marking a source as idle when not receiving messages, but I haven't seen any coded examples of that so I went for the approach I've described. eduardo On Sun, 25 Aug 2019, 18:44 Prakhar Mathur, <prakha...@go-jek.com> wrote: > Hi, > > Thanks for the response. Can you point me to some examples of such > strategy? > > On Sat, Aug 24, 2019, 06:01 Eduardo Winpenny Tejedor < > eduardo.winpe...@gmail.com> wrote: > >> Hi Prakhar, >> >> Everything is probably working as expected, if a partition does not >> receive any messages then the watermark of the operator does not advance >> (as it is the minimum across all partitions). >> >> You'll need to define a strategy for the watermark to advance even when >> no messages are received for a particular partition. >> >> Regards, >> Eduardo >> >> >> On Fri, 23 Aug 2019, 10:35 Prakhar Mathur, <prakha...@go-jek.com> wrote: >> >>> Hi, >>> >>> We are using flink v1.6. We are facing data loss issues while consuming >>> data from older offsets in Kafka with windowing. We are exploring per >>> partition watermarking strategy. But we noticed that when we are trying to >>> consume from multiple topics and if any of the partition is not receiving >>> data it just blocks everything. Do we have a known solution for this? >>> >>