I just realized I did not mention a key concept in my idea:
All watermarks are sent into all Kafka partitions so every consumer gets
all watermarks from all upstream producing tasks.

On Wed, Dec 22, 2021 at 11:33 AM Niels Basjes <ni...@basjes.nl> wrote:

> > Perhaps for this an explicit config is needed per source id (pattern?):
> > - Important producer: expire after 24 hours.
> > - Yet another IOT sensor: expire after 1 minute.
> > - and a default.
>
> Do note that this actually does something I'm not a fan of: Introduce a
> "Processing time" conclusion (the timeout of the watermarks) in an "Event
> time" stream.
> I consider mixing these two in general something to be avoided.
>
> For the situation a producer is "signing of forever" there is the option
> of it sending out a final closing watermark indicating "MAX_LONG".
> But for the "the sensor died" scenario I do not yet see a better way to do
> this.
>
> Niels
>
>
>
> On Wed, Dec 22, 2021 at 11:22 AM Niels Basjes <ni...@basjes.nl> wrote:
>
>>
>> On Tue, Dec 21, 2021 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> Your high level layout make sense. However, I think there are a few
>>> problems doing it with Flink:
>>>
>>> (1) How to encode the watermark? It could break downstream consumers
>>> that don't know what to do with them (eg, crash on deserialization)?
>>> There is no guarantee that only a downstream Flink job consumes the data
>>> (nor that the downstream Flink was upgraded to understand those input
>>> watermarks).
>>>
>>
>> What I'm doing in my experiments right now is to have created a
>> WatermarkSerde interface that is able to serialize a Watermark into my own
>> format.
>> This WatermarkSerde would then be implemented by the application builder
>> and injected in both the Sink and Source implementation of whatever
>> transport I like to use (Kafka, ...) .
>> The WatermarkSerde would also need something to determine if the provided
>> element is a watermark or a normal element.
>>
>> This allows me to use any format I like in my application and to make the
>> watermark be a valid yet slightly strange record in my stream.
>> So at the receiving end the record can simply deserialize and then
>> determine if it is a watermark or not.
>>
>> As a consequence all downstream implementations can simply be instructed
>> on how they can see if the event is really an event or a Watermark.
>>
>> A default/reference implementation that can be used where a textual
>> format is expected (like json or xml) is also an option.
>>
>> Do note that at the application level I consider the presence of these
>> special elements to be a part of the definition of the streaming interface
>> of this (set of) applications.
>> Regardless of the "where and how" of the implementation: the streaming
>> interface now "contains watermarks" in a defined way.
>>
>> Using a control message, the downstream KafkaConsumer would
>>> "filter" control messages / watermarks automatically, and user's would
>>> opt-in explicitly thus providing a safe and backward compatible upgrade
>>> path.
>>>
>>
>> Yes, that would work.
>>
>> (2) About all the metadata: it will be hard for Flink to track all those
>>> things, but it would be simpler to push it into the storage layer IMHO.
>>> For example, if Flink does dynamically scaling, it adds a new producer
>>> to the group and Kafka takes care of the rest. Thus, Flink only needs to
>>> provide the actual watermark timestamp.
>>>
>>
>> Yes, but still there is a need for an administration that tracks all
>> producers and their watermarks.
>> As far as I can tell right now the complexity will be roughly the same in
>> all directions.
>>
>>
>>>      On particular problem is error handling scenario: what happens if a
>>> producer fails and one downstream watermarks is missing?
>>
>>
>> In my mind the watermarks between all producer tasks are not synchronized
>> as they are initially created in an independent way on different machines.
>> So the sketch I have in mind right now to handle this would
>> - keep a list of all "last seen watermark timestamps" per application per
>> original taskId per input partition (this last one plays a role if you read
>> from multiple partitions with a single instance).
>> - when a new watermark arrives,
>>    - persist it if later than what we have so far.
>>    - Only if it is newer than ALL others we have output a watermark into
>> the application stream.
>>    - only outputting a watermark if we have a valid 'last watermark' for
>> all producing tasks.
>> and
>> - taking into account how many producer tasks there are: Only output if
>> we have at least one for each one.
>> - essentially doing a reset if the producer parallelism changes and
>> change the expectations on how many parallels we need.
>>    - This is a tricky one as we will get a mix of old and new watermarks
>> during this transition.
>>
>> This would be done in all receiving Source instances separately as they
>> are generally on different machines.
>>
>> So in this failure scenario all Sources would wait with their watermarks
>> until they have received the watermarks from the recovered instance of this
>> producer.
>>
>> What happens if
>>> Flink thinks a worker is dead and replaces it with a different one, but
>>> the worker is actually a zombie and might still write watermarks into
>>> the topic?
>>
>>
>> Yes, that would be a serious problem for all implementation directions.
>> I don't know yet.
>>
>> Flink cannot fence off the zombie. -- Pushing it into Kafka
>>> allows to handle those cases much easier IMHO.
>>>
>>
>> Good point.
>>
>>
>>> (3) About ordering: Kafka provides strict ordering guarantees per
>>> partitions. Thus, there is no problem here. Of course, if there are
>>> multiple producers writing into the same partition, you get interleaved
>>> writes (that's why you need to know how many producer you got, to be
>>> able to reason about it downstream).
>>>
>>
>> That is why my sketch keeps watermarks for each task for all producing
>> applications and essentially determines a new watermark from the total set.
>>
>> The big problem there (also for the Kafka route) would be handling
>> producers leaving.
>> In my sketch "new producers" is easy to handle; just add the new
>> expectation to wait for.
>>
>> Leaving producers is not easy to handle as I see no way to determine
>> the distinction between
>> - "leaving, continue without it"
>> and
>> - "down, wait for it to be back a little while later".
>>
>> Perhaps for this an explicit config is needed per source id (pattern?):
>> - Important producer: expire after 24 hours.
>> - Yet another IOT sensor: expire after 1 minute.
>> - and a default.
>>
>> Hope this helps.
>>>
>>
>> Yes it does.
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to