Hi Stefan, Thanks for all the comments! That is really helpful and I have updated the FLIP based on your comments. Please see my reply inline.
On Mon, Jul 10, 2023 at 10:23 PM Stefan Richter <srich...@confluent.io.invalid> wrote: > Hi, > > After reading through the discussion, I think the FLIP should provide > additional details and explanations about the exact semantics of the > end-to-end latency configuration and how it interacts with all other > configurations around latency, such as checkpointing. In this context, I > have a few more questions: > Good point. I agree the FLIP should provide additional details as you suggested. I have updated FLIP with a "High-level Overview" section. Can you see if that section could answer your questions? > > 1. If the idea of the configuration is to enable operators to apply some > batching as optimization within the bounds of the configured latency, how would we use that information to globally optimize > the operators in a job? Today, no operator knows about the assumed latency > of its downstream operators. So if the goal is to provide a target latency > for the whole pipeline, how do we plan to split the “budget” among all > operators and how can operators locally decide how much latency is ok to > introduce? We will not split the "budget" among all operators. Suppose execution.end-to-end-latency is configured to be larger than 0, then each operator can *optionally* buffer records until its flush() method is invoked by Flink runtime, which should happen roughly once every configured interval. So we expect each operator to be able to buffer records for up to the configured interval. Also note that execution.end-to-end-latency is a loose contract and we do not guarantee the actual end-to-end latency will always be within this bound. > Or did I misunderstand and the configuration is per operator and adds up > for the whole pipeline? How do window operators fit into this model, in particular if the window duration is longer than the > configured end-to-end latency? Would they just forward and ignore flush > events? But arguably this would lead far away from any end-to-end time > guarantees. > Good question! This case is not covered by the current FLIP and it should be explicitly explained. After thinking about this, I think operators that currently buffer records (e.g. window operator, two-phase commit sink) should do the following: - When flush() is invoked, it records the fact that flush() has been invoked (and the last ignored flushEventId) without actually emitting records. - When it is able to emit records (e.g. checkpoint triggered), it should check whether a flush() has been invoked when it was buffering those records. If yes, then it should additionally emit FlushEvent (using the last ignored flushEventId) right after it emits the buffered records. I have updated the public API and the proposed changes section so that we can enforce the behavior described above. Can you check if it answers your question? > 2. How does this idea interact with checkpointing. I know this question > was asked before and the answer was that this should be independent, but I > don’t see how it can be independent for exactly-once where we should only > be able to produce outputs on checkpoints. Would end-to-end latency config > be rather useless if the checkpoint interval is greater than the end-to-end > latency config? Or are you planning to adjust the checkpointing interval > w.r.t. the latency config? > The following are mentioned as part of execution.end-to-end-latency's description: "It's important to note that the actual latency can exceed this configured value due to factors such as backlog, per-record processing delays, or operators that hold records until the next checkpoint". Briefly speaking, execution.end-to-end-latency does not provide a hard guarantee since we are not able to guarantee it anyway. If the configs are conflict, such as when execution.end-to-end-latency < execution.checkpointint.interval and there is two-phase commit sink, the semantics of existing configuration should be enforced with higher priority than execution.end-to-end-latency. I updated the FLIP to include the following statement: "This config is compatible with all existing configurations/operators in the sense that the semantics of all existing configuration/operator will be preserved and enforced with higher priority than execution.end-to-end-latency". And yes, the end-to-end latency config might be useless if the checkpoint interval is greater than the end-to-end latency config AND there is two-phase commit sink. But note that it might be useful if operators before the two-phase commit sink are the performance bottleneck whose throughput can increase with extra buffer time. This FLIP will not adjust checkpointing intervals based on the latency config. Users are free to adjust the config by themselves as appropriate. > > 3. Why do we need record attributes AND flush events? Couldn't the flush > events be tagged with either backlog = true or false? > Let's say we have a Flink job that reads data from Kafka since two days ago. When the Kafka watermark lag is more than 1 minute, the user has absolutely no requirement for processing latency (i.e. operators can buffer records arbitrarily long). When the Kafka watermark lag is less than 1 minute, the user needs processing latency to be less than 10 seconds. In order to address this use-case, the user needs to specify the watermark lag threshold (i.e. 1 minute) so that Kafka Source will tell downstream operators its IsProcessingBacklog = true when watermarkLag > 1 minute. And the user needs to set execution.end-to-end-latency to 10 seconds. Suppose we only have FlushEvent, then Kafka Source should emit FlushEvent(backlog=true) at the beginning, emit FlushEvent(backlog=false) once every 10 seconds when watermarkLag < 1 minute. And we need to pass the value of backlog as a parameter to flush(..). It is indeed possible to implement this FLIP using the above design (i.e. no RecordAttributes). But I am not sure the resulting API is intuitive for the operator developer. For example, the user does not need to emit any record if flush(...) is invoked with backlog=true, making it a bit weird to call this flush(...). IMO, IsBacklog and FlushEvent are conceptually two different things. IsBacklog tells operators the attribute of incoming the records. FlushEvent is used to coordinate the flush operation across the topology. Therefore it seems cleaner to separate these two classes. What do you think? > > 4. What happens with events that were submitted under backlog = false and > caught in an aggregation when the regiment changes to backlog = true or > would we require a flush before we can a change? The point of my question > is to ask whether any events could become “trapped” when an operator moves > from streaming to buffering mode. > Conceptually, isBacklog is an inherent attribute of all records coming after the RecordAttributes, which will not be changed by the RecordAttribute coming after the given record. An operator should take care to enforce the processing latency requirement for the records it has received when isBacklog=false. Therefore, in the case described above, the operator should flush/emit buffered records (inside `#processRecordAttributes`) if backlog status of the input switches from false to true, so that we do not trap records that should be processed with low latency. I have updated the FLIP with the above explanation for `#processRecordAttributes`. Would this answer your question? > 5. Small nitpick on your definition on latency in the FLIP: not all events > make contribute to output that reaches a sink, for example a flatMap could > simply filter out some events. Meaning, there is at least a big loophole in > the current definition of end-to-end latency as the maximum of this time, > because it would be infinite for such events. > Good catch! You are right. I have updated the definition of the processing latency to fix this loophole. Can you see if it is well-defined now? Thanks, Dong Thanks, > Stefan > > >