yes. this is with the new java client. since it is using non-blocking NIO,
sender thread probably was able to scan the buffer very frequently. hence
random partitioner won't get much chance to accumulate records for batch or
request.
Setup


* - 3 broker instances (m1.xlarge)- 6 producer instances (m1.xlarge)- topic
partitions: 36- message size: 1 KB- no compression- traffic volume- total:
30 MB / 30K msgs,- per broker: 10 MB / 10K msgs*Summary

partitioner

batched records per request

broker cpu util

random without lingering

1.25

75%

sticky without lingering

2.0

50%

sticky with 100ms lingering

15

33%

there are two ways to improve batching

   1.

   use sticky partitioner that we implement. kafka default is random
   partitioner, where a random partition is selected for each msg. with sticky
   partitioner, we can stick all msgs (to one topic) on the same partition for
   a while (e.g. 1 second) before moving on to next partition.
   2.

   set "linger.ms" property from kafka producer. it allows message to
   linger around for some period and hope for batching opportunity.

We can deploy one or both methods. But the main point is that improved
batching helps broker a lot.

“linger.ms” can cause risk of filling up the buffer. it works very well
with sticky partitioner because it is very fast to accumulate a full batch.



On Sun, Feb 22, 2015 at 5:21 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Interesting, and this was with the new Java client? This sounds like as
> much an opportunity for improvement in the code as anything. Would you be
> willing to share the details?
>
> -jay
>
> On Sunday, February 22, 2015, Steven Wu <stevenz...@gmail.com> wrote:
>
> > > The low connection partitioner might work for this
> > by attempting to reuse recently used nodes whenever possible. That is
> > useful in environments with lots and lots of producers where you don't
> care
> > about semantic partitioning.
> >
> > In one of the perf test, we found that above "sticky" partitioner
> improved
> > batching and reduced cpu util at broker side by 60%. We plan to make it
> our
> > default partitioner.
> >
> >
> >
> > On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps <jay.kr...@gmail.com
> > <javascript:;>> wrote:
> >
> > > Hey Daniel,
> > >
> > > Yeah I think that would be doable. If you want to pursue it you would
> > need
> > > to do a quick KIP just to get everyone on the same page since this
> would
> > be
> > > a public interface we would have to support over a long time:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> > >
> > > When we have the details worked out, then it should be a fairly
> > > straight-forward patch to make that pluggable.
> > >
> > > A few comments:
> > > - I think we should just make the DefaultPartitioner the default value
> > for
> > > that configuration, rather than having it be a fall back.
> > > - You need to pass in the binary key and value in addition to the java
> > > objects. Otherwise any partitioning based on the binary value will
> > require
> > > reserializing these.
> > > - If we add this option we should really ship at least one other useful
> > > partitioning strategy. The low connection partitioner might work for
> this
> > > by attempting to reuse recently used nodes whenever possible. That is
> > > useful in environments with lots and lots of producers where you don't
> > care
> > > about semantic partitioning. It would be good to think through if there
> > are
> > > any other useful partitioning strategies to make sure they would also
> be
> > > doable with the interface we would end up with.
> > > - Currently Cluster is not a public class so we'll have to think about
> > > whether we want to make that public.
> > >
> > > -Jay
> > >
> > >
> > > On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener <
> > > daniel.wege...@holisticon.de <javascript:;>> wrote:
> > >
> > > >
> > > > Jay Kreps <jay.kreps@...> writes:
> > > >
> > > > >
> > > > > Hey Daniel,
> > > > >
> > > > > partitionsFor() will block the very first time it sees a new topic
> > that
> > > > it
> > > > > doesn't have metadata for yet. If you want to ensure you don't
> block
> > > even
> > > > > that one time, call it prior to your regular usage so it
> initializes
> > > > then.
> > > > >
> > > > > The rationale for adding a partition in ProducerRecord was that
> there
> > > are
> > > > > actually cases where you want to associate a key with the record
> but
> > > not
> > > > > use it for partitioning. The rationale for not retaining the
> > pluggable
> > > > > partitioner was that you could always just use the partition (many
> > > people
> > > > > dislike the plugin apis and asked for it). Personally, like you, I
> > > > > preferred the plugin apis.
> > > > >
> > > > > We aren't cut off from exposing the existing partitioner usage as a
> > > > public
> > > > > api that you can override if there is anyone who wants that. I
> think
> > > one
> > > > > nice thing about it would be the ability to ship with an
> alternative
> > > > > partitioning strategy that you could enable purely in config. For
> > > example
> > > > > the old producer had a partitioning strategy that attempted to
> > minimize
> > > > the
> > > > > number of TCP connections for cases where there was no key. 98% of
> > > people
> > > > > loathed that, but for 2% of people it was useful and this would be
> a
> > > way
> > > > to
> > > > > still include that behavior for people migrating to the new
> producer.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> > > > > daniel.wegener@...> wrote:
> > > > >
> > > > > > Gwen Shapira <gshapira <at> ...> writes:
> > > > > >
> > > > > > >
> > > > > > > Hi Daniel,
> > > > > > >
> > > > > > > I think you can still use the same logic you had in the custom
> > > > > > partitioner
> > > > > > > in the old producer. You just move it to the client that
> creates
> > > the
> > > > > > > records.
> > > > > > > The reason you don't cache the result of partitionsFor is that
> > the
> > > > > > producer
> > > > > > > should handle the caching for you, so its not necessarily a
> long
> > or
> > > > > > > blocking call.
> > > > > > >
> > > > > > > I see it as a pretty small change to the API. But I'm not sure
> > what
> > > > drove
> > > > > > > the change either.
> > > > > > >
> > > > > > > Gwen
> > > > > > >
> > > > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > > > > > Daniel.Wegener <at> ...> wrote:
> > > > > > >
> > > > > > > > Hello Kafka-users!
> > > > > > > >
> > > > > > > > I am facing a migration from a kind of ( a bit self plumbed)
> > > kafka
> > > > > > 0.8.1
> > > > > > > > producer to the new kafka-clients API. I just recognized,
> that
> > > the
> > > > new
> > > > > > > > KafkaProducer initializes its own Partitioner that cannot be
> > > > changed
> > > > > > (final
> > > > > > > > field, no ctor-param, no
> > > > > > > >
> > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
> > > > Is
> > > > > > this
> > > > > > > > an intentional change?
> > > > > > > > If i understand the new API correctly, one should either
> > define a
> > > > key
> > > > > > for
> > > > > > > > a message and let the default Partitioner care that it will
> be
> > > > > > distributed
> > > > > > > > over all available partitions or to set an explicit partition
> > > > number
> > > > > > per
> > > > > > > > message that will be written to.
> > > > > > > >
> > > > > > > > The old API api allowed to create ProducerRecors with a key
> > > and/or
> > > > a
> > > > > > key
> > > > > > > > used only for partitioning (but one that is not sent down the
> > > wire)
> > > > and
> > > > > > > > then to provide a custom Partitioner that later could
> > distribute
> > > > this
> > > > > > > > partitioning key over all available partitions when the
> message
> > > is
> > > > > > actually
> > > > > > > > sent.
> > > > > > > >
> > > > > > > > The difference in the new procuder API is that we need to
> know
> > > the
> > > > > > exact
> > > > > > > > number of available partitions before we even create a
> > > > ProducerRecord.
> > > > > > If
> > > > > > > > we dont ensure the correct number of partitions and try to
> > send a
> > > > > > message
> > > > > > > > to a partition that does not exist, the whole message will
> blow
> > > up
> > > > > > later
> > > > > > > > when the producer tries to send it.
> > > > > > > >
> > > > > > > > I dont expect the partition count to change that often but
> the
> > > API-
> > > > doc
> > > > > > > > states that a partitionsFor(String topic) result _should not_
> > be
> > > > > > cached.
> > > > > > > > But I do not really want to check for changed partition
> counts
> > > > before
> > > > > > every
> > > > > > > > creation of a ProducerRecord. The old pluggable partitioner
> > was,
> > > > for
> > > > > > us,
> > > > > > > > especially useful for partition-stickyness by business keys
> > (and
> > > > thus
> > > > > > > > stateful processing stages across multiple topics). This
> > ensured
> > > > that a
> > > > > > > > message that was processed on stage1:partition2 will
> eventually
> > > be
> > > > > > > > processed on stageN:partition2. Not very clever in terms of
> > > > scalability
> > > > > > per
> > > > > > > > stage, but it makes reasoning about the message flow alot
> > easier.
> > > > > > > >
> > > > > > > > So for a single ProducerRecord, for my understanding it might
> > be
> > > > nicer
> > > > > > to
> > > > > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int
> (or
> > > > Java8
> > > > > > > > lambda equivalent) instead of the nullable Partition
> attribute
> > > and
> > > > > > evaluate
> > > > > > > > this function in the producer.
> > > > > > > >
> > > > > > > > Kind regards
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > -------------------------------------------------------------------
> > > > ----
> > > > > > -------------
> > > > > > > > Daniel Wegener
> > > > > > > > Holisticon AG
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > > Cheers for you quick reply Gwen!
> > > > > > Good to know that partitionsFor is (most time) fast and
> > non-blocking.
> > > > Im
> > > > > > just wondering if this leaves a (maybe rather artificial?) use
> case
> > > > > > uncovered: If you really try to use the Producer in a completly
> > non-
> > > > > > blocking fashion (with block.on.buffer.full=false, maybe for a
> > > > reactive-
> > > > > > streams adapter?) you would still have to call partitionsFor what
> > may
> > > > > > occasionally be blocking.
> > > > > >
> > > > > > Dont get me wrong, I am happy with this solution but I think the
> > old
> > > > API
> > > > > > was a bit clearer about what you really give into the producer
> and
> > > what
> > > > it
> > > > > > does with it (a partition number (that may be discovered as
> invalid
> > > > later
> > > > > > and throws exceptions) vs a partitioner that let you distribute
> > your
> > > > > > payloads over the available number of partitions). In the new api
> > it
> > > > just
> > > > > > feels a bit more - well - java'ish :).
> > > > > >
> > > > > > Kind regards
> > > > > > Daniel
> > > > > >
> > > > > >
> > > > >
> > > >
> > > > Hi Jay.
> > > >
> > > > Thank you for this clarification, I agree it's perfectly fine to
> > enforce
> > > > the metadata resolution when you start a producer once.
> > > >
> > > > If there would be support for some kind of user defined partitioner
> I'd
> > > > have the following thougts:
> > > >
> > > > - Let the user still be able to optionally choose a partition for a
> > > > ProducerRecord.
> > > > - Let the user optionally provide a _CustomPartitioner_ as
> > KafkaProducer
> > > > ctor
> > > > param or ctor props.
> > > > - Keep the current default partitioners behavior that will just
> prefer
> > > the
> > > > result of the CustomPartitioner over its default strategies (hashing
> > the
> > > > encoded key before round-robin), but AFTER trying the
> > > > ProducerRecord.partition.
> > > >
> > > > The type signature of a CustomPartitioner could look like this:
> > > >
> > > > ```
> > > > public interface CustomPartitioner<K,V> extends Configurable {
> > > >
> > > >     /**
> > > >      * Compute the partition for the given record.
> > > >      *
> > > >      * @param topic     The topic name
> > > >      * @param key       The key to partition on (or null if no key)
> > > >      * @param value     The value to partition on
> > > >      * @param cluster   The current cluster metadata
> > > >      * @returns a partition or {@code null} if this partitioner
> cannot
> > > make
> > > > a useful decision. This will lead to a fallback to the default
> > > partitioning
> > > > behaviour.
> > > >      */
> > > >     public Integer partition(String topic, K key, V value, Cluster
> > > > cluster);
> > > >
> > > > }
> > > > ```
> > > >
> > > > This would:
> > > > - Not introduce any breaking changes to the API
> > > > - Allow users to partition based on their unserialized "business"
> keys
> > or
> > > > Values. This is consistent with the type-parameterized user provided
> > > > serializers.
> > > >
> > > > It might still make the concept of partitioning more complex and thus
> > > > harder to grasp.
> > > >
> > > > An implementation could look like this:
> > > >
> > >
> >
> https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
> > > > fe60d389f6c (just a raw sketch though).
> > > >
> > > > What do you think?
> > > >
> > > > Kind regards
> > > > Daniel
> > > >
> > > >
> > > >
> > > >
> > >
> >
>

Reply via email to