Hi Rong,

Thanks for the detailed summarization! It indeed involves lots of problems
and unanswered questions which is i think not practical to
solve in one shot. From my point of view, the performance issue with the
sliding window is the root one and maybe most possible
which user will run into. Thus i think the first question should be
answered is:
"What kind of improvements we can do with the sliding window scenario?"

First, we should try to figure out how many improvements can be done
without changing or adding new API. This can reuse the POC you did
and the work blink had been done, we can share some ideas and maybe some
implementation details. And it already involves lots of efforts
even if we only do this one thing. It may introduce some refactory to
current window operator, and we should keep it compatible with old version.

After this, we can release it in next version and gather some users
feedbacks. We can further answer the question: "Does the improvements cover
most use cases? Are there any critical ones which is impossible to do with
current window operator?". At that time, we can open the discussions to
introduce some new API to meet the requirements.

It will introduce more work than improve window operator internally when we
decide to add new APIs, which you have covered a lot in your proposal.
Actually, the approaches you proposed looks good to me, take it step by
step is a more practical way.

Best,
Kurt


On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <walter...@gmail.com> wrote:

> Hi All,
>
> Thanks for sharing feedbacks for the window optimization design doc and on
> the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very
> valuable feedbacks and we will try to incorporate them in the next step.
>
> There were several revision done for the current design doc, and several
> POCs being developed since we initially shared the document. Thus, some of
> the following might’ve already been addressed in other places. However, I
> would still like to summarize them since these points were raised up in
> various scenarios.
>
> 1) Scope of the window optimization using slicing.
>
> The original scope of the doc was to address the problem of
> element-to-window duplication when sliding window goes across wide range
> with narrow slides (with or w/o efficient partial aggregations). However,
> as we develop the 2 POCs [1,2] we found out there’s really no
> one-solution-fits-all approach to how this can be optimized (same
> observation as Kurt and Jark mentioned). Thus some further expansion of the
> scope was done:
>
> 1a). Directly improving WindowOperator([1, 3])
>
> In the design doc, the PartialWindowFunction was designed as a cue for
> WindowOperator to choose how to optimize the sliding window. This was not
> working well because: how efficient window operator process messages
> depends on: 1. the pattern of the window; 2. the pattern of the incoming
> traffic; and 3. the complexity of the window process function.
>
> The idea of having users to specify the complexity of each API (e.g.
> accumulate, merge, retract) can help the system choose the path of
> optimization, however as this stage the it is hard to utilize this
> information.
>
> 1b). Explicitly allow slicing and merging steps([2]), or more generic
> window chaining:
>
> With the second POC, it is users' responsibility to create an efficient
> window operator(s), since now user can control the state and the process
> individually. This opens up more possibilities: e.g. unaligned /
> multi-timeout session window, cascade windows.
>
> However, with this API change, users are given much more flexibilities.
>
> In conclusion to (1), I think one of the clear feedbacks is that this
> effort goes beyond just improving the sliding window performance. If we
> think it is the right way to expand the window API, there should be a
> proper API change discussion: As Kurt mentioned, if any of these cases can
> be covered with existing API, we should always exhaust that first. (Further
> discussion in (2))
>
>
> 2). The motivation of changing the public DataStream APIs related to
> window-based processing.
>
> As Jincheng and Kurt mentioned, utilizing the current windowed stream API
> can achieve some of the results. For example, using a tumble follow by a
> sliding window can reduce the amount of duplication since the tumbling
> window results will be passed to the sliding window as one element.
> However, when we experiment, the are several problems I observed:
>
> 2a). rehashing with extra keyBy
>
> The limitation requires a separate set of keyBy following the tumble
> result, which creates unnecessary complexity. One work around was to use
> "reinterpretAsKeyedStream" as Fabian mentioned, however there are many
> investigation to be done to ensure it works well (e.g. how does the keyed
> state store work since the key was not sets as a part of the namespace
> within WindowOperator)
>
> 2b). Handling iterable elements
>
> Another problem is if there’s no efficient partial results produce by the
> tumbling window, the output element (as an iterable) will still be
> duplicated into each of the downstream sliding window states.
>
> However, this is actually a tricky problem by itself, further discussion in
> (2d) for this piece.
>
> 2c). Additional support for some of the more complex use cases
>
> As described in (1b), with the explicit control of the window state and the
> window function, users can control how the optimization of the windowing
> can be done. One will argue that these type of optimization can also be
> done using ProcessFunction or other rich functions.
>
> However, if we take a look at how the WindowOperator is currently
> structured, it also has explicit aggregation + windowing APIs [4] presented
> for the WindowedStream.
>
> Another point is opening up: multi-slide windowing and multi-timeout window
> described in the presentation [5] in FF.
>
> I would also like to raise a point that the way how the current POC [2]
> changes the public API is just a proof of concept that the flexibilities
> opens more complex use cases. It is by no means the final design and I
> agree that there could be more-aggressive-than-necessary changes since it’s
> meant for a POC.
>
> 2d). The support for same-window operator (similar to Table/SQL
> OVER-aggregate)
>
> As discussed with Jincheng in a separate thread. Supporting the same window
> operator similar to how OVER-aggregate was achieved in Table/SQL API. It is
> definitely useful in some of the window use cases (2b). For example, A
> process function without efficient partial aggregation results on sliding
> window can be very efficiently evaluated since both add and retract
> operation on an iterable list is cheap.
>
> There are some fundamental difference between the current window operator
> vs the same-window operator concept, for example the trigger mechanism is
> entirely different; and it will be a challenge to incorporate
> OVER-aggregate of TableAPI since there are many table specific components
> in the bounded/unbounded over aggregates. However, I am seeing this as an
> implementation details (further discussion in (3a)).
>
> In conclusion to (2), I think the discussions remains whether to expand the
> API if we introduce sliding: if no, whether we can exhaust all the use
> cases with current API; if yes, how that extra API will look like.
> Especially the same-window operation in the DataStream API. (Utilizing
> whatever we already have in Table/SQL is one solution).
>
>
> 3). Incorporating Existing POCs and Implementations (e.g. Blink’s
> WindowOperator & SQL/Table API)
>
> There are already some implementations exist resembles the idea of slicing.
> For example Blink’s WindowOperator [3]. My thinking is once we agree on the
> API and the public facing changes, we should reuse as much of the current
> implementations as possible, since Blink has been running in production for
> years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ).
>
>
> Regarding the implementation details and some existing approaches.
>
> 3a). Over-aggregate vs. window-operator.
>
> Over aggregate has already been implemented in Table/SQL API. Unlike window
> operators, over aggregate utilizes the process function and a general
> process operator approach.
>
> Further looking into how the current window operator works comparing with
> the over aggregate, there are many *common implementations*. For example,
> the window state (especially the list-based approach); the triggering
> mechanism / timer service (especially on rowtime); and the “evicting
> policies”.
>
> I was wondering if it is possible to consider, on an operator level,
> support: 1. The same-window operator concept as a generic case of the
> over-aggregate, and 2. Create a more comment window operator base that
> generalize the state, trigger and eviction context.
>
> 3b). The support for partial process function: retracting vs. merge.
>
> Supporting retracting operations is definitely a plus in some use cases,
> for example like Fabian suggested in the JIRA discussion [6]. Currently
> this already exist in the Table/SQL API aggregation function.
>
> 3c). States and Chaining Strategy
>
> With the chaining strategy [7] in the operators, I was wondering whether
> there’s performance consideration when implementing the sliding window as
> one operator, or multiple operator chaining together. One question I think
> worth investigating is what is the most efficient way of storing the
> slicing information like Jark suggested in the JIRA discussion [8].
>
>
> As I tried to capture as many discussions here as possible, I can hardly
> imagine that we’ve address all of these questions. So please share your
> comment and feedbacks as they are highly appreciated! I can definitely
> volunteer will start a design/discussion doc for this effort if everyone
> thinks it is suitable to move forward.
>
>
> Thanks,
>
> Rong
>
>
>
> [1]
>
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m
>
> [2]
>
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y
>
> [3]
>
> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java
>
> [4]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction
>
> [5]
>
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
>
> [6] https://issues.apache.org/jira/browse/FLINK-11454
>
> [7]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
> [8] https://issues.apache.org/jira/browse/FLINK-7001
>
> On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <ykt...@gmail.com> wrote:
>
> > Hi Rong,
> >
> > Thanks for the improvement proposal, this topic aroused my interest since
> > we did some similar improvements in Blink.
> > After going through your design doc, i would like share some thoughts:
> >
> > 1. It looks to me the proposed SliceManager and MergeManager is quite
> > generic and can be used in various situations like unaligned windows.
> > However
> > this will introduce some new API and some new stream operators, and thus
> > make is a big effort to do. I suppose the main reason to introduce these
> > concepts is
> > we want to support some complex scenarios like unaligned windows, multi
> > timeout session windows and so on. But if let's go back to the original
> > motivation of this
> > improvement, it will be "Highly overlapping window operation" and "Window
> > functions with efficient methods calculating and merging partial
> results",
> > just as you mentioned
> > in the section "Target Usage Pattern". I'm curious if it's necessary to
> > introduce all these new APIs and operators to meet these requirements.
> If i
> > understand you correctly,
> > we may even can achieve the goal by using two successive windows, first
> > with tumbling window, and then sliding window. And from the experiences
> we
> > had in Blink, by adding
> > some enhancements to original window operator may also be sufficient.
> Does
> > it make sense to you if we can first try to improve the target scenario
> > without adding new APIs and
> >  operators? It will definitely make it more easy to review and merged.
> >
> > 2. From what we observed after Blink did similar improvements, slice or
> > pane based improvement is not a silver bullet for all kinds of
> situations.
> > In most cases, we only
> > observed small performance gain like 20% or 30%. The main reason is, if
> we
> > want get big gain from this improvement, normally we require the window
> > function have efficient
> > calculating and merging methods, like SUM, COUNT. If this assumption
> > stands, the basic approach of window operator will also be performant. It
> > stored compact intermedia result for
> > each window per key, and it's also very efficient to get result based on
> > this. The only downside is we need to do the calculation in multiply
> > windows per key, but based on the
> > assumption, the calculation is efficient and may not be a blocker. Based
> on
> > this, i think we must be more careful about the changes, especially API
> > related.
> >
> > 3. The last thing in my mind is whether we can share some implementations
> > with blink's window operator. We introduced a new window operator in
> Blink
> > SQL, which i think needs to be
> > discussed when we try to adopt blink improvements. Without strong
> reasons,
> > i think it's better to share some implementations with DataStream's
> window
> > operator. Maybe we don't need
> >  to share the whole operator, but at least some underlying concepts and
> > data structures. It would also be great if you can think about it and see
> > if it's a feasible way.
> >
> > Last small question: how do you handle sliding windows whose window size
> > can't be divided by step, such as 10 seconds window and slide with 3
> > seconds?
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <fhue...@gmail.com> wrote:
> >
> > > Thank you Rong!
> > > The performance of sliding windows is an issue for many users.
> > > Adding support for a more efficient window is a great effort.
> > >
> > > Thank you,
> > > Fabian
> > >
> > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <
> > walter...@gmail.com
> > > >:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for the feedbacks and suggestions to the design doc. I have
> > > created
> > > > a parent JIRA [1] to track the related tasks and started the
> > > implementation
> > > > process
> > > > Any further feedbacks or suggestions are highly appreciated.
> > > >
> > > > Best,
> > > > Rong
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-11276
> > > >
> > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <walter...@gmail.com>
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Various discussion in the mailing list & JIRA tickets [2] had been
> > > > brought
> > > > > up in the past regarding the windowing operation performance. As we
> > > > > experiment internally with some of our extreme use cases, we found
> > out
> > > > that
> > > > > using a slice-based implementation can optimize Flink's windowing
> > > > mechanism
> > > > > and provide a better performance in most cases.
> > > > >
> > > > > We've put together a preliminary enhancement and performance
> > > optimization
> > > > > plan [1] for the current windowing operation in Flink. This is
> > largely
> > > > > inspired by stream slicing research shared in recent Flink Forward
> > > > > conference [3] by Philip and Jonas, and the discussion in the main
> > JIRA
> > > > > ticket [2]. The initial design and POC implementations consider
> > > > optimizing
> > > > > the performance for the category of overlapping windows as well as
> > > > allowing
> > > > > chaining of cascade window operators.
> > > > >
> > > > > It will be great to hear the feedbacks and suggestions from the
> > > > community.
> > > > > Please kindly share your comments and suggestions.
> > > > >
> > > > > Thanks,
> > > > > Rong
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
> > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001
> > > > > [3]
> > > > >
> > > >
> > >
> >
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to