What Mathieu say makes a lot of sense.

Just some background info:

A KStream is always created from a Kafka topic.

> KStream stream = builder.stream("topicName");

A #through() is just syntactic sugar to stream.to(XXX) followed by
builder.stream(XXX). So StreamPartitioner problem solved :)


About put(); that is intentional (as Mathieu mention already):

If you would have something link KStream#put() this would not be
fault-tolerant -- Streams relies on a topic (ie, persistent storage) to
replay records on failure. #put() cannot provide this guarantee and thus
data might get lost.

> but I don't see anything like that.  Nor do the "QueryableStoreTypes"
> really seem like what I want either.

Not sure how this relates?


-Matthias




On 2/10/17 1:37 PM, Mathieu Fenniak wrote:
> Well, I think what you're doing is unusual for sure.  The Streams API is
> really about transforming streams of data from input to output... so
> therefore the API doesn't have an injection point like you're looking for.
> I'd say it's intentional (I'm just a user though).
> 
> If I were in your shoes, I'd probably decouple the applications -- make the
> portion that accepts messages over HTTP and produces to a Kafka topic one
> application, and make the stream processor another app.  That would allow
> them to be deployed and scaled separately (eg. they may not always require
> the same hardware, capacity, yada yada).
> 
> Mathieu
> 
> 
> On Fri, Feb 10, 2017 at 2:22 PM, Steven Schlansker <
> sschlans...@opentable.com> wrote:
> 
>>
>>> On Feb 10, 2017, at 1:09 PM, Mathieu Fenniak <
>> mathieu.fenn...@replicon.com> wrote:
>>>
>>> Hey Steven,
>>>
>>> If you have one KStream, and you want to produce to a topic that is read
>> by
>>> another KStream, you'd use the ".through" method of the first KStream.
>>> ".through" both outputs to a topic and returns a KStream that reads from
>>> that topic.  (".to" just outputs to a topic)
>>>
>>> If you want to produce data from a different application into a Kafka
>>> Streams app, you'd be using the Kafka Producer without the Kafka Streams
>>> library.  (Or Kafka Connect, or any other way to produce to Kafka).  You
>>> could use any implementation/configuration of partitioner.class that
>> you'd
>>> want to.
>>>
>>
>> I am writing essentially a distributed state machine using Kafka.  I get
>> the sense
>> that most users / examples are using it more for ETL or streaming data
>> processing,
>> which is probably why my need here seems a little strange.
>>
>> I expect to accept messages over HTTP and then wish to offer it into the
>> processing
>> stream.  The only actor in my model currently *is* the Kafka Streams app,
>> there is no
>> upstream or downstream collaborator to feed it data over Kafka (yet).
>>
>> I get that I can use the normal Producer with the partitioner below, but I
>> consider
>> the code a little ugly and probably could be improved.  Is the lack of a
>> Kafka Streams
>> level produce intentional?  Am I thinking about the problem wrong?
>>
>>> Mathieu
>>>
>>>
>>> On Fri, Feb 10, 2017 at 1:50 PM, Steven Schlansker <
>>> sschlans...@opentable.com> wrote:
>>>
>>>> So then I guess my problem really is that I am operating at two
>> different
>>>> levels of abstraction.
>>>> How do I produce to a KStream?  I could imagine a method:
>>>>
>>>> public void KStream.put(K, V, Callback?);
>>>>
>>>> but I don't see anything like that.  Nor do the "QueryableStoreTypes"
>>>> really seem like what I want either.
>>>>
>>>> Currently I do this, which I feel isn't the most elegant solution:
>>>>
>>>> public class MyPartitioner implements Partitioner,
>>>> StreamPartitioner<String, ChatMessage> {
>>>>    @Override
>>>>    public int partition(String topic, Object key, byte[] keyBytes,
>> Object
>>>> value, byte[] valueBytes, Cluster cluster) {
>>>>        final List<PartitionInfo> partitions =
>> cluster.partitionsForTopic(
>>>> topic);
>>>>        return partition0(keyBytes, value, partitions.size());
>>>>    }
>>>>
>>>>    @Override
>>>>    public Integer partition(String key, ChatMessage value, int
>>>> numPartitions) {
>>>>        return partition0(null, value, numPartitions);
>>>>    }
>>>>
>>>>    @VisibleForTesting
>>>>    int partition0(byte[] keyBytes, Object value, final int
>> numPartitions)
>>>> {
>>>>        if (value instanceof ChatMessage) {
>>>>            return messagePartition((ChatMessage) value, numPartitions);
>>>>        }
>>>>        // same as DefaultPartitioner except we assume no null keys
>>>>        return Utils.toPositive(Utils.murmur2(keyBytes)) %
>> numPartitions;
>>>>    }
>>>> }
>>>>
>>>> If I could produce to a StreamPartition'd KStream, then I could work
>> only
>>>> at one layer.
>>>> That would have the additional benefit that I would no longer need to
>>>> configure and
>>>> own my own KafkaProducers.
>>>>
>>>>> On Feb 9, 2017, at 8:25 PM, Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>>
>>>>> It's by design.
>>>>>
>>>>> The reason it, that Streams uses a single producer to write to
>> different
>>>>> output topic. As different output topics might have different key
>> and/or
>>>>> value types, the producer is instantiated with byte[] as key and value
>>>>> type, and Streams serialized the data before handing it to the producer
>>>>> -- Streams knows the topology and can pick the right serializer
>>>>> according to the current key and value type.
>>>>>
>>>>> That's the reason why KStream#to() has an overload allowing to specify
>> a
>>>>> custom StreamPartitioner that will be called by Streams (not the
>>>>> producer) to compute the partition before serializing the data. For
>> this
>>>>> case, the partition (to write the data into) is given to the producer
>>>>> directly and the producer does not call it's own partitioner.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 2/9/17 3:49 PM, Steven Schlansker wrote:
>>>>>> Hi, I discovered what I consider to be really confusing behavior --
>>>> wondering if this is by design or a bug.
>>>>>>
>>>>>> The Kafka Partitioner interface:
>>>>>> public int partition(String topic, Object key, byte[] keyBytes, Object
>>>> value, byte[] valueBytes, Cluster cluster);
>>>>>>
>>>>>> has both "Object value" and "byte[] valueBytes" provided.  I naïvely
>>>> assumed that "value" would be the pre-serialized
>>>>>> domain object.
>>>>>>
>>>>>> I set up a KStream:
>>>>>>
>>>>>>
>>>>>> builder.stream(Serdes.String(), requestSerde, requestTopic)
>>>>>>      .mapValues(this::requestToMessage)
>>>>>>      .to(Serdes.String(), messageSerde, messageTopic);
>>>>>>
>>>>>>
>>>>>> I produce to the "messageTopic" both via this Stream as well as by a
>>>> normal KafkaProducer.
>>>>>>
>>>>>> I thought this should be sufficient to partition both ways:
>>>>>>
>>>>>> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
>>>> MessagePartitioner.class);
>>>>>>
>>>>>> The partitioner has special understanding of the Message type and
>>>> behaves as a DefaultPartitioner otherwise.
>>>>>> Roughly,
>>>>>>
>>>>>>
>>>>>> int partition(...) {
>>>>>>   return value instanceof Message ? specialPartition((Message)value)
>>>> : defaultPartition(keyBytes);
>>>>>> }
>>>>>>
>>>>>>
>>>>>> This works great for KafkaProducer.  The "value" field is indeed my
>>>> Message type and partitions are assigned
>>>>>> correctly.
>>>>>> Unfortunately it does *not* work with the stream producer, which
>> causes
>>>> very confusing behavior.  It turns out
>>>>>> that the RecordCollectorImpl does its own serialization:
>>>>>>
>>>>>>
>>>>>> byte[] keyBytes = keySerializer.serialize(topic, key);
>>>>>> byte[] valBytes = valueSerializer.serialize(topic, value);
>>>>>> if (partition == null && partitioner != null) {
>>>>>>   List<PartitionInfo> partitions = this.producer.partitionsFor(
>> topic);
>>>>>>   if (partitions != null && partitions.size() > 0)
>>>>>>       partition = partitioner.partition(key, value,
>> partitions.size());
>>>>>>   }
>>>>>> }
>>>>>> ProducerRecord<byte[], byte[]> serializedRecord =
>>>>>>   new ProducerRecord<>(topic, partition, timestamp, keyBytes,
>>>> valBytes);
>>>>>> this.producer.send(serializedRecord)
>>>>>>
>>>>>>
>>>>>> Critically, this means the record actually sent through the
>>>> KafkaProducer is already turned into a byte[] kv.
>>>>>> So when the Partitioner gets called (since I did not specify a
>>>> partition directly, nor a StreamPartitioner), it sees
>>>>>> a byte[] and does the "defaultPartition" case.
>>>>>>
>>>>>> Could someone shed some light on the right way to set this up?  Are
>>>> normal Kafka partitioners expected to work for
>>>>>> Streams?  I would much prefer to access my domain object directly
>>>> rather than have to de-serialize the JSON we just spent
>>>>>> so many CPU cycles making :)
>>>>>>
>>>>>> I could implement StreamPartitioner, but then I have to remember to
>>>> specify it everywhere, otherwise potentially very subtle bugs creep in.
>>>>>>
>>>>>> Thanks,
>>>>>> Steven
>>>>>>
>>>>>
>>>>
>>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to