Hi Kurt,

Thanks for the valuable feedback. I think the suggestions you and Jincheng
provided are definitely the best execution plan

- Starting with sliding window optimization by exhausting the current
public API, there are some components we can leverage or directly reuse
from Blink's window operator [1] implementation.
- Backward compatibility is definitely an issue as any state changes will
probably result in a non-trivial state upgrade. I can definitely follow up
with you on this.

At the same time I think it is also a good idea to summarize all the use
cases that has been discussed so far. This can be very valuable as a
reference: To answer the questions "Does the improvements cover
most use cases?" and when not covered, "whether introduce some new API can
meet the requirements?".

I will try to convert the current parent JIRA [2] into one that does not
include public API alternation. As you mentioned this will be much more
practical way in terms of execution.

Many thanks for the suggestions and guidance!

--
Rong

[1]
https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java
[2] https://issues.apache.org/jira/browse/FLINK-11454

On Mon, Feb 25, 2019 at 3:44 AM Kurt Young <ykt...@gmail.com> wrote:

> Hi Rong,
>
> Thanks for the detailed summarization! It indeed involves lots of problems
> and unanswered questions which is i think not practical to
> solve in one shot. From my point of view, the performance issue with the
> sliding window is the root one and maybe most possible
> which user will run into. Thus i think the first question should be
> answered is:
> "What kind of improvements we can do with the sliding window scenario?"
>
> First, we should try to figure out how many improvements can be done
> without changing or adding new API. This can reuse the POC you did
> and the work blink had been done, we can share some ideas and maybe some
> implementation details. And it already involves lots of efforts
> even if we only do this one thing. It may introduce some refactory to
> current window operator, and we should keep it compatible with old version.
>
> After this, we can release it in next version and gather some users
> feedbacks. We can further answer the question: "Does the improvements cover
> most use cases? Are there any critical ones which is impossible to do with
> current window operator?". At that time, we can open the discussions to
> introduce some new API to meet the requirements.
>
> It will introduce more work than improve window operator internally when we
> decide to add new APIs, which you have covered a lot in your proposal.
> Actually, the approaches you proposed looks good to me, take it step by
> step is a more practical way.
>
> Best,
> Kurt
>
>
> On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <walter...@gmail.com> wrote:
>
> > Hi All,
> >
> > Thanks for sharing feedbacks for the window optimization design doc and
> on
> > the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very
> > valuable feedbacks and we will try to incorporate them in the next step.
> >
> > There were several revision done for the current design doc, and several
> > POCs being developed since we initially shared the document. Thus, some
> of
> > the following might’ve already been addressed in other places. However, I
> > would still like to summarize them since these points were raised up in
> > various scenarios.
> >
> > 1) Scope of the window optimization using slicing.
> >
> > The original scope of the doc was to address the problem of
> > element-to-window duplication when sliding window goes across wide range
> > with narrow slides (with or w/o efficient partial aggregations). However,
> > as we develop the 2 POCs [1,2] we found out there’s really no
> > one-solution-fits-all approach to how this can be optimized (same
> > observation as Kurt and Jark mentioned). Thus some further expansion of
> the
> > scope was done:
> >
> > 1a). Directly improving WindowOperator([1, 3])
> >
> > In the design doc, the PartialWindowFunction was designed as a cue for
> > WindowOperator to choose how to optimize the sliding window. This was not
> > working well because: how efficient window operator process messages
> > depends on: 1. the pattern of the window; 2. the pattern of the incoming
> > traffic; and 3. the complexity of the window process function.
> >
> > The idea of having users to specify the complexity of each API (e.g.
> > accumulate, merge, retract) can help the system choose the path of
> > optimization, however as this stage the it is hard to utilize this
> > information.
> >
> > 1b). Explicitly allow slicing and merging steps([2]), or more generic
> > window chaining:
> >
> > With the second POC, it is users' responsibility to create an efficient
> > window operator(s), since now user can control the state and the process
> > individually. This opens up more possibilities: e.g. unaligned /
> > multi-timeout session window, cascade windows.
> >
> > However, with this API change, users are given much more flexibilities.
> >
> > In conclusion to (1), I think one of the clear feedbacks is that this
> > effort goes beyond just improving the sliding window performance. If we
> > think it is the right way to expand the window API, there should be a
> > proper API change discussion: As Kurt mentioned, if any of these cases
> can
> > be covered with existing API, we should always exhaust that first.
> (Further
> > discussion in (2))
> >
> >
> > 2). The motivation of changing the public DataStream APIs related to
> > window-based processing.
> >
> > As Jincheng and Kurt mentioned, utilizing the current windowed stream API
> > can achieve some of the results. For example, using a tumble follow by a
> > sliding window can reduce the amount of duplication since the tumbling
> > window results will be passed to the sliding window as one element.
> > However, when we experiment, the are several problems I observed:
> >
> > 2a). rehashing with extra keyBy
> >
> > The limitation requires a separate set of keyBy following the tumble
> > result, which creates unnecessary complexity. One work around was to use
> > "reinterpretAsKeyedStream" as Fabian mentioned, however there are many
> > investigation to be done to ensure it works well (e.g. how does the keyed
> > state store work since the key was not sets as a part of the namespace
> > within WindowOperator)
> >
> > 2b). Handling iterable elements
> >
> > Another problem is if there’s no efficient partial results produce by the
> > tumbling window, the output element (as an iterable) will still be
> > duplicated into each of the downstream sliding window states.
> >
> > However, this is actually a tricky problem by itself, further discussion
> in
> > (2d) for this piece.
> >
> > 2c). Additional support for some of the more complex use cases
> >
> > As described in (1b), with the explicit control of the window state and
> the
> > window function, users can control how the optimization of the windowing
> > can be done. One will argue that these type of optimization can also be
> > done using ProcessFunction or other rich functions.
> >
> > However, if we take a look at how the WindowOperator is currently
> > structured, it also has explicit aggregation + windowing APIs [4]
> presented
> > for the WindowedStream.
> >
> > Another point is opening up: multi-slide windowing and multi-timeout
> window
> > described in the presentation [5] in FF.
> >
> > I would also like to raise a point that the way how the current POC [2]
> > changes the public API is just a proof of concept that the flexibilities
> > opens more complex use cases. It is by no means the final design and I
> > agree that there could be more-aggressive-than-necessary changes since
> it’s
> > meant for a POC.
> >
> > 2d). The support for same-window operator (similar to Table/SQL
> > OVER-aggregate)
> >
> > As discussed with Jincheng in a separate thread. Supporting the same
> window
> > operator similar to how OVER-aggregate was achieved in Table/SQL API. It
> is
> > definitely useful in some of the window use cases (2b). For example, A
> > process function without efficient partial aggregation results on sliding
> > window can be very efficiently evaluated since both add and retract
> > operation on an iterable list is cheap.
> >
> > There are some fundamental difference between the current window operator
> > vs the same-window operator concept, for example the trigger mechanism is
> > entirely different; and it will be a challenge to incorporate
> > OVER-aggregate of TableAPI since there are many table specific components
> > in the bounded/unbounded over aggregates. However, I am seeing this as an
> > implementation details (further discussion in (3a)).
> >
> > In conclusion to (2), I think the discussions remains whether to expand
> the
> > API if we introduce sliding: if no, whether we can exhaust all the use
> > cases with current API; if yes, how that extra API will look like.
> > Especially the same-window operation in the DataStream API. (Utilizing
> > whatever we already have in Table/SQL is one solution).
> >
> >
> > 3). Incorporating Existing POCs and Implementations (e.g. Blink’s
> > WindowOperator & SQL/Table API)
> >
> > There are already some implementations exist resembles the idea of
> slicing.
> > For example Blink’s WindowOperator [3]. My thinking is once we agree on
> the
> > API and the public facing changes, we should reuse as much of the current
> > implementations as possible, since Blink has been running in production
> for
> > years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ).
> >
> >
> > Regarding the implementation details and some existing approaches.
> >
> > 3a). Over-aggregate vs. window-operator.
> >
> > Over aggregate has already been implemented in Table/SQL API. Unlike
> window
> > operators, over aggregate utilizes the process function and a general
> > process operator approach.
> >
> > Further looking into how the current window operator works comparing with
> > the over aggregate, there are many *common implementations*. For example,
> > the window state (especially the list-based approach); the triggering
> > mechanism / timer service (especially on rowtime); and the “evicting
> > policies”.
> >
> > I was wondering if it is possible to consider, on an operator level,
> > support: 1. The same-window operator concept as a generic case of the
> > over-aggregate, and 2. Create a more comment window operator base that
> > generalize the state, trigger and eviction context.
> >
> > 3b). The support for partial process function: retracting vs. merge.
> >
> > Supporting retracting operations is definitely a plus in some use cases,
> > for example like Fabian suggested in the JIRA discussion [6]. Currently
> > this already exist in the Table/SQL API aggregation function.
> >
> > 3c). States and Chaining Strategy
> >
> > With the chaining strategy [7] in the operators, I was wondering whether
> > there’s performance consideration when implementing the sliding window as
> > one operator, or multiple operator chaining together. One question I
> think
> > worth investigating is what is the most efficient way of storing the
> > slicing information like Jark suggested in the JIRA discussion [8].
> >
> >
> > As I tried to capture as many discussions here as possible, I can hardly
> > imagine that we’ve address all of these questions. So please share your
> > comment and feedbacks as they are highly appreciated! I can definitely
> > volunteer will start a design/discussion doc for this effort if everyone
> > thinks it is suitable to move forward.
> >
> >
> > Thanks,
> >
> > Rong
> >
> >
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m
> >
> > [2]
> >
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y
> >
> > [3]
> >
> >
> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java
> >
> > [4]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction
> >
> > [5]
> >
> >
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
> >
> > [6] https://issues.apache.org/jira/browse/FLINK-11454
> >
> > [7]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
> > [8] https://issues.apache.org/jira/browse/FLINK-7001
> >
> > On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <ykt...@gmail.com> wrote:
> >
> > > Hi Rong,
> > >
> > > Thanks for the improvement proposal, this topic aroused my interest
> since
> > > we did some similar improvements in Blink.
> > > After going through your design doc, i would like share some thoughts:
> > >
> > > 1. It looks to me the proposed SliceManager and MergeManager is quite
> > > generic and can be used in various situations like unaligned windows.
> > > However
> > > this will introduce some new API and some new stream operators, and
> thus
> > > make is a big effort to do. I suppose the main reason to introduce
> these
> > > concepts is
> > > we want to support some complex scenarios like unaligned windows, multi
> > > timeout session windows and so on. But if let's go back to the original
> > > motivation of this
> > > improvement, it will be "Highly overlapping window operation" and
> "Window
> > > functions with efficient methods calculating and merging partial
> > results",
> > > just as you mentioned
> > > in the section "Target Usage Pattern". I'm curious if it's necessary to
> > > introduce all these new APIs and operators to meet these requirements.
> > If i
> > > understand you correctly,
> > > we may even can achieve the goal by using two successive windows, first
> > > with tumbling window, and then sliding window. And from the experiences
> > we
> > > had in Blink, by adding
> > > some enhancements to original window operator may also be sufficient.
> > Does
> > > it make sense to you if we can first try to improve the target scenario
> > > without adding new APIs and
> > >  operators? It will definitely make it more easy to review and merged.
> > >
> > > 2. From what we observed after Blink did similar improvements, slice or
> > > pane based improvement is not a silver bullet for all kinds of
> > situations.
> > > In most cases, we only
> > > observed small performance gain like 20% or 30%. The main reason is, if
> > we
> > > want get big gain from this improvement, normally we require the window
> > > function have efficient
> > > calculating and merging methods, like SUM, COUNT. If this assumption
> > > stands, the basic approach of window operator will also be performant.
> It
> > > stored compact intermedia result for
> > > each window per key, and it's also very efficient to get result based
> on
> > > this. The only downside is we need to do the calculation in multiply
> > > windows per key, but based on the
> > > assumption, the calculation is efficient and may not be a blocker.
> Based
> > on
> > > this, i think we must be more careful about the changes, especially API
> > > related.
> > >
> > > 3. The last thing in my mind is whether we can share some
> implementations
> > > with blink's window operator. We introduced a new window operator in
> > Blink
> > > SQL, which i think needs to be
> > > discussed when we try to adopt blink improvements. Without strong
> > reasons,
> > > i think it's better to share some implementations with DataStream's
> > window
> > > operator. Maybe we don't need
> > >  to share the whole operator, but at least some underlying concepts and
> > > data structures. It would also be great if you can think about it and
> see
> > > if it's a feasible way.
> > >
> > > Last small question: how do you handle sliding windows whose window
> size
> > > can't be divided by step, such as 10 seconds window and slide with 3
> > > seconds?
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <fhue...@gmail.com>
> wrote:
> > >
> > > > Thank you Rong!
> > > > The performance of sliding windows is an issue for many users.
> > > > Adding support for a more efficient window is a great effort.
> > > >
> > > > Thank you,
> > > > Fabian
> > > >
> > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <
> > > walter...@gmail.com
> > > > >:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Thanks for the feedbacks and suggestions to the design doc. I have
> > > > created
> > > > > a parent JIRA [1] to track the related tasks and started the
> > > > implementation
> > > > > process
> > > > > Any further feedbacks or suggestions are highly appreciated.
> > > > >
> > > > > Best,
> > > > > Rong
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276
> > > > >
> > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <walter...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Various discussion in the mailing list & JIRA tickets [2] had
> been
> > > > > brought
> > > > > > up in the past regarding the windowing operation performance. As
> we
> > > > > > experiment internally with some of our extreme use cases, we
> found
> > > out
> > > > > that
> > > > > > using a slice-based implementation can optimize Flink's windowing
> > > > > mechanism
> > > > > > and provide a better performance in most cases.
> > > > > >
> > > > > > We've put together a preliminary enhancement and performance
> > > > optimization
> > > > > > plan [1] for the current windowing operation in Flink. This is
> > > largely
> > > > > > inspired by stream slicing research shared in recent Flink
> Forward
> > > > > > conference [3] by Philip and Jonas, and the discussion in the
> main
> > > JIRA
> > > > > > ticket [2]. The initial design and POC implementations consider
> > > > > optimizing
> > > > > > the performance for the category of overlapping windows as well
> as
> > > > > allowing
> > > > > > chaining of cascade window operators.
> > > > > >
> > > > > > It will be great to hear the feedbacks and suggestions from the
> > > > > community.
> > > > > > Please kindly share your comments and suggestions.
> > > > > >
> > > > > > Thanks,
> > > > > > Rong
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001
> > > > > > [3]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to