Hi all, I would like to share some updates on FLIP-327. Dong and I have had a series of discussions and have made several refinements to the FLIP.
The major change to the FLIP is to allow the input of the one-input operator to be automatically sorted during backlog processing. When combined with the state backend optimization introduced in FLIP-325 [1], all the keyed single-input operators can achieve similar performance as in batch mode during backlog processing without any code change to the operator. We also implemented a POC[2] and conducted benchmark[3] using the KeyedStream#reduce operation. The benchmark results demonstrate the performance gains that this FLIP can offer. I am looking forward to any comments or feedback you may have on this FLIP. Best, Xuannan [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access [2] https://github.com/Sxnan/flink/tree/FLIP-327-demo [3] https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java > On Aug 18, 2023, at 21:28, Dong Lin <lindon...@gmail.com> wrote: > > Hi Piotr, > > Thanks for the explanation. > > To recap our offline discussion, there is a concern regarding the > capability to dynamically switch between stream and batch modes. This > concern is around unforeseen behaviors such as bugs or performance > regressions, which we might not yet be aware of yet. The reason for this > concern is that this feature involves a fundamental impact on the Flink > runtime's behavior. > > Due to the above concern, I agree it is reasonable to annotate related APIs > as experimental. This step would provide us with the flexibility to modify > these APIs if issues arise in the future. This annotation also serves as a > note to users that this functionality might not perform well as expected. > > Though I believe that we can ensure the reliability of this feature through > good design and code reviews, comprehensive unit tests, and thorough > integration testing, I agree that it is reasonable to be extra cautious in > this case. Also, it should be OK to delay making these APIs as > non-experimental by 1-2 releases. > > I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs in these docs > as experimental. Please let me know if you think any other API should also > be marked as experimental. > > Thanks! > Dong > > On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski <piotr.nowoj...@gmail.com> > wrote: > >> Hi Dong, >> >> Operators API is unfortunately also our public facing API and I mean the >> APIs that we will add there should also be marked `@Experimental` IMO. >> >> The config options should also be marked as experimental (both >> annotated @Experimental and noted the same thing in the docs, >> if @Experimental annotation is not automatically mentioned in the docs). >> >>> Alternatively, how about we add a doc for >> checkpointing.interval-during-backlog explaining its impact/concern as >> discussed above? >> >> We should do this independently from marking the APIs/config options as >> `@Experimental` >> >> Best, >> Piotrek >> >> pt., 11 sie 2023 o 14:55 Dong Lin <lindon...@gmail.com> napisał(a): >> >>> Hi Piotr, >>> >>> Thanks for the reply! >>> >>> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski <piotr.nowoj...@gmail.com >>> >>> wrote: >>> >>>> Hi, >>>> >>>> Sorry for the long delay in responding! >>>> >>>>> Given that it is an optional feature that can be >>>>> turned off by users, it might be OK to just let users try it out and >> we >>>> can >>>>> fix performance issues once we detect any of them. What do you think? >>>> >>>> I think it's fine. It would be best to mark this feature as >> experimental, >>>> and >>>> we say that the config keys or the default values might change in the >>>> future. >>>> >>> >>> In general I agree we can mark APIs that determine "whether to enable >>> dynamic switching between stream/batch mode" as experimental. >>> >>> However, I am not sure we have such an API yet. The APIs added in this >> FLIP >>> are intended to be used by operator developers rather than end users. End >>> users can enable this capability by setting >>> execution.checkpointing.interval-during-backlog = Long.MAX and uses a >>> source which might implicitly set backlog statu (e.g. HybridSource). So >>> execution.checkpointing.interval-during-backlog is the only user-facing >>> APIs that can always control whether this feature can be used. >>> >>> However, execution.checkpointing.interval-during-backlog itself is not >> tied >>> to FLIP-327. >>> >>> Do you mean we should set checkpointing.interval-during-backlog as >>> experimental? Alternatively, how about we add a doc for >>> checkpointing.interval-during-backlog explaining its impact/concern as >>> discussed above? >>> >>> Best, >>> Dong >>> >>> >>>>> Maybe we can revisit the need for such a config when we >>> introduce/discuss >>>>> the capability to switch backlog from false to true in the future. >> What >>>> do >>>>> you think? >>>> >>>> Sure, we can do that. >>>> >>>> Best, >>>> Piotrek >>>> >>>> niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com> napisał(a): >>>> >>>>> Hi Piotr, >>>>> >>>>> Thanks a lot for the explanation. Please see my reply inline. >>>>> >>>>> On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski < >>>> piotr.nowoj...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Dong, >>>>>> >>>>>> Thanks a lot for the answers. I can now only briefly answer your >> last >>>>>> email. >>>>>> >>>>>>> It is possible that spilling to disks might cause larger >> overhead. >>>> IMO >>>>> it >>>>>>> is an orthogonal issue already existing in Flink. This is >> because a >>>>> Flink >>>>>>> job running batch mode might also be slower than its throughput >> in >>>>> stream >>>>>>> mode due to the same reason. >>>>>> >>>>>> Yes, I know, but the thing that worries me is that previously only >> a >>>> user >>>>>> alone >>>>>> could decide whether to use batch mode or streaming, and in >> practice >>>> one >>>>>> user would rarely (if ever) use both for the same >> problem/job/query. >>> If >>>>> his >>>>>> intention was to eventually process live data, he was using >> streaming >>>>> even >>>>>> if there was a large backlog at the start (apart of some very few >>> very >>>>>> power >>>>>> users). >>>>>> >>>>>> With this change, we want to introduce a mode that would be >> switching >>>>> back >>>>>> and forth between streaming and "batch in streaming" automatically. >>> So >>>> a >>>>>> potential performance regression would be much more visible and >>> painful >>>>>> at the same time. If batch query runs slower then it could, it's >> kind >>>> of >>>>>> fine as >>>>>> it will end at some point. If streaming query during large back >>>> pressure >>>>>> maybe >>>>>> temporary load spike switches to batch processing, that's a bigger >>>> deal. >>>>>> Especially if batch processing mode will not be able to actually >> even >>>>>> handle >>>>>> the normal load, after the load spike. In that case, the job could >>>> never >>>>>> recover >>>>>> from the backpressure/backlog mode. >>>>>> >>>>> >>>>> I understand you are concerned with the risk of performance >> regression >>>>> introduced due to switching to batch mode. >>>>> >>>>> After thinking about this more, I think this existing proposal meets >>> the >>>>> minimum requirement of "not introducing regression for existing >> jobs". >>>> The >>>>> reason is that even if batch mode can be slower than stream mode for >>> some >>>>> operators in some cases, this is an optional feature that will only >> be >>>>> enabled if a user explicitly overrides the newly introduced config to >>>>> non-default values. Existing jobs that simply upgrade their Flink >>> library >>>>> version will not suffer any performance regression. >>>>> >>>>> More specifically, in order to switch to batch mode, users will need >> to >>>>> explicitly set execution.checkpointing.interval-during-backlog to 0. >>> And >>>>> users can always explicitly update >>>>> execution.checkpointing.interval-during-backlog to turn off the batch >>>> mode >>>>> if that incurs any performance issue. >>>>> >>>>> As far as I can tell, for all practical workloads we see in >> production >>>>> jobs, batch mode is always faster (w.r.t. throughput) than stream >> mode >>>> when >>>>> there is a high backlog of incoming records. Though it is still >>>>> theoretically possible, it should be very rare (if any) for batch >> mode >>> to >>>>> be slower in practice. Given that it is an optional feature that can >> be >>>>> turned off by users, it might be OK to just let users try it out and >> we >>>> can >>>>> fix performance issues once we detect any of them. What do you think? >>>>> >>>>> >>>>>> >>>>>>> execution.backlog.use-full-batch-mode-on-start (default false) >>>>>> >>>>>> ops sorry, it was supposed to be sth like: >>>>>> >>>>>> execution.backlog.use-batch-mode-only-on-start (default false) >>>>>> >>>>>> That option would disallow switching from streaming to batch. Batch >>>> mode >>>>>> would be allowed only to get rid of the initial, present on >> start-up >>>>>> backlog. >>>>>> >>>>>> Would allow us to safely experiment with switching from streaming >> to >>>>> batch >>>>>> and I would be actually more fine in enabling "using batch mode on >>>> start" >>>>>> by default, until we gain confidence and feedback that switching >>> back & >>>>>> forth >>>>>> is working as expected. >>>>>> >>>>> >>>>> Now I understand what you are suggesting. I agree that it is >> necessary >>>> for >>>>> users to be able to disallow switching from streaming to batch. >>>>> >>>>> I am not sure it is necessary to introduce an extra config just for >>> this >>>>> purpose. The reason is that we don't have any strategy that switches >>>>> backlog status from false to true yet. And when we have such strategy >>>> (e.g. >>>>> FLIP-328) in the future, it is very likely that we will introduce >> extra >>>>> config(s) for users to explicitly turn on such a feature. That means >>> user >>>>> should be able to turn off this feature even if we don't have >> something >>>>> like execution.backlog.use-batch-mode-only-on-start. >>>>> >>>>> Maybe we can revisit the need for such a config when we >>> introduce/discuss >>>>> the capability to switch backlog from false to true in the future. >> What >>>> do >>>>> you think? >>>>> >>>>> >>>>>> >>>>>>>> Or we could limit the scope of this FLIP to only support >> starting >>>> with >>>>>>>> batch mode and switching only once to >>>>>>>> streaming, and design a follow up with switching back and forth? >>>>>>> >>>>>>> Sure, that sounds good to me. I am happy to split this FLIP into >>> two >>>>>> FLIPs >>>>>>> so that we can make incremental progress. >>>>>> >>>>>> Great, let's do that. In a follow up FLIP we could restart the >>>> discussion >>>>>> about >>>>>> switching back and forth. >>>>>> >>>>> >>>>> Cool, I added the following statement to the motivation section. >>>>> >>>>> "NOTE: this FLIP focuses only on the capability to switch from batch >> to >>>>> stream mode. If there is any extra API needed to support switching >> from >>>>> stream to batch mode, we will discuss them in a follow-up FLIP." >>>>> >>>>> I am looking forward to reading your follow-up thoughts! >>>>> >>>>> Best, >>>>> Dong >>>>> >>>>> >>>>>> Piotrek >>>>>> >>>>>> czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com> >> napisał(a): >>>>>> >>>>>>> Hi Piotr, >>>>>>> >>>>>>> Thank you for the very detailed comments! Please see my reply >>> inline. >>>>>>> >>>>>>> On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski < >>>>>> piotr.nowoj...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Dong, >>>>>>>> >>>>>>>> I have a couple of follow up questions about switching back and >>>> forth >>>>>>>> between streaming and batching mode. >>>>>>>> Especially around shuffle/watermark strategy, and keyed state >>>>> backend. >>>>>>>> >>>>>>>> First of all, it might not always be beneficial to switch into >>> the >>>>>> batch >>>>>>>> modes: >>>>>>>> - Shuffle strategy >>>>>>>> - Is sorting going to be purely in-memory? If not, >> obviously >>>>>> spilling >>>>>>>> to disks might cause larger overheads >>>>>>>> compared to not sorting the records. >>>>>>>> >>>>>>> >>>>>>> Sorting might require spilling data to disk depending on the >> input >>>>> size. >>>>>>> The behavior of sorting w.r.t. memory/disk is expected to be >>> exactly >>>>> the >>>>>>> same as the behavior of input sorting automatically performed by >>>> Flink >>>>>>> runtime in batch mode for keyed inputs. >>>>>>> >>>>>>> More specifically, ExternalSorter >>>>>>> < >>>>>>> >>>>>> >>>>> >>>> >>> >> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java >>>>>>>> >>>>>>> is >>>>>>> currently used to sort keyed inputs in batch mode. It is >>>> automatically >>>>>> used >>>>>>> by Flink runtime in OneInputStreamTask (here >>>>>>> < >>>>>>> >>>>>> >>>>> >>>> >>> >> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114 >>>>>>>> ) >>>>>>> and in MultiInputSortingDataInput (here >>>>>>> < >>>>>>> >>>>>> >>>>> >>>> >>> >> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188 >>>>>>>> ). >>>>>>> We plan to re-use the same code/mechanism to do sorting. >>>>>>> >>>>>>> It is possible that spilling to disks might cause larger >> overhead. >>>> IMO >>>>> it >>>>>>> is an orthogonal issue already existing in Flink. This is >> because a >>>>> Flink >>>>>>> job running batch mode might also be slower than its throughput >> in >>>>> stream >>>>>>> mode due to the same reason. However, even though it is possible >> in >>>>>> theory, >>>>>>> I expect that in practice the throughput of using sorting + >>>>>>> BatchExecutionKeyedStateBackend should be much higher than using >>>> other >>>>>>> keyed statebackends when the amount of data is large. As a matter >>> of >>>>>> fact, >>>>>>> we have not heard of complaints of such performance regression >>> issues >>>>> in >>>>>>> batch mode. >>>>>>> >>>>>>> The primary goal of this FLIP is to allow the operator to run at >>> the >>>>> same >>>>>>> throughput (in stream mode when there is backlog) as it can >>> currently >>>>> do >>>>>> in >>>>>>> batch mode. And this goal is not affected by the disk overhead >>> issue >>>>>>> mentioned above. >>>>>>> >>>>>>> I am thinking maybe we can treat it as an orthogonal performance >>>>>>> optimization problem instead of solving this problem in this >> FLIP? >>>>>>> >>>>>>> - If it will be at least partially in-memory, does Flink have >>>> some >>>>>>>> mechanism to reserve optional memory that >>>>>>>> can be revoked if a new operator starts up? Can this >> memory >>>> be >>>>>>>> redistributed? Ideally we should use as >>>>>>>> much as possible of the available memory to avoid >> spilling >>>>> costs, >>>>>>> but >>>>>>>> also being able to revoke that memory >>>>>>>> >>>>>>> >>>>>>> This FLIP does not support dynamically revoking/redistribuitng >>>> managed >>>>>>> memory used by the ExternalSorter. >>>>>>> >>>>>>> For operators with isInternalSorterSupported = true, we will >>> allocate >>>>> to >>>>>>> this operator execution.sorted-inputs.memory >>>>>>> < >>>>>>> >>>>>> >>>>> >>>> >>> >> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144 >>>>>>>> >>>>>>> amount of managed memory. This is the same as how Flink allocates >>>>> managed >>>>>>> memory to an operator when this operator has keyed inputs in >> batch >>>>> mode. >>>>>>> >>>>>>> Note that this FLIP intends to support operators to sort inputs >>>>> whenever >>>>>>> there is backlog. And there is currently no way for an operator >> to >>>> know >>>>>> in >>>>>>> advance whether there will be no backlog after a given time. So >> it >>>>> seems >>>>>>> simpler to just keep managed memory for such an operator >> throughout >>>> the >>>>>>> lifecycle of this operator, for now. >>>>>>> >>>>>>> Besides, it seems that the lack of ability to dynamically >>>>>>> revoke/redistribute un-used managed memory is an existing issue >> in >>>>> Flink. >>>>>>> For example, we might have two operators sharing the same slot >> and >>>>> these >>>>>>> two operators both use managed memory (e.g. to sort inputs). >> There >>> is >>>>>>> currently no way for one operator to re-use the memory not used >> by >>>> the >>>>>>> other operator. >>>>>>> >>>>>>> Therefore, I think we can treat this as an orthogonal performance >>>>>>> optimization problem which can be addressed separately. What do >> you >>>>>> think? >>>>>>> >>>>>>> >>>>>>>> - Sometimes sorting, even if we have memory to do that, >> might >>>> be >>>>> an >>>>>>>> unnecessary overhead. >>>>>>>> - Watermarks >>>>>>>> - Is holding back watermarks always good? If we have tons >> of >>>> data >>>>>>>> buffered/sorted and waiting to be processed >>>>>>>> with multiple windows per key and many different keys. >>> When >>>> we >>>>>>>> switch back to `isBacklog=false` we >>>>>>>> first process all of that data before processing >>> watermarks, >>>>> for >>>>>>>> operators that are not using sorted input the >>>>>>>> state size can explode significantly causing lots of >>>> problems. >>>>>>> Even >>>>>>>> for those that can use sorting, switching to >>>>>>>> sorting or BatchExecutionKeyedStateBackend is not >> always a >>>>> good >>>>>>>> idea, but keeping RocksDB also can be >>>>>>>> risky. >>>>>>>> >>>>>>> >>>>>>> With the current FLIP, the proposal is to use a sorter only when >>> the >>>>>> inputs >>>>>>> have keys. According to this practice, operators which are not >>> using >>>>>>> sorting should have un-keyed inputs. I believe such an operator >>> will >>>>> not >>>>>>> even use a keyed state backend. Maybe I missed some use-case. Can >>> you >>>>>>> provide a use-case where we will have an operator with un-keyed >>>> inputs >>>>>>> whose state size can explode due to we holding back watermarks? >>>>>>> >>>>>>> For operators with keyed inputs that use sorting, I suppose it is >>>>>> possible >>>>>>> that sorting + BatchExecutionKeyedStateBackend can be worse than >>>> using >>>>>>> RocksDB. But I believe this is very very rare (if possible) in >>> almost >>>>>>> practical usage of Flink. >>>>>>> >>>>>>> Take one step back, if this indeed cause regression for a real >>>>> use-case, >>>>>>> user can set execution.checkpointing.interval-during-backlog to >>>>> anything >>>>>>> other than 0 so that this FLIP will not use >>>>>>> sorter + BatchExecutionKeyedStateBackend even even when there is >>>>> backlog. >>>>>>> >>>>>>> I would hope we can find a way to automatically determine whether >>>> using >>>>>>> sorting + BatchExecutionKeyedStateBackend can be better or worse >>> than >>>>>> using >>>>>>> RocksDB alone. But I could not find a good and reliable way to do >>>> this. >>>>>>> Maybe we can update Flink to do this when we find a good way to >> do >>>> this >>>>>> in >>>>>>> the future? >>>>>>> >>>>>>> >>>>>>> >>>>>>>> - Keyed state backend >>>>>>>> - I think you haven't described what happens during >> switching >>>>> from >>>>>>>> streaming to backlog processing. >>>>>>>> >>>>>>> >>>>>>> Good point. This indeed needs to be described. I added a TODO in >>> the >>>>>>> "Behavior changes ..." section to describe what happens when >>>> isBacklog >>>>>>> switches from false to true, for all >>>> watermark/checkpoint/statebackend >>>>>> etc. >>>>>>> >>>>>>> Let me explain this for the state backend here for now. I will >>> update >>>>>> FLIP >>>>>>> later. >>>>>>> >>>>>>> When isBacklog switches from false to true, operator with keyed >>>> inputs >>>>>> can >>>>>>> optionally (as determined by its implementation) starts to use >>>> internal >>>>>>> sorter to sort inputs by key, without processing inputs or >> updating >>>>>>> statebackend, until it receives end-of-inputs or isBacklog is >>>> switched >>>>> to >>>>>>> false again. >>>>>>> >>>>>>> >>>>>>> >>>>>>>> - Switch can be an unnecessary overhead. >>>>>>> >>>>>>> >>>>>>> I agree it can cause unnecessary overhead, particularly when >>>> isBacklog >>>>>>> switches back and forth frequently. Whether or not this is >>>> unnecessary >>>>>>> likely depends on the duration/throughput of the backlog phase as >>>> well >>>>> as >>>>>>> the specific computation logic of the operator. I am not sure >> there >>>> is >>>>> a >>>>>>> good way for Flink to determine in advance whether switching is >>>>>>> unnecessary. >>>>>>> >>>>>>> Note that for the existing use-case where we expect to change >>>> isBacklog >>>>>> to >>>>>>> true (e.g. MySQL CDC snapshot phase, Kafka source watermark lag >>> being >>>>> too >>>>>>> high), we don't expect the watermark to switch back and force >>>>> frequently. >>>>>>> And user can disable this switch by setting >>>>>>> execution.checkpointing.interval-during-backlog to anything other >>>> than >>>>> 0. >>>>>>> >>>>>>> Therefore, I am wondering if we can also view this as a >> performance >>>>>>> optimization opportunity for extra use-cases in the future, >> rather >>>>> than a >>>>>>> blocking issue of this FLIP for the MVP use-case (e.g. snapshot >>> phase >>>>> for >>>>>>> any CDC source, Kafka watermark lag). >>>>>>> >>>>>>> >>>>>>>> At the same time, in your current proposal, for >>>>>>>> `execution.checkpointing.interval-during-backlog > 0` we won't >>>>>>>> switch to "batch" mode at all. That's a bit of shame, I don't >>>>>> understand >>>>>>>> why those two things should be coupled >>>>>>>> together? >>>>>>>> >>>>>>> >>>>>>> We can in general classify optimizations as those that are >>> compatible >>>>>> with >>>>>>> checkpointing, and those that are not compatible with >>> checkpointing. >>>>> For >>>>>>> example, input sorting is currently not compatible with >>>> checkpointing. >>>>>> And >>>>>>> buffering input records to reduce state backend overhead (and >>>> probably >>>>>>> columnar processing for mini-batch in the future) is compatible >>> with >>>>>>> checkpointing. >>>>>>> >>>>>>> The primary of FLIP-327 is to support optimizations not >> compatible >>>> with >>>>>>> checkpointing. If >> execution.checkpointing.interval-during-backlog > >>>> 0, >>>>>>> which means that user intends to still do checkpointing even when >>>> there >>>>>> is >>>>>>> backog, then we will not be able to support such optimizations. >>>>>>> >>>>>>> For optimizations that are compatible with checkpointing, we can >> do >>>>> this >>>>>>> even when the operator does not run in "batch mode". There are >>> extra >>>>>>> problems to solve in order to achieve this optimization, such as >>>>>> supporting >>>>>>> unaligned checkpointing without prolonging its sync phase. I plan >>> to >>>>>>> explain how this can be done in FLIP-325. >>>>>>> >>>>>>> >>>>>>>> All in all, shouldn't we aim for some more clever process of >>>>> switching >>>>>>> back >>>>>>>> and forth between streaming/batch modes >>>>>>>> for watermark strategy/state backend/sorting based on some >>> metrics? >>>>>>> Trying >>>>>>>> to either predict if switching might help, >>>>>>>> or trying to estimate if the last switch was beneficial? Maybe >>>>>> something >>>>>>>> along the lines: >>>>>>>> - sort only in memory and during sorting count the number of >>>> distinct >>>>>>> keys >>>>>>>> (NDK) >>>>>>>> - maybe allow for spilling if so far in memory we have NDK >> * >>> 5 >>>>> = >>>>>>>> #records >>>>>>>> - do not allow to buffer records above a certain threshold, as >>>>>> otherwise >>>>>>>> checkpointing can explode >>>>>>>> - switch to `BatchExecutionKeyedStateBackend` only if NDK * 2 >>> = >>>>>> #records >>>>>>>> - do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records >>>>>>>> >>>>>>>> Or even maybe for starters something even simpler and then test >>> out >>>>>>>> something more fancy as a follow up? >>>>>>>> >>>>>>> >>>>>>> I agree it is worth investigating these ideas to further optimize >>> the >>>>>>> performance during backlog. >>>>>>> >>>>>>> I just think these can be done independently after this FLIP. The >>>> focus >>>>>> of >>>>>>> this FLIP is to re-use in stream mode the same optimization which >>> we >>>>>>> already use in batch mode, rather than inventing or improving the >>>>>>> performance of these existing optimizations. >>>>>>> >>>>>>> Given that there are already a lot of new mechanism/features to >>>> discuss >>>>>> and >>>>>>> address in this FLIP, I am hoping we can limit the scope of this >>> FLIP >>>>> to >>>>>>> re-use the existing optimization, and do these extra optimization >>>>>>> opportunities as future work. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> At the same time, >>>> `execution.checkpointing.interval-during-backlog=0` >>>>>>> seems >>>>>>>> a weird setting to me, that I would >>>>>>>> not feel safe recommending to anyone. If processing of a >> backlog >>>>> takes >>>>>> a >>>>>>>> long time, a job might stop making >>>>>>>> any progress due to some random failures. Especially dangerous >>> if a >>>>> job >>>>>>> >>>>>>> switches from streaming mode back to >>>>>>>> backlog processing due to some reasons, as that could happen >>> months >>>>>> after >>>>>>>> someone started a job with this >>>>>>>> strange setting. So should we even have it? I would simply >>> disallow >>>>>> it. I >>>>>>>> >>>>>>> >>>>>>> Good point. I do agree we need to further work to improve the >>>> failover >>>>>>> performance in case any task fails. >>>>>>> >>>>>>> As of the current FLIP, if any task fails during backlog and >>>>>>> execution.checkpointing.interval-during-backlog = 0, we will need >>> to >>>>>>> restart all operators to the last checkpointed state and continue >>>>>>> processing backlog. And this can be a lot of rollback since there >>> is >>>> no >>>>>>> checkpoint during backlog. And this can also be worse than batch >>>> since >>>>>> this >>>>>>> FLIP currently does not support exporting/saving records to local >>>> disk >>>>>> (or >>>>>>> shuffle service) so that a failed task can re-consume the records >>>> from >>>>>> the >>>>>>> upstream task (or shuffle service) in the same way as how Flink >>>>> failover >>>>>> a >>>>>>> task in batch mode. >>>>>>> >>>>>>> I think we can extend this FLIP to solve this problem so that it >>> can >>>>> have >>>>>>> at least the same behavior/performance as batch-mode job. The >> idea >>> is >>>>> to >>>>>>> also follow what batch mode does. For example, we can trigger a >>>>>> checkpoint >>>>>>> when isBacklog switches to true, and every operator should buffer >>> its >>>>>>> output in the TM local disk (or remote shuffle service). >> Therefore, >>>>>> after a >>>>>>> task fails, it can restart from the last checkpoint and >> re-consume >>>> data >>>>>>> buffered in the upstream task. >>>>>>> >>>>>>> I will update FLIP as described above. Would this address your >>>> concern? >>>>>>> >>>>>>> >>>>>>> >>>>>>>> could see a power setting like: >>>>>>>> `execution.backlog.use-full-batch-mode-on-start >> (default >>>>>> false)` >>>>>>>> >>>>>>> >>>>>>> I am not sure I fully understand this config or its motivation. >> Can >>>> you >>>>>>> help explain the exact semantics of this config? >>>>>>> >>>>>>> >>>>>>>> that would override any heuristic of switching to backlog if >>>> someone >>>>> is >>>>>>>> submitting a new job that starts with >>>>>>>> `isBacklog=true`. >>>>>>>> >>>>>>>> Or we could limit the scope of this FLIP to only support >> starting >>>>> with >>>>>>>> batch mode and switching only once to >>>>>>>> streaming, and design a follow up with switching back and >> forth? >>>>>>>> >>>>>>> >>>>>>> Sure, that sounds good to me. I am happy to split this FLIP into >>> two >>>>>> FLIPs >>>>>>> so that we can make incremental progress. >>>>>>> >>>>>>> Best, >>>>>>> Dong >>>>>>> >>>>>>> >>>>>>>> I'm looking forwards to hearing/reading out your thoughts. >>>>>>>> >>>>>>>> Best, >>>>>>>> Piotrek >>>>>>>> >>>>>>>> >>>>>>>> śr., 12 lip 2023 o 12:38 Jing Ge <j...@ververica.com.invalid> >>>>>>> napisał(a): >>>>>>>> >>>>>>>>> Hi Dong, >>>>>>>>> >>>>>>>>> Thanks for your reply! >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> Jing >>>>>>>>> >>>>>>>>> On Wed, Jul 12, 2023 at 3:25 AM Dong Lin < >> lindon...@gmail.com> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Jing, >>>>>>>>>> >>>>>>>>>> Thanks for the comments. Please see my reply inline. >>>>>>>>>> >>>>>>>>>> On Wed, Jul 12, 2023 at 5:04 AM Jing Ge >>>>> <j...@ververica.com.invalid >>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Dong, >>>>>>>>>>> >>>>>>>>>>> Thanks for the clarification. Now it is clear for me. I >> got >>>>>>>> additional >>>>>>>>>> noob >>>>>>>>>>> questions wrt the internal sorter. >>>>>>>>>>> >>>>>>>>>>> 1. when to call setter to set the internalSorterSupported >>> to >>>> be >>>>>>> true? >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Developer of the operator class (i.e. those classes which >>>>>> implements >>>>>>>>>> `StreamOperator`) should override the >>>> `#getOperatorAttributes()` >>>>>> API >>>>>>> to >>>>>>>>> set >>>>>>>>>> internalSorterSupported to true, if he/she decides to sort >>>>> records >>>>>>>>>> internally in the operator. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> 2 >>>>>>>>>>> *"For those operators whose throughput can be >> considerably >>>>>> improved >>>>>>>>> with >>>>>>>>>> an >>>>>>>>>>> internal sorter, update it to take advantage of the >>> internal >>>>>> sorter >>>>>>>>> when >>>>>>>>>>> its input has isBacklog=true.* >>>>>>>>>>> *Typically, operators that involve aggregation operation >>>> (e.g. >>>>>>> join, >>>>>>>>>>> cogroup, aggregate) on keyed inputs can benefit from >> using >>> an >>>>>>>> internal >>>>>>>>>>> sorter."* >>>>>>>>>>> >>>>>>>>>>> *"The operator that performs CoGroup operation will >>>> instantiate >>>>>> two >>>>>>>>>>> internal sorter to sorts records from its two inputs >>>>> separately. >>>>>>> Then >>>>>>>>> it >>>>>>>>>>> can pull the sorted records from these two sorters. This >>> can >>>> be >>>>>>> done >>>>>>>>>>> without wrapping input records with TaggedUnion<...>. In >>>>>>> comparison, >>>>>>>>> the >>>>>>>>>>> existing DataStream#coGroup needs to wrap input records >>> with >>>>>>>>>>> TaggedUnion<...> before sorting them using one external >>>> sorter, >>>>>>> which >>>>>>>>>>> introduces higher overhead."* >>>>>>>>>>> >>>>>>>>>>> According to the performance test, it seems that internal >>>>> sorter >>>>>>> has >>>>>>>>>> better >>>>>>>>>>> performance than external sorter. Is it possible to make >>>> those >>>>>>>>> operators >>>>>>>>>>> that can benefit from it use internal sorter by default? >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Yes, it is possible. After this FLIP is done, users can use >>>>>>>>>> DataStream#coGroup with EndOfStreamWindows as the window >>>> assigner >>>>>> to >>>>>>>>>> co-group two streams in effectively the batch manner. An >>>> operator >>>>>>> that >>>>>>>>> uses >>>>>>>>>> an internal sorter will be used to perform the co-group >>>>> operation. >>>>>>>> There >>>>>>>>> is >>>>>>>>>> no need for users of the DataStream API to explicitly know >> or >>>> set >>>>>> the >>>>>>>>>> internal sorter in anyway. >>>>>>>>>> >>>>>>>>>> In the future, we plan to incrementally optimize other >>>>> aggregation >>>>>>>>>> operation (e.g. aggregate) on the DataStream API when >>>>>>>> EndOfStreamWindows >>>>>>>>> is >>>>>>>>>> used as the window assigner. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Dong >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> Jing >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Jul 11, 2023 at 2:58 PM Dong Lin < >>>> lindon...@gmail.com> >>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Jing, >>>>>>>>>>>> >>>>>>>>>>>> Thank you for the comments! Please see my reply inline. >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jul 11, 2023 at 5:41 AM Jing Ge >>>>>>> <j...@ververica.com.invalid >>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the proposal! The FLIP is already in good >>>>> shape. I >>>>>>> got >>>>>>>>>> some >>>>>>>>>>>> NIT >>>>>>>>>>>>> questions. >>>>>>>>>>>>> >>>>>>>>>>>>> 1. It is a little bit weird to write the hint right >>> after >>>>> the >>>>>>>>>>> motivation >>>>>>>>>>>>> that some features have been moved to FLIP-331, >> because >>>> at >>>>>> that >>>>>>>>> time, >>>>>>>>>>>>> readers don't know the context about what features >> does >>>> it >>>>>>> mean. >>>>>>>> I >>>>>>>>>>> would >>>>>>>>>>>>> suggest moving the note to the beginning of "Public >>>>>> interfaces" >>>>>>>>>>> sections. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Given that the reviewer who commented on this email >>> thread >>>>>>> before I >>>>>>>>>>>> refactored the FLIP (i.e. Piotr) has read FLP-331, I >>> think >>>> it >>>>>> is >>>>>>>>>> simpler >>>>>>>>>>> to >>>>>>>>>>>> just remove any mention of FLIP-331. I have updated the >>>> FLIP >>>>>>>>>> accordingly. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> 2. It is also a little bit weird to describe all >>>> behaviour >>>>>>>> changes >>>>>>>>> at >>>>>>>>>>>> first >>>>>>>>>>>>> but only focus on one single feature, i.e. how to >>>> implement >>>>>>>>>>>>> internalSorterSupported. TBH, I was lost while I was >>>>> reading >>>>>>> the >>>>>>>>>> Public >>>>>>>>>>>>> interfaces. Maybe change the FLIP title? Another >> option >>>>> could >>>>>>> be >>>>>>>> to >>>>>>>>>>>> write a >>>>>>>>>>>>> short summary of all features and point out that this >>>> FLIP >>>>>> will >>>>>>>>> only >>>>>>>>>>>> focus >>>>>>>>>>>>> on the internalSorterSupported feature. Others could >> be >>>>> found >>>>>>> in >>>>>>>>>>>> FLIP-331. >>>>>>>>>>>>> WDYT? >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Conceptually, the purpose of this FLIP is to allow a >>> stream >>>>>> mode >>>>>>>> job >>>>>>>>> to >>>>>>>>>>> run >>>>>>>>>>>> parts of the topology in batch mode so that it can >> apply >>>>>>>>>>>> optimizations/computations that can not be used >> together >>>> with >>>>>>>>>>> checkpointing >>>>>>>>>>>> (and thus not usable in stream mode). Although internal >>>>> sorter >>>>>> is >>>>>>>> the >>>>>>>>>>> only >>>>>>>>>>>> optimization immediately supported in this FLIP, this >>> FLIP >>>>> lays >>>>>>> the >>>>>>>>>>>> foundation to support other optimizations in the >> future, >>>> such >>>>>> as >>>>>>>>> using >>>>>>>>>>> GPU >>>>>>>>>>>> to process a bounded stream of records. >>>>>>>>>>>> >>>>>>>>>>>> Therefore, I find it better to keep the current title >>>> rather >>>>>> than >>>>>>>>>>> limiting >>>>>>>>>>>> the scope to internal sorter. What do you think? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> 3. There should be a typo at 4) Checkpoint and >> failover >>>>>>> strategy >>>>>>>> -> >>>>>>>>>>> Mixed >>>>>>>>>>>>> mode -> >>>>>>>>>>>>> >>>>>>>>>>>>> - If any task fails when isBacklog=false true, >> this >>>> task >>>>>> is >>>>>>>>>>> restarted >>>>>>>>>>>> to >>>>>>>>>>>>> re-process its input from the beginning. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> Thank you for catching this issue. It is fixed now. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Dong >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Best regards >>>>>>>>>>>>> Jing >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jul 6, 2023 at 1:24 PM Dong Lin < >>>>> lindon...@gmail.com >>>>>>> >>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Piotr, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for your comments! Please see my reply >> inline. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski < >>>>>>>>>>>> piotr.nowoj...@gmail.com >>>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a couple of questions. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Could you explain why those properties >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Nullable private Boolean isOutputOnEOF = >> null; >>>>>>>>>>>>>>> @Nullable private Boolean >> isOutputOnCheckpoint >>> = >>>>>> null; >>>>>>>>>>>>>>> @Nullable private Boolean >>>>> isInternalSorterSupported = >>>>>>>> null; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> must be `@Nullable`, instead of having the >> default >>>>> value >>>>>>> set >>>>>>>> to >>>>>>>>>>>>> `false`? >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> By initializing these private variables in >>>>>>>>>> OperatorAttributesBuilder >>>>>>>>>>> as >>>>>>>>>>>>>> null, we can implement >>>>> `OperatorAttributesBuilder#build()` >>>>>> in >>>>>>>>> such >>>>>>>>>> a >>>>>>>>>>>> way >>>>>>>>>>>>>> that it can print DEBUG level logging to say >>>>>>>>> "isOutputOnCheckpoint >>>>>>>>>> is >>>>>>>>>>>> not >>>>>>>>>>>>>> explicitly set". This can help user/SRE debug >>>> performance >>>>>>>> issues >>>>>>>>>> (or >>>>>>>>>>>> lack >>>>>>>>>>>>>> of the expected optimization) due to operators not >>>>>> explicitly >>>>>>>>>> setting >>>>>>>>>>>> the >>>>>>>>>>>>>> right operator attribute. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For example, we might want a job to always use the >>>> longer >>>>>>>>>>> checkpointing >>>>>>>>>>>>>> interval (i.e. >>>>>>> execution.checkpointing.interval-during-backlog) >>>>>>>>> if >>>>>>>>>>> all >>>>>>>>>>>>>> running operators have isOutputOnCheckpoint==false, >>> and >>>>> use >>>>>>> the >>>>>>>>>> short >>>>>>>>>>>>>> checkpointing interval otherwise. If a user has >>>>> explicitly >>>>>>>>>> configured >>>>>>>>>>>> the >>>>>>>>>>>>>> execution.checkpointing.interval-during-backlog but >>> the >>>>>>>> two-phase >>>>>>>>>>>> commit >>>>>>>>>>>>>> sink library has not been upgraded to set >>>>>>>>>> isOutputOnCheckpoint=true, >>>>>>>>>>>> then >>>>>>>>>>>>>> the job will end up using the long checkpointing >>>>> interval, >>>>>>> and >>>>>>>> it >>>>>>>>>>> will >>>>>>>>>>>> be >>>>>>>>>>>>>> useful to figure out what is going wrong in this >> case >>>> by >>>>>>>> checking >>>>>>>>>> the >>>>>>>>>>>>> log. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Note that the default value of these fields of the >>>>>>>>>> OperatorAttributes >>>>>>>>>>>>>> instance built by OperatorAttributesBuilder will >>> still >>>> be >>>>>>>> false. >>>>>>>>>> The >>>>>>>>>>>>>> following is mentioned in the Java doc of >>>>>>>>>>>>>> `OperatorAttributesBuilder#build()`: >>>>>>>>>>>>>> >>>>>>>>>>>>>> /** >>>>>>>>>>>>>> * If any operator attribute is null, we will log >> it >>>> at >>>>>>> DEBUG >>>>>>>>>> level >>>>>>>>>>>> and >>>>>>>>>>>>>> use the following >>>>>>>>>>>>>> * default values. >>>>>>>>>>>>>> * - isOutputOnEOF defaults to false >>>>>>>>>>>>>> * - isOutputOnCheckpoint defaults to false >>>>>>>>>>>>>> * - isInternalSorterSupported defaults to false >>>>>>>>>>>>>> */ >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Second question, have you thought about cases >> where >>>>>> someone >>>>>>>> is >>>>>>>>>>>>>>> either bootstrapping from a streaming source like >>>> Kafka >>>>>>>>>>>>>>> or simply trying to catch up after a long period >> of >>>>>>> downtime >>>>>>>>> in a >>>>>>>>>>>>> purely >>>>>>>>>>>>>>> streaming job? Generally speaking a cases where >>>>>>>>>>>>>>> user doesn't care about latency in the catch up >>>> phase, >>>>>>>>> regardless >>>>>>>>>>> if >>>>>>>>>>>>> the >>>>>>>>>>>>>>> source is bounded or unbounded, but wants to >>> process >>>>>>>>>>>>>>> the data as fast as possible, and then switch >>>>> dynamically >>>>>>> to >>>>>>>>> real >>>>>>>>>>>> time >>>>>>>>>>>>>>> processing? >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Yes, I have thought about this. We should allow >> this >>>> job >>>>> to >>>>>>>>>>> effectively >>>>>>>>>>>>> run >>>>>>>>>>>>>> in batch mode when the job is in the catch-up >> phase. >>>>>> FLIP-327 >>>>>>>> is >>>>>>>>>>>> actually >>>>>>>>>>>>>> an important step toward addressing this use-case. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In order to address the above use-case, all we need >>> is >>>> a >>>>>> way >>>>>>>> for >>>>>>>>>>> source >>>>>>>>>>>>>> operator (e.g. Kafka) to tell Flink runtime (via >>>>>>>>>> IsProcessingBacklog) >>>>>>>>>>>>>> whether it is in the catch-up phase. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Since every Kafka message has event-timestamp, we >> can >>>>> allow >>>>>>>> users >>>>>>>>>> to >>>>>>>>>>>>>> specify a job-level config such as >>>>>>>>> backlog-watermark-lag-threshold, >>>>>>>>>>> and >>>>>>>>>>>>>> consider a Kafka Source to have >>>> IsProcessingBacklog=true >>>>> if >>>>>>>>>>>> system_time - >>>>>>>>>>>>>> watermark > backlog-watermark-lag-threshold. This >>>>>> effectively >>>>>>>>>> allows >>>>>>>>>>> us >>>>>>>>>>>>> to >>>>>>>>>>>>>> determine whether Kafka is in the catch up phase. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Once we have this capability (I plan to work on >> this >>> in >>>>>>>>> FLIP-328), >>>>>>>>>> we >>>>>>>>>>>> can >>>>>>>>>>>>>> directly use the features proposed in FLIP-325 and >>>>> FLIP-327 >>>>>>> to >>>>>>>>>>> optimize >>>>>>>>>>>>> the >>>>>>>>>>>>>> above use-case. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What do you think? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Dong >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> niedz., 2 lip 2023 o 16:15 Dong Lin < >>>>> lindon...@gmail.com >>>>>>> >>>>>>>>>>>> napisał(a): >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am opening this thread to discuss FLIP-327: >>>> Support >>>>>>>>>>> stream-batch >>>>>>>>>>>>>>> unified >>>>>>>>>>>>>>>> operator to improve job throughput when >>> processing >>>>>>> backlog >>>>>>>>>> data. >>>>>>>>>>>> The >>>>>>>>>>>>>>> design >>>>>>>>>>>>>>>> doc can be found at >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data >>>>>>>>>>>>>>>> . >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This FLIP enables a Flink job to initially >>> operate >>>> in >>>>>>> batch >>>>>>>>>> mode, >>>>>>>>>>>>>>> achieving >>>>>>>>>>>>>>>> high throughput while processing records that >> do >>>> not >>>>>>>> require >>>>>>>>>> low >>>>>>>>>>>>>>> processing >>>>>>>>>>>>>>>> latency. Subsequently, the job can seamlessly >>>>>> transition >>>>>>> to >>>>>>>>>>> stream >>>>>>>>>>>>> mode >>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>> processing real-time records with low latency. >>>>>>> Importantly, >>>>>>>>> the >>>>>>>>>>>> same >>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>> can be utilized before and after this mode >>> switch, >>>>>> making >>>>>>>> it >>>>>>>>>>>>>> particularly >>>>>>>>>>>>>>>> valuable when users wish to bootstrap the job's >>>> state >>>>>>> using >>>>>>>>>>>>> historical >>>>>>>>>>>>>>>> data. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We would greatly appreciate any comments or >>>> feedback >>>>>> you >>>>>>>> may >>>>>>>>>> have >>>>>>>>>>>> on >>>>>>>>>>>>>> this >>>>>>>>>>>>>>>> proposal. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>