Hi Kurt, Thanks for the valuable feedback. I think the suggestions you and Jincheng provided are definitely the best execution plan
- Starting with sliding window optimization by exhausting the current public API, there are some components we can leverage or directly reuse from Blink's window operator [1] implementation. - Backward compatibility is definitely an issue as any state changes will probably result in a non-trivial state upgrade. I can definitely follow up with you on this. At the same time I think it is also a good idea to summarize all the use cases that has been discussed so far. This can be very valuable as a reference: To answer the questions "Does the improvements cover most use cases?" and when not covered, "whether introduce some new API can meet the requirements?". I will try to convert the current parent JIRA [2] into one that does not include public API alternation. As you mentioned this will be much more practical way in terms of execution. Many thanks for the suggestions and guidance! -- Rong [1] https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java [2] https://issues.apache.org/jira/browse/FLINK-11454 On Mon, Feb 25, 2019 at 3:44 AM Kurt Young <ykt...@gmail.com> wrote: > Hi Rong, > > Thanks for the detailed summarization! It indeed involves lots of problems > and unanswered questions which is i think not practical to > solve in one shot. From my point of view, the performance issue with the > sliding window is the root one and maybe most possible > which user will run into. Thus i think the first question should be > answered is: > "What kind of improvements we can do with the sliding window scenario?" > > First, we should try to figure out how many improvements can be done > without changing or adding new API. This can reuse the POC you did > and the work blink had been done, we can share some ideas and maybe some > implementation details. And it already involves lots of efforts > even if we only do this one thing. It may introduce some refactory to > current window operator, and we should keep it compatible with old version. > > After this, we can release it in next version and gather some users > feedbacks. We can further answer the question: "Does the improvements cover > most use cases? Are there any critical ones which is impossible to do with > current window operator?". At that time, we can open the discussions to > introduce some new API to meet the requirements. > > It will introduce more work than improve window operator internally when we > decide to add new APIs, which you have covered a lot in your proposal. > Actually, the approaches you proposed looks good to me, take it step by > step is a more practical way. > > Best, > Kurt > > > On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <walter...@gmail.com> wrote: > > > Hi All, > > > > Thanks for sharing feedbacks for the window optimization design doc and > on > > the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very > > valuable feedbacks and we will try to incorporate them in the next step. > > > > There were several revision done for the current design doc, and several > > POCs being developed since we initially shared the document. Thus, some > of > > the following might’ve already been addressed in other places. However, I > > would still like to summarize them since these points were raised up in > > various scenarios. > > > > 1) Scope of the window optimization using slicing. > > > > The original scope of the doc was to address the problem of > > element-to-window duplication when sliding window goes across wide range > > with narrow slides (with or w/o efficient partial aggregations). However, > > as we develop the 2 POCs [1,2] we found out there’s really no > > one-solution-fits-all approach to how this can be optimized (same > > observation as Kurt and Jark mentioned). Thus some further expansion of > the > > scope was done: > > > > 1a). Directly improving WindowOperator([1, 3]) > > > > In the design doc, the PartialWindowFunction was designed as a cue for > > WindowOperator to choose how to optimize the sliding window. This was not > > working well because: how efficient window operator process messages > > depends on: 1. the pattern of the window; 2. the pattern of the incoming > > traffic; and 3. the complexity of the window process function. > > > > The idea of having users to specify the complexity of each API (e.g. > > accumulate, merge, retract) can help the system choose the path of > > optimization, however as this stage the it is hard to utilize this > > information. > > > > 1b). Explicitly allow slicing and merging steps([2]), or more generic > > window chaining: > > > > With the second POC, it is users' responsibility to create an efficient > > window operator(s), since now user can control the state and the process > > individually. This opens up more possibilities: e.g. unaligned / > > multi-timeout session window, cascade windows. > > > > However, with this API change, users are given much more flexibilities. > > > > In conclusion to (1), I think one of the clear feedbacks is that this > > effort goes beyond just improving the sliding window performance. If we > > think it is the right way to expand the window API, there should be a > > proper API change discussion: As Kurt mentioned, if any of these cases > can > > be covered with existing API, we should always exhaust that first. > (Further > > discussion in (2)) > > > > > > 2). The motivation of changing the public DataStream APIs related to > > window-based processing. > > > > As Jincheng and Kurt mentioned, utilizing the current windowed stream API > > can achieve some of the results. For example, using a tumble follow by a > > sliding window can reduce the amount of duplication since the tumbling > > window results will be passed to the sliding window as one element. > > However, when we experiment, the are several problems I observed: > > > > 2a). rehashing with extra keyBy > > > > The limitation requires a separate set of keyBy following the tumble > > result, which creates unnecessary complexity. One work around was to use > > "reinterpretAsKeyedStream" as Fabian mentioned, however there are many > > investigation to be done to ensure it works well (e.g. how does the keyed > > state store work since the key was not sets as a part of the namespace > > within WindowOperator) > > > > 2b). Handling iterable elements > > > > Another problem is if there’s no efficient partial results produce by the > > tumbling window, the output element (as an iterable) will still be > > duplicated into each of the downstream sliding window states. > > > > However, this is actually a tricky problem by itself, further discussion > in > > (2d) for this piece. > > > > 2c). Additional support for some of the more complex use cases > > > > As described in (1b), with the explicit control of the window state and > the > > window function, users can control how the optimization of the windowing > > can be done. One will argue that these type of optimization can also be > > done using ProcessFunction or other rich functions. > > > > However, if we take a look at how the WindowOperator is currently > > structured, it also has explicit aggregation + windowing APIs [4] > presented > > for the WindowedStream. > > > > Another point is opening up: multi-slide windowing and multi-timeout > window > > described in the presentation [5] in FF. > > > > I would also like to raise a point that the way how the current POC [2] > > changes the public API is just a proof of concept that the flexibilities > > opens more complex use cases. It is by no means the final design and I > > agree that there could be more-aggressive-than-necessary changes since > it’s > > meant for a POC. > > > > 2d). The support for same-window operator (similar to Table/SQL > > OVER-aggregate) > > > > As discussed with Jincheng in a separate thread. Supporting the same > window > > operator similar to how OVER-aggregate was achieved in Table/SQL API. It > is > > definitely useful in some of the window use cases (2b). For example, A > > process function without efficient partial aggregation results on sliding > > window can be very efficiently evaluated since both add and retract > > operation on an iterable list is cheap. > > > > There are some fundamental difference between the current window operator > > vs the same-window operator concept, for example the trigger mechanism is > > entirely different; and it will be a challenge to incorporate > > OVER-aggregate of TableAPI since there are many table specific components > > in the bounded/unbounded over aggregates. However, I am seeing this as an > > implementation details (further discussion in (3a)). > > > > In conclusion to (2), I think the discussions remains whether to expand > the > > API if we introduce sliding: if no, whether we can exhaust all the use > > cases with current API; if yes, how that extra API will look like. > > Especially the same-window operation in the DataStream API. (Utilizing > > whatever we already have in Table/SQL is one solution). > > > > > > 3). Incorporating Existing POCs and Implementations (e.g. Blink’s > > WindowOperator & SQL/Table API) > > > > There are already some implementations exist resembles the idea of > slicing. > > For example Blink’s WindowOperator [3]. My thinking is once we agree on > the > > API and the public facing changes, we should reuse as much of the current > > implementations as possible, since Blink has been running in production > for > > years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ). > > > > > > Regarding the implementation details and some existing approaches. > > > > 3a). Over-aggregate vs. window-operator. > > > > Over aggregate has already been implemented in Table/SQL API. Unlike > window > > operators, over aggregate utilizes the process function and a general > > process operator approach. > > > > Further looking into how the current window operator works comparing with > > the over aggregate, there are many *common implementations*. For example, > > the window state (especially the list-based approach); the triggering > > mechanism / timer service (especially on rowtime); and the “evicting > > policies”. > > > > I was wondering if it is possible to consider, on an operator level, > > support: 1. The same-window operator concept as a generic case of the > > over-aggregate, and 2. Create a more comment window operator base that > > generalize the state, trigger and eviction context. > > > > 3b). The support for partial process function: retracting vs. merge. > > > > Supporting retracting operations is definitely a plus in some use cases, > > for example like Fabian suggested in the JIRA discussion [6]. Currently > > this already exist in the Table/SQL API aggregation function. > > > > 3c). States and Chaining Strategy > > > > With the chaining strategy [7] in the operators, I was wondering whether > > there’s performance consideration when implementing the sliding window as > > one operator, or multiple operator chaining together. One question I > think > > worth investigating is what is the most efficient way of storing the > > slicing information like Jark suggested in the JIRA discussion [8]. > > > > > > As I tried to capture as many discussions here as possible, I can hardly > > imagine that we’ve address all of these questions. So please share your > > comment and feedbacks as they are highly appreciated! I can definitely > > volunteer will start a design/discussion doc for this effort if everyone > > thinks it is suitable to move forward. > > > > > > Thanks, > > > > Rong > > > > > > > > [1] > > > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m > > > > [2] > > > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y > > > > [3] > > > > > https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java > > > > [4] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction > > > > [5] > > > > > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > > > [6] https://issues.apache.org/jira/browse/FLINK-11454 > > > > [7] > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups > > [8] https://issues.apache.org/jira/browse/FLINK-7001 > > > > On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <ykt...@gmail.com> wrote: > > > > > Hi Rong, > > > > > > Thanks for the improvement proposal, this topic aroused my interest > since > > > we did some similar improvements in Blink. > > > After going through your design doc, i would like share some thoughts: > > > > > > 1. It looks to me the proposed SliceManager and MergeManager is quite > > > generic and can be used in various situations like unaligned windows. > > > However > > > this will introduce some new API and some new stream operators, and > thus > > > make is a big effort to do. I suppose the main reason to introduce > these > > > concepts is > > > we want to support some complex scenarios like unaligned windows, multi > > > timeout session windows and so on. But if let's go back to the original > > > motivation of this > > > improvement, it will be "Highly overlapping window operation" and > "Window > > > functions with efficient methods calculating and merging partial > > results", > > > just as you mentioned > > > in the section "Target Usage Pattern". I'm curious if it's necessary to > > > introduce all these new APIs and operators to meet these requirements. > > If i > > > understand you correctly, > > > we may even can achieve the goal by using two successive windows, first > > > with tumbling window, and then sliding window. And from the experiences > > we > > > had in Blink, by adding > > > some enhancements to original window operator may also be sufficient. > > Does > > > it make sense to you if we can first try to improve the target scenario > > > without adding new APIs and > > > operators? It will definitely make it more easy to review and merged. > > > > > > 2. From what we observed after Blink did similar improvements, slice or > > > pane based improvement is not a silver bullet for all kinds of > > situations. > > > In most cases, we only > > > observed small performance gain like 20% or 30%. The main reason is, if > > we > > > want get big gain from this improvement, normally we require the window > > > function have efficient > > > calculating and merging methods, like SUM, COUNT. If this assumption > > > stands, the basic approach of window operator will also be performant. > It > > > stored compact intermedia result for > > > each window per key, and it's also very efficient to get result based > on > > > this. The only downside is we need to do the calculation in multiply > > > windows per key, but based on the > > > assumption, the calculation is efficient and may not be a blocker. > Based > > on > > > this, i think we must be more careful about the changes, especially API > > > related. > > > > > > 3. The last thing in my mind is whether we can share some > implementations > > > with blink's window operator. We introduced a new window operator in > > Blink > > > SQL, which i think needs to be > > > discussed when we try to adopt blink improvements. Without strong > > reasons, > > > i think it's better to share some implementations with DataStream's > > window > > > operator. Maybe we don't need > > > to share the whole operator, but at least some underlying concepts and > > > data structures. It would also be great if you can think about it and > see > > > if it's a feasible way. > > > > > > Last small question: how do you handle sliding windows whose window > size > > > can't be divided by step, such as 10 seconds window and slide with 3 > > > seconds? > > > > > > Best, > > > Kurt > > > > > > > > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <fhue...@gmail.com> > wrote: > > > > > > > Thank you Rong! > > > > The performance of sliding windows is an issue for many users. > > > > Adding support for a more efficient window is a great effort. > > > > > > > > Thank you, > > > > Fabian > > > > > > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong < > > > walter...@gmail.com > > > > >: > > > > > > > > > Hi all, > > > > > > > > > > Thanks for the feedbacks and suggestions to the design doc. I have > > > > created > > > > > a parent JIRA [1] to track the related tasks and started the > > > > implementation > > > > > process > > > > > Any further feedbacks or suggestions are highly appreciated. > > > > > > > > > > Best, > > > > > Rong > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276 > > > > > > > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <walter...@gmail.com> > > wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > Various discussion in the mailing list & JIRA tickets [2] had > been > > > > > brought > > > > > > up in the past regarding the windowing operation performance. As > we > > > > > > experiment internally with some of our extreme use cases, we > found > > > out > > > > > that > > > > > > using a slice-based implementation can optimize Flink's windowing > > > > > mechanism > > > > > > and provide a better performance in most cases. > > > > > > > > > > > > We've put together a preliminary enhancement and performance > > > > optimization > > > > > > plan [1] for the current windowing operation in Flink. This is > > > largely > > > > > > inspired by stream slicing research shared in recent Flink > Forward > > > > > > conference [3] by Philip and Jonas, and the discussion in the > main > > > JIRA > > > > > > ticket [2]. The initial design and POC implementations consider > > > > > optimizing > > > > > > the performance for the category of overlapping windows as well > as > > > > > allowing > > > > > > chaining of cascade window operators. > > > > > > > > > > > > It will be great to hear the feedbacks and suggestions from the > > > > > community. > > > > > > Please kindly share your comments and suggestions. > > > > > > > > > > > > Thanks, > > > > > > Rong > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001 > > > > > > [3] > > > > > > > > > > > > > > > > > > > > > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >