> On Feb 10, 2017, at 1:09 PM, Mathieu Fenniak <mathieu.fenn...@replicon.com> > wrote: > > Hey Steven, > > If you have one KStream, and you want to produce to a topic that is read by > another KStream, you'd use the ".through" method of the first KStream. > ".through" both outputs to a topic and returns a KStream that reads from > that topic. (".to" just outputs to a topic) > > If you want to produce data from a different application into a Kafka > Streams app, you'd be using the Kafka Producer without the Kafka Streams > library. (Or Kafka Connect, or any other way to produce to Kafka). You > could use any implementation/configuration of partitioner.class that you'd > want to. >
I am writing essentially a distributed state machine using Kafka. I get the sense that most users / examples are using it more for ETL or streaming data processing, which is probably why my need here seems a little strange. I expect to accept messages over HTTP and then wish to offer it into the processing stream. The only actor in my model currently *is* the Kafka Streams app, there is no upstream or downstream collaborator to feed it data over Kafka (yet). I get that I can use the normal Producer with the partitioner below, but I consider the code a little ugly and probably could be improved. Is the lack of a Kafka Streams level produce intentional? Am I thinking about the problem wrong? > Mathieu > > > On Fri, Feb 10, 2017 at 1:50 PM, Steven Schlansker < > sschlans...@opentable.com> wrote: > >> So then I guess my problem really is that I am operating at two different >> levels of abstraction. >> How do I produce to a KStream? I could imagine a method: >> >> public void KStream.put(K, V, Callback?); >> >> but I don't see anything like that. Nor do the "QueryableStoreTypes" >> really seem like what I want either. >> >> Currently I do this, which I feel isn't the most elegant solution: >> >> public class MyPartitioner implements Partitioner, >> StreamPartitioner<String, ChatMessage> { >> @Override >> public int partition(String topic, Object key, byte[] keyBytes, Object >> value, byte[] valueBytes, Cluster cluster) { >> final List<PartitionInfo> partitions = cluster.partitionsForTopic( >> topic); >> return partition0(keyBytes, value, partitions.size()); >> } >> >> @Override >> public Integer partition(String key, ChatMessage value, int >> numPartitions) { >> return partition0(null, value, numPartitions); >> } >> >> @VisibleForTesting >> int partition0(byte[] keyBytes, Object value, final int numPartitions) >> { >> if (value instanceof ChatMessage) { >> return messagePartition((ChatMessage) value, numPartitions); >> } >> // same as DefaultPartitioner except we assume no null keys >> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; >> } >> } >> >> If I could produce to a StreamPartition'd KStream, then I could work only >> at one layer. >> That would have the additional benefit that I would no longer need to >> configure and >> own my own KafkaProducers. >> >>> On Feb 9, 2017, at 8:25 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >>> >>> It's by design. >>> >>> The reason it, that Streams uses a single producer to write to different >>> output topic. As different output topics might have different key and/or >>> value types, the producer is instantiated with byte[] as key and value >>> type, and Streams serialized the data before handing it to the producer >>> -- Streams knows the topology and can pick the right serializer >>> according to the current key and value type. >>> >>> That's the reason why KStream#to() has an overload allowing to specify a >>> custom StreamPartitioner that will be called by Streams (not the >>> producer) to compute the partition before serializing the data. For this >>> case, the partition (to write the data into) is given to the producer >>> directly and the producer does not call it's own partitioner. >>> >>> >>> -Matthias >>> >>> >>> On 2/9/17 3:49 PM, Steven Schlansker wrote: >>>> Hi, I discovered what I consider to be really confusing behavior -- >> wondering if this is by design or a bug. >>>> >>>> The Kafka Partitioner interface: >>>> public int partition(String topic, Object key, byte[] keyBytes, Object >> value, byte[] valueBytes, Cluster cluster); >>>> >>>> has both "Object value" and "byte[] valueBytes" provided. I naïvely >> assumed that "value" would be the pre-serialized >>>> domain object. >>>> >>>> I set up a KStream: >>>> >>>> >>>> builder.stream(Serdes.String(), requestSerde, requestTopic) >>>> .mapValues(this::requestToMessage) >>>> .to(Serdes.String(), messageSerde, messageTopic); >>>> >>>> >>>> I produce to the "messageTopic" both via this Stream as well as by a >> normal KafkaProducer. >>>> >>>> I thought this should be sufficient to partition both ways: >>>> >>>> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, >> MessagePartitioner.class); >>>> >>>> The partitioner has special understanding of the Message type and >> behaves as a DefaultPartitioner otherwise. >>>> Roughly, >>>> >>>> >>>> int partition(...) { >>>> return value instanceof Message ? specialPartition((Message)value) >> : defaultPartition(keyBytes); >>>> } >>>> >>>> >>>> This works great for KafkaProducer. The "value" field is indeed my >> Message type and partitions are assigned >>>> correctly. >>>> Unfortunately it does *not* work with the stream producer, which causes >> very confusing behavior. It turns out >>>> that the RecordCollectorImpl does its own serialization: >>>> >>>> >>>> byte[] keyBytes = keySerializer.serialize(topic, key); >>>> byte[] valBytes = valueSerializer.serialize(topic, value); >>>> if (partition == null && partitioner != null) { >>>> List<PartitionInfo> partitions = this.producer.partitionsFor(topic); >>>> if (partitions != null && partitions.size() > 0) >>>> partition = partitioner.partition(key, value, partitions.size()); >>>> } >>>> } >>>> ProducerRecord<byte[], byte[]> serializedRecord = >>>> new ProducerRecord<>(topic, partition, timestamp, keyBytes, >> valBytes); >>>> this.producer.send(serializedRecord) >>>> >>>> >>>> Critically, this means the record actually sent through the >> KafkaProducer is already turned into a byte[] kv. >>>> So when the Partitioner gets called (since I did not specify a >> partition directly, nor a StreamPartitioner), it sees >>>> a byte[] and does the "defaultPartition" case. >>>> >>>> Could someone shed some light on the right way to set this up? Are >> normal Kafka partitioners expected to work for >>>> Streams? I would much prefer to access my domain object directly >> rather than have to de-serialize the JSON we just spent >>>> so many CPU cycles making :) >>>> >>>> I could implement StreamPartitioner, but then I have to remember to >> specify it everywhere, otherwise potentially very subtle bugs creep in. >>>> >>>> Thanks, >>>> Steven >>>> >>> >> >>
signature.asc
Description: Message signed with OpenPGP using GPGMail