So then I guess my problem really is that I am operating at two different levels of abstraction. How do I produce to a KStream? I could imagine a method:
public void KStream.put(K, V, Callback?); but I don't see anything like that. Nor do the "QueryableStoreTypes" really seem like what I want either. Currently I do this, which I feel isn't the most elegant solution: public class MyPartitioner implements Partitioner, StreamPartitioner<String, ChatMessage> { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { final List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return partition0(keyBytes, value, partitions.size()); } @Override public Integer partition(String key, ChatMessage value, int numPartitions) { return partition0(null, value, numPartitions); } @VisibleForTesting int partition0(byte[] keyBytes, Object value, final int numPartitions) { if (value instanceof ChatMessage) { return messagePartition((ChatMessage) value, numPartitions); } // same as DefaultPartitioner except we assume no null keys return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } If I could produce to a StreamPartition'd KStream, then I could work only at one layer. That would have the additional benefit that I would no longer need to configure and own my own KafkaProducers. > On Feb 9, 2017, at 8:25 PM, Matthias J. Sax <matth...@confluent.io> wrote: > > It's by design. > > The reason it, that Streams uses a single producer to write to different > output topic. As different output topics might have different key and/or > value types, the producer is instantiated with byte[] as key and value > type, and Streams serialized the data before handing it to the producer > -- Streams knows the topology and can pick the right serializer > according to the current key and value type. > > That's the reason why KStream#to() has an overload allowing to specify a > custom StreamPartitioner that will be called by Streams (not the > producer) to compute the partition before serializing the data. For this > case, the partition (to write the data into) is given to the producer > directly and the producer does not call it's own partitioner. > > > -Matthias > > > On 2/9/17 3:49 PM, Steven Schlansker wrote: >> Hi, I discovered what I consider to be really confusing behavior -- >> wondering if this is by design or a bug. >> >> The Kafka Partitioner interface: >> public int partition(String topic, Object key, byte[] keyBytes, Object >> value, byte[] valueBytes, Cluster cluster); >> >> has both "Object value" and "byte[] valueBytes" provided. I naïvely assumed >> that "value" would be the pre-serialized >> domain object. >> >> I set up a KStream: >> >> >> builder.stream(Serdes.String(), requestSerde, requestTopic) >> .mapValues(this::requestToMessage) >> .to(Serdes.String(), messageSerde, messageTopic); >> >> >> I produce to the "messageTopic" both via this Stream as well as by a normal >> KafkaProducer. >> >> I thought this should be sufficient to partition both ways: >> >> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MessagePartitioner.class); >> >> The partitioner has special understanding of the Message type and behaves as >> a DefaultPartitioner otherwise. >> Roughly, >> >> >> int partition(...) { >> return value instanceof Message ? specialPartition((Message)value) : >> defaultPartition(keyBytes); >> } >> >> >> This works great for KafkaProducer. The "value" field is indeed my Message >> type and partitions are assigned >> correctly. >> Unfortunately it does *not* work with the stream producer, which causes very >> confusing behavior. It turns out >> that the RecordCollectorImpl does its own serialization: >> >> >> byte[] keyBytes = keySerializer.serialize(topic, key); >> byte[] valBytes = valueSerializer.serialize(topic, value); >> if (partition == null && partitioner != null) { >> List<PartitionInfo> partitions = this.producer.partitionsFor(topic); >> if (partitions != null && partitions.size() > 0) >> partition = partitioner.partition(key, value, partitions.size()); >> } >> } >> ProducerRecord<byte[], byte[]> serializedRecord = >> new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes); >> this.producer.send(serializedRecord) >> >> >> Critically, this means the record actually sent through the KafkaProducer is >> already turned into a byte[] kv. >> So when the Partitioner gets called (since I did not specify a partition >> directly, nor a StreamPartitioner), it sees >> a byte[] and does the "defaultPartition" case. >> >> Could someone shed some light on the right way to set this up? Are normal >> Kafka partitioners expected to work for >> Streams? I would much prefer to access my domain object directly rather >> than have to de-serialize the JSON we just spent >> so many CPU cycles making :) >> >> I could implement StreamPartitioner, but then I have to remember to specify >> it everywhere, otherwise potentially very subtle bugs creep in. >> >> Thanks, >> Steven >> >
signature.asc
Description: Message signed with OpenPGP using GPGMail