Gwen Shapira <gshapira@...> writes:

> 
> Hi Daniel,
> 
> I think you can still use the same logic you had in the custom 
partitioner
> in the old producer. You just move it to the client that creates the
> records.
> The reason you don't cache the result of partitionsFor is that the 
producer
> should handle the caching for you, so its not necessarily a long or
> blocking call.
> 
> I see it as a pretty small change to the API. But I'm not sure what drove
> the change either.
> 
> Gwen
> 
> On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> Daniel.Wegener@...> wrote:
> 
> > Hello Kafka-users!
> >
> > I am facing a migration from a kind of ( a bit self plumbed) kafka 
0.8.1
> > producer to the new kafka-clients API. I just recognized, that the new
> > KafkaProducer initializes its own Partitioner that cannot be changed 
(final
> > field, no ctor-param, no
> > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is 
this
> > an intentional change?
> > If i understand the new API correctly, one should either define a key 
for
> > a message and let the default Partitioner care that it will be 
distributed
> > over all available partitions or to set an explicit partition number 
per
> > message that will be written to.
> >
> > The old API api allowed to create ProducerRecors with a key and/or a 
key
> > used only for partitioning (but one that is not sent down the wire) and
> > then to provide a custom Partitioner that later could distribute this
> > partitioning key over all available partitions when the message is 
actually
> > sent.
> >
> > The difference in the new procuder API is that we need to know the 
exact
> > number of available partitions before we even create a ProducerRecord. 
If
> > we dont ensure the correct number of partitions and try to send a 
message
> > to a partition that does not exist, the whole message will blow up 
later
> > when the producer tries to send it.
> >
> > I dont expect the partition count to change that often but the API-doc
> > states that a partitionsFor(String topic) result _should not_ be 
cached.
> > But I do not really want to check for changed partition counts before 
every
> > creation of a ProducerRecord. The old pluggable partitioner was, for 
us,
> > especially useful for partition-stickyness by business keys (and thus
> > stateful processing stages across multiple topics). This ensured that a
> > message that was processed on stage1:partition2 will eventually be
> > processed on stageN:partition2. Not very clever in terms of scalability 
per
> > stage, but it makes reasoning about the message flow alot easier.
> >
> > So for a single ProducerRecord, for my understanding it might be nicer 
to
> > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8
> > lambda equivalent) instead of the nullable Partition attribute and 
evaluate
> > this function in the producer.
> >
> > Kind regards
> >
> >
> > -----------------------------------------------------------------------
-------------
> > Daniel Wegener
> > Holisticon AG
> >
> >
> 

Cheers for you quick reply Gwen!
Good to know that partitionsFor is (most time) fast and non-blocking. Im 
just wondering if this leaves a (maybe rather artificial?) use case 
uncovered: If you really try to use the Producer in a completly non-
blocking fashion (with block.on.buffer.full=false, maybe for a reactive-
streams adapter?) you would still have to call partitionsFor what may 
occasionally be blocking.

Dont get me wrong, I am happy with this solution but I think the old API 
was a bit clearer about what you really give into the producer and what it 
does with it (a partition number (that may be discovered as invalid later 
and throws exceptions) vs a partitioner that let you distribute your 
payloads over the available number of partitions). In the new api it just 
feels a bit more - well - java'ish :).

Kind regards
Daniel

Reply via email to