Hey Steven,

If you have one KStream, and you want to produce to a topic that is read by
another KStream, you'd use the ".through" method of the first KStream.
 ".through" both outputs to a topic and returns a KStream that reads from
that topic.  (".to" just outputs to a topic)

If you want to produce data from a different application into a Kafka
Streams app, you'd be using the Kafka Producer without the Kafka Streams
library.  (Or Kafka Connect, or any other way to produce to Kafka).  You
could use any implementation/configuration of partitioner.class that you'd
want to.

Mathieu


On Fri, Feb 10, 2017 at 1:50 PM, Steven Schlansker <
sschlans...@opentable.com> wrote:

> So then I guess my problem really is that I am operating at two different
> levels of abstraction.
> How do I produce to a KStream?  I could imagine a method:
>
> public void KStream.put(K, V, Callback?);
>
> but I don't see anything like that.  Nor do the "QueryableStoreTypes"
> really seem like what I want either.
>
> Currently I do this, which I feel isn't the most elegant solution:
>
> public class MyPartitioner implements Partitioner,
> StreamPartitioner<String, ChatMessage> {
>     @Override
>     public int partition(String topic, Object key, byte[] keyBytes, Object
> value, byte[] valueBytes, Cluster cluster) {
>         final List<PartitionInfo> partitions = cluster.partitionsForTopic(
> topic);
>         return partition0(keyBytes, value, partitions.size());
>     }
>
>     @Override
>     public Integer partition(String key, ChatMessage value, int
> numPartitions) {
>         return partition0(null, value, numPartitions);
>     }
>
>     @VisibleForTesting
>     int partition0(byte[] keyBytes, Object value, final int numPartitions)
> {
>         if (value instanceof ChatMessage) {
>             return messagePartition((ChatMessage) value, numPartitions);
>         }
>         // same as DefaultPartitioner except we assume no null keys
>         return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
>     }
> }
>
> If I could produce to a StreamPartition'd KStream, then I could work only
> at one layer.
> That would have the additional benefit that I would no longer need to
> configure and
> own my own KafkaProducers.
>
> > On Feb 9, 2017, at 8:25 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> > It's by design.
> >
> > The reason it, that Streams uses a single producer to write to different
> > output topic. As different output topics might have different key and/or
> > value types, the producer is instantiated with byte[] as key and value
> > type, and Streams serialized the data before handing it to the producer
> > -- Streams knows the topology and can pick the right serializer
> > according to the current key and value type.
> >
> > That's the reason why KStream#to() has an overload allowing to specify a
> > custom StreamPartitioner that will be called by Streams (not the
> > producer) to compute the partition before serializing the data. For this
> > case, the partition (to write the data into) is given to the producer
> > directly and the producer does not call it's own partitioner.
> >
> >
> > -Matthias
> >
> >
> > On 2/9/17 3:49 PM, Steven Schlansker wrote:
> >> Hi, I discovered what I consider to be really confusing behavior --
> wondering if this is by design or a bug.
> >>
> >> The Kafka Partitioner interface:
> >> public int partition(String topic, Object key, byte[] keyBytes, Object
> value, byte[] valueBytes, Cluster cluster);
> >>
> >> has both "Object value" and "byte[] valueBytes" provided.  I naïvely
> assumed that "value" would be the pre-serialized
> >> domain object.
> >>
> >> I set up a KStream:
> >>
> >>
> >> builder.stream(Serdes.String(), requestSerde, requestTopic)
> >>       .mapValues(this::requestToMessage)
> >>       .to(Serdes.String(), messageSerde, messageTopic);
> >>
> >>
> >> I produce to the "messageTopic" both via this Stream as well as by a
> normal KafkaProducer.
> >>
> >> I thought this should be sufficient to partition both ways:
> >>
> >> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
> MessagePartitioner.class);
> >>
> >> The partitioner has special understanding of the Message type and
> behaves as a DefaultPartitioner otherwise.
> >> Roughly,
> >>
> >>
> >> int partition(...) {
> >>    return value instanceof Message ? specialPartition((Message)value)
> : defaultPartition(keyBytes);
> >> }
> >>
> >>
> >> This works great for KafkaProducer.  The "value" field is indeed my
> Message type and partitions are assigned
> >> correctly.
> >> Unfortunately it does *not* work with the stream producer, which causes
> very confusing behavior.  It turns out
> >> that the RecordCollectorImpl does its own serialization:
> >>
> >>
> >> byte[] keyBytes = keySerializer.serialize(topic, key);
> >> byte[] valBytes = valueSerializer.serialize(topic, value);
> >> if (partition == null && partitioner != null) {
> >>    List<PartitionInfo> partitions = this.producer.partitionsFor(topic);
> >>    if (partitions != null && partitions.size() > 0)
> >>        partition = partitioner.partition(key, value, partitions.size());
> >>    }
> >> }
> >> ProducerRecord<byte[], byte[]> serializedRecord =
> >>    new ProducerRecord<>(topic, partition, timestamp, keyBytes,
> valBytes);
> >> this.producer.send(serializedRecord)
> >>
> >>
> >> Critically, this means the record actually sent through the
> KafkaProducer is already turned into a byte[] kv.
> >> So when the Partitioner gets called (since I did not specify a
> partition directly, nor a StreamPartitioner), it sees
> >> a byte[] and does the "defaultPartition" case.
> >>
> >> Could someone shed some light on the right way to set this up?  Are
> normal Kafka partitioners expected to work for
> >> Streams?  I would much prefer to access my domain object directly
> rather than have to de-serialize the JSON we just spent
> >> so many CPU cycles making :)
> >>
> >> I could implement StreamPartitioner, but then I have to remember to
> specify it everywhere, otherwise potentially very subtle bugs creep in.
> >>
> >> Thanks,
> >> Steven
> >>
> >
>
>

Reply via email to