Hi Jing, Thank you for the comments! Please see my reply inline.
On Tue, Jul 11, 2023 at 5:41 AM Jing Ge <j...@ververica.com.invalid> wrote: > Hi Dong, > > Thanks for the proposal! The FLIP is already in good shape. I got some NIT > questions. > > 1. It is a little bit weird to write the hint right after the motivation > that some features have been moved to FLIP-331, because at that time, > readers don't know the context about what features does it mean. I would > suggest moving the note to the beginning of "Public interfaces" sections. > Given that the reviewer who commented on this email thread before I refactored the FLIP (i.e. Piotr) has read FLP-331, I think it is simpler to just remove any mention of FLIP-331. I have updated the FLIP accordingly. > 2. It is also a little bit weird to describe all behaviour changes at first > but only focus on one single feature, i.e. how to implement > internalSorterSupported. TBH, I was lost while I was reading the Public > interfaces. Maybe change the FLIP title? Another option could be to write a > short summary of all features and point out that this FLIP will only focus > on the internalSorterSupported feature. Others could be found in FLIP-331. > WDYT? > Conceptually, the purpose of this FLIP is to allow a stream mode job to run parts of the topology in batch mode so that it can apply optimizations/computations that can not be used together with checkpointing (and thus not usable in stream mode). Although internal sorter is the only optimization immediately supported in this FLIP, this FLIP lays the foundation to support other optimizations in the future, such as using GPU to process a bounded stream of records. Therefore, I find it better to keep the current title rather than limiting the scope to internal sorter. What do you think? > 3. There should be a typo at 4) Checkpoint and failover strategy -> Mixed > mode -> > > - If any task fails when isBacklog=false true, this task is restarted to > re-process its input from the beginning. > > Thank you for catching this issue. It is fixed now. Best, Dong > > > Best regards > Jing > > > On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <lindon...@gmail.com> wrote: > > > Hi Piotr, > > > > Thanks for your comments! Please see my reply inline. > > > > On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <piotr.nowoj...@gmail.com > > > > wrote: > > > > > Hi Dong, > > > > > > I have a couple of questions. > > > > > > Could you explain why those properties > > > > > > @Nullable private Boolean isOutputOnEOF = null; > > > @Nullable private Boolean isOutputOnCheckpoint = null; > > > @Nullable private Boolean isInternalSorterSupported = null; > > > > > > must be `@Nullable`, instead of having the default value set to > `false`? > > > > > > > By initializing these private variables in OperatorAttributesBuilder as > > null, we can implement `OperatorAttributesBuilder#build()` in such a way > > that it can print DEBUG level logging to say "isOutputOnCheckpoint is not > > explicitly set". This can help user/SRE debug performance issues (or lack > > of the expected optimization) due to operators not explicitly setting the > > right operator attribute. > > > > For example, we might want a job to always use the longer checkpointing > > interval (i.e. execution.checkpointing.interval-during-backlog) if all > > running operators have isOutputOnCheckpoint==false, and use the short > > checkpointing interval otherwise. If a user has explicitly configured the > > execution.checkpointing.interval-during-backlog but the two-phase commit > > sink library has not been upgraded to set isOutputOnCheckpoint=true, then > > the job will end up using the long checkpointing interval, and it will be > > useful to figure out what is going wrong in this case by checking the > log. > > > > Note that the default value of these fields of the OperatorAttributes > > instance built by OperatorAttributesBuilder will still be false. The > > following is mentioned in the Java doc of > > `OperatorAttributesBuilder#build()`: > > > > /** > > * If any operator attribute is null, we will log it at DEBUG level and > > use the following > > * default values. > > * - isOutputOnEOF defaults to false > > * - isOutputOnCheckpoint defaults to false > > * - isInternalSorterSupported defaults to false > > */ > > > > > > > > > > Second question, have you thought about cases where someone is > > > either bootstrapping from a streaming source like Kafka > > > or simply trying to catch up after a long period of downtime in a > purely > > > streaming job? Generally speaking a cases where > > > user doesn't care about latency in the catch up phase, regardless if > the > > > source is bounded or unbounded, but wants to process > > > the data as fast as possible, and then switch dynamically to real time > > > processing? > > > > > > > Yes, I have thought about this. We should allow this job to effectively > run > > in batch mode when the job is in the catch-up phase. FLIP-327 is actually > > an important step toward addressing this use-case. > > > > In order to address the above use-case, all we need is a way for source > > operator (e.g. Kafka) to tell Flink runtime (via IsProcessingBacklog) > > whether it is in the catch-up phase. > > > > Since every Kafka message has event-timestamp, we can allow users to > > specify a job-level config such as backlog-watermark-lag-threshold, and > > consider a Kafka Source to have IsProcessingBacklog=true if system_time - > > watermark > backlog-watermark-lag-threshold. This effectively allows us > to > > determine whether Kafka is in the catch up phase. > > > > Once we have this capability (I plan to work on this in FLIP-328), we can > > directly use the features proposed in FLIP-325 and FLIP-327 to optimize > the > > above use-case. > > > > What do you think? > > > > Best, > > Dong > > > > > > > > > > Best, > > > Piotrek > > > > > > niedz., 2 lip 2023 o 16:15 Dong Lin <lindon...@gmail.com> napisał(a): > > > > > > > Hi all, > > > > > > > > I am opening this thread to discuss FLIP-327: Support stream-batch > > > unified > > > > operator to improve job throughput when processing backlog data. The > > > design > > > > doc can be found at > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data > > > > . > > > > > > > > This FLIP enables a Flink job to initially operate in batch mode, > > > achieving > > > > high throughput while processing records that do not require low > > > processing > > > > latency. Subsequently, the job can seamlessly transition to stream > mode > > > for > > > > processing real-time records with low latency. Importantly, the same > > > state > > > > can be utilized before and after this mode switch, making it > > particularly > > > > valuable when users wish to bootstrap the job's state using > historical > > > > data. > > > > > > > > We would greatly appreciate any comments or feedback you may have on > > this > > > > proposal. > > > > > > > > Cheers, > > > > Dong > > > > > > > > > >