Interesting, and this was with the new Java client? This sounds like as much an opportunity for improvement in the code as anything. Would you be willing to share the details?
-jay On Sunday, February 22, 2015, Steven Wu <stevenz...@gmail.com> wrote: > > The low connection partitioner might work for this > by attempting to reuse recently used nodes whenever possible. That is > useful in environments with lots and lots of producers where you don't care > about semantic partitioning. > > In one of the perf test, we found that above "sticky" partitioner improved > batching and reduced cpu util at broker side by 60%. We plan to make it our > default partitioner. > > > > On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps <jay.kr...@gmail.com > <javascript:;>> wrote: > > > Hey Daniel, > > > > Yeah I think that would be doable. If you want to pursue it you would > need > > to do a quick KIP just to get everyone on the same page since this would > be > > a public interface we would have to support over a long time: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > > > > When we have the details worked out, then it should be a fairly > > straight-forward patch to make that pluggable. > > > > A few comments: > > - I think we should just make the DefaultPartitioner the default value > for > > that configuration, rather than having it be a fall back. > > - You need to pass in the binary key and value in addition to the java > > objects. Otherwise any partitioning based on the binary value will > require > > reserializing these. > > - If we add this option we should really ship at least one other useful > > partitioning strategy. The low connection partitioner might work for this > > by attempting to reuse recently used nodes whenever possible. That is > > useful in environments with lots and lots of producers where you don't > care > > about semantic partitioning. It would be good to think through if there > are > > any other useful partitioning strategies to make sure they would also be > > doable with the interface we would end up with. > > - Currently Cluster is not a public class so we'll have to think about > > whether we want to make that public. > > > > -Jay > > > > > > On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener < > > daniel.wege...@holisticon.de <javascript:;>> wrote: > > > > > > > > Jay Kreps <jay.kreps@...> writes: > > > > > > > > > > > Hey Daniel, > > > > > > > > partitionsFor() will block the very first time it sees a new topic > that > > > it > > > > doesn't have metadata for yet. If you want to ensure you don't block > > even > > > > that one time, call it prior to your regular usage so it initializes > > > then. > > > > > > > > The rationale for adding a partition in ProducerRecord was that there > > are > > > > actually cases where you want to associate a key with the record but > > not > > > > use it for partitioning. The rationale for not retaining the > pluggable > > > > partitioner was that you could always just use the partition (many > > people > > > > dislike the plugin apis and asked for it). Personally, like you, I > > > > preferred the plugin apis. > > > > > > > > We aren't cut off from exposing the existing partitioner usage as a > > > public > > > > api that you can override if there is anyone who wants that. I think > > one > > > > nice thing about it would be the ability to ship with an alternative > > > > partitioning strategy that you could enable purely in config. For > > example > > > > the old producer had a partitioning strategy that attempted to > minimize > > > the > > > > number of TCP connections for cases where there was no key. 98% of > > people > > > > loathed that, but for 2% of people it was useful and this would be a > > way > > > to > > > > still include that behavior for people migrating to the new producer. > > > > > > > > -Jay > > > > > > > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener < > > > > daniel.wegener@...> wrote: > > > > > > > > > Gwen Shapira <gshapira <at> ...> writes: > > > > > > > > > > > > > > > > > Hi Daniel, > > > > > > > > > > > > I think you can still use the same logic you had in the custom > > > > > partitioner > > > > > > in the old producer. You just move it to the client that creates > > the > > > > > > records. > > > > > > The reason you don't cache the result of partitionsFor is that > the > > > > > producer > > > > > > should handle the caching for you, so its not necessarily a long > or > > > > > > blocking call. > > > > > > > > > > > > I see it as a pretty small change to the API. But I'm not sure > what > > > drove > > > > > > the change either. > > > > > > > > > > > > Gwen > > > > > > > > > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener < > > > > > > Daniel.Wegener <at> ...> wrote: > > > > > > > > > > > > > Hello Kafka-users! > > > > > > > > > > > > > > I am facing a migration from a kind of ( a bit self plumbed) > > kafka > > > > > 0.8.1 > > > > > > > producer to the new kafka-clients API. I just recognized, that > > the > > > new > > > > > > > KafkaProducer initializes its own Partitioner that cannot be > > > changed > > > > > (final > > > > > > > field, no ctor-param, no > > > > > > > > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). > > > Is > > > > > this > > > > > > > an intentional change? > > > > > > > If i understand the new API correctly, one should either > define a > > > key > > > > > for > > > > > > > a message and let the default Partitioner care that it will be > > > > > distributed > > > > > > > over all available partitions or to set an explicit partition > > > number > > > > > per > > > > > > > message that will be written to. > > > > > > > > > > > > > > The old API api allowed to create ProducerRecors with a key > > and/or > > > a > > > > > key > > > > > > > used only for partitioning (but one that is not sent down the > > wire) > > > and > > > > > > > then to provide a custom Partitioner that later could > distribute > > > this > > > > > > > partitioning key over all available partitions when the message > > is > > > > > actually > > > > > > > sent. > > > > > > > > > > > > > > The difference in the new procuder API is that we need to know > > the > > > > > exact > > > > > > > number of available partitions before we even create a > > > ProducerRecord. > > > > > If > > > > > > > we dont ensure the correct number of partitions and try to > send a > > > > > message > > > > > > > to a partition that does not exist, the whole message will blow > > up > > > > > later > > > > > > > when the producer tries to send it. > > > > > > > > > > > > > > I dont expect the partition count to change that often but the > > API- > > > doc > > > > > > > states that a partitionsFor(String topic) result _should not_ > be > > > > > cached. > > > > > > > But I do not really want to check for changed partition counts > > > before > > > > > every > > > > > > > creation of a ProducerRecord. The old pluggable partitioner > was, > > > for > > > > > us, > > > > > > > especially useful for partition-stickyness by business keys > (and > > > thus > > > > > > > stateful processing stages across multiple topics). This > ensured > > > that a > > > > > > > message that was processed on stage1:partition2 will eventually > > be > > > > > > > processed on stageN:partition2. Not very clever in terms of > > > scalability > > > > > per > > > > > > > stage, but it makes reasoning about the message flow alot > easier. > > > > > > > > > > > > > > So for a single ProducerRecord, for my understanding it might > be > > > nicer > > > > > to > > > > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or > > > Java8 > > > > > > > lambda equivalent) instead of the nullable Partition attribute > > and > > > > > evaluate > > > > > > > this function in the producer. > > > > > > > > > > > > > > Kind regards > > > > > > > > > > > > > > > > > > > > > > > ------------------------------------------------------------------- > > > ---- > > > > > ------------- > > > > > > > Daniel Wegener > > > > > > > Holisticon AG > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers for you quick reply Gwen! > > > > > Good to know that partitionsFor is (most time) fast and > non-blocking. > > > Im > > > > > just wondering if this leaves a (maybe rather artificial?) use case > > > > > uncovered: If you really try to use the Producer in a completly > non- > > > > > blocking fashion (with block.on.buffer.full=false, maybe for a > > > reactive- > > > > > streams adapter?) you would still have to call partitionsFor what > may > > > > > occasionally be blocking. > > > > > > > > > > Dont get me wrong, I am happy with this solution but I think the > old > > > API > > > > > was a bit clearer about what you really give into the producer and > > what > > > it > > > > > does with it (a partition number (that may be discovered as invalid > > > later > > > > > and throws exceptions) vs a partitioner that let you distribute > your > > > > > payloads over the available number of partitions). In the new api > it > > > just > > > > > feels a bit more - well - java'ish :). > > > > > > > > > > Kind regards > > > > > Daniel > > > > > > > > > > > > > > > > > > > > Hi Jay. > > > > > > Thank you for this clarification, I agree it's perfectly fine to > enforce > > > the metadata resolution when you start a producer once. > > > > > > If there would be support for some kind of user defined partitioner I'd > > > have the following thougts: > > > > > > - Let the user still be able to optionally choose a partition for a > > > ProducerRecord. > > > - Let the user optionally provide a _CustomPartitioner_ as > KafkaProducer > > > ctor > > > param or ctor props. > > > - Keep the current default partitioners behavior that will just prefer > > the > > > result of the CustomPartitioner over its default strategies (hashing > the > > > encoded key before round-robin), but AFTER trying the > > > ProducerRecord.partition. > > > > > > The type signature of a CustomPartitioner could look like this: > > > > > > ``` > > > public interface CustomPartitioner<K,V> extends Configurable { > > > > > > /** > > > * Compute the partition for the given record. > > > * > > > * @param topic The topic name > > > * @param key The key to partition on (or null if no key) > > > * @param value The value to partition on > > > * @param cluster The current cluster metadata > > > * @returns a partition or {@code null} if this partitioner cannot > > make > > > a useful decision. This will lead to a fallback to the default > > partitioning > > > behaviour. > > > */ > > > public Integer partition(String topic, K key, V value, Cluster > > > cluster); > > > > > > } > > > ``` > > > > > > This would: > > > - Not introduce any breaking changes to the API > > > - Allow users to partition based on their unserialized "business" keys > or > > > Values. This is consistent with the type-parameterized user provided > > > serializers. > > > > > > It might still make the concept of partitioning more complex and thus > > > harder to grasp. > > > > > > An implementation could look like this: > > > > > > https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a > > > fe60d389f6c (just a raw sketch though). > > > > > > What do you think? > > > > > > Kind regards > > > Daniel > > > > > > > > > > > > > > >