Hello Kafka-users!

I am facing a migration from a kind of ( a bit self plumbed) kafka 0.8.1 
producer to the new kafka-clients API. I just recognized, that the new 
KafkaProducer initializes its own Partitioner that cannot be changed (final 
field, no ctor-param, no 
Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is this an 
intentional change? 
If i understand the new API correctly, one should either define a key for a 
message and let the default Partitioner care that it will be distributed over 
all available partitions or to set an explicit partition number per message 
that will be written to.

The old API api allowed to create ProducerRecors with a key and/or a key used 
only for partitioning (but one that is not sent down the wire) and then to 
provide a custom Partitioner that later could distribute this partitioning key 
over all available partitions when the message is actually sent.

The difference in the new procuder API is that we need to know the exact number 
of available partitions before we even create a ProducerRecord. If we dont 
ensure the correct number of partitions and try to send a message to a 
partition that does not exist, the whole message will blow up later when the 
producer tries to send it.

I dont expect the partition count to change that often but the API-doc states 
that a partitionsFor(String topic) result _should not_ be cached. But I do not 
really want to check for changed partition counts before every creation of a 
ProducerRecord. The old pluggable partitioner was, for us, especially useful 
for partition-stickyness by business keys (and thus stateful processing stages 
across multiple topics). This ensured that a message that was processed on 
stage1:partition2 will eventually be processed on stageN:partition2. Not very 
clever in terms of scalability per stage, but it makes reasoning about the 
message flow alot easier.  
 
So for a single ProducerRecord, for my understanding it might be nicer to have 
a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8 lambda 
equivalent) instead of the nullable Partition attribute and evaluate this 
function in the producer.

Kind regards

------------------------------------------------------------------------------------
Daniel Wegener
Holisticon AG

Reply via email to