Hey Steven, If you have one KStream, and you want to produce to a topic that is read by another KStream, you'd use the ".through" method of the first KStream. ".through" both outputs to a topic and returns a KStream that reads from that topic. (".to" just outputs to a topic)
If you want to produce data from a different application into a Kafka Streams app, you'd be using the Kafka Producer without the Kafka Streams library. (Or Kafka Connect, or any other way to produce to Kafka). You could use any implementation/configuration of partitioner.class that you'd want to. Mathieu On Fri, Feb 10, 2017 at 1:50 PM, Steven Schlansker < sschlans...@opentable.com> wrote: > So then I guess my problem really is that I am operating at two different > levels of abstraction. > How do I produce to a KStream? I could imagine a method: > > public void KStream.put(K, V, Callback?); > > but I don't see anything like that. Nor do the "QueryableStoreTypes" > really seem like what I want either. > > Currently I do this, which I feel isn't the most elegant solution: > > public class MyPartitioner implements Partitioner, > StreamPartitioner<String, ChatMessage> { > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > final List<PartitionInfo> partitions = cluster.partitionsForTopic( > topic); > return partition0(keyBytes, value, partitions.size()); > } > > @Override > public Integer partition(String key, ChatMessage value, int > numPartitions) { > return partition0(null, value, numPartitions); > } > > @VisibleForTesting > int partition0(byte[] keyBytes, Object value, final int numPartitions) > { > if (value instanceof ChatMessage) { > return messagePartition((ChatMessage) value, numPartitions); > } > // same as DefaultPartitioner except we assume no null keys > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } > } > > If I could produce to a StreamPartition'd KStream, then I could work only > at one layer. > That would have the additional benefit that I would no longer need to > configure and > own my own KafkaProducers. > > > On Feb 9, 2017, at 8:25 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > > It's by design. > > > > The reason it, that Streams uses a single producer to write to different > > output topic. As different output topics might have different key and/or > > value types, the producer is instantiated with byte[] as key and value > > type, and Streams serialized the data before handing it to the producer > > -- Streams knows the topology and can pick the right serializer > > according to the current key and value type. > > > > That's the reason why KStream#to() has an overload allowing to specify a > > custom StreamPartitioner that will be called by Streams (not the > > producer) to compute the partition before serializing the data. For this > > case, the partition (to write the data into) is given to the producer > > directly and the producer does not call it's own partitioner. > > > > > > -Matthias > > > > > > On 2/9/17 3:49 PM, Steven Schlansker wrote: > >> Hi, I discovered what I consider to be really confusing behavior -- > wondering if this is by design or a bug. > >> > >> The Kafka Partitioner interface: > >> public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster); > >> > >> has both "Object value" and "byte[] valueBytes" provided. I naïvely > assumed that "value" would be the pre-serialized > >> domain object. > >> > >> I set up a KStream: > >> > >> > >> builder.stream(Serdes.String(), requestSerde, requestTopic) > >> .mapValues(this::requestToMessage) > >> .to(Serdes.String(), messageSerde, messageTopic); > >> > >> > >> I produce to the "messageTopic" both via this Stream as well as by a > normal KafkaProducer. > >> > >> I thought this should be sufficient to partition both ways: > >> > >> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, > MessagePartitioner.class); > >> > >> The partitioner has special understanding of the Message type and > behaves as a DefaultPartitioner otherwise. > >> Roughly, > >> > >> > >> int partition(...) { > >> return value instanceof Message ? specialPartition((Message)value) > : defaultPartition(keyBytes); > >> } > >> > >> > >> This works great for KafkaProducer. The "value" field is indeed my > Message type and partitions are assigned > >> correctly. > >> Unfortunately it does *not* work with the stream producer, which causes > very confusing behavior. It turns out > >> that the RecordCollectorImpl does its own serialization: > >> > >> > >> byte[] keyBytes = keySerializer.serialize(topic, key); > >> byte[] valBytes = valueSerializer.serialize(topic, value); > >> if (partition == null && partitioner != null) { > >> List<PartitionInfo> partitions = this.producer.partitionsFor(topic); > >> if (partitions != null && partitions.size() > 0) > >> partition = partitioner.partition(key, value, partitions.size()); > >> } > >> } > >> ProducerRecord<byte[], byte[]> serializedRecord = > >> new ProducerRecord<>(topic, partition, timestamp, keyBytes, > valBytes); > >> this.producer.send(serializedRecord) > >> > >> > >> Critically, this means the record actually sent through the > KafkaProducer is already turned into a byte[] kv. > >> So when the Partitioner gets called (since I did not specify a > partition directly, nor a StreamPartitioner), it sees > >> a byte[] and does the "defaultPartition" case. > >> > >> Could someone shed some light on the right way to set this up? Are > normal Kafka partitioners expected to work for > >> Streams? I would much prefer to access my domain object directly > rather than have to de-serialize the JSON we just spent > >> so many CPU cycles making :) > >> > >> I could implement StreamPartitioner, but then I have to remember to > specify it everywhere, otherwise potentially very subtle bugs creep in. > >> > >> Thanks, > >> Steven > >> > > > >