Hi Vino,

> So if users want to use local aggregation, they should call the window API
> to build a local window that means users should (or say "can") specify the
> window length and other information based on their needs.

It sounds ok for me. It would have to be run against some API guys from the 
community though.

Piotrek

> On 4 Jun 2019, at 10:19, Dian Fu <dian0511...@gmail.com> wrote:
> 
> Hi Vino,
> 
> Thanks a lot for starting this discussion. +1 to this feature as I think it 
> will be very useful.
> 
> Regarding to using window to buffer the input elements, personally I don't 
> think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in 
> states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states. This 
> is also not necessary for Local Aggregate operator and storing the input 
> elements in memory is enough.
> 
> Thanks,
> Dian
> 
>> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道:
>> 
>> Hi Ken,
>> 
>> Thanks for your reply.
>> 
>> As I said before, we try to reuse Flink's state concept (fault tolerance
>> and guarantee "Exactly-Once" semantics). So we did not consider cache.
>> 
>> In addition, if we use Flink's state, the OOM related issue is not a key
>> problem we need to consider.
>> 
>> Best,
>> Vino
>> 
>> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:
>> 
>>> Hi all,
>>> 
>>> Cascading implemented this “map-side reduce” functionality with an LLR
>>> cache.
>>> 
>>> That worked well, as then the skewed keys would always be in the cache.
>>> 
>>> The API let you decide the size of the cache, in terms of number of
>>> entries.
>>> 
>>> Having a memory limit would have been better for many of our use cases,
>>> though FWIR there’s no good way to estimate in-memory size for objects.
>>> 
>>> — Ken
>>> 
>>>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote:
>>>> 
>>>> Hi Piotr,
>>>> 
>>>> The localKeyBy API returns an instance of KeyedStream (we just added an
>>>> inner flag to identify the local mode) which is Flink has provided
>>> before.
>>>> Users can call all the APIs(especially *window* APIs) which KeyedStream
>>>> provided.
>>>> 
>>>> So if users want to use local aggregation, they should call the window
>>> API
>>>> to build a local window that means users should (or say "can") specify
>>> the
>>>> window length and other information based on their needs.
>>>> 
>>>> I think you described another idea different from us. We did not try to
>>>> react after triggering some predefined threshold. We tend to give users
>>> the
>>>> discretion to make decisions.
>>>> 
>>>> Our design idea tends to reuse Flink provided concept and functions like
>>>> state and window (IMO, we do not need to worry about OOM and the issues
>>> you
>>>> mentioned).
>>>> 
>>>> Best,
>>>> Vino
>>>> 
>>>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> +1 for the idea from my side. I’ve even attempted to add similar feature
>>>>> quite some time ago, but didn’t get enough traction [1].
>>>>> 
>>>>> I’ve read through your document and I couldn’t find it mentioning
>>>>> anywhere, when the pre aggregated result should be emitted down the
>>> stream?
>>>>> I think that’s one of the most crucial decision, since wrong decision
>>> here
>>>>> can lead to decrease of performance or to an explosion of memory/state
>>>>> consumption (both with bounded and unbounded data streams). For
>>> streaming
>>>>> it can also lead to an increased latency.
>>>>> 
>>>>> Since this is also a decision that’s impossible to make automatically
>>>>> perfectly reliably, first and foremost I would expect this to be
>>>>> configurable via the API. With maybe some predefined triggers, like on
>>>>> watermark (for windowed operations), on checkpoint barrier (to decrease
>>>>> state size?), on element count, maybe memory usage (much easier to
>>> estimate
>>>>> with a known/predefined types, like in SQL)… and with some option to
>>>>> implement custom trigger.
>>>>> 
>>>>> Also what would work the best would be to have a some form of memory
>>>>> consumption priority. For example if we are running out of memory for
>>>>> HashJoin/Final aggregation, instead of spilling to disk or crashing the
>>> job
>>>>> with OOM it would be probably better to prune/dump the pre/local
>>>>> aggregation state. But that’s another story.
>>>>> 
>>>>> [1] https://github.com/apache/flink/pull/4626 <
>>>>> https://github.com/apache/flink/pull/4626>
>>>>> 
>>>>> Piotrek
>>>>> 
>>>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
>>>>>> 
>>>>>> Excited and  Big +1 for this feature.
>>>>>> 
>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
>>>>>> 
>>>>>>> Nice feature.
>>>>>>> Looking forward to having it in Flink.
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Xiaogang
>>>>>>> 
>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
>>>>>>> 
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and
>>>>>>> QCon
>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our
>>> inner
>>>>>>>> Flink fork. This feature can effectively alleviate data skew.
>>>>>>>> 
>>>>>>>> Currently, keyed streams are widely used to perform aggregating
>>>>>>> operations
>>>>>>>> (e.g., reduce, sum and window) on the elements that having the same
>>>>> key.
>>>>>>>> When executed at runtime, the elements with the same key will be sent
>>>>> to
>>>>>>>> and aggregated by the same task.
>>>>>>>> 
>>>>>>>> The performance of these aggregating operations is very sensitive to
>>>>> the
>>>>>>>> distribution of keys. In the cases where the distribution of keys
>>>>>>> follows a
>>>>>>>> powerful law, the performance will be significantly downgraded. More
>>>>>>>> unluckily, increasing the degree of parallelism does not help when a
>>>>> task
>>>>>>>> is overloaded by a single key.
>>>>>>>> 
>>>>>>>> Local aggregation is a widely-adopted method to reduce the
>>> performance
>>>>>>>> degraded by data skew. We can decompose the aggregating operations
>>> into
>>>>>>> two
>>>>>>>> phases. In the first phase, we aggregate the elements of the same key
>>>>> at
>>>>>>>> the sender side to obtain partial results. Then at the second phase,
>>>>>>> these
>>>>>>>> partial results are sent to receivers according to their keys and are
>>>>>>>> combined to obtain the final result. Since the number of partial
>>>>> results
>>>>>>>> received by each receiver is limited by the number of senders, the
>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing the
>>>>> amount
>>>>>>>> of transferred data the performance can be further improved.
>>>>>>>> 
>>>>>>>> The design documentation is here:
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>>>>>>> 
>>>>>>>> Any comment and feedback are welcome and appreciated.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Vino
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>>> 
> 

Reply via email to