> Perhaps for this an explicit config is needed per source id (pattern?): > - Important producer: expire after 24 hours. > - Yet another IOT sensor: expire after 1 minute. > - and a default.
Do note that this actually does something I'm not a fan of: Introduce a "Processing time" conclusion (the timeout of the watermarks) in an "Event time" stream. I consider mixing these two in general something to be avoided. For the situation a producer is "signing of forever" there is the option of it sending out a final closing watermark indicating "MAX_LONG". But for the "the sensor died" scenario I do not yet see a better way to do this. Niels On Wed, Dec 22, 2021 at 11:22 AM Niels Basjes <ni...@basjes.nl> wrote: > > On Tue, Dec 21, 2021 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote: > >> Your high level layout make sense. However, I think there are a few >> problems doing it with Flink: >> >> (1) How to encode the watermark? It could break downstream consumers >> that don't know what to do with them (eg, crash on deserialization)? >> There is no guarantee that only a downstream Flink job consumes the data >> (nor that the downstream Flink was upgraded to understand those input >> watermarks). >> > > What I'm doing in my experiments right now is to have created a > WatermarkSerde interface that is able to serialize a Watermark into my own > format. > This WatermarkSerde would then be implemented by the application builder > and injected in both the Sink and Source implementation of whatever > transport I like to use (Kafka, ...) . > The WatermarkSerde would also need something to determine if the provided > element is a watermark or a normal element. > > This allows me to use any format I like in my application and to make the > watermark be a valid yet slightly strange record in my stream. > So at the receiving end the record can simply deserialize and then > determine if it is a watermark or not. > > As a consequence all downstream implementations can simply be instructed > on how they can see if the event is really an event or a Watermark. > > A default/reference implementation that can be used where a textual format > is expected (like json or xml) is also an option. > > Do note that at the application level I consider the presence of these > special elements to be a part of the definition of the streaming interface > of this (set of) applications. > Regardless of the "where and how" of the implementation: the streaming > interface now "contains watermarks" in a defined way. > > Using a control message, the downstream KafkaConsumer would >> "filter" control messages / watermarks automatically, and user's would >> opt-in explicitly thus providing a safe and backward compatible upgrade >> path. >> > > Yes, that would work. > > (2) About all the metadata: it will be hard for Flink to track all those >> things, but it would be simpler to push it into the storage layer IMHO. >> For example, if Flink does dynamically scaling, it adds a new producer >> to the group and Kafka takes care of the rest. Thus, Flink only needs to >> provide the actual watermark timestamp. >> > > Yes, but still there is a need for an administration that tracks all > producers and their watermarks. > As far as I can tell right now the complexity will be roughly the same in > all directions. > > >> On particular problem is error handling scenario: what happens if a >> producer fails and one downstream watermarks is missing? > > > In my mind the watermarks between all producer tasks are not synchronized > as they are initially created in an independent way on different machines. > So the sketch I have in mind right now to handle this would > - keep a list of all "last seen watermark timestamps" per application per > original taskId per input partition (this last one plays a role if you read > from multiple partitions with a single instance). > - when a new watermark arrives, > - persist it if later than what we have so far. > - Only if it is newer than ALL others we have output a watermark into > the application stream. > - only outputting a watermark if we have a valid 'last watermark' for > all producing tasks. > and > - taking into account how many producer tasks there are: Only output if we > have at least one for each one. > - essentially doing a reset if the producer parallelism changes and > change the expectations on how many parallels we need. > - This is a tricky one as we will get a mix of old and new watermarks > during this transition. > > This would be done in all receiving Source instances separately as they > are generally on different machines. > > So in this failure scenario all Sources would wait with their watermarks > until they have received the watermarks from the recovered instance of this > producer. > > What happens if >> Flink thinks a worker is dead and replaces it with a different one, but >> the worker is actually a zombie and might still write watermarks into >> the topic? > > > Yes, that would be a serious problem for all implementation directions. > I don't know yet. > > Flink cannot fence off the zombie. -- Pushing it into Kafka >> allows to handle those cases much easier IMHO. >> > > Good point. > > >> (3) About ordering: Kafka provides strict ordering guarantees per >> partitions. Thus, there is no problem here. Of course, if there are >> multiple producers writing into the same partition, you get interleaved >> writes (that's why you need to know how many producer you got, to be >> able to reason about it downstream). >> > > That is why my sketch keeps watermarks for each task for all producing > applications and essentially determines a new watermark from the total set. > > The big problem there (also for the Kafka route) would be handling > producers leaving. > In my sketch "new producers" is easy to handle; just add the new > expectation to wait for. > > Leaving producers is not easy to handle as I see no way to determine > the distinction between > - "leaving, continue without it" > and > - "down, wait for it to be back a little while later". > > Perhaps for this an explicit config is needed per source id (pattern?): > - Important producer: expire after 24 hours. > - Yet another IOT sensor: expire after 1 minute. > - and a default. > > Hope this helps. >> > > Yes it does. > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > -- Best regards / Met vriendelijke groeten, Niels Basjes