Hi Dian,

I still think your implementation is similar to the window operator, you
mentioned the scalable trigger mechanism, the window API also can customize
trigger.

Moreover, IMO, the design should guarantee a deterministic semantics, I
think based on memory availability is a non-deterministic design.

In addition, if the implementation depends on the timing of checkpoint, I
do not think it is reasonable, we should avoid affecting the checkpoint's
progress.

Best,
Vino

Dian Fu <dian0511...@gmail.com> 于2019年6月5日周三 下午1:55写道:

> Hi Vino,
>
> Thanks a lot for your reply.
>
> > 1) When, Why and How to judge the memory is exhausted?
>
> My point here is that the local aggregate operator can buffer the inputs
> in memory and send out the results AT ANY TIME. i.e. element count or the
> time interval reached a pre-configured value, the memory usage of buffered
> elements reached a configured valued (suppose we can estimate the object
> size efficiently), or even when checkpoint is triggered.
>
> >
> > 2) If the local aggregate operator rarely needs to operate the state,
> what
> > do you think about fault tolerance?
>
> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` which
> can be used here to send out the results to the downstream when checkpoint
> is triggered. Then fault tolerance can work well.
>
> Even if there is no such a method available, we can still store the
> buffered elements or pre-aggregate results to state when checkpoint is
> triggered. The state access will be much less compared with window operator
> as only the elements not sent out when checkpoint occur have to be written
> to state. Suppose the checkpoint interval is 3 minutes and the trigger
> interval is 10 seconds, then only about less than "10/180" elements will be
> written to state.
>
>
> Thanks,
> Dian
>
>
> > 在 2019年6月5日,上午11:43,Biao Liu <mmyy1...@gmail.com> 写道:
> >
> > Hi Vino,
> >
> > +1 for this feature. It's useful for data skew. And it could also reduce
> > shuffled datum.
> >
> > I have some concerns about the API part. From my side, this feature
> should
> > be more like an improvement. I'm afraid the proposal is an overkill about
> > the API part. Many other systems support pre-aggregation as an
> optimization
> > of global aggregation. The optimization might be used automatically or
> > manually but with a simple API. The proposal introduces a series of
> > flexible local aggregation APIs. They could be independent with global
> > aggregation. It doesn't look like an improvement but introduces a lot of
> > features. I'm not sure if there is a bigger picture later. As for now the
> > API part looks a little heavy for me.
> >
> >
> > vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 上午10:38写道:
> >
> >> Hi Litree,
> >>
> >> From an implementation level, the localKeyBy API returns a general
> >> KeyedStream, you can call all the APIs which KeyedStream provides, we
> did
> >> not restrict its usage, although we can do this (for example returns a
> new
> >> stream object named LocalKeyedStream).
> >>
> >> However, to achieve the goal of local aggregation, it only makes sense
> to
> >> call the window API.
> >>
> >> Best,
> >> Vino
> >>
> >> litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道:
> >>
> >>> Hi Vino,
> >>>
> >>>
> >>> I have read your design,something I want to know is the usage of these
> >> new
> >>> APIs.It looks like when I use localByKey,i must then use a window
> >> operator
> >>> to return a datastream,and then use keyby and another window operator
> to
> >>> get the final result?
> >>>
> >>>
> >>> thanks,
> >>> Litree
> >>>
> >>>
> >>> On 06/04/2019 17:22, vino yang wrote:
> >>> Hi Dian,
> >>>
> >>> Thanks for your reply.
> >>>
> >>> I know what you mean. However, if you think deeply, you will find your
> >>> implementation need to provide an operator which looks like a window
> >>> operator. You need to use state and receive aggregation function and
> >>> specify the trigger time. It looks like a lightweight window operator.
> >>> Right?
> >>>
> >>> We try to reuse Flink provided functions and reduce complexity. IMO, It
> >> is
> >>> more user-friendly because users are familiar with the window API.
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>>
> >>> Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道:
> >>>
> >>>> Hi Vino,
> >>>>
> >>>> Thanks a lot for starting this discussion. +1 to this feature as I
> >> think
> >>>> it will be very useful.
> >>>>
> >>>> Regarding to using window to buffer the input elements, personally I
> >>> don't
> >>>> think it's a good solution for the following reasons:
> >>>> 1) As we know that WindowOperator will store the accumulated results
> in
> >>>> states, this is not necessary for Local Aggregate operator.
> >>>> 2) For WindowOperator, each input element will be accumulated to
> >> states.
> >>>> This is also not necessary for Local Aggregate operator and storing
> the
> >>>> input elements in memory is enough.
> >>>>
> >>>> Thanks,
> >>>> Dian
> >>>>
> >>>>> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道:
> >>>>>
> >>>>> Hi Ken,
> >>>>>
> >>>>> Thanks for your reply.
> >>>>>
> >>>>> As I said before, we try to reuse Flink's state concept (fault
> >>> tolerance
> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider
> >> cache.
> >>>>>
> >>>>> In addition, if we use Flink's state, the OOM related issue is not a
> >>> key
> >>>>> problem we need to consider.
> >>>>>
> >>>>> Best,
> >>>>> Vino
> >>>>>
> >>>>> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> Cascading implemented this “map-side reduce” functionality with an
> >> LLR
> >>>>>> cache.
> >>>>>>
> >>>>>> That worked well, as then the skewed keys would always be in the
> >>> cache.
> >>>>>>
> >>>>>> The API let you decide the size of the cache, in terms of number of
> >>>>>> entries.
> >>>>>>
> >>>>>> Having a memory limit would have been better for many of our use
> >>> cases,
> >>>>>> though FWIR there’s no good way to estimate in-memory size for
> >>> objects.
> >>>>>>
> >>>>>> — Ken
> >>>>>>
> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com>
> >> wrote:
> >>>>>>>
> >>>>>>> Hi Piotr,
> >>>>>>>
> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just
> >> added
> >>> an
> >>>>>>> inner flag to identify the local mode) which is Flink has provided
> >>>>>> before.
> >>>>>>> Users can call all the APIs(especially *window* APIs) which
> >>> KeyedStream
> >>>>>>> provided.
> >>>>>>>
> >>>>>>> So if users want to use local aggregation, they should call the
> >>> window
> >>>>>> API
> >>>>>>> to build a local window that means users should (or say "can")
> >>> specify
> >>>>>> the
> >>>>>>> window length and other information based on their needs.
> >>>>>>>
> >>>>>>> I think you described another idea different from us. We did not
> >> try
> >>> to
> >>>>>>> react after triggering some predefined threshold. We tend to give
> >>> users
> >>>>>> the
> >>>>>>> discretion to make decisions.
> >>>>>>>
> >>>>>>> Our design idea tends to reuse Flink provided concept and functions
> >>>> like
> >>>>>>> state and window (IMO, we do not need to worry about OOM and the
> >>> issues
> >>>>>> you
> >>>>>>> mentioned).
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Vino
> >>>>>>>
> >>>>>>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar
> >>>> feature
> >>>>>>>> quite some time ago, but didn’t get enough traction [1].
> >>>>>>>>
> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning
> >>>>>>>> anywhere, when the pre aggregated result should be emitted down
> >> the
> >>>>>> stream?
> >>>>>>>> I think that’s one of the most crucial decision, since wrong
> >>> decision
> >>>>>> here
> >>>>>>>> can lead to decrease of performance or to an explosion of
> >>> memory/state
> >>>>>>>> consumption (both with bounded and unbounded data streams). For
> >>>>>> streaming
> >>>>>>>> it can also lead to an increased latency.
> >>>>>>>>
> >>>>>>>> Since this is also a decision that’s impossible to make
> >>> automatically
> >>>>>>>> perfectly reliably, first and foremost I would expect this to be
> >>>>>>>> configurable via the API. With maybe some predefined triggers,
> >> like
> >>> on
> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to
> >>>> decrease
> >>>>>>>> state size?), on element count, maybe memory usage (much easier to
> >>>>>> estimate
> >>>>>>>> with a known/predefined types, like in SQL)… and with some option
> >> to
> >>>>>>>> implement custom trigger.
> >>>>>>>>
> >>>>>>>> Also what would work the best would be to have a some form of
> >> memory
> >>>>>>>> consumption priority. For example if we are running out of memory
> >>> for
> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or
> >> crashing
> >>>> the
> >>>>>> job
> >>>>>>>> with OOM it would be probably better to prune/dump the pre/local
> >>>>>>>> aggregation state. But that’s another story.
> >>>>>>>>
> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 <
> >>>>>>>> https://github.com/apache/flink/pull/4626>
> >>>>>>>>
> >>>>>>>> Piotrek
> >>>>>>>>
> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Excited and  Big +1 for this feature.
> >>>>>>>>>
> >>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
> >>>>>>>>>
> >>>>>>>>>> Nice feature.
> >>>>>>>>>> Looking forward to having it in Flink.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Xiaogang
> >>>>>>>>>>
> >>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
> >>>>>>>>>>
> >>>>>>>>>>> Hi all,
> >>>>>>>>>>>
> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF
> >> 2019
> >>>> and
> >>>>>>>>>> QCon
> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in
> >> our
> >>>>>> inner
> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew.
> >>>>>>>>>>>
> >>>>>>>>>>> Currently, keyed streams are widely used to perform aggregating
> >>>>>>>>>> operations
> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the
> >>> same
> >>>>>>>> key.
> >>>>>>>>>>> When executed at runtime, the elements with the same key will
> >> be
> >>>> sent
> >>>>>>>> to
> >>>>>>>>>>> and aggregated by the same task.
> >>>>>>>>>>>
> >>>>>>>>>>> The performance of these aggregating operations is very
> >> sensitive
> >>>> to
> >>>>>>>> the
> >>>>>>>>>>> distribution of keys. In the cases where the distribution of
> >> keys
> >>>>>>>>>> follows a
> >>>>>>>>>>> powerful law, the performance will be significantly downgraded.
> >>>> More
> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help
> >>> when
> >>>> a
> >>>>>>>> task
> >>>>>>>>>>> is overloaded by a single key.
> >>>>>>>>>>>
> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the
> >>>>>> performance
> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating
> >>> operations
> >>>>>> into
> >>>>>>>>>> two
> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the
> >> same
> >>>> key
> >>>>>>>> at
> >>>>>>>>>>> the sender side to obtain partial results. Then at the second
> >>>> phase,
> >>>>>>>>>> these
> >>>>>>>>>>> partial results are sent to receivers according to their keys
> >> and
> >>>> are
> >>>>>>>>>>> combined to obtain the final result. Since the number of
> >> partial
> >>>>>>>> results
> >>>>>>>>>>> received by each receiver is limited by the number of senders,
> >>> the
> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing
> >>> the
> >>>>>>>> amount
> >>>>>>>>>>> of transferred data the performance can be further improved.
> >>>>>>>>>>>
> >>>>>>>>>>> The design documentation is here:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>>>>>>>>>>
> >>>>>>>>>>> Any comment and feedback are welcome and appreciated.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Vino
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>> --------------------------
> >>>>>> Ken Krugler
> >>>>>> +1 530-210-6378
> >>>>>> http://www.scaleunlimited.com
> >>>>>> Custom big data solutions & training
> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >>
>
>

Reply via email to