hi

+1,nice work

best
forwardxu

boshu Zheng <kisim...@163.com> 于2019年6月6日周四 下午4:30写道:

> Hi,
>
>
>   +1 from my side. Looking forward to this must-have feature :)
>
>
> Best,
> boshu
>
> At 2019-06-05 16:33:13, "vino yang" <yanghua1...@gmail.com> wrote:
> >Hi Aljoscha,
> >
> >What do you think about this feature and design document?
> >
> >Best,
> >Vino
> >
> >vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 下午4:18写道:
> >
> >> Hi Dian,
> >>
> >> I still think your implementation is similar to the window operator, you
> >> mentioned the scalable trigger mechanism, the window API also can
> customize
> >> trigger.
> >>
> >> Moreover, IMO, the design should guarantee a deterministic semantics, I
> >> think based on memory availability is a non-deterministic design.
> >>
> >> In addition, if the implementation depends on the timing of checkpoint,
> I
> >> do not think it is reasonable, we should avoid affecting the
> checkpoint's
> >> progress.
> >>
> >> Best,
> >> Vino
> >>
> >> Dian Fu <dian0511...@gmail.com> 于2019年6月5日周三 下午1:55写道:
> >>
> >>> Hi Vino,
> >>>
> >>> Thanks a lot for your reply.
> >>>
> >>> > 1) When, Why and How to judge the memory is exhausted?
> >>>
> >>> My point here is that the local aggregate operator can buffer the
> inputs
> >>> in memory and send out the results AT ANY TIME. i.e. element count or
> the
> >>> time interval reached a pre-configured value, the memory usage of
> buffered
> >>> elements reached a configured valued (suppose we can estimate the
> object
> >>> size efficiently), or even when checkpoint is triggered.
> >>>
> >>> >
> >>> > 2) If the local aggregate operator rarely needs to operate the state,
> >>> what
> >>> > do you think about fault tolerance?
> >>>
> >>> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier`
> >>> which can be used here to send out the results to the downstream when
> >>> checkpoint is triggered. Then fault tolerance can work well.
> >>>
> >>> Even if there is no such a method available, we can still store the
> >>> buffered elements or pre-aggregate results to state when checkpoint is
> >>> triggered. The state access will be much less compared with window
> operator
> >>> as only the elements not sent out when checkpoint occur have to be
> written
> >>> to state. Suppose the checkpoint interval is 3 minutes and the trigger
> >>> interval is 10 seconds, then only about less than "10/180" elements
> will be
> >>> written to state.
> >>>
> >>>
> >>> Thanks,
> >>> Dian
> >>>
> >>>
> >>> > 在 2019年6月5日,上午11:43,Biao Liu <mmyy1...@gmail.com> 写道:
> >>> >
> >>> > Hi Vino,
> >>> >
> >>> > +1 for this feature. It's useful for data skew. And it could also
> reduce
> >>> > shuffled datum.
> >>> >
> >>> > I have some concerns about the API part. From my side, this feature
> >>> should
> >>> > be more like an improvement. I'm afraid the proposal is an overkill
> >>> about
> >>> > the API part. Many other systems support pre-aggregation as an
> >>> optimization
> >>> > of global aggregation. The optimization might be used automatically
> or
> >>> > manually but with a simple API. The proposal introduces a series of
> >>> > flexible local aggregation APIs. They could be independent with
> global
> >>> > aggregation. It doesn't look like an improvement but introduces a
> lot of
> >>> > features. I'm not sure if there is a bigger picture later. As for now
> >>> the
> >>> > API part looks a little heavy for me.
> >>> >
> >>> >
> >>> > vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 上午10:38写道:
> >>> >
> >>> >> Hi Litree,
> >>> >>
> >>> >> From an implementation level, the localKeyBy API returns a general
> >>> >> KeyedStream, you can call all the APIs which KeyedStream provides,
> we
> >>> did
> >>> >> not restrict its usage, although we can do this (for example
> returns a
> >>> new
> >>> >> stream object named LocalKeyedStream).
> >>> >>
> >>> >> However, to achieve the goal of local aggregation, it only makes
> sense
> >>> to
> >>> >> call the window API.
> >>> >>
> >>> >> Best,
> >>> >> Vino
> >>> >>
> >>> >> litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道:
> >>> >>
> >>> >>> Hi Vino,
> >>> >>>
> >>> >>>
> >>> >>> I have read your design,something I want to know is the usage of
> these
> >>> >> new
> >>> >>> APIs.It looks like when I use localByKey,i must then use a window
> >>> >> operator
> >>> >>> to return a datastream,and then use keyby and another window
> operator
> >>> to
> >>> >>> get the final result?
> >>> >>>
> >>> >>>
> >>> >>> thanks,
> >>> >>> Litree
> >>> >>>
> >>> >>>
> >>> >>> On 06/04/2019 17:22, vino yang wrote:
> >>> >>> Hi Dian,
> >>> >>>
> >>> >>> Thanks for your reply.
> >>> >>>
> >>> >>> I know what you mean. However, if you think deeply, you will find
> your
> >>> >>> implementation need to provide an operator which looks like a
> window
> >>> >>> operator. You need to use state and receive aggregation function
> and
> >>> >>> specify the trigger time. It looks like a lightweight window
> operator.
> >>> >>> Right?
> >>> >>>
> >>> >>> We try to reuse Flink provided functions and reduce complexity.
> IMO,
> >>> It
> >>> >> is
> >>> >>> more user-friendly because users are familiar with the window API.
> >>> >>>
> >>> >>> Best,
> >>> >>> Vino
> >>> >>>
> >>> >>>
> >>> >>> Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道:
> >>> >>>
> >>> >>>> Hi Vino,
> >>> >>>>
> >>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I
> >>> >> think
> >>> >>>> it will be very useful.
> >>> >>>>
> >>> >>>> Regarding to using window to buffer the input elements,
> personally I
> >>> >>> don't
> >>> >>>> think it's a good solution for the following reasons:
> >>> >>>> 1) As we know that WindowOperator will store the accumulated
> results
> >>> in
> >>> >>>> states, this is not necessary for Local Aggregate operator.
> >>> >>>> 2) For WindowOperator, each input element will be accumulated to
> >>> >> states.
> >>> >>>> This is also not necessary for Local Aggregate operator and
> storing
> >>> the
> >>> >>>> input elements in memory is enough.
> >>> >>>>
> >>> >>>> Thanks,
> >>> >>>> Dian
> >>> >>>>
> >>> >>>>> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道:
> >>> >>>>>
> >>> >>>>> Hi Ken,
> >>> >>>>>
> >>> >>>>> Thanks for your reply.
> >>> >>>>>
> >>> >>>>> As I said before, we try to reuse Flink's state concept (fault
> >>> >>> tolerance
> >>> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider
> >>> >> cache.
> >>> >>>>>
> >>> >>>>> In addition, if we use Flink's state, the OOM related issue is
> not a
> >>> >>> key
> >>> >>>>> problem we need to consider.
> >>> >>>>>
> >>> >>>>> Best,
> >>> >>>>> Vino
> >>> >>>>>
> >>> >>>>> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:
> >>> >>>>>
> >>> >>>>>> Hi all,
> >>> >>>>>>
> >>> >>>>>> Cascading implemented this “map-side reduce” functionality with
> an
> >>> >> LLR
> >>> >>>>>> cache.
> >>> >>>>>>
> >>> >>>>>> That worked well, as then the skewed keys would always be in the
> >>> >>> cache.
> >>> >>>>>>
> >>> >>>>>> The API let you decide the size of the cache, in terms of
> number of
> >>> >>>>>> entries.
> >>> >>>>>>
> >>> >>>>>> Having a memory limit would have been better for many of our use
> >>> >>> cases,
> >>> >>>>>> though FWIR there’s no good way to estimate in-memory size for
> >>> >>> objects.
> >>> >>>>>>
> >>> >>>>>> — Ken
> >>> >>>>>>
> >>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com>
> >>> >> wrote:
> >>> >>>>>>>
> >>> >>>>>>> Hi Piotr,
> >>> >>>>>>>
> >>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just
> >>> >> added
> >>> >>> an
> >>> >>>>>>> inner flag to identify the local mode) which is Flink has
> provided
> >>> >>>>>> before.
> >>> >>>>>>> Users can call all the APIs(especially *window* APIs) which
> >>> >>> KeyedStream
> >>> >>>>>>> provided.
> >>> >>>>>>>
> >>> >>>>>>> So if users want to use local aggregation, they should call the
> >>> >>> window
> >>> >>>>>> API
> >>> >>>>>>> to build a local window that means users should (or say "can")
> >>> >>> specify
> >>> >>>>>> the
> >>> >>>>>>> window length and other information based on their needs.
> >>> >>>>>>>
> >>> >>>>>>> I think you described another idea different from us. We did
> not
> >>> >> try
> >>> >>> to
> >>> >>>>>>> react after triggering some predefined threshold. We tend to
> give
> >>> >>> users
> >>> >>>>>> the
> >>> >>>>>>> discretion to make decisions.
> >>> >>>>>>>
> >>> >>>>>>> Our design idea tends to reuse Flink provided concept and
> >>> functions
> >>> >>>> like
> >>> >>>>>>> state and window (IMO, we do not need to worry about OOM and
> the
> >>> >>> issues
> >>> >>>>>> you
> >>> >>>>>>> mentioned).
> >>> >>>>>>>
> >>> >>>>>>> Best,
> >>> >>>>>>> Vino
> >>> >>>>>>>
> >>> >>>>>>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
> >>> >>>>>>>
> >>> >>>>>>>> Hi,
> >>> >>>>>>>>
> >>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add
> similar
> >>> >>>> feature
> >>> >>>>>>>> quite some time ago, but didn’t get enough traction [1].
> >>> >>>>>>>>
> >>> >>>>>>>> I’ve read through your document and I couldn’t find it
> mentioning
> >>> >>>>>>>> anywhere, when the pre aggregated result should be emitted
> down
> >>> >> the
> >>> >>>>>> stream?
> >>> >>>>>>>> I think that’s one of the most crucial decision, since wrong
> >>> >>> decision
> >>> >>>>>> here
> >>> >>>>>>>> can lead to decrease of performance or to an explosion of
> >>> >>> memory/state
> >>> >>>>>>>> consumption (both with bounded and unbounded data streams).
> For
> >>> >>>>>> streaming
> >>> >>>>>>>> it can also lead to an increased latency.
> >>> >>>>>>>>
> >>> >>>>>>>> Since this is also a decision that’s impossible to make
> >>> >>> automatically
> >>> >>>>>>>> perfectly reliably, first and foremost I would expect this to
> be
> >>> >>>>>>>> configurable via the API. With maybe some predefined triggers,
> >>> >> like
> >>> >>> on
> >>> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to
> >>> >>>> decrease
> >>> >>>>>>>> state size?), on element count, maybe memory usage (much
> easier
> >>> to
> >>> >>>>>> estimate
> >>> >>>>>>>> with a known/predefined types, like in SQL)… and with some
> option
> >>> >> to
> >>> >>>>>>>> implement custom trigger.
> >>> >>>>>>>>
> >>> >>>>>>>> Also what would work the best would be to have a some form of
> >>> >> memory
> >>> >>>>>>>> consumption priority. For example if we are running out of
> memory
> >>> >>> for
> >>> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or
> >>> >> crashing
> >>> >>>> the
> >>> >>>>>> job
> >>> >>>>>>>> with OOM it would be probably better to prune/dump the
> pre/local
> >>> >>>>>>>> aggregation state. But that’s another story.
> >>> >>>>>>>>
> >>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 <
> >>> >>>>>>>> https://github.com/apache/flink/pull/4626>
> >>> >>>>>>>>
> >>> >>>>>>>> Piotrek
> >>> >>>>>>>>
> >>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
> >>> >>>>>>>>>
> >>> >>>>>>>>> Excited and  Big +1 for this feature.
> >>> >>>>>>>>>
> >>> >>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
> >>> >>>>>>>>>
> >>> >>>>>>>>>> Nice feature.
> >>> >>>>>>>>>> Looking forward to having it in Flink.
> >>> >>>>>>>>>>
> >>> >>>>>>>>>> Regards,
> >>> >>>>>>>>>> Xiaogang
> >>> >>>>>>>>>>
> >>> >>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
> >>> >>>>>>>>>>
> >>> >>>>>>>>>>> Hi all,
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward
> SF
> >>> >> 2019
> >>> >>>> and
> >>> >>>>>>>>>> QCon
> >>> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation"
> in
> >>> >> our
> >>> >>>>>> inner
> >>> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data
> skew.
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> Currently, keyed streams are widely used to perform
> >>> aggregating
> >>> >>>>>>>>>> operations
> >>> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having
> the
> >>> >>> same
> >>> >>>>>>>> key.
> >>> >>>>>>>>>>> When executed at runtime, the elements with the same key
> will
> >>> >> be
> >>> >>>> sent
> >>> >>>>>>>> to
> >>> >>>>>>>>>>> and aggregated by the same task.
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> The performance of these aggregating operations is very
> >>> >> sensitive
> >>> >>>> to
> >>> >>>>>>>> the
> >>> >>>>>>>>>>> distribution of keys. In the cases where the distribution
> of
> >>> >> keys
> >>> >>>>>>>>>> follows a
> >>> >>>>>>>>>>> powerful law, the performance will be significantly
> >>> downgraded.
> >>> >>>> More
> >>> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not
> help
> >>> >>> when
> >>> >>>> a
> >>> >>>>>>>> task
> >>> >>>>>>>>>>> is overloaded by a single key.
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the
> >>> >>>>>> performance
> >>> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating
> >>> >>> operations
> >>> >>>>>> into
> >>> >>>>>>>>>> two
> >>> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of
> the
> >>> >> same
> >>> >>>> key
> >>> >>>>>>>> at
> >>> >>>>>>>>>>> the sender side to obtain partial results. Then at the
> second
> >>> >>>> phase,
> >>> >>>>>>>>>> these
> >>> >>>>>>>>>>> partial results are sent to receivers according to their
> keys
> >>> >> and
> >>> >>>> are
> >>> >>>>>>>>>>> combined to obtain the final result. Since the number of
> >>> >> partial
> >>> >>>>>>>> results
> >>> >>>>>>>>>>> received by each receiver is limited by the number of
> senders,
> >>> >>> the
> >>> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by
> reducing
> >>> >>> the
> >>> >>>>>>>> amount
> >>> >>>>>>>>>>> of transferred data the performance can be further
> improved.
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> The design documentation is here:
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>
> >>> >>>>>>>>
> >>> >>>>>>
> >>> >>>>
> >>> >>>
> >>> >>
> >>>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated.
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> Best,
> >>> >>>>>>>>>>> Vino
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>
> >>> >>>>>>>>
> >>> >>>>>>>>
> >>> >>>>>>
> >>> >>>>>> --------------------------
> >>> >>>>>> Ken Krugler
> >>> >>>>>> +1 530-210-6378
> >>> >>>>>> http://www.scaleunlimited.com
> >>> >>>>>> Custom big data solutions & training
> >>> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
> >>> >>>>>>
> >>> >>>>>>
> >>> >>>>
> >>> >>>>
> >>> >>>
> >>> >>
> >>>
> >>>
>

Reply via email to