Gwen Shapira <gshapira@...> writes: > > Hi Daniel, > > I think you can still use the same logic you had in the custom partitioner > in the old producer. You just move it to the client that creates the > records. > The reason you don't cache the result of partitionsFor is that the producer > should handle the caching for you, so its not necessarily a long or > blocking call. > > I see it as a pretty small change to the API. But I'm not sure what drove > the change either. > > Gwen > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener < > Daniel.Wegener@...> wrote: > > > Hello Kafka-users! > > > > I am facing a migration from a kind of ( a bit self plumbed) kafka 0.8.1 > > producer to the new kafka-clients API. I just recognized, that the new > > KafkaProducer initializes its own Partitioner that cannot be changed (final > > field, no ctor-param, no > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is this > > an intentional change? > > If i understand the new API correctly, one should either define a key for > > a message and let the default Partitioner care that it will be distributed > > over all available partitions or to set an explicit partition number per > > message that will be written to. > > > > The old API api allowed to create ProducerRecors with a key and/or a key > > used only for partitioning (but one that is not sent down the wire) and > > then to provide a custom Partitioner that later could distribute this > > partitioning key over all available partitions when the message is actually > > sent. > > > > The difference in the new procuder API is that we need to know the exact > > number of available partitions before we even create a ProducerRecord. If > > we dont ensure the correct number of partitions and try to send a message > > to a partition that does not exist, the whole message will blow up later > > when the producer tries to send it. > > > > I dont expect the partition count to change that often but the API-doc > > states that a partitionsFor(String topic) result _should not_ be cached. > > But I do not really want to check for changed partition counts before every > > creation of a ProducerRecord. The old pluggable partitioner was, for us, > > especially useful for partition-stickyness by business keys (and thus > > stateful processing stages across multiple topics). This ensured that a > > message that was processed on stage1:partition2 will eventually be > > processed on stageN:partition2. Not very clever in terms of scalability per > > stage, but it makes reasoning about the message flow alot easier. > > > > So for a single ProducerRecord, for my understanding it might be nicer to > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8 > > lambda equivalent) instead of the nullable Partition attribute and evaluate > > this function in the producer. > > > > Kind regards > > > > > > ----------------------------------------------------------------------- ------------- > > Daniel Wegener > > Holisticon AG > > > > >
Cheers for you quick reply Gwen! Good to know that partitionsFor is (most time) fast and non-blocking. Im just wondering if this leaves a (maybe rather artificial?) use case uncovered: If you really try to use the Producer in a completly non- blocking fashion (with block.on.buffer.full=false, maybe for a reactive- streams adapter?) you would still have to call partitionsFor what may occasionally be blocking. Dont get me wrong, I am happy with this solution but I think the old API was a bit clearer about what you really give into the producer and what it does with it (a partition number (that may be discovered as invalid later and throws exceptions) vs a partitioner that let you distribute your payloads over the available number of partitions). In the new api it just feels a bit more - well - java'ish :). Kind regards Daniel