Hi Ken,

Thanks for your reply.

As I said before, we try to reuse Flink's state concept (fault tolerance
and guarantee "Exactly-Once" semantics). So we did not consider cache.

In addition, if we use Flink's state, the OOM related issue is not a key
problem we need to consider.

Best,
Vino

Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:

> Hi all,
>
> Cascading implemented this “map-side reduce” functionality with an LLR
> cache.
>
> That worked well, as then the skewed keys would always be in the cache.
>
> The API let you decide the size of the cache, in terms of number of
> entries.
>
> Having a memory limit would have been better for many of our use cases,
> though FWIR there’s no good way to estimate in-memory size for objects.
>
> — Ken
>
> > On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote:
> >
> > Hi Piotr,
> >
> > The localKeyBy API returns an instance of KeyedStream (we just added an
> > inner flag to identify the local mode) which is Flink has provided
> before.
> > Users can call all the APIs(especially *window* APIs) which KeyedStream
> > provided.
> >
> > So if users want to use local aggregation, they should call the window
> API
> > to build a local window that means users should (or say "can") specify
> the
> > window length and other information based on their needs.
> >
> > I think you described another idea different from us. We did not try to
> > react after triggering some predefined threshold. We tend to give users
> the
> > discretion to make decisions.
> >
> > Our design idea tends to reuse Flink provided concept and functions like
> > state and window (IMO, we do not need to worry about OOM and the issues
> you
> > mentioned).
> >
> > Best,
> > Vino
> >
> > Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
> >
> >> Hi,
> >>
> >> +1 for the idea from my side. I’ve even attempted to add similar feature
> >> quite some time ago, but didn’t get enough traction [1].
> >>
> >> I’ve read through your document and I couldn’t find it mentioning
> >> anywhere, when the pre aggregated result should be emitted down the
> stream?
> >> I think that’s one of the most crucial decision, since wrong decision
> here
> >> can lead to decrease of performance or to an explosion of memory/state
> >> consumption (both with bounded and unbounded data streams). For
> streaming
> >> it can also lead to an increased latency.
> >>
> >> Since this is also a decision that’s impossible to make automatically
> >> perfectly reliably, first and foremost I would expect this to be
> >> configurable via the API. With maybe some predefined triggers, like on
> >> watermark (for windowed operations), on checkpoint barrier (to decrease
> >> state size?), on element count, maybe memory usage (much easier to
> estimate
> >> with a known/predefined types, like in SQL)… and with some option to
> >> implement custom trigger.
> >>
> >> Also what would work the best would be to have a some form of memory
> >> consumption priority. For example if we are running out of memory for
> >> HashJoin/Final aggregation, instead of spilling to disk or crashing the
> job
> >> with OOM it would be probably better to prune/dump the pre/local
> >> aggregation state. But that’s another story.
> >>
> >> [1] https://github.com/apache/flink/pull/4626 <
> >> https://github.com/apache/flink/pull/4626>
> >>
> >> Piotrek
> >>
> >>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
> >>>
> >>> Excited and  Big +1 for this feature.
> >>>
> >>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
> >>>
> >>>> Nice feature.
> >>>> Looking forward to having it in Flink.
> >>>>
> >>>> Regards,
> >>>> Xiaogang
> >>>>
> >>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
> >>>>
> >>>>> Hi all,
> >>>>>
> >>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and
> >>>> QCon
> >>>>> Beijing 2019, our team has implemented "Local aggregation" in our
> inner
> >>>>> Flink fork. This feature can effectively alleviate data skew.
> >>>>>
> >>>>> Currently, keyed streams are widely used to perform aggregating
> >>>> operations
> >>>>> (e.g., reduce, sum and window) on the elements that having the same
> >> key.
> >>>>> When executed at runtime, the elements with the same key will be sent
> >> to
> >>>>> and aggregated by the same task.
> >>>>>
> >>>>> The performance of these aggregating operations is very sensitive to
> >> the
> >>>>> distribution of keys. In the cases where the distribution of keys
> >>>> follows a
> >>>>> powerful law, the performance will be significantly downgraded. More
> >>>>> unluckily, increasing the degree of parallelism does not help when a
> >> task
> >>>>> is overloaded by a single key.
> >>>>>
> >>>>> Local aggregation is a widely-adopted method to reduce the
> performance
> >>>>> degraded by data skew. We can decompose the aggregating operations
> into
> >>>> two
> >>>>> phases. In the first phase, we aggregate the elements of the same key
> >> at
> >>>>> the sender side to obtain partial results. Then at the second phase,
> >>>> these
> >>>>> partial results are sent to receivers according to their keys and are
> >>>>> combined to obtain the final result. Since the number of partial
> >> results
> >>>>> received by each receiver is limited by the number of senders, the
> >>>>> imbalance among receivers can be reduced. Besides, by reducing the
> >> amount
> >>>>> of transferred data the performance can be further improved.
> >>>>>
> >>>>> The design documentation is here:
> >>>>>
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>>>>
> >>>>> Any comment and feedback are welcome and appreciated.
> >>>>>
> >>>>> Best,
> >>>>> Vino
> >>>>>
> >>>>
> >>
> >>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Reply via email to