> Perhaps for this an explicit config is needed per source id (pattern?):
> - Important producer: expire after 24 hours.
> - Yet another IOT sensor: expire after 1 minute.
> - and a default.

Do note that this actually does something I'm not a fan of: Introduce a
"Processing time" conclusion (the timeout of the watermarks) in an "Event
time" stream.
I consider mixing these two in general something to be avoided.

For the situation a producer is "signing of forever" there is the option of
it sending out a final closing watermark indicating "MAX_LONG".
But for the "the sensor died" scenario I do not yet see a better way to do
this.

Niels



On Wed, Dec 22, 2021 at 11:22 AM Niels Basjes <ni...@basjes.nl> wrote:

>
> On Tue, Dec 21, 2021 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> Your high level layout make sense. However, I think there are a few
>> problems doing it with Flink:
>>
>> (1) How to encode the watermark? It could break downstream consumers
>> that don't know what to do with them (eg, crash on deserialization)?
>> There is no guarantee that only a downstream Flink job consumes the data
>> (nor that the downstream Flink was upgraded to understand those input
>> watermarks).
>>
>
> What I'm doing in my experiments right now is to have created a
> WatermarkSerde interface that is able to serialize a Watermark into my own
> format.
> This WatermarkSerde would then be implemented by the application builder
> and injected in both the Sink and Source implementation of whatever
> transport I like to use (Kafka, ...) .
> The WatermarkSerde would also need something to determine if the provided
> element is a watermark or a normal element.
>
> This allows me to use any format I like in my application and to make the
> watermark be a valid yet slightly strange record in my stream.
> So at the receiving end the record can simply deserialize and then
> determine if it is a watermark or not.
>
> As a consequence all downstream implementations can simply be instructed
> on how they can see if the event is really an event or a Watermark.
>
> A default/reference implementation that can be used where a textual format
> is expected (like json or xml) is also an option.
>
> Do note that at the application level I consider the presence of these
> special elements to be a part of the definition of the streaming interface
> of this (set of) applications.
> Regardless of the "where and how" of the implementation: the streaming
> interface now "contains watermarks" in a defined way.
>
> Using a control message, the downstream KafkaConsumer would
>> "filter" control messages / watermarks automatically, and user's would
>> opt-in explicitly thus providing a safe and backward compatible upgrade
>> path.
>>
>
> Yes, that would work.
>
> (2) About all the metadata: it will be hard for Flink to track all those
>> things, but it would be simpler to push it into the storage layer IMHO.
>> For example, if Flink does dynamically scaling, it adds a new producer
>> to the group and Kafka takes care of the rest. Thus, Flink only needs to
>> provide the actual watermark timestamp.
>>
>
> Yes, but still there is a need for an administration that tracks all
> producers and their watermarks.
> As far as I can tell right now the complexity will be roughly the same in
> all directions.
>
>
>>      On particular problem is error handling scenario: what happens if a
>> producer fails and one downstream watermarks is missing?
>
>
> In my mind the watermarks between all producer tasks are not synchronized
> as they are initially created in an independent way on different machines.
> So the sketch I have in mind right now to handle this would
> - keep a list of all "last seen watermark timestamps" per application per
> original taskId per input partition (this last one plays a role if you read
> from multiple partitions with a single instance).
> - when a new watermark arrives,
>    - persist it if later than what we have so far.
>    - Only if it is newer than ALL others we have output a watermark into
> the application stream.
>    - only outputting a watermark if we have a valid 'last watermark' for
> all producing tasks.
> and
> - taking into account how many producer tasks there are: Only output if we
> have at least one for each one.
> - essentially doing a reset if the producer parallelism changes and
> change the expectations on how many parallels we need.
>    - This is a tricky one as we will get a mix of old and new watermarks
> during this transition.
>
> This would be done in all receiving Source instances separately as they
> are generally on different machines.
>
> So in this failure scenario all Sources would wait with their watermarks
> until they have received the watermarks from the recovered instance of this
> producer.
>
> What happens if
>> Flink thinks a worker is dead and replaces it with a different one, but
>> the worker is actually a zombie and might still write watermarks into
>> the topic?
>
>
> Yes, that would be a serious problem for all implementation directions.
> I don't know yet.
>
> Flink cannot fence off the zombie. -- Pushing it into Kafka
>> allows to handle those cases much easier IMHO.
>>
>
> Good point.
>
>
>> (3) About ordering: Kafka provides strict ordering guarantees per
>> partitions. Thus, there is no problem here. Of course, if there are
>> multiple producers writing into the same partition, you get interleaved
>> writes (that's why you need to know how many producer you got, to be
>> able to reason about it downstream).
>>
>
> That is why my sketch keeps watermarks for each task for all producing
> applications and essentially determines a new watermark from the total set.
>
> The big problem there (also for the Kafka route) would be handling
> producers leaving.
> In my sketch "new producers" is easy to handle; just add the new
> expectation to wait for.
>
> Leaving producers is not easy to handle as I see no way to determine
> the distinction between
> - "leaving, continue without it"
> and
> - "down, wait for it to be back a little while later".
>
> Perhaps for this an explicit config is needed per source id (pattern?):
> - Important producer: expire after 24 hours.
> - Yet another IOT sensor: expire after 1 minute.
> - and a default.
>
> Hope this helps.
>>
>
> Yes it does.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to