So then I guess my problem really is that I am operating at two different 
levels of abstraction.
How do I produce to a KStream?  I could imagine a method:

public void KStream.put(K, V, Callback?);

but I don't see anything like that.  Nor do the "QueryableStoreTypes"
really seem like what I want either.

Currently I do this, which I feel isn't the most elegant solution:

public class MyPartitioner implements Partitioner, StreamPartitioner<String, 
ChatMessage> {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
        final List<PartitionInfo> partitions = 
cluster.partitionsForTopic(topic);
        return partition0(keyBytes, value, partitions.size());
    }

    @Override
    public Integer partition(String key, ChatMessage value, int numPartitions) {
        return partition0(null, value, numPartitions);
    }

    @VisibleForTesting
    int partition0(byte[] keyBytes, Object value, final int numPartitions) {
        if (value instanceof ChatMessage) {
            return messagePartition((ChatMessage) value, numPartitions);
        }
        // same as DefaultPartitioner except we assume no null keys
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

If I could produce to a StreamPartition'd KStream, then I could work only at 
one layer.
That would have the additional benefit that I would no longer need to configure 
and
own my own KafkaProducers.

> On Feb 9, 2017, at 8:25 PM, Matthias J. Sax <matth...@confluent.io> wrote:
> 
> It's by design.
> 
> The reason it, that Streams uses a single producer to write to different
> output topic. As different output topics might have different key and/or
> value types, the producer is instantiated with byte[] as key and value
> type, and Streams serialized the data before handing it to the producer
> -- Streams knows the topology and can pick the right serializer
> according to the current key and value type.
> 
> That's the reason why KStream#to() has an overload allowing to specify a
> custom StreamPartitioner that will be called by Streams (not the
> producer) to compute the partition before serializing the data. For this
> case, the partition (to write the data into) is given to the producer
> directly and the producer does not call it's own partitioner.
> 
> 
> -Matthias
> 
> 
> On 2/9/17 3:49 PM, Steven Schlansker wrote:
>> Hi, I discovered what I consider to be really confusing behavior -- 
>> wondering if this is by design or a bug.
>> 
>> The Kafka Partitioner interface:
>> public int partition(String topic, Object key, byte[] keyBytes, Object 
>> value, byte[] valueBytes, Cluster cluster);
>> 
>> has both "Object value" and "byte[] valueBytes" provided.  I naïvely assumed 
>> that "value" would be the pre-serialized
>> domain object.
>> 
>> I set up a KStream:
>> 
>> 
>> builder.stream(Serdes.String(), requestSerde, requestTopic)
>>       .mapValues(this::requestToMessage)
>>       .to(Serdes.String(), messageSerde, messageTopic);
>> 
>> 
>> I produce to the "messageTopic" both via this Stream as well as by a normal 
>> KafkaProducer.
>> 
>> I thought this should be sufficient to partition both ways:
>> 
>> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MessagePartitioner.class);
>> 
>> The partitioner has special understanding of the Message type and behaves as 
>> a DefaultPartitioner otherwise.
>> Roughly,
>> 
>> 
>> int partition(...) {
>>    return value instanceof Message ? specialPartition((Message)value) : 
>> defaultPartition(keyBytes);
>> }
>> 
>> 
>> This works great for KafkaProducer.  The "value" field is indeed my Message 
>> type and partitions are assigned
>> correctly.
>> Unfortunately it does *not* work with the stream producer, which causes very 
>> confusing behavior.  It turns out
>> that the RecordCollectorImpl does its own serialization:
>> 
>> 
>> byte[] keyBytes = keySerializer.serialize(topic, key);
>> byte[] valBytes = valueSerializer.serialize(topic, value);
>> if (partition == null && partitioner != null) {
>>    List<PartitionInfo> partitions = this.producer.partitionsFor(topic);
>>    if (partitions != null && partitions.size() > 0)
>>        partition = partitioner.partition(key, value, partitions.size());
>>    }
>> }
>> ProducerRecord<byte[], byte[]> serializedRecord =
>>    new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
>> this.producer.send(serializedRecord)
>> 
>> 
>> Critically, this means the record actually sent through the KafkaProducer is 
>> already turned into a byte[] kv.
>> So when the Partitioner gets called (since I did not specify a partition 
>> directly, nor a StreamPartitioner), it sees
>> a byte[] and does the "defaultPartition" case.
>> 
>> Could someone shed some light on the right way to set this up?  Are normal 
>> Kafka partitioners expected to work for
>> Streams?  I would much prefer to access my domain object directly rather 
>> than have to de-serialize the JSON we just spent
>> so many CPU cycles making :)
>> 
>> I could implement StreamPartitioner, but then I have to remember to specify 
>> it everywhere, otherwise potentially very subtle bugs creep in.
>> 
>> Thanks,
>> Steven
>> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to