Hi All, Thanks for sharing feedbacks for the window optimization design doc and on the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very valuable feedbacks and we will try to incorporate them in the next step.
There were several revision done for the current design doc, and several POCs being developed since we initially shared the document. Thus, some of the following might’ve already been addressed in other places. However, I would still like to summarize them since these points were raised up in various scenarios. 1) Scope of the window optimization using slicing. The original scope of the doc was to address the problem of element-to-window duplication when sliding window goes across wide range with narrow slides (with or w/o efficient partial aggregations). However, as we develop the 2 POCs [1,2] we found out there’s really no one-solution-fits-all approach to how this can be optimized (same observation as Kurt and Jark mentioned). Thus some further expansion of the scope was done: 1a). Directly improving WindowOperator([1, 3]) In the design doc, the PartialWindowFunction was designed as a cue for WindowOperator to choose how to optimize the sliding window. This was not working well because: how efficient window operator process messages depends on: 1. the pattern of the window; 2. the pattern of the incoming traffic; and 3. the complexity of the window process function. The idea of having users to specify the complexity of each API (e.g. accumulate, merge, retract) can help the system choose the path of optimization, however as this stage the it is hard to utilize this information. 1b). Explicitly allow slicing and merging steps([2]), or more generic window chaining: With the second POC, it is users' responsibility to create an efficient window operator(s), since now user can control the state and the process individually. This opens up more possibilities: e.g. unaligned / multi-timeout session window, cascade windows. However, with this API change, users are given much more flexibilities. In conclusion to (1), I think one of the clear feedbacks is that this effort goes beyond just improving the sliding window performance. If we think it is the right way to expand the window API, there should be a proper API change discussion: As Kurt mentioned, if any of these cases can be covered with existing API, we should always exhaust that first. (Further discussion in (2)) 2). The motivation of changing the public DataStream APIs related to window-based processing. As Jincheng and Kurt mentioned, utilizing the current windowed stream API can achieve some of the results. For example, using a tumble follow by a sliding window can reduce the amount of duplication since the tumbling window results will be passed to the sliding window as one element. However, when we experiment, the are several problems I observed: 2a). rehashing with extra keyBy The limitation requires a separate set of keyBy following the tumble result, which creates unnecessary complexity. One work around was to use "reinterpretAsKeyedStream" as Fabian mentioned, however there are many investigation to be done to ensure it works well (e.g. how does the keyed state store work since the key was not sets as a part of the namespace within WindowOperator) 2b). Handling iterable elements Another problem is if there’s no efficient partial results produce by the tumbling window, the output element (as an iterable) will still be duplicated into each of the downstream sliding window states. However, this is actually a tricky problem by itself, further discussion in (2d) for this piece. 2c). Additional support for some of the more complex use cases As described in (1b), with the explicit control of the window state and the window function, users can control how the optimization of the windowing can be done. One will argue that these type of optimization can also be done using ProcessFunction or other rich functions. However, if we take a look at how the WindowOperator is currently structured, it also has explicit aggregation + windowing APIs [4] presented for the WindowedStream. Another point is opening up: multi-slide windowing and multi-timeout window described in the presentation [5] in FF. I would also like to raise a point that the way how the current POC [2] changes the public API is just a proof of concept that the flexibilities opens more complex use cases. It is by no means the final design and I agree that there could be more-aggressive-than-necessary changes since it’s meant for a POC. 2d). The support for same-window operator (similar to Table/SQL OVER-aggregate) As discussed with Jincheng in a separate thread. Supporting the same window operator similar to how OVER-aggregate was achieved in Table/SQL API. It is definitely useful in some of the window use cases (2b). For example, A process function without efficient partial aggregation results on sliding window can be very efficiently evaluated since both add and retract operation on an iterable list is cheap. There are some fundamental difference between the current window operator vs the same-window operator concept, for example the trigger mechanism is entirely different; and it will be a challenge to incorporate OVER-aggregate of TableAPI since there are many table specific components in the bounded/unbounded over aggregates. However, I am seeing this as an implementation details (further discussion in (3a)). In conclusion to (2), I think the discussions remains whether to expand the API if we introduce sliding: if no, whether we can exhaust all the use cases with current API; if yes, how that extra API will look like. Especially the same-window operation in the DataStream API. (Utilizing whatever we already have in Table/SQL is one solution). 3). Incorporating Existing POCs and Implementations (e.g. Blink’s WindowOperator & SQL/Table API) There are already some implementations exist resembles the idea of slicing. For example Blink’s WindowOperator [3]. My thinking is once we agree on the API and the public facing changes, we should reuse as much of the current implementations as possible, since Blink has been running in production for years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ). Regarding the implementation details and some existing approaches. 3a). Over-aggregate vs. window-operator. Over aggregate has already been implemented in Table/SQL API. Unlike window operators, over aggregate utilizes the process function and a general process operator approach. Further looking into how the current window operator works comparing with the over aggregate, there are many *common implementations*. For example, the window state (especially the list-based approach); the triggering mechanism / timer service (especially on rowtime); and the “evicting policies”. I was wondering if it is possible to consider, on an operator level, support: 1. The same-window operator concept as a generic case of the over-aggregate, and 2. Create a more comment window operator base that generalize the state, trigger and eviction context. 3b). The support for partial process function: retracting vs. merge. Supporting retracting operations is definitely a plus in some use cases, for example like Fabian suggested in the JIRA discussion [6]. Currently this already exist in the Table/SQL API aggregation function. 3c). States and Chaining Strategy With the chaining strategy [7] in the operators, I was wondering whether there’s performance consideration when implementing the sliding window as one operator, or multiple operator chaining together. One question I think worth investigating is what is the most efficient way of storing the slicing information like Jark suggested in the JIRA discussion [8]. As I tried to capture as many discussions here as possible, I can hardly imagine that we’ve address all of these questions. So please share your comment and feedbacks as they are highly appreciated! I can definitely volunteer will start a design/discussion doc for this effort if everyone thinks it is suitable to move forward. Thanks, Rong [1] https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m [2] https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y [3] https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction [5] https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing [6] https://issues.apache.org/jira/browse/FLINK-11454 [7] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups [8] https://issues.apache.org/jira/browse/FLINK-7001 On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <ykt...@gmail.com> wrote: > Hi Rong, > > Thanks for the improvement proposal, this topic aroused my interest since > we did some similar improvements in Blink. > After going through your design doc, i would like share some thoughts: > > 1. It looks to me the proposed SliceManager and MergeManager is quite > generic and can be used in various situations like unaligned windows. > However > this will introduce some new API and some new stream operators, and thus > make is a big effort to do. I suppose the main reason to introduce these > concepts is > we want to support some complex scenarios like unaligned windows, multi > timeout session windows and so on. But if let's go back to the original > motivation of this > improvement, it will be "Highly overlapping window operation" and "Window > functions with efficient methods calculating and merging partial results", > just as you mentioned > in the section "Target Usage Pattern". I'm curious if it's necessary to > introduce all these new APIs and operators to meet these requirements. If i > understand you correctly, > we may even can achieve the goal by using two successive windows, first > with tumbling window, and then sliding window. And from the experiences we > had in Blink, by adding > some enhancements to original window operator may also be sufficient. Does > it make sense to you if we can first try to improve the target scenario > without adding new APIs and > operators? It will definitely make it more easy to review and merged. > > 2. From what we observed after Blink did similar improvements, slice or > pane based improvement is not a silver bullet for all kinds of situations. > In most cases, we only > observed small performance gain like 20% or 30%. The main reason is, if we > want get big gain from this improvement, normally we require the window > function have efficient > calculating and merging methods, like SUM, COUNT. If this assumption > stands, the basic approach of window operator will also be performant. It > stored compact intermedia result for > each window per key, and it's also very efficient to get result based on > this. The only downside is we need to do the calculation in multiply > windows per key, but based on the > assumption, the calculation is efficient and may not be a blocker. Based on > this, i think we must be more careful about the changes, especially API > related. > > 3. The last thing in my mind is whether we can share some implementations > with blink's window operator. We introduced a new window operator in Blink > SQL, which i think needs to be > discussed when we try to adopt blink improvements. Without strong reasons, > i think it's better to share some implementations with DataStream's window > operator. Maybe we don't need > to share the whole operator, but at least some underlying concepts and > data structures. It would also be great if you can think about it and see > if it's a feasible way. > > Last small question: how do you handle sliding windows whose window size > can't be divided by step, such as 10 seconds window and slide with 3 > seconds? > > Best, > Kurt > > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <fhue...@gmail.com> wrote: > > > Thank you Rong! > > The performance of sliding windows is an issue for many users. > > Adding support for a more efficient window is a great effort. > > > > Thank you, > > Fabian > > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong < > walter...@gmail.com > > >: > > > > > Hi all, > > > > > > Thanks for the feedbacks and suggestions to the design doc. I have > > created > > > a parent JIRA [1] to track the related tasks and started the > > implementation > > > process > > > Any further feedbacks or suggestions are highly appreciated. > > > > > > Best, > > > Rong > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276 > > > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <walter...@gmail.com> wrote: > > > > > > > Hi all, > > > > > > > > Various discussion in the mailing list & JIRA tickets [2] had been > > > brought > > > > up in the past regarding the windowing operation performance. As we > > > > experiment internally with some of our extreme use cases, we found > out > > > that > > > > using a slice-based implementation can optimize Flink's windowing > > > mechanism > > > > and provide a better performance in most cases. > > > > > > > > We've put together a preliminary enhancement and performance > > optimization > > > > plan [1] for the current windowing operation in Flink. This is > largely > > > > inspired by stream slicing research shared in recent Flink Forward > > > > conference [3] by Philip and Jonas, and the discussion in the main > JIRA > > > > ticket [2]. The initial design and POC implementations consider > > > optimizing > > > > the performance for the category of overlapping windows as well as > > > allowing > > > > chaining of cascade window operators. > > > > > > > > It will be great to hear the feedbacks and suggestions from the > > > community. > > > > Please kindly share your comments and suggestions. > > > > > > > > Thanks, > > > > Rong > > > > > > > > [1] > > > > > > > > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001 > > > > [3] > > > > > > > > > > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > > > > > > > > > > > > > > > > > > > > >