I'm reading the Pulsar PIP and noticed another thing to take into account: multiple applications (with each a different parallelism) that all write into the same topic.
On Mon, 20 Dec 2021, 10:45 Niels Basjes, <ni...@basjes.nl> wrote: > Hi Till, > > This morning I also realized what you call an 'effective watermark' is > indeed what is needed. > I'm going to read up on what Pulsar has planned. > > What I realized is that the consuming application must be aware of the > parallelism of the producing application, which is independent of the > partitions in the intermediate transport. > > Assume I produce in parallel 2 and have 5 kafka partition which I then > read in parallel 3; then in the consuming (parallel 3) application I must > wait for watermarks from each original input before I can continue: which > is 2 > Also we must assume that those watermarks are created at different > timestamps. > So my current assessment is that the watermark records must include at > least the timestamp, the number of the thread for this watermark and the > total number of threads . > > Niels > > > On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Niels, >> >> if you have multiple inputs going into a single Kafka partition then you >> have to calculate the effective watermark by looking at the min watermark >> from all inputs. You could insert a Flink operator that takes care of it >> and then writes to a set of partitions in 1:n relationship. Alternatively, >> you could take a look at Pulsar that wants to support this functionality >> out of the box [1]. >> >> [1] https://github.com/apache/pulsar/issues/12267 >> >> Cheers, >> Till >> >> On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <ni...@basjes.nl> wrote: >> >> > Hi, >> > >> > About a year ago I spoke at the Flink Forward conference ( >> > https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling >> development >> > problems regarding streaming applications and handling the lack of >> events >> > in a stream. >> > Something I spoke about towards the end of this talk was the idea to >> ship >> > the watermarks of a Flink topology into the intermediate transport >> between >> > applications so you wouldn't need to recreate them. >> > >> > At that time it was just an idea, today I'm actually trying to build >> that >> > and see if this idea is actually possible. >> > >> > So the class of applications I work on usually do a keyBy on something >> like >> > a SessionId, SensorId or IP address. >> > In low traffic scenarios this means that in Kafka some partitions are >> > completely idle which makes Windows/GroupBy type operations impossible >> (in >> > my talk I explain it a lot better). >> > >> > I have a test setup right now to play around with this and I'm running >> into >> > a bit of a conceptual hurdle for which I'm looking for help. >> > >> > My goal is to ship the watermarks from within a topology into Kafka and >> > then let a follow up application extract those watermarks again and >> simply >> > continue. >> > The new SinkWriter interface has a void writeWatermark(Watermark >> > watermark) method >> > that seems intended for this kind of thing. >> > The basic operations like writing a watermark into Kafka, reading it >> again >> > and then recreating the watermark again works in my test setup (very >> messy >> > code but it works). >> > >> > My hurdle has to do with the combination of >> > - different parallelism numbers between Flink and Kafka (how do I ship 2 >> > watermarks into 3 partitions) >> > - the fact that if you do a keyBy (both in Flink and Kafka) there is a >> > likely mismatch between the Flink 'partition' and the Kafka `partition`. >> > - processing speed differences between various threads (like session "A" >> > needs more CPU cycles/time/processing than session "B") will lead to >> > skewing of the progression between them. >> > - watermarks in separate threads in a single Flink topology are not >> > synchronized (they cannot and should not be). >> > >> > Has anyone any pointers on possible ways to handle this? >> > >> > Right now my only idea is to ship the watermark into all partitions (as >> > they do not have a key!) and let the consuming application determine the >> > "real watermark" based on the mix of watermarks coming in from the >> upstream >> > threads. >> > >> > All suggestions and ideas are appreciated. >> > >> > -- >> > Best regards / Met vriendelijke groeten, >> > >> > Niels Basjes >> > >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >