Jason,
Thanks for starting the discussion and for your very concise (and correct)
summary.

Ewen, while what you say is true, those kinds of detasets (large number of
keys with skew) are very typical in the Web (think Twitter users, or Web
pages, or even just plain text).
If you want to compute an aggregate on these datasets (either for reporting
purposes, or as part of some analytical task such as machine learning),
then the skew will kill your performance, and the amount of parallelism you
can effectively extract from your dataset.
PKG is a solution to that, without the full overhead of going to shuffle
grouping to compute partial aggregates.
The problem with shuffle grouping is not only the memory, but also the cost
of combining the aggregates, which increases with the parallelism level.
Also, by keeping partial aggregates in 2 places, you can query those at
runtime with constant overhead (similarly to what you would be able to do
with hashing) rather than needing to broadcast the query to all partitions
(which you need to do with shuffle grouping).

--
Gianmarco

On 28 July 2015 at 00:54, Gwen Shapira <gshap...@cloudera.com> wrote:

> I guess it depends on whether the original producer did any "map"
> tasks or simply wrote raw data. We usually advocate writing raw data,
> and since we need to write it anyway, the partitioner doesn't
> introduce any extra "hops".
>
> Its definitely useful to look at use-cases and I need to think a bit
> more on whether huge-key-space-with-large-skew is the only one.
> I think that there are use-cases that are not pure-aggregate and
> therefore keeping key-list in memory won't help and scaling to large
> number of partitions is still required (and therefore skew is a
> critical problem). However, I may be making stuff up, so need to
> double check.
>
> Gwen
>
>
>
>
>
> On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava
> <e...@confluent.io> wrote:
> > Gwen - this is really like two steps of map reduce though, right? The
> first
> > step does the partial shuffle to two partitions per key, second step does
> > partial reduce + final full shuffle, final step does the final reduce.
> >
> > This strikes me as similar to partition assignment strategies in the
> > consumer in that there will probably be a small handful of commonly used
> > strategies that we can just maintain as part of Kafka. A few people will
> > need more obscure strategies and they can maintain those implementations
> > themselves. For reference, a quick grep of Spark shows 5 partitioners:
> Hash
> > and RangePartitioner, which are in core, PythonPartitioner,
> GridPartitioner
> > for partitioning matrices, and ShuffleRowRDD for their SQL
> implementation.
> > So I don't think it would be a big deal to include it here, although I'm
> > not really sure how often it's useful -- compared to normal partitioning
> or
> > just doing two steps by starting with unpartitioned data, you need to be
> > performing an aggregation, the key set needs to be large enough for
> memory
> > usage to be a problem (i.e. you don't want each consumer to have to
> > maintain a map with every key in it), and a sufficiently skewed
> > distribution (i.e. not just 1 or 2 very hot keys). The key set
> constraint,
> > in particular, is the one I'm not convinced by since in practice if you
> > have a skewed distribution, you probably also won't actually see every
> key
> > in every partition; each worker actually only needs to maintain a subset
> of
> > the key set (and associated aggregate data) in memory.
> >
> >
> > On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira <gshap...@cloudera.com>
> > wrote:
> >
> >> If you are used to map-reduce patterns, this sounds like a perfectly
> >> natural way to process streams of data.
> >>
> >> Call the first consumer "map-combine-log", the topic "shuffle-log" and
> >> the second consumer "reduce-log" :)
> >> I like that a lot. It works well for either "embarrassingly parallel"
> >> cases, or "so much data that more parallelism is worth the extra
> >> overhead" cases.
> >>
> >> I personally don't care if its in core-Kafka, KIP-28 or a github
> >> project elsewhere, but I find it useful and non-esoteric.
> >>
> >>
> >>
> >> On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >> > For a little background, the difference between this partitioner and
> the
> >> > default one is that it breaks the deterministic mapping from key to
> >> > partition. Instead, messages for a given key can end up in either of
> two
> >> > partitions. This means that the consumer generally won't see all
> messages
> >> > for a given key. Instead the consumer would compute an aggregate for
> each
> >> > key on the partitions it consumes and write them to a separate topic.
> For
> >> > example, if you are writing log messages to a "logs" topic with the
> >> > hostname as the key, you could this partitioning strategy to compute
> >> > message counts for each host in each partition and write them to a
> >> > "log-counts" topic. Then a consumer of the "log-counts" topic would
> >> compute
> >> > total aggregates based on the two intermediate aggregates. The
> benefit is
> >> > that you are generally going to get better load balancing across
> >> partitions
> >> > than if you used the default partitioner. (Please correct me if my
> >> > understanding is incorrect, Gianmarco)
> >> >
> >> > So I think the question is whether this is a useful primitive for
> Kafka
> >> to
> >> > provide out of the box? I was a little concerned that this use case
> is a
> >> > little esoteric for a core feature, but it may make more sense in the
> >> > context of KIP-28 which would provide some higher-level processing
> >> > capabilities (though it doesn't seem like the KStream abstraction
> would
> >> > provide a direct way to leverage this partitioner without custom
> logic).
> >> >
> >> > Thanks,
> >> > Jason
> >> >
> >> >
> >> > On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales <
> >> > g...@apache.org> wrote:
> >> >
> >> >> Hello folks,
> >> >>
> >> >> I'd like to ask the community about its opinion on the partitioning
> >> >> functions in Kafka.
> >> >>
> >> >> With KAFKA-2091 <https://issues.apache.org/jira/browse/KAFKA-2091>
> >> >> integrated we are now able to have custom partitioners in the
> producer.
> >> >> The question now becomes *which* partitioners should ship with Kafka?
> >> >> This issue arose in the context of KAFKA-2092
> >> >> <https://issues.apache.org/jira/browse/KAFKA-2092>, which
> implements a
> >> >> specific load-balanced partitioning. This partitioner however assumes
> >> some
> >> >> stages of processing on top of it to make proper use of the data,
> i.e.,
> >> it
> >> >> envisions Kafka as a substrate for stream processing, and not only as
> >> the
> >> >> I/O component.
> >> >> Is this a direction that Kafka wants to go towards? Or is this a role
> >> >> better left to the internal communication systems of other stream
> >> >> processing engines (e.g., Storm)?
> >> >> And if the answer is the latter, how would something such a Samza
> (which
> >> >> relies mostly on Kafka as its communication substrate) be able to
> >> implement
> >> >> advanced partitioning schemes?
> >> >>
> >> >> Cheers,
> >> >> --
> >> >> Gianmarco
> >> >>
> >>
> >
> >
> >
> > --
> > Thanks,
> > Ewen
>

Reply via email to