Your high level layout make sense. However, I think there are a few
problems doing it with Flink:
(1) How to encode the watermark? It could break downstream consumers
that don't know what to do with them (eg, crash on deserialization)?
There is no guarantee that only a downstream Flink job consumes the data
(nor that the downstream Flink was upgraded to understand those input
watermarks).
Using a control message, the downstream KafkaConsumer would
"filter" control messages / watermarks automatically, and user's would
opt-in explicitly thus providing a safe and backward compatible upgrade
path.
(2) About all the metadata: it will be hard for Flink to track all those
things, but it would be simpler to push it into the storage layer IMHO.
For example, if Flink does dynamically scaling, it adds a new producer
to the group and Kafka takes care of the rest. Thus, Flink only needs to
provide the actual watermark timestamp.
On particular problem is error handling scenario: what happens if a
producer fails and one downstream watermarks is missing? What happens if
Flink thinks a worker is dead and replaces it with a different one, but
the worker is actually a zombie and might still write watermarks into
the topic? Flink cannot fence off the zombie. -- Pushing it into Kafka
allows to handle those cases much easier IMHO.
(3) About ordering: Kafka provides strict ordering guarantees per
partitions. Thus, there is no problem here. Of course, if there are
multiple producers writing into the same partition, you get interleaved
writes (that's why you need to know how many producer you got, to be
able to reason about it downstream).
Hope this helps.
-Matthias
On 12/21/21 5:13 AM, Niels Basjes wrote:
Hi,
Like I said I've only just started thinking about how this can be
implemented (I'm currently still lacking a lot of knowledge).
So at this point I do not yet see why solving this in the transport (like
Kafka) is easier than solving it in the processing engine (like Flink).
In the normal scenarios we have today all watermarks are (re)created in the
processing engine so instinctively I would expect that to be the
"right place".
Also as far as I can see right now in order to make this happen the
watermarks must all be annotated with things like the applicationId (to
handle multiple producers), the timestamp (duh), the taskId and the total
number of tasks in the producing system: So the producers or the broker
must attach this information to the watermarks.
It should also be able to handle dynamic scaling of producing applications
and handling the entering and leaving of producers into a topic is also a
thing to consider.
[Reading this back; is this the reason for it to be easier in the
transport?]
I do realize that even if this is implemented in the processing engine some
constraints may be needed to allow this to work: Like having some kind of
ordering guarantees per partition in a topic.
Do you guys know of any article/blog/paper/mail discussion/... that
describes/discusses this?
Niels
On Mon, Dec 20, 2021 at 4:35 PM Matthias J. Sax <mj...@mailbox.org.invalid>
wrote:
I think this problem should be tackled inside Kafka, not Flink.
Kafka already has internal control messages to write transaction
markers. Those could be extended to carry watermark information. It
would be best to generalize those as "user control messages" and
watermarks could just be one application.
In addition, we might need something link a "producer group" to track
how many producers are writing into a partition: this would allow to
inform downstream consumer how many different watermarks they need to
track.
It's not an easy problem to solve, but without integrating with the
storage layer, but trying to solve it at the processing layer, it's even
harder.
-Matthias
On 12/20/21 01:57, Niels Basjes wrote:
I'm reading the Pulsar PIP and noticed another thing to take into
account:
multiple applications (with each a different parallelism) that all write
into the same topic.
On Mon, 20 Dec 2021, 10:45 Niels Basjes, <ni...@basjes.nl> wrote:
Hi Till,
This morning I also realized what you call an 'effective watermark' is
indeed what is needed.
I'm going to read up on what Pulsar has planned.
What I realized is that the consuming application must be aware of the
parallelism of the producing application, which is independent of the
partitions in the intermediate transport.
Assume I produce in parallel 2 and have 5 kafka partition which I then
read in parallel 3; then in the consuming (parallel 3) application I
must
wait for watermarks from each original input before I can continue:
which
is 2
Also we must assume that those watermarks are created at different
timestamps.
So my current assessment is that the watermark records must include at
least the timestamp, the number of the thread for this watermark and the
total number of threads .
Niels
On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann <trohrm...@apache.org>
wrote:
Hi Niels,
if you have multiple inputs going into a single Kafka partition then
you
have to calculate the effective watermark by looking at the min
watermark
from all inputs. You could insert a Flink operator that takes care of
it
and then writes to a set of partitions in 1:n relationship.
Alternatively,
you could take a look at Pulsar that wants to support this
functionality
out of the box [1].
[1] https://github.com/apache/pulsar/issues/12267
Cheers,
Till
On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <ni...@basjes.nl> wrote:
Hi,
About a year ago I spoke at the Flink Forward conference (
https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling
development
problems regarding streaming applications and handling the lack of
events
in a stream.
Something I spoke about towards the end of this talk was the idea to
ship
the watermarks of a Flink topology into the intermediate transport
between
applications so you wouldn't need to recreate them.
At that time it was just an idea, today I'm actually trying to build
that
and see if this idea is actually possible.
So the class of applications I work on usually do a keyBy on something
like
a SessionId, SensorId or IP address.
In low traffic scenarios this means that in Kafka some partitions are
completely idle which makes Windows/GroupBy type operations impossible
(in
my talk I explain it a lot better).
I have a test setup right now to play around with this and I'm running
into
a bit of a conceptual hurdle for which I'm looking for help.
My goal is to ship the watermarks from within a topology into Kafka
and
then let a follow up application extract those watermarks again and
simply
continue.
The new SinkWriter interface has a void writeWatermark(Watermark
watermark) method
that seems intended for this kind of thing.
The basic operations like writing a watermark into Kafka, reading it
again
and then recreating the watermark again works in my test setup (very
messy
code but it works).
My hurdle has to do with the combination of
- different parallelism numbers between Flink and Kafka (how do I
ship 2
watermarks into 3 partitions)
- the fact that if you do a keyBy (both in Flink and Kafka) there is a
likely mismatch between the Flink 'partition' and the Kafka
`partition`.
- processing speed differences between various threads (like session
"A"
needs more CPU cycles/time/processing than session "B") will lead to
skewing of the progression between them.
- watermarks in separate threads in a single Flink topology are not
synchronized (they cannot and should not be).
Has anyone any pointers on possible ways to handle this?
Right now my only idea is to ship the watermark into all partitions
(as
they do not have a key!) and let the consuming application determine
the
"real watermark" based on the mix of watermarks coming in from the
upstream
threads.
All suggestions and ideas are appreciated.
--
Best regards / Met vriendelijke groeten,
Niels Basjes
--
Best regards / Met vriendelijke groeten,
Niels Basjes