Hi, I discovered what I consider to be really confusing behavior -- wondering 
if this is by design or a bug.

The Kafka Partitioner interface:
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
byte[] valueBytes, Cluster cluster);

has both "Object value" and "byte[] valueBytes" provided.  I naïvely assumed 
that "value" would be the pre-serialized
domain object.

I set up a KStream:


builder.stream(Serdes.String(), requestSerde, requestTopic)
       .mapValues(this::requestToMessage)
       .to(Serdes.String(), messageSerde, messageTopic);


I produce to the "messageTopic" both via this Stream as well as by a normal 
KafkaProducer.

I thought this should be sufficient to partition both ways:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MessagePartitioner.class);

The partitioner has special understanding of the Message type and behaves as a 
DefaultPartitioner otherwise.
Roughly,


int partition(...) {
    return value instanceof Message ? specialPartition((Message)value) : 
defaultPartition(keyBytes);
}


This works great for KafkaProducer.  The "value" field is indeed my Message 
type and partitions are assigned
correctly.
Unfortunately it does *not* work with the stream producer, which causes very 
confusing behavior.  It turns out
that the RecordCollectorImpl does its own serialization:


byte[] keyBytes = keySerializer.serialize(topic, key);
byte[] valBytes = valueSerializer.serialize(topic, value);
if (partition == null && partitioner != null) {
    List<PartitionInfo> partitions = this.producer.partitionsFor(topic);
    if (partitions != null && partitions.size() > 0)
        partition = partitioner.partition(key, value, partitions.size());
    }
}
ProducerRecord<byte[], byte[]> serializedRecord =
    new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
this.producer.send(serializedRecord)


Critically, this means the record actually sent through the KafkaProducer is 
already turned into a byte[] kv.
So when the Partitioner gets called (since I did not specify a partition 
directly, nor a StreamPartitioner), it sees
a byte[] and does the "defaultPartition" case.

Could someone shed some light on the right way to set this up?  Are normal 
Kafka partitioners expected to work for
Streams?  I would much prefer to access my domain object directly rather than 
have to de-serialize the JSON we just spent
so many CPU cycles making :)

I could implement StreamPartitioner, but then I have to remember to specify it 
everywhere, otherwise potentially very subtle bugs creep in.

Thanks,
Steven

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to