I just realized I did not mention a key concept in my idea: All watermarks are sent into all Kafka partitions so every consumer gets all watermarks from all upstream producing tasks.
On Wed, Dec 22, 2021 at 11:33 AM Niels Basjes <ni...@basjes.nl> wrote: > > Perhaps for this an explicit config is needed per source id (pattern?): > > - Important producer: expire after 24 hours. > > - Yet another IOT sensor: expire after 1 minute. > > - and a default. > > Do note that this actually does something I'm not a fan of: Introduce a > "Processing time" conclusion (the timeout of the watermarks) in an "Event > time" stream. > I consider mixing these two in general something to be avoided. > > For the situation a producer is "signing of forever" there is the option > of it sending out a final closing watermark indicating "MAX_LONG". > But for the "the sensor died" scenario I do not yet see a better way to do > this. > > Niels > > > > On Wed, Dec 22, 2021 at 11:22 AM Niels Basjes <ni...@basjes.nl> wrote: > >> >> On Tue, Dec 21, 2021 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote: >> >>> Your high level layout make sense. However, I think there are a few >>> problems doing it with Flink: >>> >>> (1) How to encode the watermark? It could break downstream consumers >>> that don't know what to do with them (eg, crash on deserialization)? >>> There is no guarantee that only a downstream Flink job consumes the data >>> (nor that the downstream Flink was upgraded to understand those input >>> watermarks). >>> >> >> What I'm doing in my experiments right now is to have created a >> WatermarkSerde interface that is able to serialize a Watermark into my own >> format. >> This WatermarkSerde would then be implemented by the application builder >> and injected in both the Sink and Source implementation of whatever >> transport I like to use (Kafka, ...) . >> The WatermarkSerde would also need something to determine if the provided >> element is a watermark or a normal element. >> >> This allows me to use any format I like in my application and to make the >> watermark be a valid yet slightly strange record in my stream. >> So at the receiving end the record can simply deserialize and then >> determine if it is a watermark or not. >> >> As a consequence all downstream implementations can simply be instructed >> on how they can see if the event is really an event or a Watermark. >> >> A default/reference implementation that can be used where a textual >> format is expected (like json or xml) is also an option. >> >> Do note that at the application level I consider the presence of these >> special elements to be a part of the definition of the streaming interface >> of this (set of) applications. >> Regardless of the "where and how" of the implementation: the streaming >> interface now "contains watermarks" in a defined way. >> >> Using a control message, the downstream KafkaConsumer would >>> "filter" control messages / watermarks automatically, and user's would >>> opt-in explicitly thus providing a safe and backward compatible upgrade >>> path. >>> >> >> Yes, that would work. >> >> (2) About all the metadata: it will be hard for Flink to track all those >>> things, but it would be simpler to push it into the storage layer IMHO. >>> For example, if Flink does dynamically scaling, it adds a new producer >>> to the group and Kafka takes care of the rest. Thus, Flink only needs to >>> provide the actual watermark timestamp. >>> >> >> Yes, but still there is a need for an administration that tracks all >> producers and their watermarks. >> As far as I can tell right now the complexity will be roughly the same in >> all directions. >> >> >>> On particular problem is error handling scenario: what happens if a >>> producer fails and one downstream watermarks is missing? >> >> >> In my mind the watermarks between all producer tasks are not synchronized >> as they are initially created in an independent way on different machines. >> So the sketch I have in mind right now to handle this would >> - keep a list of all "last seen watermark timestamps" per application per >> original taskId per input partition (this last one plays a role if you read >> from multiple partitions with a single instance). >> - when a new watermark arrives, >> - persist it if later than what we have so far. >> - Only if it is newer than ALL others we have output a watermark into >> the application stream. >> - only outputting a watermark if we have a valid 'last watermark' for >> all producing tasks. >> and >> - taking into account how many producer tasks there are: Only output if >> we have at least one for each one. >> - essentially doing a reset if the producer parallelism changes and >> change the expectations on how many parallels we need. >> - This is a tricky one as we will get a mix of old and new watermarks >> during this transition. >> >> This would be done in all receiving Source instances separately as they >> are generally on different machines. >> >> So in this failure scenario all Sources would wait with their watermarks >> until they have received the watermarks from the recovered instance of this >> producer. >> >> What happens if >>> Flink thinks a worker is dead and replaces it with a different one, but >>> the worker is actually a zombie and might still write watermarks into >>> the topic? >> >> >> Yes, that would be a serious problem for all implementation directions. >> I don't know yet. >> >> Flink cannot fence off the zombie. -- Pushing it into Kafka >>> allows to handle those cases much easier IMHO. >>> >> >> Good point. >> >> >>> (3) About ordering: Kafka provides strict ordering guarantees per >>> partitions. Thus, there is no problem here. Of course, if there are >>> multiple producers writing into the same partition, you get interleaved >>> writes (that's why you need to know how many producer you got, to be >>> able to reason about it downstream). >>> >> >> That is why my sketch keeps watermarks for each task for all producing >> applications and essentially determines a new watermark from the total set. >> >> The big problem there (also for the Kafka route) would be handling >> producers leaving. >> In my sketch "new producers" is easy to handle; just add the new >> expectation to wait for. >> >> Leaving producers is not easy to handle as I see no way to determine >> the distinction between >> - "leaving, continue without it" >> and >> - "down, wait for it to be back a little while later". >> >> Perhaps for this an explicit config is needed per source id (pattern?): >> - Important producer: expire after 24 hours. >> - Yet another IOT sensor: expire after 1 minute. >> - and a default. >> >> Hope this helps. >>> >> >> Yes it does. >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > -- Best regards / Met vriendelijke groeten, Niels Basjes