Hi Dong, Thanks for your reply!
Best regards, Jing On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <lindon...@gmail.com> wrote: > Hi Jing, > > Thanks for the comments. Please see my reply inline. > > On Wed, Jul 12, 2023 at 5:04 AM Jing Ge <j...@ververica.com.invalid> > wrote: > > > Hi Dong, > > > > Thanks for the clarification. Now it is clear for me. I got additional > noob > > questions wrt the internal sorter. > > > > 1. when to call setter to set the internalSorterSupported to be true? > > > > Developer of the operator class (i.e. those classes which implements > `StreamOperator`) should override the `#getOperatorAttributes()` API to set > internalSorterSupported to true, if he/she decides to sort records > internally in the operator. > > > > 2 > > *"For those operators whose throughput can be considerably improved with > an > > internal sorter, update it to take advantage of the internal sorter when > > its input has isBacklog=true.* > > *Typically, operators that involve aggregation operation (e.g. join, > > cogroup, aggregate) on keyed inputs can benefit from using an internal > > sorter."* > > > > *"The operator that performs CoGroup operation will instantiate two > > internal sorter to sorts records from its two inputs separately. Then it > > can pull the sorted records from these two sorters. This can be done > > without wrapping input records with TaggedUnion<...>. In comparison, the > > existing DataStream#coGroup needs to wrap input records with > > TaggedUnion<...> before sorting them using one external sorter, which > > introduces higher overhead."* > > > > According to the performance test, it seems that internal sorter has > better > > performance than external sorter. Is it possible to make those operators > > that can benefit from it use internal sorter by default? > > > > Yes, it is possible. After this FLIP is done, users can use > DataStream#coGroup with EndOfStreamWindows as the window assigner to > co-group two streams in effectively the batch manner. An operator that uses > an internal sorter will be used to perform the co-group operation. There is > no need for users of the DataStream API to explicitly know or set the > internal sorter in anyway. > > In the future, we plan to incrementally optimize other aggregation > operation (e.g. aggregate) on the DataStream API when EndOfStreamWindows is > used as the window assigner. > > Best, > Dong > > > > > > Best regards, > > Jing > > > > > > On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <lindon...@gmail.com> wrote: > > > > > Hi Jing, > > > > > > Thank you for the comments! Please see my reply inline. > > > > > > On Tue, Jul 11, 2023 at 5:41 AM Jing Ge <j...@ververica.com.invalid> > > > wrote: > > > > > > > Hi Dong, > > > > > > > > Thanks for the proposal! The FLIP is already in good shape. I got > some > > > NIT > > > > questions. > > > > > > > > 1. It is a little bit weird to write the hint right after the > > motivation > > > > that some features have been moved to FLIP-331, because at that time, > > > > readers don't know the context about what features does it mean. I > > would > > > > suggest moving the note to the beginning of "Public interfaces" > > sections. > > > > > > > > > > Given that the reviewer who commented on this email thread before I > > > refactored the FLIP (i.e. Piotr) has read FLP-331, I think it is > simpler > > to > > > just remove any mention of FLIP-331. I have updated the FLIP > accordingly. > > > > > > > > > > 2. It is also a little bit weird to describe all behaviour changes at > > > first > > > > but only focus on one single feature, i.e. how to implement > > > > internalSorterSupported. TBH, I was lost while I was reading the > Public > > > > interfaces. Maybe change the FLIP title? Another option could be to > > > write a > > > > short summary of all features and point out that this FLIP will only > > > focus > > > > on the internalSorterSupported feature. Others could be found in > > > FLIP-331. > > > > WDYT? > > > > > > > > > > Conceptually, the purpose of this FLIP is to allow a stream mode job to > > run > > > parts of the topology in batch mode so that it can apply > > > optimizations/computations that can not be used together with > > checkpointing > > > (and thus not usable in stream mode). Although internal sorter is the > > only > > > optimization immediately supported in this FLIP, this FLIP lays the > > > foundation to support other optimizations in the future, such as using > > GPU > > > to process a bounded stream of records. > > > > > > Therefore, I find it better to keep the current title rather than > > limiting > > > the scope to internal sorter. What do you think? > > > > > > > > > > > > > 3. There should be a typo at 4) Checkpoint and failover strategy -> > > Mixed > > > > mode -> > > > > > > > > - If any task fails when isBacklog=false true, this task is > > restarted > > > to > > > > re-process its input from the beginning. > > > > > > > > > > > Thank you for catching this issue. It is fixed now. > > > > > > Best, > > > Dong > > > > > > > > > > > > > > > > > > Best regards > > > > Jing > > > > > > > > > > > > On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <lindon...@gmail.com> wrote: > > > > > > > > > Hi Piotr, > > > > > > > > > > Thanks for your comments! Please see my reply inline. > > > > > > > > > > On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski < > > > piotr.nowoj...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > I have a couple of questions. > > > > > > > > > > > > Could you explain why those properties > > > > > > > > > > > > @Nullable private Boolean isOutputOnEOF = null; > > > > > > @Nullable private Boolean isOutputOnCheckpoint = null; > > > > > > @Nullable private Boolean isInternalSorterSupported = null; > > > > > > > > > > > > must be `@Nullable`, instead of having the default value set to > > > > `false`? > > > > > > > > > > > > > > > > By initializing these private variables in > OperatorAttributesBuilder > > as > > > > > null, we can implement `OperatorAttributesBuilder#build()` in such > a > > > way > > > > > that it can print DEBUG level logging to say "isOutputOnCheckpoint > is > > > not > > > > > explicitly set". This can help user/SRE debug performance issues > (or > > > lack > > > > > of the expected optimization) due to operators not explicitly > setting > > > the > > > > > right operator attribute. > > > > > > > > > > For example, we might want a job to always use the longer > > checkpointing > > > > > interval (i.e. execution.checkpointing.interval-during-backlog) if > > all > > > > > running operators have isOutputOnCheckpoint==false, and use the > short > > > > > checkpointing interval otherwise. If a user has explicitly > configured > > > the > > > > > execution.checkpointing.interval-during-backlog but the two-phase > > > commit > > > > > sink library has not been upgraded to set > isOutputOnCheckpoint=true, > > > then > > > > > the job will end up using the long checkpointing interval, and it > > will > > > be > > > > > useful to figure out what is going wrong in this case by checking > the > > > > log. > > > > > > > > > > Note that the default value of these fields of the > OperatorAttributes > > > > > instance built by OperatorAttributesBuilder will still be false. > The > > > > > following is mentioned in the Java doc of > > > > > `OperatorAttributesBuilder#build()`: > > > > > > > > > > /** > > > > > * If any operator attribute is null, we will log it at DEBUG > level > > > and > > > > > use the following > > > > > * default values. > > > > > * - isOutputOnEOF defaults to false > > > > > * - isOutputOnCheckpoint defaults to false > > > > > * - isInternalSorterSupported defaults to false > > > > > */ > > > > > > > > > > > > > > > > > > > > > > Second question, have you thought about cases where someone is > > > > > > either bootstrapping from a streaming source like Kafka > > > > > > or simply trying to catch up after a long period of downtime in a > > > > purely > > > > > > streaming job? Generally speaking a cases where > > > > > > user doesn't care about latency in the catch up phase, regardless > > if > > > > the > > > > > > source is bounded or unbounded, but wants to process > > > > > > the data as fast as possible, and then switch dynamically to real > > > time > > > > > > processing? > > > > > > > > > > > > > > > > Yes, I have thought about this. We should allow this job to > > effectively > > > > run > > > > > in batch mode when the job is in the catch-up phase. FLIP-327 is > > > actually > > > > > an important step toward addressing this use-case. > > > > > > > > > > In order to address the above use-case, all we need is a way for > > source > > > > > operator (e.g. Kafka) to tell Flink runtime (via > IsProcessingBacklog) > > > > > whether it is in the catch-up phase. > > > > > > > > > > Since every Kafka message has event-timestamp, we can allow users > to > > > > > specify a job-level config such as backlog-watermark-lag-threshold, > > and > > > > > consider a Kafka Source to have IsProcessingBacklog=true if > > > system_time - > > > > > watermark > backlog-watermark-lag-threshold. This effectively > allows > > us > > > > to > > > > > determine whether Kafka is in the catch up phase. > > > > > > > > > > Once we have this capability (I plan to work on this in FLIP-328), > we > > > can > > > > > directly use the features proposed in FLIP-325 and FLIP-327 to > > optimize > > > > the > > > > > above use-case. > > > > > > > > > > What do you think? > > > > > > > > > > Best, > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > Piotrek > > > > > > > > > > > > niedz., 2 lip 2023 o 16:15 Dong Lin <lindon...@gmail.com> > > > napisał(a): > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > I am opening this thread to discuss FLIP-327: Support > > stream-batch > > > > > > unified > > > > > > > operator to improve job throughput when processing backlog > data. > > > The > > > > > > design > > > > > > > doc can be found at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data > > > > > > > . > > > > > > > > > > > > > > This FLIP enables a Flink job to initially operate in batch > mode, > > > > > > achieving > > > > > > > high throughput while processing records that do not require > low > > > > > > processing > > > > > > > latency. Subsequently, the job can seamlessly transition to > > stream > > > > mode > > > > > > for > > > > > > > processing real-time records with low latency. Importantly, the > > > same > > > > > > state > > > > > > > can be utilized before and after this mode switch, making it > > > > > particularly > > > > > > > valuable when users wish to bootstrap the job's state using > > > > historical > > > > > > > data. > > > > > > > > > > > > > > We would greatly appreciate any comments or feedback you may > have > > > on > > > > > this > > > > > > > proposal. > > > > > > > > > > > > > > Cheers, > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > >