On Tue, Dec 21, 2021 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:
> Your high level layout make sense. However, I think there are a few > problems doing it with Flink: > > (1) How to encode the watermark? It could break downstream consumers > that don't know what to do with them (eg, crash on deserialization)? > There is no guarantee that only a downstream Flink job consumes the data > (nor that the downstream Flink was upgraded to understand those input > watermarks). > What I'm doing in my experiments right now is to have created a WatermarkSerde interface that is able to serialize a Watermark into my own format. This WatermarkSerde would then be implemented by the application builder and injected in both the Sink and Source implementation of whatever transport I like to use (Kafka, ...) . The WatermarkSerde would also need something to determine if the provided element is a watermark or a normal element. This allows me to use any format I like in my application and to make the watermark be a valid yet slightly strange record in my stream. So at the receiving end the record can simply deserialize and then determine if it is a watermark or not. As a consequence all downstream implementations can simply be instructed on how they can see if the event is really an event or a Watermark. A default/reference implementation that can be used where a textual format is expected (like json or xml) is also an option. Do note that at the application level I consider the presence of these special elements to be a part of the definition of the streaming interface of this (set of) applications. Regardless of the "where and how" of the implementation: the streaming interface now "contains watermarks" in a defined way. Using a control message, the downstream KafkaConsumer would > "filter" control messages / watermarks automatically, and user's would > opt-in explicitly thus providing a safe and backward compatible upgrade > path. > Yes, that would work. (2) About all the metadata: it will be hard for Flink to track all those > things, but it would be simpler to push it into the storage layer IMHO. > For example, if Flink does dynamically scaling, it adds a new producer > to the group and Kafka takes care of the rest. Thus, Flink only needs to > provide the actual watermark timestamp. > Yes, but still there is a need for an administration that tracks all producers and their watermarks. As far as I can tell right now the complexity will be roughly the same in all directions. > On particular problem is error handling scenario: what happens if a > producer fails and one downstream watermarks is missing? In my mind the watermarks between all producer tasks are not synchronized as they are initially created in an independent way on different machines. So the sketch I have in mind right now to handle this would - keep a list of all "last seen watermark timestamps" per application per original taskId per input partition (this last one plays a role if you read from multiple partitions with a single instance). - when a new watermark arrives, - persist it if later than what we have so far. - Only if it is newer than ALL others we have output a watermark into the application stream. - only outputting a watermark if we have a valid 'last watermark' for all producing tasks. and - taking into account how many producer tasks there are: Only output if we have at least one for each one. - essentially doing a reset if the producer parallelism changes and change the expectations on how many parallels we need. - This is a tricky one as we will get a mix of old and new watermarks during this transition. This would be done in all receiving Source instances separately as they are generally on different machines. So in this failure scenario all Sources would wait with their watermarks until they have received the watermarks from the recovered instance of this producer. What happens if > Flink thinks a worker is dead and replaces it with a different one, but > the worker is actually a zombie and might still write watermarks into > the topic? Yes, that would be a serious problem for all implementation directions. I don't know yet. Flink cannot fence off the zombie. -- Pushing it into Kafka > allows to handle those cases much easier IMHO. > Good point. > (3) About ordering: Kafka provides strict ordering guarantees per > partitions. Thus, there is no problem here. Of course, if there are > multiple producers writing into the same partition, you get interleaved > writes (that's why you need to know how many producer you got, to be > able to reason about it downstream). > That is why my sketch keeps watermarks for each task for all producing applications and essentially determines a new watermark from the total set. The big problem there (also for the Kafka route) would be handling producers leaving. In my sketch "new producers" is easy to handle; just add the new expectation to wait for. Leaving producers is not easy to handle as I see no way to determine the distinction between - "leaving, continue without it" and - "down, wait for it to be back a little while later". Perhaps for this an explicit config is needed per source id (pattern?): - Important producer: expire after 24 hours. - Yet another IOT sensor: expire after 1 minute. - and a default. Hope this helps. > Yes it does. -- Best regards / Met vriendelijke groeten, Niels Basjes