Hi, 

  +1 from my side. Looking forward to this must-have feature :)


Best,
boshu

At 2019-06-05 16:33:13, "vino yang" <yanghua1...@gmail.com> wrote:
>Hi Aljoscha,
>
>What do you think about this feature and design document?
>
>Best,
>Vino
>
>vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 下午4:18写道:
>
>> Hi Dian,
>>
>> I still think your implementation is similar to the window operator, you
>> mentioned the scalable trigger mechanism, the window API also can customize
>> trigger.
>>
>> Moreover, IMO, the design should guarantee a deterministic semantics, I
>> think based on memory availability is a non-deterministic design.
>>
>> In addition, if the implementation depends on the timing of checkpoint, I
>> do not think it is reasonable, we should avoid affecting the checkpoint's
>> progress.
>>
>> Best,
>> Vino
>>
>> Dian Fu <dian0511...@gmail.com> 于2019年6月5日周三 下午1:55写道:
>>
>>> Hi Vino,
>>>
>>> Thanks a lot for your reply.
>>>
>>> > 1) When, Why and How to judge the memory is exhausted?
>>>
>>> My point here is that the local aggregate operator can buffer the inputs
>>> in memory and send out the results AT ANY TIME. i.e. element count or the
>>> time interval reached a pre-configured value, the memory usage of buffered
>>> elements reached a configured valued (suppose we can estimate the object
>>> size efficiently), or even when checkpoint is triggered.
>>>
>>> >
>>> > 2) If the local aggregate operator rarely needs to operate the state,
>>> what
>>> > do you think about fault tolerance?
>>>
>>> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier`
>>> which can be used here to send out the results to the downstream when
>>> checkpoint is triggered. Then fault tolerance can work well.
>>>
>>> Even if there is no such a method available, we can still store the
>>> buffered elements or pre-aggregate results to state when checkpoint is
>>> triggered. The state access will be much less compared with window operator
>>> as only the elements not sent out when checkpoint occur have to be written
>>> to state. Suppose the checkpoint interval is 3 minutes and the trigger
>>> interval is 10 seconds, then only about less than "10/180" elements will be
>>> written to state.
>>>
>>>
>>> Thanks,
>>> Dian
>>>
>>>
>>> > 在 2019年6月5日,上午11:43,Biao Liu <mmyy1...@gmail.com> 写道:
>>> >
>>> > Hi Vino,
>>> >
>>> > +1 for this feature. It's useful for data skew. And it could also reduce
>>> > shuffled datum.
>>> >
>>> > I have some concerns about the API part. From my side, this feature
>>> should
>>> > be more like an improvement. I'm afraid the proposal is an overkill
>>> about
>>> > the API part. Many other systems support pre-aggregation as an
>>> optimization
>>> > of global aggregation. The optimization might be used automatically or
>>> > manually but with a simple API. The proposal introduces a series of
>>> > flexible local aggregation APIs. They could be independent with global
>>> > aggregation. It doesn't look like an improvement but introduces a lot of
>>> > features. I'm not sure if there is a bigger picture later. As for now
>>> the
>>> > API part looks a little heavy for me.
>>> >
>>> >
>>> > vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 上午10:38写道:
>>> >
>>> >> Hi Litree,
>>> >>
>>> >> From an implementation level, the localKeyBy API returns a general
>>> >> KeyedStream, you can call all the APIs which KeyedStream provides, we
>>> did
>>> >> not restrict its usage, although we can do this (for example returns a
>>> new
>>> >> stream object named LocalKeyedStream).
>>> >>
>>> >> However, to achieve the goal of local aggregation, it only makes sense
>>> to
>>> >> call the window API.
>>> >>
>>> >> Best,
>>> >> Vino
>>> >>
>>> >> litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道:
>>> >>
>>> >>> Hi Vino,
>>> >>>
>>> >>>
>>> >>> I have read your design,something I want to know is the usage of these
>>> >> new
>>> >>> APIs.It looks like when I use localByKey,i must then use a window
>>> >> operator
>>> >>> to return a datastream,and then use keyby and another window operator
>>> to
>>> >>> get the final result?
>>> >>>
>>> >>>
>>> >>> thanks,
>>> >>> Litree
>>> >>>
>>> >>>
>>> >>> On 06/04/2019 17:22, vino yang wrote:
>>> >>> Hi Dian,
>>> >>>
>>> >>> Thanks for your reply.
>>> >>>
>>> >>> I know what you mean. However, if you think deeply, you will find your
>>> >>> implementation need to provide an operator which looks like a window
>>> >>> operator. You need to use state and receive aggregation function and
>>> >>> specify the trigger time. It looks like a lightweight window operator.
>>> >>> Right?
>>> >>>
>>> >>> We try to reuse Flink provided functions and reduce complexity. IMO,
>>> It
>>> >> is
>>> >>> more user-friendly because users are familiar with the window API.
>>> >>>
>>> >>> Best,
>>> >>> Vino
>>> >>>
>>> >>>
>>> >>> Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道:
>>> >>>
>>> >>>> Hi Vino,
>>> >>>>
>>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I
>>> >> think
>>> >>>> it will be very useful.
>>> >>>>
>>> >>>> Regarding to using window to buffer the input elements, personally I
>>> >>> don't
>>> >>>> think it's a good solution for the following reasons:
>>> >>>> 1) As we know that WindowOperator will store the accumulated results
>>> in
>>> >>>> states, this is not necessary for Local Aggregate operator.
>>> >>>> 2) For WindowOperator, each input element will be accumulated to
>>> >> states.
>>> >>>> This is also not necessary for Local Aggregate operator and storing
>>> the
>>> >>>> input elements in memory is enough.
>>> >>>>
>>> >>>> Thanks,
>>> >>>> Dian
>>> >>>>
>>> >>>>> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道:
>>> >>>>>
>>> >>>>> Hi Ken,
>>> >>>>>
>>> >>>>> Thanks for your reply.
>>> >>>>>
>>> >>>>> As I said before, we try to reuse Flink's state concept (fault
>>> >>> tolerance
>>> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider
>>> >> cache.
>>> >>>>>
>>> >>>>> In addition, if we use Flink's state, the OOM related issue is not a
>>> >>> key
>>> >>>>> problem we need to consider.
>>> >>>>>
>>> >>>>> Best,
>>> >>>>> Vino
>>> >>>>>
>>> >>>>> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:
>>> >>>>>
>>> >>>>>> Hi all,
>>> >>>>>>
>>> >>>>>> Cascading implemented this “map-side reduce” functionality with an
>>> >> LLR
>>> >>>>>> cache.
>>> >>>>>>
>>> >>>>>> That worked well, as then the skewed keys would always be in the
>>> >>> cache.
>>> >>>>>>
>>> >>>>>> The API let you decide the size of the cache, in terms of number of
>>> >>>>>> entries.
>>> >>>>>>
>>> >>>>>> Having a memory limit would have been better for many of our use
>>> >>> cases,
>>> >>>>>> though FWIR there’s no good way to estimate in-memory size for
>>> >>> objects.
>>> >>>>>>
>>> >>>>>> — Ken
>>> >>>>>>
>>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com>
>>> >> wrote:
>>> >>>>>>>
>>> >>>>>>> Hi Piotr,
>>> >>>>>>>
>>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just
>>> >> added
>>> >>> an
>>> >>>>>>> inner flag to identify the local mode) which is Flink has provided
>>> >>>>>> before.
>>> >>>>>>> Users can call all the APIs(especially *window* APIs) which
>>> >>> KeyedStream
>>> >>>>>>> provided.
>>> >>>>>>>
>>> >>>>>>> So if users want to use local aggregation, they should call the
>>> >>> window
>>> >>>>>> API
>>> >>>>>>> to build a local window that means users should (or say "can")
>>> >>> specify
>>> >>>>>> the
>>> >>>>>>> window length and other information based on their needs.
>>> >>>>>>>
>>> >>>>>>> I think you described another idea different from us. We did not
>>> >> try
>>> >>> to
>>> >>>>>>> react after triggering some predefined threshold. We tend to give
>>> >>> users
>>> >>>>>> the
>>> >>>>>>> discretion to make decisions.
>>> >>>>>>>
>>> >>>>>>> Our design idea tends to reuse Flink provided concept and
>>> functions
>>> >>>> like
>>> >>>>>>> state and window (IMO, we do not need to worry about OOM and the
>>> >>> issues
>>> >>>>>> you
>>> >>>>>>> mentioned).
>>> >>>>>>>
>>> >>>>>>> Best,
>>> >>>>>>> Vino
>>> >>>>>>>
>>> >>>>>>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
>>> >>>>>>>
>>> >>>>>>>> Hi,
>>> >>>>>>>>
>>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar
>>> >>>> feature
>>> >>>>>>>> quite some time ago, but didn’t get enough traction [1].
>>> >>>>>>>>
>>> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning
>>> >>>>>>>> anywhere, when the pre aggregated result should be emitted down
>>> >> the
>>> >>>>>> stream?
>>> >>>>>>>> I think that’s one of the most crucial decision, since wrong
>>> >>> decision
>>> >>>>>> here
>>> >>>>>>>> can lead to decrease of performance or to an explosion of
>>> >>> memory/state
>>> >>>>>>>> consumption (both with bounded and unbounded data streams). For
>>> >>>>>> streaming
>>> >>>>>>>> it can also lead to an increased latency.
>>> >>>>>>>>
>>> >>>>>>>> Since this is also a decision that’s impossible to make
>>> >>> automatically
>>> >>>>>>>> perfectly reliably, first and foremost I would expect this to be
>>> >>>>>>>> configurable via the API. With maybe some predefined triggers,
>>> >> like
>>> >>> on
>>> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to
>>> >>>> decrease
>>> >>>>>>>> state size?), on element count, maybe memory usage (much easier
>>> to
>>> >>>>>> estimate
>>> >>>>>>>> with a known/predefined types, like in SQL)… and with some option
>>> >> to
>>> >>>>>>>> implement custom trigger.
>>> >>>>>>>>
>>> >>>>>>>> Also what would work the best would be to have a some form of
>>> >> memory
>>> >>>>>>>> consumption priority. For example if we are running out of memory
>>> >>> for
>>> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or
>>> >> crashing
>>> >>>> the
>>> >>>>>> job
>>> >>>>>>>> with OOM it would be probably better to prune/dump the pre/local
>>> >>>>>>>> aggregation state. But that’s another story.
>>> >>>>>>>>
>>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 <
>>> >>>>>>>> https://github.com/apache/flink/pull/4626>
>>> >>>>>>>>
>>> >>>>>>>> Piotrek
>>> >>>>>>>>
>>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>> Excited and  Big +1 for this feature.
>>> >>>>>>>>>
>>> >>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
>>> >>>>>>>>>
>>> >>>>>>>>>> Nice feature.
>>> >>>>>>>>>> Looking forward to having it in Flink.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Regards,
>>> >>>>>>>>>> Xiaogang
>>> >>>>>>>>>>
>>> >>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
>>> >>>>>>>>>>
>>> >>>>>>>>>>> Hi all,
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF
>>> >> 2019
>>> >>>> and
>>> >>>>>>>>>> QCon
>>> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in
>>> >> our
>>> >>>>>> inner
>>> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Currently, keyed streams are widely used to perform
>>> aggregating
>>> >>>>>>>>>> operations
>>> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the
>>> >>> same
>>> >>>>>>>> key.
>>> >>>>>>>>>>> When executed at runtime, the elements with the same key will
>>> >> be
>>> >>>> sent
>>> >>>>>>>> to
>>> >>>>>>>>>>> and aggregated by the same task.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> The performance of these aggregating operations is very
>>> >> sensitive
>>> >>>> to
>>> >>>>>>>> the
>>> >>>>>>>>>>> distribution of keys. In the cases where the distribution of
>>> >> keys
>>> >>>>>>>>>> follows a
>>> >>>>>>>>>>> powerful law, the performance will be significantly
>>> downgraded.
>>> >>>> More
>>> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help
>>> >>> when
>>> >>>> a
>>> >>>>>>>> task
>>> >>>>>>>>>>> is overloaded by a single key.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the
>>> >>>>>> performance
>>> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating
>>> >>> operations
>>> >>>>>> into
>>> >>>>>>>>>> two
>>> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the
>>> >> same
>>> >>>> key
>>> >>>>>>>> at
>>> >>>>>>>>>>> the sender side to obtain partial results. Then at the second
>>> >>>> phase,
>>> >>>>>>>>>> these
>>> >>>>>>>>>>> partial results are sent to receivers according to their keys
>>> >> and
>>> >>>> are
>>> >>>>>>>>>>> combined to obtain the final result. Since the number of
>>> >> partial
>>> >>>>>>>> results
>>> >>>>>>>>>>> received by each receiver is limited by the number of senders,
>>> >>> the
>>> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing
>>> >>> the
>>> >>>>>>>> amount
>>> >>>>>>>>>>> of transferred data the performance can be further improved.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> The design documentation is here:
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>
>>> >>>>
>>> >>>
>>> >>
>>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Best,
>>> >>>>>>>>>>> Vino
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>
>>> >>>>>> --------------------------
>>> >>>>>> Ken Krugler
>>> >>>>>> +1 530-210-6378
>>> >>>>>> http://www.scaleunlimited.com
>>> >>>>>> Custom big data solutions & training
>>> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> >>>>>>
>>> >>>>>>
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>>
>>>

Reply via email to