Hi Devs,

Thank you all for the valuable feedbacks and comments in the previous
design doc.
We have currently created an initial design/work plan based on @Kurt's
suggestions as "improvements with the sliding window scenario without
changing/adding new public APIs".

Please kindly take a look at the initial design document here [1]. Any
comments or suggestions are highly appreciated!

Thanks,
Rong

--

[1]
https://docs.google.com/document/d/1CvjPJl1Fm1PCpsuuZ4Qc-p_iUzUosBePX_rWNUt8lRw/edit#

On Thu, Feb 28, 2019 at 2:24 PM Rong Rong <walter...@gmail.com> wrote:

> Hi Kurt,
>
> Thanks for the valuable feedback. I think the suggestions you and Jincheng
> provided are definitely the best execution plan
>
> - Starting with sliding window optimization by exhausting the current
> public API, there are some components we can leverage or directly reuse
> from Blink's window operator [1] implementation.
> - Backward compatibility is definitely an issue as any state changes will
> probably result in a non-trivial state upgrade. I can definitely follow up
> with you on this.
>
> At the same time I think it is also a good idea to summarize all the use
> cases that has been discussed so far. This can be very valuable as a
> reference: To answer the questions "Does the improvements cover
> most use cases?" and when not covered, "whether introduce some new API can
> meet the requirements?".
>
> I will try to convert the current parent JIRA [2] into one that does not
> include public API alternation. As you mentioned this will be much more
> practical way in terms of execution.
>
> Many thanks for the suggestions and guidance!
>
> --
> Rong
>
> [1]
> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java
> [2] https://issues.apache.org/jira/browse/FLINK-11454
>
> On Mon, Feb 25, 2019 at 3:44 AM Kurt Young <ykt...@gmail.com> wrote:
>
>> Hi Rong,
>>
>> Thanks for the detailed summarization! It indeed involves lots of problems
>> and unanswered questions which is i think not practical to
>> solve in one shot. From my point of view, the performance issue with the
>> sliding window is the root one and maybe most possible
>> which user will run into. Thus i think the first question should be
>> answered is:
>> "What kind of improvements we can do with the sliding window scenario?"
>>
>> First, we should try to figure out how many improvements can be done
>> without changing or adding new API. This can reuse the POC you did
>> and the work blink had been done, we can share some ideas and maybe some
>> implementation details. And it already involves lots of efforts
>> even if we only do this one thing. It may introduce some refactory to
>> current window operator, and we should keep it compatible with old
>> version.
>>
>> After this, we can release it in next version and gather some users
>> feedbacks. We can further answer the question: "Does the improvements
>> cover
>> most use cases? Are there any critical ones which is impossible to do with
>> current window operator?". At that time, we can open the discussions to
>> introduce some new API to meet the requirements.
>>
>> It will introduce more work than improve window operator internally when
>> we
>> decide to add new APIs, which you have covered a lot in your proposal.
>> Actually, the approaches you proposed looks good to me, take it step by
>> step is a more practical way.
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <walter...@gmail.com> wrote:
>>
>> > Hi All,
>> >
>> > Thanks for sharing feedbacks for the window optimization design doc and
>> on
>> > the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very
>> > valuable feedbacks and we will try to incorporate them in the next step.
>> >
>> > There were several revision done for the current design doc, and several
>> > POCs being developed since we initially shared the document. Thus, some
>> of
>> > the following might’ve already been addressed in other places. However,
>> I
>> > would still like to summarize them since these points were raised up in
>> > various scenarios.
>> >
>> > 1) Scope of the window optimization using slicing.
>> >
>> > The original scope of the doc was to address the problem of
>> > element-to-window duplication when sliding window goes across wide range
>> > with narrow slides (with or w/o efficient partial aggregations).
>> However,
>> > as we develop the 2 POCs [1,2] we found out there’s really no
>> > one-solution-fits-all approach to how this can be optimized (same
>> > observation as Kurt and Jark mentioned). Thus some further expansion of
>> the
>> > scope was done:
>> >
>> > 1a). Directly improving WindowOperator([1, 3])
>> >
>> > In the design doc, the PartialWindowFunction was designed as a cue for
>> > WindowOperator to choose how to optimize the sliding window. This was
>> not
>> > working well because: how efficient window operator process messages
>> > depends on: 1. the pattern of the window; 2. the pattern of the incoming
>> > traffic; and 3. the complexity of the window process function.
>> >
>> > The idea of having users to specify the complexity of each API (e.g.
>> > accumulate, merge, retract) can help the system choose the path of
>> > optimization, however as this stage the it is hard to utilize this
>> > information.
>> >
>> > 1b). Explicitly allow slicing and merging steps([2]), or more generic
>> > window chaining:
>> >
>> > With the second POC, it is users' responsibility to create an efficient
>> > window operator(s), since now user can control the state and the process
>> > individually. This opens up more possibilities: e.g. unaligned /
>> > multi-timeout session window, cascade windows.
>> >
>> > However, with this API change, users are given much more flexibilities.
>> >
>> > In conclusion to (1), I think one of the clear feedbacks is that this
>> > effort goes beyond just improving the sliding window performance. If we
>> > think it is the right way to expand the window API, there should be a
>> > proper API change discussion: As Kurt mentioned, if any of these cases
>> can
>> > be covered with existing API, we should always exhaust that first.
>> (Further
>> > discussion in (2))
>> >
>> >
>> > 2). The motivation of changing the public DataStream APIs related to
>> > window-based processing.
>> >
>> > As Jincheng and Kurt mentioned, utilizing the current windowed stream
>> API
>> > can achieve some of the results. For example, using a tumble follow by a
>> > sliding window can reduce the amount of duplication since the tumbling
>> > window results will be passed to the sliding window as one element.
>> > However, when we experiment, the are several problems I observed:
>> >
>> > 2a). rehashing with extra keyBy
>> >
>> > The limitation requires a separate set of keyBy following the tumble
>> > result, which creates unnecessary complexity. One work around was to use
>> > "reinterpretAsKeyedStream" as Fabian mentioned, however there are many
>> > investigation to be done to ensure it works well (e.g. how does the
>> keyed
>> > state store work since the key was not sets as a part of the namespace
>> > within WindowOperator)
>> >
>> > 2b). Handling iterable elements
>> >
>> > Another problem is if there’s no efficient partial results produce by
>> the
>> > tumbling window, the output element (as an iterable) will still be
>> > duplicated into each of the downstream sliding window states.
>> >
>> > However, this is actually a tricky problem by itself, further
>> discussion in
>> > (2d) for this piece.
>> >
>> > 2c). Additional support for some of the more complex use cases
>> >
>> > As described in (1b), with the explicit control of the window state and
>> the
>> > window function, users can control how the optimization of the windowing
>> > can be done. One will argue that these type of optimization can also be
>> > done using ProcessFunction or other rich functions.
>> >
>> > However, if we take a look at how the WindowOperator is currently
>> > structured, it also has explicit aggregation + windowing APIs [4]
>> presented
>> > for the WindowedStream.
>> >
>> > Another point is opening up: multi-slide windowing and multi-timeout
>> window
>> > described in the presentation [5] in FF.
>> >
>> > I would also like to raise a point that the way how the current POC [2]
>> > changes the public API is just a proof of concept that the flexibilities
>> > opens more complex use cases. It is by no means the final design and I
>> > agree that there could be more-aggressive-than-necessary changes since
>> it’s
>> > meant for a POC.
>> >
>> > 2d). The support for same-window operator (similar to Table/SQL
>> > OVER-aggregate)
>> >
>> > As discussed with Jincheng in a separate thread. Supporting the same
>> window
>> > operator similar to how OVER-aggregate was achieved in Table/SQL API.
>> It is
>> > definitely useful in some of the window use cases (2b). For example, A
>> > process function without efficient partial aggregation results on
>> sliding
>> > window can be very efficiently evaluated since both add and retract
>> > operation on an iterable list is cheap.
>> >
>> > There are some fundamental difference between the current window
>> operator
>> > vs the same-window operator concept, for example the trigger mechanism
>> is
>> > entirely different; and it will be a challenge to incorporate
>> > OVER-aggregate of TableAPI since there are many table specific
>> components
>> > in the bounded/unbounded over aggregates. However, I am seeing this as
>> an
>> > implementation details (further discussion in (3a)).
>> >
>> > In conclusion to (2), I think the discussions remains whether to expand
>> the
>> > API if we introduce sliding: if no, whether we can exhaust all the use
>> > cases with current API; if yes, how that extra API will look like.
>> > Especially the same-window operation in the DataStream API. (Utilizing
>> > whatever we already have in Table/SQL is one solution).
>> >
>> >
>> > 3). Incorporating Existing POCs and Implementations (e.g. Blink’s
>> > WindowOperator & SQL/Table API)
>> >
>> > There are already some implementations exist resembles the idea of
>> slicing.
>> > For example Blink’s WindowOperator [3]. My thinking is once we agree on
>> the
>> > API and the public facing changes, we should reuse as much of the
>> current
>> > implementations as possible, since Blink has been running in production
>> for
>> > years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ).
>> >
>> >
>> > Regarding the implementation details and some existing approaches.
>> >
>> > 3a). Over-aggregate vs. window-operator.
>> >
>> > Over aggregate has already been implemented in Table/SQL API. Unlike
>> window
>> > operators, over aggregate utilizes the process function and a general
>> > process operator approach.
>> >
>> > Further looking into how the current window operator works comparing
>> with
>> > the over aggregate, there are many *common implementations*. For
>> example,
>> > the window state (especially the list-based approach); the triggering
>> > mechanism / timer service (especially on rowtime); and the “evicting
>> > policies”.
>> >
>> > I was wondering if it is possible to consider, on an operator level,
>> > support: 1. The same-window operator concept as a generic case of the
>> > over-aggregate, and 2. Create a more comment window operator base that
>> > generalize the state, trigger and eviction context.
>> >
>> > 3b). The support for partial process function: retracting vs. merge.
>> >
>> > Supporting retracting operations is definitely a plus in some use cases,
>> > for example like Fabian suggested in the JIRA discussion [6]. Currently
>> > this already exist in the Table/SQL API aggregation function.
>> >
>> > 3c). States and Chaining Strategy
>> >
>> > With the chaining strategy [7] in the operators, I was wondering whether
>> > there’s performance consideration when implementing the sliding window
>> as
>> > one operator, or multiple operator chaining together. One question I
>> think
>> > worth investigating is what is the most efficient way of storing the
>> > slicing information like Jark suggested in the JIRA discussion [8].
>> >
>> >
>> > As I tried to capture as many discussions here as possible, I can hardly
>> > imagine that we’ve address all of these questions. So please share your
>> > comment and feedbacks as they are highly appreciated! I can definitely
>> > volunteer will start a design/discussion doc for this effort if everyone
>> > thinks it is suitable to move forward.
>> >
>> >
>> > Thanks,
>> >
>> > Rong
>> >
>> >
>> >
>> > [1]
>> >
>> >
>> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m
>> >
>> > [2]
>> >
>> >
>> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y
>> >
>> > [3]
>> >
>> >
>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java
>> >
>> > [4]
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction
>> >
>> > [5]
>> >
>> >
>> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
>> >
>> > [6] https://issues.apache.org/jira/browse/FLINK-11454
>> >
>> > [7]
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
>> > [8] https://issues.apache.org/jira/browse/FLINK-7001
>> >
>> > On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <ykt...@gmail.com> wrote:
>> >
>> > > Hi Rong,
>> > >
>> > > Thanks for the improvement proposal, this topic aroused my interest
>> since
>> > > we did some similar improvements in Blink.
>> > > After going through your design doc, i would like share some thoughts:
>> > >
>> > > 1. It looks to me the proposed SliceManager and MergeManager is quite
>> > > generic and can be used in various situations like unaligned windows.
>> > > However
>> > > this will introduce some new API and some new stream operators, and
>> thus
>> > > make is a big effort to do. I suppose the main reason to introduce
>> these
>> > > concepts is
>> > > we want to support some complex scenarios like unaligned windows,
>> multi
>> > > timeout session windows and so on. But if let's go back to the
>> original
>> > > motivation of this
>> > > improvement, it will be "Highly overlapping window operation" and
>> "Window
>> > > functions with efficient methods calculating and merging partial
>> > results",
>> > > just as you mentioned
>> > > in the section "Target Usage Pattern". I'm curious if it's necessary
>> to
>> > > introduce all these new APIs and operators to meet these requirements.
>> > If i
>> > > understand you correctly,
>> > > we may even can achieve the goal by using two successive windows,
>> first
>> > > with tumbling window, and then sliding window. And from the
>> experiences
>> > we
>> > > had in Blink, by adding
>> > > some enhancements to original window operator may also be sufficient.
>> > Does
>> > > it make sense to you if we can first try to improve the target
>> scenario
>> > > without adding new APIs and
>> > >  operators? It will definitely make it more easy to review and merged.
>> > >
>> > > 2. From what we observed after Blink did similar improvements, slice
>> or
>> > > pane based improvement is not a silver bullet for all kinds of
>> > situations.
>> > > In most cases, we only
>> > > observed small performance gain like 20% or 30%. The main reason is,
>> if
>> > we
>> > > want get big gain from this improvement, normally we require the
>> window
>> > > function have efficient
>> > > calculating and merging methods, like SUM, COUNT. If this assumption
>> > > stands, the basic approach of window operator will also be
>> performant. It
>> > > stored compact intermedia result for
>> > > each window per key, and it's also very efficient to get result based
>> on
>> > > this. The only downside is we need to do the calculation in multiply
>> > > windows per key, but based on the
>> > > assumption, the calculation is efficient and may not be a blocker.
>> Based
>> > on
>> > > this, i think we must be more careful about the changes, especially
>> API
>> > > related.
>> > >
>> > > 3. The last thing in my mind is whether we can share some
>> implementations
>> > > with blink's window operator. We introduced a new window operator in
>> > Blink
>> > > SQL, which i think needs to be
>> > > discussed when we try to adopt blink improvements. Without strong
>> > reasons,
>> > > i think it's better to share some implementations with DataStream's
>> > window
>> > > operator. Maybe we don't need
>> > >  to share the whole operator, but at least some underlying concepts
>> and
>> > > data structures. It would also be great if you can think about it and
>> see
>> > > if it's a feasible way.
>> > >
>> > > Last small question: how do you handle sliding windows whose window
>> size
>> > > can't be divided by step, such as 10 seconds window and slide with 3
>> > > seconds?
>> > >
>> > > Best,
>> > > Kurt
>> > >
>> > >
>> > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <fhue...@gmail.com>
>> wrote:
>> > >
>> > > > Thank you Rong!
>> > > > The performance of sliding windows is an issue for many users.
>> > > > Adding support for a more efficient window is a great effort.
>> > > >
>> > > > Thank you,
>> > > > Fabian
>> > > >
>> > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <
>> > > walter...@gmail.com
>> > > > >:
>> > > >
>> > > > > Hi all,
>> > > > >
>> > > > > Thanks for the feedbacks and suggestions to the design doc. I have
>> > > > created
>> > > > > a parent JIRA [1] to track the related tasks and started the
>> > > > implementation
>> > > > > process
>> > > > > Any further feedbacks or suggestions are highly appreciated.
>> > > > >
>> > > > > Best,
>> > > > > Rong
>> > > > >
>> > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276
>> > > > >
>> > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <walter...@gmail.com>
>> > wrote:
>> > > > >
>> > > > > > Hi all,
>> > > > > >
>> > > > > > Various discussion in the mailing list & JIRA tickets [2] had
>> been
>> > > > > brought
>> > > > > > up in the past regarding the windowing operation performance.
>> As we
>> > > > > > experiment internally with some of our extreme use cases, we
>> found
>> > > out
>> > > > > that
>> > > > > > using a slice-based implementation can optimize Flink's
>> windowing
>> > > > > mechanism
>> > > > > > and provide a better performance in most cases.
>> > > > > >
>> > > > > > We've put together a preliminary enhancement and performance
>> > > > optimization
>> > > > > > plan [1] for the current windowing operation in Flink. This is
>> > > largely
>> > > > > > inspired by stream slicing research shared in recent Flink
>> Forward
>> > > > > > conference [3] by Philip and Jonas, and the discussion in the
>> main
>> > > JIRA
>> > > > > > ticket [2]. The initial design and POC implementations consider
>> > > > > optimizing
>> > > > > > the performance for the category of overlapping windows as well
>> as
>> > > > > allowing
>> > > > > > chaining of cascade window operators.
>> > > > > >
>> > > > > > It will be great to hear the feedbacks and suggestions from the
>> > > > > community.
>> > > > > > Please kindly share your comments and suggestions.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Rong
>> > > > > >
>> > > > > > [1]
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
>> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001
>> > > > > > [3]
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to