I can describe a use that has been successful for me. We have a Flink workflow 
that calculates reports over many days and have it currently set up to 
recompute the last 10 days or so when recovering this "deep history" from our 
databases and then switches over to live flow to process all subsequent update 
events. I wrote this before the days of the HyrbidSource so it is literally a 
JDBC data source that queries state for the last 10 days and that stream is 
merged with a "live" stream from a db poller or Kafka stream.

In answer to your question, during recovery I have all state for the old 
business days sent with a timestamp of that business date e.g. new 
DateTime(2023, 1, 15, 0, 0, 0, UTC).getMillis() for any data associated with 
the 15th Jan 2023. Once the data source has emitted all the state for that 
date, it then emits a watermark with exactly the same timestamp as it is 
communicating downstream that all the data has been sent for that date. Then 
moves onto the next date emitting that state.

When my system starts up it records the current datetime and treats all data 
retrieved before that timestamp as being recovered state, and all data 
receieved from the live pollers/Kafka to be after that cut-off point. The live 
sources emit objects timestamped with the current time and periodically emit a 
watermark to make forward progress. I'm simplifying here but you get the point.

This pattern is useful for me because my keyed process functions are able to 
register timers to process all the data for an historic date at once - it won't 
need to fire on each message received or try to compute with missing data, but 
instead runs once all the data has been received for a date from all the 
sources. (The time is only triggered when the watermark is reached and that 
required all sources to have reached at least that point in the recovery). Once 
we have reached the startup datetime watermark the system seamlessly flips into 
live processing mode. The watermarks still trigger my timers but now we are 
processing the last ~1 minute of batched data.

So logically the meaning of a timestamp and watermark in my system always 
represents a forward moving moment in time - it is just that it means an 
historic date for data during recovery from the databases and then a current 
timestamp when the system is processing live data.

Hope that gives you some ideas and help.

James.
________________________________
From: Gen Luo <luogen...@gmail.com>
Sent: 02 February 2023 09:52
To: Jan Lukavský <je...@seznam.cz>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Non-temporal watermarks

Hi,

This is an interesting topic. I suppose the watermark is defined based on the 
event time since it's mainly used, or designed, for the event time processing. 
Flink provides the event time processing mechanism because it's widely needed. 
Every event has its event time and we usually need to group or order by the 
event time. On the other hand, this also means that we can process events from 
different sources as the event time is naturally of the same scale.

However, just as you say, technically speaking the event timestamp can be 
replaced with any other meaningful number (or event a comparable), and the 
(event time) watermark should change accordingly. If we promise this field and 
its watermark of all sources are of the same scale, we can process the 
data/event from the sources together with it just like the event time. As the 
event time processing and event time timer service doesn't rely on the actual 
time point or duration, I suppose this can be implemented by defining it as the 
event time, if it contains only positive numbers.


On Thu, Feb 2, 2023 at 5:18 PM Jan Lukavský 
<je...@seznam.cz<mailto:je...@seznam.cz>> wrote:
Hi,

I will not speak about details related to Flink specifically, the
concept of watermarks is more abstract, so I'll leave implementation
details aside.

Speaking generally, yes, there is a set of requirements that must be met
in order to be able to generate a system that uses watermarks.

The primary question is what are watermarks used for? The answer is - we
need watermarks to be able to define a partially stable order of
_events_. Event is an immutable piece of data that can be _observed_
(i.e. processed) with various consumer-dependent delays (two consumers
of the event can see the event at different processing times), or a
specific (local) timestamp. Generally an event tells us that something,
somewhere happened at given local timestamp.

Watermarks create markers in processing time of each observer, so that
the observer is able to tell if two events (e.g. event "close
time-window T1" and "new data with timestamp T2 arrived") can be ordered
(that is being able to tell which one is - globally! - preceding the other).

Having said that - there is a general algebra for "timestamps" - and
therefore watermarks. A timestamp can be any object that defines the
following operations:

  - a less-than relation <, i.e. t1 < t2: bool, this relation needs to
be a antisymmetric, so t1 < t2 implies not t2 < t1

  - a function min_timestamp_following(timestamp t1, duration):
timestamp t2, that returns the minimal timestamp, for which t1 +
duration < t2 (this function is actually a definition of duration)

These two conditions allows to construct a working streaming processing
system, which means there should be no problem using different
"timestamps", provided we know how to construct the above.

Using "a different number" for timestamps and watermarks seems valid in
this sense, provided you are fine with the implicit definition of
duration, that is currently defined as simple t2 - t1.

I tried to explain why it is not good to expect that two events can be
globally ordered and what is the actual role of watermarks in this in a
twitter thread [1], if anyone interested.

Best,

  Jan

[1]
https://twitter.com/janl_apache/status/1478757956263071745?s=20&t=cMXfPHS8EjPrbF8jys43BQ

On 2/2/23 00:18, Yaroslav Tkachenko wrote:
> Hey everyone,
>
> I'm wondering if anyone has done any experiments trying to use
> non-temporal watermarks? For example, a dataset may contain some kind
> of virtual timestamp / version field that behaves just like a regular
> timestamp (monotonically increasing, etc.), but has a different scale
> / range.
>
> As far as I can see Flink assumes that the values used for event times
> and watermark generation are actually timestamps and the Table API
> requires you to define watermarks on TIMESTAMP columns.
>
> Practically speaking timestamp is just a number, so if I have a
> "timeline" that consists of 1000 monotonically increasing integers,
> for example, the concepts like late-arriving
> data, bounded-out-of-orderness, etc. still work.
>
> Thanks for sharing any thoughts you might have on this topic!

Reply via email to