Jay Kreps <jay.kreps@...> writes: > > Hey Daniel, > > partitionsFor() will block the very first time it sees a new topic that it > doesn't have metadata for yet. If you want to ensure you don't block even > that one time, call it prior to your regular usage so it initializes then. > > The rationale for adding a partition in ProducerRecord was that there are > actually cases where you want to associate a key with the record but not > use it for partitioning. The rationale for not retaining the pluggable > partitioner was that you could always just use the partition (many people > dislike the plugin apis and asked for it). Personally, like you, I > preferred the plugin apis. > > We aren't cut off from exposing the existing partitioner usage as a public > api that you can override if there is anyone who wants that. I think one > nice thing about it would be the ability to ship with an alternative > partitioning strategy that you could enable purely in config. For example > the old producer had a partitioning strategy that attempted to minimize the > number of TCP connections for cases where there was no key. 98% of people > loathed that, but for 2% of people it was useful and this would be a way to > still include that behavior for people migrating to the new producer. > > -Jay > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener < > daniel.wegener@...> wrote: > > > Gwen Shapira <gshapira <at> ...> writes: > > > > > > > > Hi Daniel, > > > > > > I think you can still use the same logic you had in the custom > > partitioner > > > in the old producer. You just move it to the client that creates the > > > records. > > > The reason you don't cache the result of partitionsFor is that the > > producer > > > should handle the caching for you, so its not necessarily a long or > > > blocking call. > > > > > > I see it as a pretty small change to the API. But I'm not sure what drove > > > the change either. > > > > > > Gwen > > > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener < > > > Daniel.Wegener <at> ...> wrote: > > > > > > > Hello Kafka-users! > > > > > > > > I am facing a migration from a kind of ( a bit self plumbed) kafka > > 0.8.1 > > > > producer to the new kafka-clients API. I just recognized, that the new > > > > KafkaProducer initializes its own Partitioner that cannot be changed > > (final > > > > field, no ctor-param, no > > > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is > > this > > > > an intentional change? > > > > If i understand the new API correctly, one should either define a key > > for > > > > a message and let the default Partitioner care that it will be > > distributed > > > > over all available partitions or to set an explicit partition number > > per > > > > message that will be written to. > > > > > > > > The old API api allowed to create ProducerRecors with a key and/or a > > key > > > > used only for partitioning (but one that is not sent down the wire) and > > > > then to provide a custom Partitioner that later could distribute this > > > > partitioning key over all available partitions when the message is > > actually > > > > sent. > > > > > > > > The difference in the new procuder API is that we need to know the > > exact > > > > number of available partitions before we even create a ProducerRecord. > > If > > > > we dont ensure the correct number of partitions and try to send a > > message > > > > to a partition that does not exist, the whole message will blow up > > later > > > > when the producer tries to send it. > > > > > > > > I dont expect the partition count to change that often but the API- doc > > > > states that a partitionsFor(String topic) result _should not_ be > > cached. > > > > But I do not really want to check for changed partition counts before > > every > > > > creation of a ProducerRecord. The old pluggable partitioner was, for > > us, > > > > especially useful for partition-stickyness by business keys (and thus > > > > stateful processing stages across multiple topics). This ensured that a > > > > message that was processed on stage1:partition2 will eventually be > > > > processed on stageN:partition2. Not very clever in terms of scalability > > per > > > > stage, but it makes reasoning about the message flow alot easier. > > > > > > > > So for a single ProducerRecord, for my understanding it might be nicer > > to > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8 > > > > lambda equivalent) instead of the nullable Partition attribute and > > evaluate > > > > this function in the producer. > > > > > > > > Kind regards > > > > > > > > > > > > ------------------------------------------------------------------- ---- > > ------------- > > > > Daniel Wegener > > > > Holisticon AG > > > > > > > > > > > > > > > Cheers for you quick reply Gwen! > > Good to know that partitionsFor is (most time) fast and non-blocking. Im > > just wondering if this leaves a (maybe rather artificial?) use case > > uncovered: If you really try to use the Producer in a completly non- > > blocking fashion (with block.on.buffer.full=false, maybe for a reactive- > > streams adapter?) you would still have to call partitionsFor what may > > occasionally be blocking. > > > > Dont get me wrong, I am happy with this solution but I think the old API > > was a bit clearer about what you really give into the producer and what it > > does with it (a partition number (that may be discovered as invalid later > > and throws exceptions) vs a partitioner that let you distribute your > > payloads over the available number of partitions). In the new api it just > > feels a bit more - well - java'ish :). > > > > Kind regards > > Daniel > > > > >
Hi Jay. Thank you for this clarification, I agree it's perfectly fine to enforce the metadata resolution when you start a producer once. If there would be support for some kind of user defined partitioner I'd have the following thougts: - Let the user still be able to optionally choose a partition for a ProducerRecord. - Let the user optionally provide a _CustomPartitioner_ as KafkaProducer ctor param or ctor props. - Keep the current default partitioners behavior that will just prefer the result of the CustomPartitioner over its default strategies (hashing the encoded key before round-robin), but AFTER trying the ProducerRecord.partition. The type signature of a CustomPartitioner could look like this: ``` public interface CustomPartitioner<K,V> extends Configurable { /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param value The value to partition on * @param cluster The current cluster metadata * @returns a partition or {@code null} if this partitioner cannot make a useful decision. This will lead to a fallback to the default partitioning behaviour. */ public Integer partition(String topic, K key, V value, Cluster cluster); } ``` This would: - Not introduce any breaking changes to the API - Allow users to partition based on their unserialized "business" keys or Values. This is consistent with the type-parameterized user provided serializers. It might still make the concept of partitioning more complex and thus harder to grasp. An implementation could look like this: https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a fe60d389f6c (just a raw sketch though). What do you think? Kind regards Daniel