> That is a fair point, but I don't think we can guarantee that we have a timestamp embedded in the record. (Or is there some stable kafka metadata we could use here, I'm not that familiar with what kafka guarantees). We could require it to be opt-in given the caveats.

Kafka (and Kinesis) offer an ingestion timestamp, which can also be set by users (at least in the Kafka case, I don't know the details of Kinesis). This is what KinesisIO actually uses as event timestamp - the "approximate arrival timestamp".

The tricky bit is how the user specifies the watermark, unless they can
guarantee the custom timestamps are monotonically ordered (at least within a partition).

If assigned by the broker, the log append time is monotonic for partition (unless brokers in a cluster have clocks seriously out of sync, then a leader re-election could cause time go backwards). I went through the code in KafkaIO and it also has a great amount of code deprecated and with "use withLogAppendTime instead" comments (the associated issue even has only three digits [1] :)). I would tend to remove the processing time timestamping and watermarking all together, but let users specify it manually, provided they know the consequences.

Jan [1] https://issues.apache.org/jira/browse/BEAM-591

On 10/31/23 21:36, Robert Bradshaw via dev wrote:
On Tue, Oct 31, 2023 at 10:28 AM Jan Lukavský<je...@seznam.cz>  wrote:
On 10/31/23 17:44, Robert Bradshaw via dev wrote:
There are really two cases that make sense:

(1) We read the event timestamps from the kafka records themselves and
have some external knowledge that guarantees (or at least provides a
very good heuristic) about what the timestamps of unread messages
could be in the future to set the watermark. This could possibly
involve knowing that the timestamps in a partition are monotonically
increasing, or somehow have bounded skew.
+1
(2) We use processing time as both the watermark and for setting the
event timestamp on produced messages. From this point on we can safely
reason about the event time.
This is where I have some doubts. We can reason about event time, but is
is not stable upon Pipeline restarts (if there is any downstream
processing that depends on event time and is not shielded by
@RequiresStableInput, it might give different results on restarts).
That is a fair point, but I don't think we can guarantee that we have
a timestamp embedded in the record. (Or is there some stable kafka
metadata we could use here, I'm not that familiar with what kafka
guarantees). We could require it to be opt-in given the caveats.

Is
there any specific case why not use option 1)? Do we have to provide the
alternative 2), provided users can implement it themselves (we would
need to allow users to specify custom timestamp function, but that
should be done in all cases)?
The tricky bit is how the user specifies the watermark, unless they
can guarantee the custom timestamps are monotonically ordered (at
least within a partition).

The current state seems a bit broken if I understand correctly.
+1
On Tue, Oct 31, 2023 at 1:16 AM Jan Lukavský<je...@seznam.cz>  wrote:
I think that instead of deprecating and creating new version, we could leverage 
the proposed update compatibility flag for this [1]. I still have some doubts 
if the processing-time watermarking (and event-time assignment) makes sense. Do 
we have a valid use-case for that? This is actually the removed 
SYNCHRONIZED_PROCESSING_TIME time domain, which is problematic - restarts of 
Pipelines causes timestamps to change and hence makes *every* DoFn potentially 
non-deterministic, which would be unexpected side-effect. This makes me wonder 
if we should remove this policy altogether (deprecate or use the update 
compatibility flag, so that the policy throws exception in new version).

The crucial point would be to find a use-case where it is actually helpful to 
use such policy.
Any ideas?

   Jan

[1]https://lists.apache.org/thread/29r3zv04n4ooq68zzvpw6zm1185n59m2

On 10/27/23 18:33, Alexey Romanenko wrote:

Ahh, ok, I see.

Yes, it looks like a bug. So, I'd propose to deprecate the old "processing 
time” watermark policy, which we can remove later, and create a new fixed one.

PS: It’s recommended to use "org.apache.beam.sdk.io.aws2.kinesis.KinesisIO” 
instead of deprecated “org.apache.beam.sdk.io.kinesis.KinesisIO” one.

—
Alexey

On 27 Oct 2023, at 17:42, Jan Lukavský<je...@seznam.cz>  wrote:

No, I'm referring to this [1] policy which has unexpected (and hardly avoidable 
on the user-code side) data loss issues. The problem is that assigning 
timestamps to elements and watermarks is completely decoupled and unrelated, 
which I'd say is a bug.

   Jan

[1]https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--

On 10/27/23 16:51, Alexey Romanenko wrote:

Why not just to create a custom watermark policy for that? Or you mean to make 
it as a default policy?

—
Alexey

On 27 Oct 2023, at 10:25, Jan Lukavský<je...@seznam.cz>  wrote:


Hi,

when discussing about [1] we found out, that the issue is actually caused by 
processing time watermarks in KinesisIO. Enabling this watermark outputs 
watermarks based on current processing time, _but event timestamps are derived 
from ingestion timestamp_. This can cause unbounded lateness when processing 
backlog. I think this setup is error-prone and will likely cause data loss due 
to dropped elements. This can be solved in two ways:

   a) deprecate processing time watermarks, or

   b) modify KinesisIO's watermark policy so that is assigns event timestamps 
as well (the processing-time watermark policy would have to derive event 
timestamps from processing-time).

I'd prefer option b) , but it might be a breaking change, moreover I'm not sure 
if I understand the purpose of processing-time watermark policy, it might be 
essentially ill defined from the beginning, thus it might really be better to 
remove it completely. There is also a related issue [2].

Any thoughts on this?

   Jan

[1]https://github.com/apache/beam/issues/25975

[2]https://github.com/apache/beam/issues/28760


Reply via email to