Hi Devs, Thank you all for the valuable feedbacks and comments in the previous design doc. We have currently created an initial design/work plan based on @Kurt's suggestions as "improvements with the sliding window scenario without changing/adding new public APIs".
Please kindly take a look at the initial design document here [1]. Any comments or suggestions are highly appreciated! Thanks, Rong -- [1] https://docs.google.com/document/d/1CvjPJl1Fm1PCpsuuZ4Qc-p_iUzUosBePX_rWNUt8lRw/edit# On Thu, Feb 28, 2019 at 2:24 PM Rong Rong <walter...@gmail.com> wrote: > Hi Kurt, > > Thanks for the valuable feedback. I think the suggestions you and Jincheng > provided are definitely the best execution plan > > - Starting with sliding window optimization by exhausting the current > public API, there are some components we can leverage or directly reuse > from Blink's window operator [1] implementation. > - Backward compatibility is definitely an issue as any state changes will > probably result in a non-trivial state upgrade. I can definitely follow up > with you on this. > > At the same time I think it is also a good idea to summarize all the use > cases that has been discussed so far. This can be very valuable as a > reference: To answer the questions "Does the improvements cover > most use cases?" and when not covered, "whether introduce some new API can > meet the requirements?". > > I will try to convert the current parent JIRA [2] into one that does not > include public API alternation. As you mentioned this will be much more > practical way in terms of execution. > > Many thanks for the suggestions and guidance! > > -- > Rong > > [1] > https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java > [2] https://issues.apache.org/jira/browse/FLINK-11454 > > On Mon, Feb 25, 2019 at 3:44 AM Kurt Young <ykt...@gmail.com> wrote: > >> Hi Rong, >> >> Thanks for the detailed summarization! It indeed involves lots of problems >> and unanswered questions which is i think not practical to >> solve in one shot. From my point of view, the performance issue with the >> sliding window is the root one and maybe most possible >> which user will run into. Thus i think the first question should be >> answered is: >> "What kind of improvements we can do with the sliding window scenario?" >> >> First, we should try to figure out how many improvements can be done >> without changing or adding new API. This can reuse the POC you did >> and the work blink had been done, we can share some ideas and maybe some >> implementation details. And it already involves lots of efforts >> even if we only do this one thing. It may introduce some refactory to >> current window operator, and we should keep it compatible with old >> version. >> >> After this, we can release it in next version and gather some users >> feedbacks. We can further answer the question: "Does the improvements >> cover >> most use cases? Are there any critical ones which is impossible to do with >> current window operator?". At that time, we can open the discussions to >> introduce some new API to meet the requirements. >> >> It will introduce more work than improve window operator internally when >> we >> decide to add new APIs, which you have covered a lot in your proposal. >> Actually, the approaches you proposed looks good to me, take it step by >> step is a more practical way. >> >> Best, >> Kurt >> >> >> On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <walter...@gmail.com> wrote: >> >> > Hi All, >> > >> > Thanks for sharing feedbacks for the window optimization design doc and >> on >> > the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very >> > valuable feedbacks and we will try to incorporate them in the next step. >> > >> > There were several revision done for the current design doc, and several >> > POCs being developed since we initially shared the document. Thus, some >> of >> > the following might’ve already been addressed in other places. However, >> I >> > would still like to summarize them since these points were raised up in >> > various scenarios. >> > >> > 1) Scope of the window optimization using slicing. >> > >> > The original scope of the doc was to address the problem of >> > element-to-window duplication when sliding window goes across wide range >> > with narrow slides (with or w/o efficient partial aggregations). >> However, >> > as we develop the 2 POCs [1,2] we found out there’s really no >> > one-solution-fits-all approach to how this can be optimized (same >> > observation as Kurt and Jark mentioned). Thus some further expansion of >> the >> > scope was done: >> > >> > 1a). Directly improving WindowOperator([1, 3]) >> > >> > In the design doc, the PartialWindowFunction was designed as a cue for >> > WindowOperator to choose how to optimize the sliding window. This was >> not >> > working well because: how efficient window operator process messages >> > depends on: 1. the pattern of the window; 2. the pattern of the incoming >> > traffic; and 3. the complexity of the window process function. >> > >> > The idea of having users to specify the complexity of each API (e.g. >> > accumulate, merge, retract) can help the system choose the path of >> > optimization, however as this stage the it is hard to utilize this >> > information. >> > >> > 1b). Explicitly allow slicing and merging steps([2]), or more generic >> > window chaining: >> > >> > With the second POC, it is users' responsibility to create an efficient >> > window operator(s), since now user can control the state and the process >> > individually. This opens up more possibilities: e.g. unaligned / >> > multi-timeout session window, cascade windows. >> > >> > However, with this API change, users are given much more flexibilities. >> > >> > In conclusion to (1), I think one of the clear feedbacks is that this >> > effort goes beyond just improving the sliding window performance. If we >> > think it is the right way to expand the window API, there should be a >> > proper API change discussion: As Kurt mentioned, if any of these cases >> can >> > be covered with existing API, we should always exhaust that first. >> (Further >> > discussion in (2)) >> > >> > >> > 2). The motivation of changing the public DataStream APIs related to >> > window-based processing. >> > >> > As Jincheng and Kurt mentioned, utilizing the current windowed stream >> API >> > can achieve some of the results. For example, using a tumble follow by a >> > sliding window can reduce the amount of duplication since the tumbling >> > window results will be passed to the sliding window as one element. >> > However, when we experiment, the are several problems I observed: >> > >> > 2a). rehashing with extra keyBy >> > >> > The limitation requires a separate set of keyBy following the tumble >> > result, which creates unnecessary complexity. One work around was to use >> > "reinterpretAsKeyedStream" as Fabian mentioned, however there are many >> > investigation to be done to ensure it works well (e.g. how does the >> keyed >> > state store work since the key was not sets as a part of the namespace >> > within WindowOperator) >> > >> > 2b). Handling iterable elements >> > >> > Another problem is if there’s no efficient partial results produce by >> the >> > tumbling window, the output element (as an iterable) will still be >> > duplicated into each of the downstream sliding window states. >> > >> > However, this is actually a tricky problem by itself, further >> discussion in >> > (2d) for this piece. >> > >> > 2c). Additional support for some of the more complex use cases >> > >> > As described in (1b), with the explicit control of the window state and >> the >> > window function, users can control how the optimization of the windowing >> > can be done. One will argue that these type of optimization can also be >> > done using ProcessFunction or other rich functions. >> > >> > However, if we take a look at how the WindowOperator is currently >> > structured, it also has explicit aggregation + windowing APIs [4] >> presented >> > for the WindowedStream. >> > >> > Another point is opening up: multi-slide windowing and multi-timeout >> window >> > described in the presentation [5] in FF. >> > >> > I would also like to raise a point that the way how the current POC [2] >> > changes the public API is just a proof of concept that the flexibilities >> > opens more complex use cases. It is by no means the final design and I >> > agree that there could be more-aggressive-than-necessary changes since >> it’s >> > meant for a POC. >> > >> > 2d). The support for same-window operator (similar to Table/SQL >> > OVER-aggregate) >> > >> > As discussed with Jincheng in a separate thread. Supporting the same >> window >> > operator similar to how OVER-aggregate was achieved in Table/SQL API. >> It is >> > definitely useful in some of the window use cases (2b). For example, A >> > process function without efficient partial aggregation results on >> sliding >> > window can be very efficiently evaluated since both add and retract >> > operation on an iterable list is cheap. >> > >> > There are some fundamental difference between the current window >> operator >> > vs the same-window operator concept, for example the trigger mechanism >> is >> > entirely different; and it will be a challenge to incorporate >> > OVER-aggregate of TableAPI since there are many table specific >> components >> > in the bounded/unbounded over aggregates. However, I am seeing this as >> an >> > implementation details (further discussion in (3a)). >> > >> > In conclusion to (2), I think the discussions remains whether to expand >> the >> > API if we introduce sliding: if no, whether we can exhaust all the use >> > cases with current API; if yes, how that extra API will look like. >> > Especially the same-window operation in the DataStream API. (Utilizing >> > whatever we already have in Table/SQL is one solution). >> > >> > >> > 3). Incorporating Existing POCs and Implementations (e.g. Blink’s >> > WindowOperator & SQL/Table API) >> > >> > There are already some implementations exist resembles the idea of >> slicing. >> > For example Blink’s WindowOperator [3]. My thinking is once we agree on >> the >> > API and the public facing changes, we should reuse as much of the >> current >> > implementations as possible, since Blink has been running in production >> for >> > years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ). >> > >> > >> > Regarding the implementation details and some existing approaches. >> > >> > 3a). Over-aggregate vs. window-operator. >> > >> > Over aggregate has already been implemented in Table/SQL API. Unlike >> window >> > operators, over aggregate utilizes the process function and a general >> > process operator approach. >> > >> > Further looking into how the current window operator works comparing >> with >> > the over aggregate, there are many *common implementations*. For >> example, >> > the window state (especially the list-based approach); the triggering >> > mechanism / timer service (especially on rowtime); and the “evicting >> > policies”. >> > >> > I was wondering if it is possible to consider, on an operator level, >> > support: 1. The same-window operator concept as a generic case of the >> > over-aggregate, and 2. Create a more comment window operator base that >> > generalize the state, trigger and eviction context. >> > >> > 3b). The support for partial process function: retracting vs. merge. >> > >> > Supporting retracting operations is definitely a plus in some use cases, >> > for example like Fabian suggested in the JIRA discussion [6]. Currently >> > this already exist in the Table/SQL API aggregation function. >> > >> > 3c). States and Chaining Strategy >> > >> > With the chaining strategy [7] in the operators, I was wondering whether >> > there’s performance consideration when implementing the sliding window >> as >> > one operator, or multiple operator chaining together. One question I >> think >> > worth investigating is what is the most efficient way of storing the >> > slicing information like Jark suggested in the JIRA discussion [8]. >> > >> > >> > As I tried to capture as many discussions here as possible, I can hardly >> > imagine that we’ve address all of these questions. So please share your >> > comment and feedbacks as they are highly appreciated! I can definitely >> > volunteer will start a design/discussion doc for this effort if everyone >> > thinks it is suitable to move forward. >> > >> > >> > Thanks, >> > >> > Rong >> > >> > >> > >> > [1] >> > >> > >> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m >> > >> > [2] >> > >> > >> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y >> > >> > [3] >> > >> > >> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java >> > >> > [4] >> > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction >> > >> > [5] >> > >> > >> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing >> > >> > [6] https://issues.apache.org/jira/browse/FLINK-11454 >> > >> > [7] >> > >> > >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups >> > [8] https://issues.apache.org/jira/browse/FLINK-7001 >> > >> > On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <ykt...@gmail.com> wrote: >> > >> > > Hi Rong, >> > > >> > > Thanks for the improvement proposal, this topic aroused my interest >> since >> > > we did some similar improvements in Blink. >> > > After going through your design doc, i would like share some thoughts: >> > > >> > > 1. It looks to me the proposed SliceManager and MergeManager is quite >> > > generic and can be used in various situations like unaligned windows. >> > > However >> > > this will introduce some new API and some new stream operators, and >> thus >> > > make is a big effort to do. I suppose the main reason to introduce >> these >> > > concepts is >> > > we want to support some complex scenarios like unaligned windows, >> multi >> > > timeout session windows and so on. But if let's go back to the >> original >> > > motivation of this >> > > improvement, it will be "Highly overlapping window operation" and >> "Window >> > > functions with efficient methods calculating and merging partial >> > results", >> > > just as you mentioned >> > > in the section "Target Usage Pattern". I'm curious if it's necessary >> to >> > > introduce all these new APIs and operators to meet these requirements. >> > If i >> > > understand you correctly, >> > > we may even can achieve the goal by using two successive windows, >> first >> > > with tumbling window, and then sliding window. And from the >> experiences >> > we >> > > had in Blink, by adding >> > > some enhancements to original window operator may also be sufficient. >> > Does >> > > it make sense to you if we can first try to improve the target >> scenario >> > > without adding new APIs and >> > > operators? It will definitely make it more easy to review and merged. >> > > >> > > 2. From what we observed after Blink did similar improvements, slice >> or >> > > pane based improvement is not a silver bullet for all kinds of >> > situations. >> > > In most cases, we only >> > > observed small performance gain like 20% or 30%. The main reason is, >> if >> > we >> > > want get big gain from this improvement, normally we require the >> window >> > > function have efficient >> > > calculating and merging methods, like SUM, COUNT. If this assumption >> > > stands, the basic approach of window operator will also be >> performant. It >> > > stored compact intermedia result for >> > > each window per key, and it's also very efficient to get result based >> on >> > > this. The only downside is we need to do the calculation in multiply >> > > windows per key, but based on the >> > > assumption, the calculation is efficient and may not be a blocker. >> Based >> > on >> > > this, i think we must be more careful about the changes, especially >> API >> > > related. >> > > >> > > 3. The last thing in my mind is whether we can share some >> implementations >> > > with blink's window operator. We introduced a new window operator in >> > Blink >> > > SQL, which i think needs to be >> > > discussed when we try to adopt blink improvements. Without strong >> > reasons, >> > > i think it's better to share some implementations with DataStream's >> > window >> > > operator. Maybe we don't need >> > > to share the whole operator, but at least some underlying concepts >> and >> > > data structures. It would also be great if you can think about it and >> see >> > > if it's a feasible way. >> > > >> > > Last small question: how do you handle sliding windows whose window >> size >> > > can't be divided by step, such as 10 seconds window and slide with 3 >> > > seconds? >> > > >> > > Best, >> > > Kurt >> > > >> > > >> > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <fhue...@gmail.com> >> wrote: >> > > >> > > > Thank you Rong! >> > > > The performance of sliding windows is an issue for many users. >> > > > Adding support for a more efficient window is a great effort. >> > > > >> > > > Thank you, >> > > > Fabian >> > > > >> > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong < >> > > walter...@gmail.com >> > > > >: >> > > > >> > > > > Hi all, >> > > > > >> > > > > Thanks for the feedbacks and suggestions to the design doc. I have >> > > > created >> > > > > a parent JIRA [1] to track the related tasks and started the >> > > > implementation >> > > > > process >> > > > > Any further feedbacks or suggestions are highly appreciated. >> > > > > >> > > > > Best, >> > > > > Rong >> > > > > >> > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276 >> > > > > >> > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <walter...@gmail.com> >> > wrote: >> > > > > >> > > > > > Hi all, >> > > > > > >> > > > > > Various discussion in the mailing list & JIRA tickets [2] had >> been >> > > > > brought >> > > > > > up in the past regarding the windowing operation performance. >> As we >> > > > > > experiment internally with some of our extreme use cases, we >> found >> > > out >> > > > > that >> > > > > > using a slice-based implementation can optimize Flink's >> windowing >> > > > > mechanism >> > > > > > and provide a better performance in most cases. >> > > > > > >> > > > > > We've put together a preliminary enhancement and performance >> > > > optimization >> > > > > > plan [1] for the current windowing operation in Flink. This is >> > > largely >> > > > > > inspired by stream slicing research shared in recent Flink >> Forward >> > > > > > conference [3] by Philip and Jonas, and the discussion in the >> main >> > > JIRA >> > > > > > ticket [2]. The initial design and POC implementations consider >> > > > > optimizing >> > > > > > the performance for the category of overlapping windows as well >> as >> > > > > allowing >> > > > > > chaining of cascade window operators. >> > > > > > >> > > > > > It will be great to hear the feedbacks and suggestions from the >> > > > > community. >> > > > > > Please kindly share your comments and suggestions. >> > > > > > >> > > > > > Thanks, >> > > > > > Rong >> > > > > > >> > > > > > [1] >> > > > > > >> > > > > >> > > > >> > > >> > >> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing >> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001 >> > > > > > [3] >> > > > > > >> > > > > >> > > > >> > > >> > >> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >