Hi Niels, if you have multiple inputs going into a single Kafka partition then you have to calculate the effective watermark by looking at the min watermark from all inputs. You could insert a Flink operator that takes care of it and then writes to a set of partitions in 1:n relationship. Alternatively, you could take a look at Pulsar that wants to support this functionality out of the box [1].
[1] https://github.com/apache/pulsar/issues/12267 Cheers, Till On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <ni...@basjes.nl> wrote: > Hi, > > About a year ago I spoke at the Flink Forward conference ( > https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development > problems regarding streaming applications and handling the lack of events > in a stream. > Something I spoke about towards the end of this talk was the idea to ship > the watermarks of a Flink topology into the intermediate transport between > applications so you wouldn't need to recreate them. > > At that time it was just an idea, today I'm actually trying to build that > and see if this idea is actually possible. > > So the class of applications I work on usually do a keyBy on something like > a SessionId, SensorId or IP address. > In low traffic scenarios this means that in Kafka some partitions are > completely idle which makes Windows/GroupBy type operations impossible (in > my talk I explain it a lot better). > > I have a test setup right now to play around with this and I'm running into > a bit of a conceptual hurdle for which I'm looking for help. > > My goal is to ship the watermarks from within a topology into Kafka and > then let a follow up application extract those watermarks again and simply > continue. > The new SinkWriter interface has a void writeWatermark(Watermark > watermark) method > that seems intended for this kind of thing. > The basic operations like writing a watermark into Kafka, reading it again > and then recreating the watermark again works in my test setup (very messy > code but it works). > > My hurdle has to do with the combination of > - different parallelism numbers between Flink and Kafka (how do I ship 2 > watermarks into 3 partitions) > - the fact that if you do a keyBy (both in Flink and Kafka) there is a > likely mismatch between the Flink 'partition' and the Kafka `partition`. > - processing speed differences between various threads (like session "A" > needs more CPU cycles/time/processing than session "B") will lead to > skewing of the progression between them. > - watermarks in separate threads in a single Flink topology are not > synchronized (they cannot and should not be). > > Has anyone any pointers on possible ways to handle this? > > Right now my only idea is to ship the watermark into all partitions (as > they do not have a key!) and let the consuming application determine the > "real watermark" based on the mix of watermarks coming in from the upstream > threads. > > All suggestions and ideas are appreciated. > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >