Hey Daniel, partitionsFor() will block the very first time it sees a new topic that it doesn't have metadata for yet. If you want to ensure you don't block even that one time, call it prior to your regular usage so it initializes then.
The rationale for adding a partition in ProducerRecord was that there are actually cases where you want to associate a key with the record but not use it for partitioning. The rationale for not retaining the pluggable partitioner was that you could always just use the partition (many people dislike the plugin apis and asked for it). Personally, like you, I preferred the plugin apis. We aren't cut off from exposing the existing partitioner usage as a public api that you can override if there is anyone who wants that. I think one nice thing about it would be the ability to ship with an alternative partitioning strategy that you could enable purely in config. For example the old producer had a partitioning strategy that attempted to minimize the number of TCP connections for cases where there was no key. 98% of people loathed that, but for 2% of people it was useful and this would be a way to still include that behavior for people migrating to the new producer. -Jay On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener < daniel.wege...@holisticon.de> wrote: > Gwen Shapira <gshapira@...> writes: > > > > > Hi Daniel, > > > > I think you can still use the same logic you had in the custom > partitioner > > in the old producer. You just move it to the client that creates the > > records. > > The reason you don't cache the result of partitionsFor is that the > producer > > should handle the caching for you, so its not necessarily a long or > > blocking call. > > > > I see it as a pretty small change to the API. But I'm not sure what drove > > the change either. > > > > Gwen > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener < > > Daniel.Wegener@...> wrote: > > > > > Hello Kafka-users! > > > > > > I am facing a migration from a kind of ( a bit self plumbed) kafka > 0.8.1 > > > producer to the new kafka-clients API. I just recognized, that the new > > > KafkaProducer initializes its own Partitioner that cannot be changed > (final > > > field, no ctor-param, no > > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is > this > > > an intentional change? > > > If i understand the new API correctly, one should either define a key > for > > > a message and let the default Partitioner care that it will be > distributed > > > over all available partitions or to set an explicit partition number > per > > > message that will be written to. > > > > > > The old API api allowed to create ProducerRecors with a key and/or a > key > > > used only for partitioning (but one that is not sent down the wire) and > > > then to provide a custom Partitioner that later could distribute this > > > partitioning key over all available partitions when the message is > actually > > > sent. > > > > > > The difference in the new procuder API is that we need to know the > exact > > > number of available partitions before we even create a ProducerRecord. > If > > > we dont ensure the correct number of partitions and try to send a > message > > > to a partition that does not exist, the whole message will blow up > later > > > when the producer tries to send it. > > > > > > I dont expect the partition count to change that often but the API-doc > > > states that a partitionsFor(String topic) result _should not_ be > cached. > > > But I do not really want to check for changed partition counts before > every > > > creation of a ProducerRecord. The old pluggable partitioner was, for > us, > > > especially useful for partition-stickyness by business keys (and thus > > > stateful processing stages across multiple topics). This ensured that a > > > message that was processed on stage1:partition2 will eventually be > > > processed on stageN:partition2. Not very clever in terms of scalability > per > > > stage, but it makes reasoning about the message flow alot easier. > > > > > > So for a single ProducerRecord, for my understanding it might be nicer > to > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8 > > > lambda equivalent) instead of the nullable Partition attribute and > evaluate > > > this function in the producer. > > > > > > Kind regards > > > > > > > > > ----------------------------------------------------------------------- > ------------- > > > Daniel Wegener > > > Holisticon AG > > > > > > > > > > Cheers for you quick reply Gwen! > Good to know that partitionsFor is (most time) fast and non-blocking. Im > just wondering if this leaves a (maybe rather artificial?) use case > uncovered: If you really try to use the Producer in a completly non- > blocking fashion (with block.on.buffer.full=false, maybe for a reactive- > streams adapter?) you would still have to call partitionsFor what may > occasionally be blocking. > > Dont get me wrong, I am happy with this solution but I think the old API > was a bit clearer about what you really give into the producer and what it > does with it (a partition number (that may be discovered as invalid later > and throws exceptions) vs a partitioner that let you distribute your > payloads over the available number of partitions). In the new api it just > feels a bit more - well - java'ish :). > > Kind regards > Daniel > >