Hi Aljoscha,

What do you think about this feature and design document?

Best,
Vino

vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 下午4:18写道:

> Hi Dian,
>
> I still think your implementation is similar to the window operator, you
> mentioned the scalable trigger mechanism, the window API also can customize
> trigger.
>
> Moreover, IMO, the design should guarantee a deterministic semantics, I
> think based on memory availability is a non-deterministic design.
>
> In addition, if the implementation depends on the timing of checkpoint, I
> do not think it is reasonable, we should avoid affecting the checkpoint's
> progress.
>
> Best,
> Vino
>
> Dian Fu <dian0511...@gmail.com> 于2019年6月5日周三 下午1:55写道:
>
>> Hi Vino,
>>
>> Thanks a lot for your reply.
>>
>> > 1) When, Why and How to judge the memory is exhausted?
>>
>> My point here is that the local aggregate operator can buffer the inputs
>> in memory and send out the results AT ANY TIME. i.e. element count or the
>> time interval reached a pre-configured value, the memory usage of buffered
>> elements reached a configured valued (suppose we can estimate the object
>> size efficiently), or even when checkpoint is triggered.
>>
>> >
>> > 2) If the local aggregate operator rarely needs to operate the state,
>> what
>> > do you think about fault tolerance?
>>
>> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier`
>> which can be used here to send out the results to the downstream when
>> checkpoint is triggered. Then fault tolerance can work well.
>>
>> Even if there is no such a method available, we can still store the
>> buffered elements or pre-aggregate results to state when checkpoint is
>> triggered. The state access will be much less compared with window operator
>> as only the elements not sent out when checkpoint occur have to be written
>> to state. Suppose the checkpoint interval is 3 minutes and the trigger
>> interval is 10 seconds, then only about less than "10/180" elements will be
>> written to state.
>>
>>
>> Thanks,
>> Dian
>>
>>
>> > 在 2019年6月5日,上午11:43,Biao Liu <mmyy1...@gmail.com> 写道:
>> >
>> > Hi Vino,
>> >
>> > +1 for this feature. It's useful for data skew. And it could also reduce
>> > shuffled datum.
>> >
>> > I have some concerns about the API part. From my side, this feature
>> should
>> > be more like an improvement. I'm afraid the proposal is an overkill
>> about
>> > the API part. Many other systems support pre-aggregation as an
>> optimization
>> > of global aggregation. The optimization might be used automatically or
>> > manually but with a simple API. The proposal introduces a series of
>> > flexible local aggregation APIs. They could be independent with global
>> > aggregation. It doesn't look like an improvement but introduces a lot of
>> > features. I'm not sure if there is a bigger picture later. As for now
>> the
>> > API part looks a little heavy for me.
>> >
>> >
>> > vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 上午10:38写道:
>> >
>> >> Hi Litree,
>> >>
>> >> From an implementation level, the localKeyBy API returns a general
>> >> KeyedStream, you can call all the APIs which KeyedStream provides, we
>> did
>> >> not restrict its usage, although we can do this (for example returns a
>> new
>> >> stream object named LocalKeyedStream).
>> >>
>> >> However, to achieve the goal of local aggregation, it only makes sense
>> to
>> >> call the window API.
>> >>
>> >> Best,
>> >> Vino
>> >>
>> >> litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道:
>> >>
>> >>> Hi Vino,
>> >>>
>> >>>
>> >>> I have read your design,something I want to know is the usage of these
>> >> new
>> >>> APIs.It looks like when I use localByKey,i must then use a window
>> >> operator
>> >>> to return a datastream,and then use keyby and another window operator
>> to
>> >>> get the final result?
>> >>>
>> >>>
>> >>> thanks,
>> >>> Litree
>> >>>
>> >>>
>> >>> On 06/04/2019 17:22, vino yang wrote:
>> >>> Hi Dian,
>> >>>
>> >>> Thanks for your reply.
>> >>>
>> >>> I know what you mean. However, if you think deeply, you will find your
>> >>> implementation need to provide an operator which looks like a window
>> >>> operator. You need to use state and receive aggregation function and
>> >>> specify the trigger time. It looks like a lightweight window operator.
>> >>> Right?
>> >>>
>> >>> We try to reuse Flink provided functions and reduce complexity. IMO,
>> It
>> >> is
>> >>> more user-friendly because users are familiar with the window API.
>> >>>
>> >>> Best,
>> >>> Vino
>> >>>
>> >>>
>> >>> Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道:
>> >>>
>> >>>> Hi Vino,
>> >>>>
>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I
>> >> think
>> >>>> it will be very useful.
>> >>>>
>> >>>> Regarding to using window to buffer the input elements, personally I
>> >>> don't
>> >>>> think it's a good solution for the following reasons:
>> >>>> 1) As we know that WindowOperator will store the accumulated results
>> in
>> >>>> states, this is not necessary for Local Aggregate operator.
>> >>>> 2) For WindowOperator, each input element will be accumulated to
>> >> states.
>> >>>> This is also not necessary for Local Aggregate operator and storing
>> the
>> >>>> input elements in memory is enough.
>> >>>>
>> >>>> Thanks,
>> >>>> Dian
>> >>>>
>> >>>>> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道:
>> >>>>>
>> >>>>> Hi Ken,
>> >>>>>
>> >>>>> Thanks for your reply.
>> >>>>>
>> >>>>> As I said before, we try to reuse Flink's state concept (fault
>> >>> tolerance
>> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider
>> >> cache.
>> >>>>>
>> >>>>> In addition, if we use Flink's state, the OOM related issue is not a
>> >>> key
>> >>>>> problem we need to consider.
>> >>>>>
>> >>>>> Best,
>> >>>>> Vino
>> >>>>>
>> >>>>> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:
>> >>>>>
>> >>>>>> Hi all,
>> >>>>>>
>> >>>>>> Cascading implemented this “map-side reduce” functionality with an
>> >> LLR
>> >>>>>> cache.
>> >>>>>>
>> >>>>>> That worked well, as then the skewed keys would always be in the
>> >>> cache.
>> >>>>>>
>> >>>>>> The API let you decide the size of the cache, in terms of number of
>> >>>>>> entries.
>> >>>>>>
>> >>>>>> Having a memory limit would have been better for many of our use
>> >>> cases,
>> >>>>>> though FWIR there’s no good way to estimate in-memory size for
>> >>> objects.
>> >>>>>>
>> >>>>>> — Ken
>> >>>>>>
>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com>
>> >> wrote:
>> >>>>>>>
>> >>>>>>> Hi Piotr,
>> >>>>>>>
>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just
>> >> added
>> >>> an
>> >>>>>>> inner flag to identify the local mode) which is Flink has provided
>> >>>>>> before.
>> >>>>>>> Users can call all the APIs(especially *window* APIs) which
>> >>> KeyedStream
>> >>>>>>> provided.
>> >>>>>>>
>> >>>>>>> So if users want to use local aggregation, they should call the
>> >>> window
>> >>>>>> API
>> >>>>>>> to build a local window that means users should (or say "can")
>> >>> specify
>> >>>>>> the
>> >>>>>>> window length and other information based on their needs.
>> >>>>>>>
>> >>>>>>> I think you described another idea different from us. We did not
>> >> try
>> >>> to
>> >>>>>>> react after triggering some predefined threshold. We tend to give
>> >>> users
>> >>>>>> the
>> >>>>>>> discretion to make decisions.
>> >>>>>>>
>> >>>>>>> Our design idea tends to reuse Flink provided concept and
>> functions
>> >>>> like
>> >>>>>>> state and window (IMO, we do not need to worry about OOM and the
>> >>> issues
>> >>>>>> you
>> >>>>>>> mentioned).
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Vino
>> >>>>>>>
>> >>>>>>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
>> >>>>>>>
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar
>> >>>> feature
>> >>>>>>>> quite some time ago, but didn’t get enough traction [1].
>> >>>>>>>>
>> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning
>> >>>>>>>> anywhere, when the pre aggregated result should be emitted down
>> >> the
>> >>>>>> stream?
>> >>>>>>>> I think that’s one of the most crucial decision, since wrong
>> >>> decision
>> >>>>>> here
>> >>>>>>>> can lead to decrease of performance or to an explosion of
>> >>> memory/state
>> >>>>>>>> consumption (both with bounded and unbounded data streams). For
>> >>>>>> streaming
>> >>>>>>>> it can also lead to an increased latency.
>> >>>>>>>>
>> >>>>>>>> Since this is also a decision that’s impossible to make
>> >>> automatically
>> >>>>>>>> perfectly reliably, first and foremost I would expect this to be
>> >>>>>>>> configurable via the API. With maybe some predefined triggers,
>> >> like
>> >>> on
>> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to
>> >>>> decrease
>> >>>>>>>> state size?), on element count, maybe memory usage (much easier
>> to
>> >>>>>> estimate
>> >>>>>>>> with a known/predefined types, like in SQL)… and with some option
>> >> to
>> >>>>>>>> implement custom trigger.
>> >>>>>>>>
>> >>>>>>>> Also what would work the best would be to have a some form of
>> >> memory
>> >>>>>>>> consumption priority. For example if we are running out of memory
>> >>> for
>> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or
>> >> crashing
>> >>>> the
>> >>>>>> job
>> >>>>>>>> with OOM it would be probably better to prune/dump the pre/local
>> >>>>>>>> aggregation state. But that’s another story.
>> >>>>>>>>
>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 <
>> >>>>>>>> https://github.com/apache/flink/pull/4626>
>> >>>>>>>>
>> >>>>>>>> Piotrek
>> >>>>>>>>
>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Excited and  Big +1 for this feature.
>> >>>>>>>>>
>> >>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
>> >>>>>>>>>
>> >>>>>>>>>> Nice feature.
>> >>>>>>>>>> Looking forward to having it in Flink.
>> >>>>>>>>>>
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Xiaogang
>> >>>>>>>>>>
>> >>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi all,
>> >>>>>>>>>>>
>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF
>> >> 2019
>> >>>> and
>> >>>>>>>>>> QCon
>> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in
>> >> our
>> >>>>>> inner
>> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Currently, keyed streams are widely used to perform
>> aggregating
>> >>>>>>>>>> operations
>> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the
>> >>> same
>> >>>>>>>> key.
>> >>>>>>>>>>> When executed at runtime, the elements with the same key will
>> >> be
>> >>>> sent
>> >>>>>>>> to
>> >>>>>>>>>>> and aggregated by the same task.
>> >>>>>>>>>>>
>> >>>>>>>>>>> The performance of these aggregating operations is very
>> >> sensitive
>> >>>> to
>> >>>>>>>> the
>> >>>>>>>>>>> distribution of keys. In the cases where the distribution of
>> >> keys
>> >>>>>>>>>> follows a
>> >>>>>>>>>>> powerful law, the performance will be significantly
>> downgraded.
>> >>>> More
>> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help
>> >>> when
>> >>>> a
>> >>>>>>>> task
>> >>>>>>>>>>> is overloaded by a single key.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the
>> >>>>>> performance
>> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating
>> >>> operations
>> >>>>>> into
>> >>>>>>>>>> two
>> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the
>> >> same
>> >>>> key
>> >>>>>>>> at
>> >>>>>>>>>>> the sender side to obtain partial results. Then at the second
>> >>>> phase,
>> >>>>>>>>>> these
>> >>>>>>>>>>> partial results are sent to receivers according to their keys
>> >> and
>> >>>> are
>> >>>>>>>>>>> combined to obtain the final result. Since the number of
>> >> partial
>> >>>>>>>> results
>> >>>>>>>>>>> received by each receiver is limited by the number of senders,
>> >>> the
>> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing
>> >>> the
>> >>>>>>>> amount
>> >>>>>>>>>>> of transferred data the performance can be further improved.
>> >>>>>>>>>>>
>> >>>>>>>>>>> The design documentation is here:
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>
>> >>>
>> >>
>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>> >>>>>>>>>>>
>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Best,
>> >>>>>>>>>>> Vino
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>>> --------------------------
>> >>>>>> Ken Krugler
>> >>>>>> +1 530-210-6378
>> >>>>>> http://www.scaleunlimited.com
>> >>>>>> Custom big data solutions & training
>> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>> >>>>>>
>> >>>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>>
>>

Reply via email to