Interesting, and this was with the new Java client? This sounds like as
much an opportunity for improvement in the code as anything. Would you be
willing to share the details?

-jay

On Sunday, February 22, 2015, Steven Wu <stevenz...@gmail.com> wrote:

> > The low connection partitioner might work for this
> by attempting to reuse recently used nodes whenever possible. That is
> useful in environments with lots and lots of producers where you don't care
> about semantic partitioning.
>
> In one of the perf test, we found that above "sticky" partitioner improved
> batching and reduced cpu util at broker side by 60%. We plan to make it our
> default partitioner.
>
>
>
> On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps <jay.kr...@gmail.com
> <javascript:;>> wrote:
>
> > Hey Daniel,
> >
> > Yeah I think that would be doable. If you want to pursue it you would
> need
> > to do a quick KIP just to get everyone on the same page since this would
> be
> > a public interface we would have to support over a long time:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> >
> > When we have the details worked out, then it should be a fairly
> > straight-forward patch to make that pluggable.
> >
> > A few comments:
> > - I think we should just make the DefaultPartitioner the default value
> for
> > that configuration, rather than having it be a fall back.
> > - You need to pass in the binary key and value in addition to the java
> > objects. Otherwise any partitioning based on the binary value will
> require
> > reserializing these.
> > - If we add this option we should really ship at least one other useful
> > partitioning strategy. The low connection partitioner might work for this
> > by attempting to reuse recently used nodes whenever possible. That is
> > useful in environments with lots and lots of producers where you don't
> care
> > about semantic partitioning. It would be good to think through if there
> are
> > any other useful partitioning strategies to make sure they would also be
> > doable with the interface we would end up with.
> > - Currently Cluster is not a public class so we'll have to think about
> > whether we want to make that public.
> >
> > -Jay
> >
> >
> > On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener <
> > daniel.wege...@holisticon.de <javascript:;>> wrote:
> >
> > >
> > > Jay Kreps <jay.kreps@...> writes:
> > >
> > > >
> > > > Hey Daniel,
> > > >
> > > > partitionsFor() will block the very first time it sees a new topic
> that
> > > it
> > > > doesn't have metadata for yet. If you want to ensure you don't block
> > even
> > > > that one time, call it prior to your regular usage so it initializes
> > > then.
> > > >
> > > > The rationale for adding a partition in ProducerRecord was that there
> > are
> > > > actually cases where you want to associate a key with the record but
> > not
> > > > use it for partitioning. The rationale for not retaining the
> pluggable
> > > > partitioner was that you could always just use the partition (many
> > people
> > > > dislike the plugin apis and asked for it). Personally, like you, I
> > > > preferred the plugin apis.
> > > >
> > > > We aren't cut off from exposing the existing partitioner usage as a
> > > public
> > > > api that you can override if there is anyone who wants that. I think
> > one
> > > > nice thing about it would be the ability to ship with an alternative
> > > > partitioning strategy that you could enable purely in config. For
> > example
> > > > the old producer had a partitioning strategy that attempted to
> minimize
> > > the
> > > > number of TCP connections for cases where there was no key. 98% of
> > people
> > > > loathed that, but for 2% of people it was useful and this would be a
> > way
> > > to
> > > > still include that behavior for people migrating to the new producer.
> > > >
> > > > -Jay
> > > >
> > > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> > > > daniel.wegener@...> wrote:
> > > >
> > > > > Gwen Shapira <gshapira <at> ...> writes:
> > > > >
> > > > > >
> > > > > > Hi Daniel,
> > > > > >
> > > > > > I think you can still use the same logic you had in the custom
> > > > > partitioner
> > > > > > in the old producer. You just move it to the client that creates
> > the
> > > > > > records.
> > > > > > The reason you don't cache the result of partitionsFor is that
> the
> > > > > producer
> > > > > > should handle the caching for you, so its not necessarily a long
> or
> > > > > > blocking call.
> > > > > >
> > > > > > I see it as a pretty small change to the API. But I'm not sure
> what
> > > drove
> > > > > > the change either.
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > > > > Daniel.Wegener <at> ...> wrote:
> > > > > >
> > > > > > > Hello Kafka-users!
> > > > > > >
> > > > > > > I am facing a migration from a kind of ( a bit self plumbed)
> > kafka
> > > > > 0.8.1
> > > > > > > producer to the new kafka-clients API. I just recognized, that
> > the
> > > new
> > > > > > > KafkaProducer initializes its own Partitioner that cannot be
> > > changed
> > > > > (final
> > > > > > > field, no ctor-param, no
> > > > > > >
> > Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
> > > Is
> > > > > this
> > > > > > > an intentional change?
> > > > > > > If i understand the new API correctly, one should either
> define a
> > > key
> > > > > for
> > > > > > > a message and let the default Partitioner care that it will be
> > > > > distributed
> > > > > > > over all available partitions or to set an explicit partition
> > > number
> > > > > per
> > > > > > > message that will be written to.
> > > > > > >
> > > > > > > The old API api allowed to create ProducerRecors with a key
> > and/or
> > > a
> > > > > key
> > > > > > > used only for partitioning (but one that is not sent down the
> > wire)
> > > and
> > > > > > > then to provide a custom Partitioner that later could
> distribute
> > > this
> > > > > > > partitioning key over all available partitions when the message
> > is
> > > > > actually
> > > > > > > sent.
> > > > > > >
> > > > > > > The difference in the new procuder API is that we need to know
> > the
> > > > > exact
> > > > > > > number of available partitions before we even create a
> > > ProducerRecord.
> > > > > If
> > > > > > > we dont ensure the correct number of partitions and try to
> send a
> > > > > message
> > > > > > > to a partition that does not exist, the whole message will blow
> > up
> > > > > later
> > > > > > > when the producer tries to send it.
> > > > > > >
> > > > > > > I dont expect the partition count to change that often but the
> > API-
> > > doc
> > > > > > > states that a partitionsFor(String topic) result _should not_
> be
> > > > > cached.
> > > > > > > But I do not really want to check for changed partition counts
> > > before
> > > > > every
> > > > > > > creation of a ProducerRecord. The old pluggable partitioner
> was,
> > > for
> > > > > us,
> > > > > > > especially useful for partition-stickyness by business keys
> (and
> > > thus
> > > > > > > stateful processing stages across multiple topics). This
> ensured
> > > that a
> > > > > > > message that was processed on stage1:partition2 will eventually
> > be
> > > > > > > processed on stageN:partition2. Not very clever in terms of
> > > scalability
> > > > > per
> > > > > > > stage, but it makes reasoning about the message flow alot
> easier.
> > > > > > >
> > > > > > > So for a single ProducerRecord, for my understanding it might
> be
> > > nicer
> > > > > to
> > > > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or
> > > Java8
> > > > > > > lambda equivalent) instead of the nullable Partition attribute
> > and
> > > > > evaluate
> > > > > > > this function in the producer.
> > > > > > >
> > > > > > > Kind regards
> > > > > > >
> > > > > > >
> > > > > > >
> > -------------------------------------------------------------------
> > > ----
> > > > > -------------
> > > > > > > Daniel Wegener
> > > > > > > Holisticon AG
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > Cheers for you quick reply Gwen!
> > > > > Good to know that partitionsFor is (most time) fast and
> non-blocking.
> > > Im
> > > > > just wondering if this leaves a (maybe rather artificial?) use case
> > > > > uncovered: If you really try to use the Producer in a completly
> non-
> > > > > blocking fashion (with block.on.buffer.full=false, maybe for a
> > > reactive-
> > > > > streams adapter?) you would still have to call partitionsFor what
> may
> > > > > occasionally be blocking.
> > > > >
> > > > > Dont get me wrong, I am happy with this solution but I think the
> old
> > > API
> > > > > was a bit clearer about what you really give into the producer and
> > what
> > > it
> > > > > does with it (a partition number (that may be discovered as invalid
> > > later
> > > > > and throws exceptions) vs a partitioner that let you distribute
> your
> > > > > payloads over the available number of partitions). In the new api
> it
> > > just
> > > > > feels a bit more - well - java'ish :).
> > > > >
> > > > > Kind regards
> > > > > Daniel
> > > > >
> > > > >
> > > >
> > >
> > > Hi Jay.
> > >
> > > Thank you for this clarification, I agree it's perfectly fine to
> enforce
> > > the metadata resolution when you start a producer once.
> > >
> > > If there would be support for some kind of user defined partitioner I'd
> > > have the following thougts:
> > >
> > > - Let the user still be able to optionally choose a partition for a
> > > ProducerRecord.
> > > - Let the user optionally provide a _CustomPartitioner_ as
> KafkaProducer
> > > ctor
> > > param or ctor props.
> > > - Keep the current default partitioners behavior that will just prefer
> > the
> > > result of the CustomPartitioner over its default strategies (hashing
> the
> > > encoded key before round-robin), but AFTER trying the
> > > ProducerRecord.partition.
> > >
> > > The type signature of a CustomPartitioner could look like this:
> > >
> > > ```
> > > public interface CustomPartitioner<K,V> extends Configurable {
> > >
> > >     /**
> > >      * Compute the partition for the given record.
> > >      *
> > >      * @param topic     The topic name
> > >      * @param key       The key to partition on (or null if no key)
> > >      * @param value     The value to partition on
> > >      * @param cluster   The current cluster metadata
> > >      * @returns a partition or {@code null} if this partitioner cannot
> > make
> > > a useful decision. This will lead to a fallback to the default
> > partitioning
> > > behaviour.
> > >      */
> > >     public Integer partition(String topic, K key, V value, Cluster
> > > cluster);
> > >
> > > }
> > > ```
> > >
> > > This would:
> > > - Not introduce any breaking changes to the API
> > > - Allow users to partition based on their unserialized "business" keys
> or
> > > Values. This is consistent with the type-parameterized user provided
> > > serializers.
> > >
> > > It might still make the concept of partitioning more complex and thus
> > > harder to grasp.
> > >
> > > An implementation could look like this:
> > >
> >
> https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
> > > fe60d389f6c (just a raw sketch though).
> > >
> > > What do you think?
> > >
> > > Kind regards
> > > Daniel
> > >
> > >
> > >
> > >
> >
>

Reply via email to