Hey Daniel, Yeah I think that would be doable. If you want to pursue it you would need to do a quick KIP just to get everyone on the same page since this would be a public interface we would have to support over a long time: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
When we have the details worked out, then it should be a fairly straight-forward patch to make that pluggable. A few comments: - I think we should just make the DefaultPartitioner the default value for that configuration, rather than having it be a fall back. - You need to pass in the binary key and value in addition to the java objects. Otherwise any partitioning based on the binary value will require reserializing these. - If we add this option we should really ship at least one other useful partitioning strategy. The low connection partitioner might work for this by attempting to reuse recently used nodes whenever possible. That is useful in environments with lots and lots of producers where you don't care about semantic partitioning. It would be good to think through if there are any other useful partitioning strategies to make sure they would also be doable with the interface we would end up with. - Currently Cluster is not a public class so we'll have to think about whether we want to make that public. -Jay On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener < daniel.wege...@holisticon.de> wrote: > > Jay Kreps <jay.kreps@...> writes: > > > > > Hey Daniel, > > > > partitionsFor() will block the very first time it sees a new topic that > it > > doesn't have metadata for yet. If you want to ensure you don't block even > > that one time, call it prior to your regular usage so it initializes > then. > > > > The rationale for adding a partition in ProducerRecord was that there are > > actually cases where you want to associate a key with the record but not > > use it for partitioning. The rationale for not retaining the pluggable > > partitioner was that you could always just use the partition (many people > > dislike the plugin apis and asked for it). Personally, like you, I > > preferred the plugin apis. > > > > We aren't cut off from exposing the existing partitioner usage as a > public > > api that you can override if there is anyone who wants that. I think one > > nice thing about it would be the ability to ship with an alternative > > partitioning strategy that you could enable purely in config. For example > > the old producer had a partitioning strategy that attempted to minimize > the > > number of TCP connections for cases where there was no key. 98% of people > > loathed that, but for 2% of people it was useful and this would be a way > to > > still include that behavior for people migrating to the new producer. > > > > -Jay > > > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener < > > daniel.wegener@...> wrote: > > > > > Gwen Shapira <gshapira <at> ...> writes: > > > > > > > > > > > Hi Daniel, > > > > > > > > I think you can still use the same logic you had in the custom > > > partitioner > > > > in the old producer. You just move it to the client that creates the > > > > records. > > > > The reason you don't cache the result of partitionsFor is that the > > > producer > > > > should handle the caching for you, so its not necessarily a long or > > > > blocking call. > > > > > > > > I see it as a pretty small change to the API. But I'm not sure what > drove > > > > the change either. > > > > > > > > Gwen > > > > > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener < > > > > Daniel.Wegener <at> ...> wrote: > > > > > > > > > Hello Kafka-users! > > > > > > > > > > I am facing a migration from a kind of ( a bit self plumbed) kafka > > > 0.8.1 > > > > > producer to the new kafka-clients API. I just recognized, that the > new > > > > > KafkaProducer initializes its own Partitioner that cannot be > changed > > > (final > > > > > field, no ctor-param, no > > > > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). > Is > > > this > > > > > an intentional change? > > > > > If i understand the new API correctly, one should either define a > key > > > for > > > > > a message and let the default Partitioner care that it will be > > > distributed > > > > > over all available partitions or to set an explicit partition > number > > > per > > > > > message that will be written to. > > > > > > > > > > The old API api allowed to create ProducerRecors with a key and/or > a > > > key > > > > > used only for partitioning (but one that is not sent down the wire) > and > > > > > then to provide a custom Partitioner that later could distribute > this > > > > > partitioning key over all available partitions when the message is > > > actually > > > > > sent. > > > > > > > > > > The difference in the new procuder API is that we need to know the > > > exact > > > > > number of available partitions before we even create a > ProducerRecord. > > > If > > > > > we dont ensure the correct number of partitions and try to send a > > > message > > > > > to a partition that does not exist, the whole message will blow up > > > later > > > > > when the producer tries to send it. > > > > > > > > > > I dont expect the partition count to change that often but the API- > doc > > > > > states that a partitionsFor(String topic) result _should not_ be > > > cached. > > > > > But I do not really want to check for changed partition counts > before > > > every > > > > > creation of a ProducerRecord. The old pluggable partitioner was, > for > > > us, > > > > > especially useful for partition-stickyness by business keys (and > thus > > > > > stateful processing stages across multiple topics). This ensured > that a > > > > > message that was processed on stage1:partition2 will eventually be > > > > > processed on stageN:partition2. Not very clever in terms of > scalability > > > per > > > > > stage, but it makes reasoning about the message flow alot easier. > > > > > > > > > > So for a single ProducerRecord, for my understanding it might be > nicer > > > to > > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or > Java8 > > > > > lambda equivalent) instead of the nullable Partition attribute and > > > evaluate > > > > > this function in the producer. > > > > > > > > > > Kind regards > > > > > > > > > > > > > > > ------------------------------------------------------------------- > ---- > > > ------------- > > > > > Daniel Wegener > > > > > Holisticon AG > > > > > > > > > > > > > > > > > > > > Cheers for you quick reply Gwen! > > > Good to know that partitionsFor is (most time) fast and non-blocking. > Im > > > just wondering if this leaves a (maybe rather artificial?) use case > > > uncovered: If you really try to use the Producer in a completly non- > > > blocking fashion (with block.on.buffer.full=false, maybe for a > reactive- > > > streams adapter?) you would still have to call partitionsFor what may > > > occasionally be blocking. > > > > > > Dont get me wrong, I am happy with this solution but I think the old > API > > > was a bit clearer about what you really give into the producer and what > it > > > does with it (a partition number (that may be discovered as invalid > later > > > and throws exceptions) vs a partitioner that let you distribute your > > > payloads over the available number of partitions). In the new api it > just > > > feels a bit more - well - java'ish :). > > > > > > Kind regards > > > Daniel > > > > > > > > > > Hi Jay. > > Thank you for this clarification, I agree it's perfectly fine to enforce > the metadata resolution when you start a producer once. > > If there would be support for some kind of user defined partitioner I'd > have the following thougts: > > - Let the user still be able to optionally choose a partition for a > ProducerRecord. > - Let the user optionally provide a _CustomPartitioner_ as KafkaProducer > ctor > param or ctor props. > - Keep the current default partitioners behavior that will just prefer the > result of the CustomPartitioner over its default strategies (hashing the > encoded key before round-robin), but AFTER trying the > ProducerRecord.partition. > > The type signature of a CustomPartitioner could look like this: > > ``` > public interface CustomPartitioner<K,V> extends Configurable { > > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param value The value to partition on > * @param cluster The current cluster metadata > * @returns a partition or {@code null} if this partitioner cannot make > a useful decision. This will lead to a fallback to the default partitioning > behaviour. > */ > public Integer partition(String topic, K key, V value, Cluster > cluster); > > } > ``` > > This would: > - Not introduce any breaking changes to the API > - Allow users to partition based on their unserialized "business" keys or > Values. This is consistent with the type-parameterized user provided > serializers. > > It might still make the concept of partitioning more complex and thus > harder to grasp. > > An implementation could look like this: > https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a > fe60d389f6c (just a raw sketch though). > > What do you think? > > Kind regards > Daniel > > > >