Hi Stefan,

Thanks for all the comments! That is really helpful and I have updated the
FLIP based on your comments. Please see my reply inline.

On Mon, Jul 10, 2023 at 10:23 PM Stefan Richter
<srich...@confluent.io.invalid> wrote:

> Hi,
>
> After reading through the discussion, I think the FLIP should provide
> additional details and explanations about the exact semantics of the
> end-to-end latency configuration and how it interacts with all other
> configurations around latency, such as checkpointing. In this context, I
> have a few more questions:
>

Good point. I agree the FLIP should provide additional details as you
suggested. I have updated FLIP with a "High-level Overview" section. Can
you see if that section could answer your questions?


>
> 1. If the idea of the configuration is to enable operators to apply some
> batching as optimization within the bounds of the

configured latency, how would we use that information to globally optimize
> the operators in a job? Today, no operator knows about the assumed latency
> of its downstream operators. So if the goal is to provide a target latency
> for the whole pipeline, how do we plan to split the “budget” among all
> operators and how can operators locally decide how much latency is ok to
> introduce?


We will not split the "budget" among all operators.
Suppose execution.end-to-end-latency is configured to be larger than 0,
then each operator can *optionally* buffer records until its flush() method
is invoked by Flink runtime, which should happen roughly once every
configured interval. So we expect each operator to be able to buffer
records for up to the configured interval.

Also note that execution.end-to-end-latency is a loose contract and we do
not guarantee the actual end-to-end latency will always be within this
bound.


>

Or did I misunderstand and the configuration is per operator and adds up
> for the whole pipeline? How do window operators fit

into this model, in particular if the window duration is longer than the
> configured end-to-end latency? Would they just forward and ignore flush
> events? But arguably this would lead far away from any end-to-end time
> guarantees.
>

Good question! This case is not covered by the current FLIP and it should
be explicitly explained.

After thinking about this, I think operators that currently buffer records
(e.g. window operator, two-phase commit sink) should do the following:
- When flush() is invoked, it records the fact that flush() has been
invoked (and the last ignored flushEventId) without actually emitting
records.
- When it is able to emit records (e.g. checkpoint triggered), it should
check whether a flush() has been invoked when it was buffering those
records. If yes, then it should additionally emit FlushEvent (using the
last ignored flushEventId) right after it emits the buffered records.

I have updated the public API and the proposed changes section so that we
can enforce the behavior described above. Can you check if it answers your
question?


> 2. How does this idea interact with checkpointing. I know this question
> was asked before and the answer was that this should be independent, but I
> don’t see how it can be independent for exactly-once where we should only
> be able to produce outputs on checkpoints. Would end-to-end latency config
> be rather useless if the checkpoint interval is greater than the end-to-end
> latency config? Or are you planning to adjust the checkpointing interval
> w.r.t. the latency config?
>

The following are mentioned as part of execution.end-to-end-latency's
description: "It's important to note that the actual latency can exceed
this configured value due to factors such as backlog, per-record processing
delays, or operators that hold records until the next checkpoint".

Briefly speaking, execution.end-to-end-latency does not provide a hard
guarantee since we are not able to guarantee it anyway. If the configs are
conflict, such as when execution.end-to-end-latency <
execution.checkpointint.interval and there is two-phase commit sink, the
semantics of existing configuration should be enforced with higher priority
than execution.end-to-end-latency.

I updated the FLIP to include the following statement: "This config is
compatible with all existing configurations/operators in the sense that the
semantics of all existing configuration/operator will be preserved and
enforced with higher priority than execution.end-to-end-latency".

And yes, the end-to-end latency config might be useless if the checkpoint
interval is greater than the end-to-end latency config AND there is
two-phase commit sink. But note that it might be useful if operators before
the two-phase commit sink are the performance bottleneck whose throughput
can increase with extra buffer time.

This FLIP will not adjust checkpointing intervals based on the latency
config. Users are free to adjust the config by themselves as appropriate.


>
> 3. Why do we need record attributes AND flush events? Couldn't the flush
> events be tagged with either backlog = true or false?
>

Let's say we have a Flink job that reads data from Kafka since two days
ago. When the Kafka watermark lag is more than 1 minute, the user has
absolutely no requirement for processing latency (i.e. operators can buffer
records arbitrarily long). When the Kafka watermark lag is less than 1
minute, the user needs processing latency to be less than 10 seconds.

In order to address this use-case, the user needs to specify the watermark
lag threshold (i.e. 1 minute) so that Kafka Source will tell downstream
operators its IsProcessingBacklog = true when watermarkLag > 1 minute. And
the user needs to set execution.end-to-end-latency to 10 seconds.

Suppose we only have FlushEvent, then Kafka Source should emit
FlushEvent(backlog=true) at the beginning, emit FlushEvent(backlog=false)
once every 10 seconds when watermarkLag < 1 minute. And we need to pass the
value of backlog as a parameter to flush(..).

It is indeed possible to implement this FLIP using the above design (i.e.
no RecordAttributes). But I am not sure the resulting API is intuitive for
the operator developer. For example, the user does not need to emit any
record if flush(...) is invoked with backlog=true, making it a bit weird to
call this flush(...).

IMO, IsBacklog and FlushEvent are conceptually two different things.
IsBacklog tells operators the attribute of incoming the records. FlushEvent
is used to coordinate the flush operation across the topology. Therefore it
seems cleaner to separate these two classes.

What do you think?


>
> 4. What happens with events that were submitted under backlog = false and
> caught in an aggregation when the regiment changes to backlog = true or
> would we require a flush before we can a change? The point of my question
> is to ask whether any events could become “trapped” when an operator moves
> from streaming to buffering mode.
>

Conceptually, isBacklog is an inherent attribute of all records coming
after the RecordAttributes, which will not be changed by the
RecordAttribute coming after the given record.

An operator should take care to enforce the processing latency requirement
for the records it has received when isBacklog=false. Therefore, in the
case described above, the operator should flush/emit buffered records
(inside `#processRecordAttributes`) if backlog status of the input switches
from false to true, so that we do not trap records that should be processed
with low latency.

I have updated the FLIP with the above explanation for
`#processRecordAttributes`. Would this answer your question?


> 5. Small nitpick on your definition on latency in the FLIP: not all events
> make contribute to output that reaches a sink, for example a flatMap could
> simply filter out some events. Meaning, there is at least a big loophole in
> the current definition of end-to-end latency as the maximum of this time,
> because it would be infinite for such events.
>

Good catch! You are right. I have updated the definition of the processing
latency to fix this loophole. Can you see if it is well-defined now?

Thanks,
Dong

Thanks,
> Stefan
>
>
>

Reply via email to