It's by design.

The reason it, that Streams uses a single producer to write to different
output topic. As different output topics might have different key and/or
value types, the producer is instantiated with byte[] as key and value
type, and Streams serialized the data before handing it to the producer
-- Streams knows the topology and can pick the right serializer
according to the current key and value type.

That's the reason why KStream#to() has an overload allowing to specify a
custom StreamPartitioner that will be called by Streams (not the
producer) to compute the partition before serializing the data. For this
case, the partition (to write the data into) is given to the producer
directly and the producer does not call it's own partitioner.


-Matthias


On 2/9/17 3:49 PM, Steven Schlansker wrote:
> Hi, I discovered what I consider to be really confusing behavior -- wondering 
> if this is by design or a bug.
> 
> The Kafka Partitioner interface:
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster);
> 
> has both "Object value" and "byte[] valueBytes" provided.  I naïvely assumed 
> that "value" would be the pre-serialized
> domain object.
> 
> I set up a KStream:
> 
> 
> builder.stream(Serdes.String(), requestSerde, requestTopic)
>        .mapValues(this::requestToMessage)
>        .to(Serdes.String(), messageSerde, messageTopic);
> 
> 
> I produce to the "messageTopic" both via this Stream as well as by a normal 
> KafkaProducer.
> 
> I thought this should be sufficient to partition both ways:
> 
> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MessagePartitioner.class);
> 
> The partitioner has special understanding of the Message type and behaves as 
> a DefaultPartitioner otherwise.
> Roughly,
> 
> 
> int partition(...) {
>     return value instanceof Message ? specialPartition((Message)value) : 
> defaultPartition(keyBytes);
> }
> 
> 
> This works great for KafkaProducer.  The "value" field is indeed my Message 
> type and partitions are assigned
> correctly.
> Unfortunately it does *not* work with the stream producer, which causes very 
> confusing behavior.  It turns out
> that the RecordCollectorImpl does its own serialization:
> 
> 
> byte[] keyBytes = keySerializer.serialize(topic, key);
> byte[] valBytes = valueSerializer.serialize(topic, value);
> if (partition == null && partitioner != null) {
>     List<PartitionInfo> partitions = this.producer.partitionsFor(topic);
>     if (partitions != null && partitions.size() > 0)
>         partition = partitioner.partition(key, value, partitions.size());
>     }
> }
> ProducerRecord<byte[], byte[]> serializedRecord =
>     new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
> this.producer.send(serializedRecord)
> 
> 
> Critically, this means the record actually sent through the KafkaProducer is 
> already turned into a byte[] kv.
> So when the Partitioner gets called (since I did not specify a partition 
> directly, nor a StreamPartitioner), it sees
> a byte[] and does the "defaultPartition" case.
> 
> Could someone shed some light on the right way to set this up?  Are normal 
> Kafka partitioners expected to work for
> Streams?  I would much prefer to access my domain object directly rather than 
> have to de-serialize the JSON we just spent
> so many CPU cycles making :)
> 
> I could implement StreamPartitioner, but then I have to remember to specify 
> it everywhere, otherwise potentially very subtle bugs creep in.
> 
> Thanks,
> Steven
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to