> On Feb 10, 2017, at 1:09 PM, Mathieu Fenniak <mathieu.fenn...@replicon.com> 
> wrote:
> 
> Hey Steven,
> 
> If you have one KStream, and you want to produce to a topic that is read by
> another KStream, you'd use the ".through" method of the first KStream.
> ".through" both outputs to a topic and returns a KStream that reads from
> that topic.  (".to" just outputs to a topic)
> 
> If you want to produce data from a different application into a Kafka
> Streams app, you'd be using the Kafka Producer without the Kafka Streams
> library.  (Or Kafka Connect, or any other way to produce to Kafka).  You
> could use any implementation/configuration of partitioner.class that you'd
> want to.
> 

I am writing essentially a distributed state machine using Kafka.  I get the 
sense
that most users / examples are using it more for ETL or streaming data 
processing,
which is probably why my need here seems a little strange.

I expect to accept messages over HTTP and then wish to offer it into the 
processing
stream.  The only actor in my model currently *is* the Kafka Streams app, there 
is no
upstream or downstream collaborator to feed it data over Kafka (yet).

I get that I can use the normal Producer with the partitioner below, but I 
consider
the code a little ugly and probably could be improved.  Is the lack of a Kafka 
Streams
level produce intentional?  Am I thinking about the problem wrong?

> Mathieu
> 
> 
> On Fri, Feb 10, 2017 at 1:50 PM, Steven Schlansker <
> sschlans...@opentable.com> wrote:
> 
>> So then I guess my problem really is that I am operating at two different
>> levels of abstraction.
>> How do I produce to a KStream?  I could imagine a method:
>> 
>> public void KStream.put(K, V, Callback?);
>> 
>> but I don't see anything like that.  Nor do the "QueryableStoreTypes"
>> really seem like what I want either.
>> 
>> Currently I do this, which I feel isn't the most elegant solution:
>> 
>> public class MyPartitioner implements Partitioner,
>> StreamPartitioner<String, ChatMessage> {
>>    @Override
>>    public int partition(String topic, Object key, byte[] keyBytes, Object
>> value, byte[] valueBytes, Cluster cluster) {
>>        final List<PartitionInfo> partitions = cluster.partitionsForTopic(
>> topic);
>>        return partition0(keyBytes, value, partitions.size());
>>    }
>> 
>>    @Override
>>    public Integer partition(String key, ChatMessage value, int
>> numPartitions) {
>>        return partition0(null, value, numPartitions);
>>    }
>> 
>>    @VisibleForTesting
>>    int partition0(byte[] keyBytes, Object value, final int numPartitions)
>> {
>>        if (value instanceof ChatMessage) {
>>            return messagePartition((ChatMessage) value, numPartitions);
>>        }
>>        // same as DefaultPartitioner except we assume no null keys
>>        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
>>    }
>> }
>> 
>> If I could produce to a StreamPartition'd KStream, then I could work only
>> at one layer.
>> That would have the additional benefit that I would no longer need to
>> configure and
>> own my own KafkaProducers.
>> 
>>> On Feb 9, 2017, at 8:25 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>> 
>>> It's by design.
>>> 
>>> The reason it, that Streams uses a single producer to write to different
>>> output topic. As different output topics might have different key and/or
>>> value types, the producer is instantiated with byte[] as key and value
>>> type, and Streams serialized the data before handing it to the producer
>>> -- Streams knows the topology and can pick the right serializer
>>> according to the current key and value type.
>>> 
>>> That's the reason why KStream#to() has an overload allowing to specify a
>>> custom StreamPartitioner that will be called by Streams (not the
>>> producer) to compute the partition before serializing the data. For this
>>> case, the partition (to write the data into) is given to the producer
>>> directly and the producer does not call it's own partitioner.
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 2/9/17 3:49 PM, Steven Schlansker wrote:
>>>> Hi, I discovered what I consider to be really confusing behavior --
>> wondering if this is by design or a bug.
>>>> 
>>>> The Kafka Partitioner interface:
>>>> public int partition(String topic, Object key, byte[] keyBytes, Object
>> value, byte[] valueBytes, Cluster cluster);
>>>> 
>>>> has both "Object value" and "byte[] valueBytes" provided.  I naïvely
>> assumed that "value" would be the pre-serialized
>>>> domain object.
>>>> 
>>>> I set up a KStream:
>>>> 
>>>> 
>>>> builder.stream(Serdes.String(), requestSerde, requestTopic)
>>>>      .mapValues(this::requestToMessage)
>>>>      .to(Serdes.String(), messageSerde, messageTopic);
>>>> 
>>>> 
>>>> I produce to the "messageTopic" both via this Stream as well as by a
>> normal KafkaProducer.
>>>> 
>>>> I thought this should be sufficient to partition both ways:
>>>> 
>>>> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
>> MessagePartitioner.class);
>>>> 
>>>> The partitioner has special understanding of the Message type and
>> behaves as a DefaultPartitioner otherwise.
>>>> Roughly,
>>>> 
>>>> 
>>>> int partition(...) {
>>>>   return value instanceof Message ? specialPartition((Message)value)
>> : defaultPartition(keyBytes);
>>>> }
>>>> 
>>>> 
>>>> This works great for KafkaProducer.  The "value" field is indeed my
>> Message type and partitions are assigned
>>>> correctly.
>>>> Unfortunately it does *not* work with the stream producer, which causes
>> very confusing behavior.  It turns out
>>>> that the RecordCollectorImpl does its own serialization:
>>>> 
>>>> 
>>>> byte[] keyBytes = keySerializer.serialize(topic, key);
>>>> byte[] valBytes = valueSerializer.serialize(topic, value);
>>>> if (partition == null && partitioner != null) {
>>>>   List<PartitionInfo> partitions = this.producer.partitionsFor(topic);
>>>>   if (partitions != null && partitions.size() > 0)
>>>>       partition = partitioner.partition(key, value, partitions.size());
>>>>   }
>>>> }
>>>> ProducerRecord<byte[], byte[]> serializedRecord =
>>>>   new ProducerRecord<>(topic, partition, timestamp, keyBytes,
>> valBytes);
>>>> this.producer.send(serializedRecord)
>>>> 
>>>> 
>>>> Critically, this means the record actually sent through the
>> KafkaProducer is already turned into a byte[] kv.
>>>> So when the Partitioner gets called (since I did not specify a
>> partition directly, nor a StreamPartitioner), it sees
>>>> a byte[] and does the "defaultPartition" case.
>>>> 
>>>> Could someone shed some light on the right way to set this up?  Are
>> normal Kafka partitioners expected to work for
>>>> Streams?  I would much prefer to access my domain object directly
>> rather than have to de-serialize the JSON we just spent
>>>> so many CPU cycles making :)
>>>> 
>>>> I could implement StreamPartitioner, but then I have to remember to
>> specify it everywhere, otherwise potentially very subtle bugs creep in.
>>>> 
>>>> Thanks,
>>>> Steven
>>>> 
>>> 
>> 
>> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to