Hi, I discovered what I consider to be really confusing behavior -- wondering if this is by design or a bug.
The Kafka Partitioner interface: public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); has both "Object value" and "byte[] valueBytes" provided. I naïvely assumed that "value" would be the pre-serialized domain object. I set up a KStream: builder.stream(Serdes.String(), requestSerde, requestTopic) .mapValues(this::requestToMessage) .to(Serdes.String(), messageSerde, messageTopic); I produce to the "messageTopic" both via this Stream as well as by a normal KafkaProducer. I thought this should be sufficient to partition both ways: props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MessagePartitioner.class); The partitioner has special understanding of the Message type and behaves as a DefaultPartitioner otherwise. Roughly, int partition(...) { return value instanceof Message ? specialPartition((Message)value) : defaultPartition(keyBytes); } This works great for KafkaProducer. The "value" field is indeed my Message type and partitions are assigned correctly. Unfortunately it does *not* work with the stream producer, which causes very confusing behavior. It turns out that the RecordCollectorImpl does its own serialization: byte[] keyBytes = keySerializer.serialize(topic, key); byte[] valBytes = valueSerializer.serialize(topic, value); if (partition == null && partitioner != null) { List<PartitionInfo> partitions = this.producer.partitionsFor(topic); if (partitions != null && partitions.size() > 0) partition = partitioner.partition(key, value, partitions.size()); } } ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes); this.producer.send(serializedRecord) Critically, this means the record actually sent through the KafkaProducer is already turned into a byte[] kv. So when the Partitioner gets called (since I did not specify a partition directly, nor a StreamPartitioner), it sees a byte[] and does the "defaultPartition" case. Could someone shed some light on the right way to set this up? Are normal Kafka partitioners expected to work for Streams? I would much prefer to access my domain object directly rather than have to de-serialize the JSON we just spent so many CPU cycles making :) I could implement StreamPartitioner, but then I have to remember to specify it everywhere, otherwise potentially very subtle bugs creep in. Thanks, Steven
signature.asc
Description: Message signed with OpenPGP using GPGMail