What Mathieu say makes a lot of sense. Just some background info:
A KStream is always created from a Kafka topic. > KStream stream = builder.stream("topicName"); A #through() is just syntactic sugar to stream.to(XXX) followed by builder.stream(XXX). So StreamPartitioner problem solved :) About put(); that is intentional (as Mathieu mention already): If you would have something link KStream#put() this would not be fault-tolerant -- Streams relies on a topic (ie, persistent storage) to replay records on failure. #put() cannot provide this guarantee and thus data might get lost. > but I don't see anything like that. Nor do the "QueryableStoreTypes" > really seem like what I want either. Not sure how this relates? -Matthias On 2/10/17 1:37 PM, Mathieu Fenniak wrote: > Well, I think what you're doing is unusual for sure. The Streams API is > really about transforming streams of data from input to output... so > therefore the API doesn't have an injection point like you're looking for. > I'd say it's intentional (I'm just a user though). > > If I were in your shoes, I'd probably decouple the applications -- make the > portion that accepts messages over HTTP and produces to a Kafka topic one > application, and make the stream processor another app. That would allow > them to be deployed and scaled separately (eg. they may not always require > the same hardware, capacity, yada yada). > > Mathieu > > > On Fri, Feb 10, 2017 at 2:22 PM, Steven Schlansker < > sschlans...@opentable.com> wrote: > >> >>> On Feb 10, 2017, at 1:09 PM, Mathieu Fenniak < >> mathieu.fenn...@replicon.com> wrote: >>> >>> Hey Steven, >>> >>> If you have one KStream, and you want to produce to a topic that is read >> by >>> another KStream, you'd use the ".through" method of the first KStream. >>> ".through" both outputs to a topic and returns a KStream that reads from >>> that topic. (".to" just outputs to a topic) >>> >>> If you want to produce data from a different application into a Kafka >>> Streams app, you'd be using the Kafka Producer without the Kafka Streams >>> library. (Or Kafka Connect, or any other way to produce to Kafka). You >>> could use any implementation/configuration of partitioner.class that >> you'd >>> want to. >>> >> >> I am writing essentially a distributed state machine using Kafka. I get >> the sense >> that most users / examples are using it more for ETL or streaming data >> processing, >> which is probably why my need here seems a little strange. >> >> I expect to accept messages over HTTP and then wish to offer it into the >> processing >> stream. The only actor in my model currently *is* the Kafka Streams app, >> there is no >> upstream or downstream collaborator to feed it data over Kafka (yet). >> >> I get that I can use the normal Producer with the partitioner below, but I >> consider >> the code a little ugly and probably could be improved. Is the lack of a >> Kafka Streams >> level produce intentional? Am I thinking about the problem wrong? >> >>> Mathieu >>> >>> >>> On Fri, Feb 10, 2017 at 1:50 PM, Steven Schlansker < >>> sschlans...@opentable.com> wrote: >>> >>>> So then I guess my problem really is that I am operating at two >> different >>>> levels of abstraction. >>>> How do I produce to a KStream? I could imagine a method: >>>> >>>> public void KStream.put(K, V, Callback?); >>>> >>>> but I don't see anything like that. Nor do the "QueryableStoreTypes" >>>> really seem like what I want either. >>>> >>>> Currently I do this, which I feel isn't the most elegant solution: >>>> >>>> public class MyPartitioner implements Partitioner, >>>> StreamPartitioner<String, ChatMessage> { >>>> @Override >>>> public int partition(String topic, Object key, byte[] keyBytes, >> Object >>>> value, byte[] valueBytes, Cluster cluster) { >>>> final List<PartitionInfo> partitions = >> cluster.partitionsForTopic( >>>> topic); >>>> return partition0(keyBytes, value, partitions.size()); >>>> } >>>> >>>> @Override >>>> public Integer partition(String key, ChatMessage value, int >>>> numPartitions) { >>>> return partition0(null, value, numPartitions); >>>> } >>>> >>>> @VisibleForTesting >>>> int partition0(byte[] keyBytes, Object value, final int >> numPartitions) >>>> { >>>> if (value instanceof ChatMessage) { >>>> return messagePartition((ChatMessage) value, numPartitions); >>>> } >>>> // same as DefaultPartitioner except we assume no null keys >>>> return Utils.toPositive(Utils.murmur2(keyBytes)) % >> numPartitions; >>>> } >>>> } >>>> >>>> If I could produce to a StreamPartition'd KStream, then I could work >> only >>>> at one layer. >>>> That would have the additional benefit that I would no longer need to >>>> configure and >>>> own my own KafkaProducers. >>>> >>>>> On Feb 9, 2017, at 8:25 PM, Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>>> >>>>> It's by design. >>>>> >>>>> The reason it, that Streams uses a single producer to write to >> different >>>>> output topic. As different output topics might have different key >> and/or >>>>> value types, the producer is instantiated with byte[] as key and value >>>>> type, and Streams serialized the data before handing it to the producer >>>>> -- Streams knows the topology and can pick the right serializer >>>>> according to the current key and value type. >>>>> >>>>> That's the reason why KStream#to() has an overload allowing to specify >> a >>>>> custom StreamPartitioner that will be called by Streams (not the >>>>> producer) to compute the partition before serializing the data. For >> this >>>>> case, the partition (to write the data into) is given to the producer >>>>> directly and the producer does not call it's own partitioner. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 2/9/17 3:49 PM, Steven Schlansker wrote: >>>>>> Hi, I discovered what I consider to be really confusing behavior -- >>>> wondering if this is by design or a bug. >>>>>> >>>>>> The Kafka Partitioner interface: >>>>>> public int partition(String topic, Object key, byte[] keyBytes, Object >>>> value, byte[] valueBytes, Cluster cluster); >>>>>> >>>>>> has both "Object value" and "byte[] valueBytes" provided. I naïvely >>>> assumed that "value" would be the pre-serialized >>>>>> domain object. >>>>>> >>>>>> I set up a KStream: >>>>>> >>>>>> >>>>>> builder.stream(Serdes.String(), requestSerde, requestTopic) >>>>>> .mapValues(this::requestToMessage) >>>>>> .to(Serdes.String(), messageSerde, messageTopic); >>>>>> >>>>>> >>>>>> I produce to the "messageTopic" both via this Stream as well as by a >>>> normal KafkaProducer. >>>>>> >>>>>> I thought this should be sufficient to partition both ways: >>>>>> >>>>>> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, >>>> MessagePartitioner.class); >>>>>> >>>>>> The partitioner has special understanding of the Message type and >>>> behaves as a DefaultPartitioner otherwise. >>>>>> Roughly, >>>>>> >>>>>> >>>>>> int partition(...) { >>>>>> return value instanceof Message ? specialPartition((Message)value) >>>> : defaultPartition(keyBytes); >>>>>> } >>>>>> >>>>>> >>>>>> This works great for KafkaProducer. The "value" field is indeed my >>>> Message type and partitions are assigned >>>>>> correctly. >>>>>> Unfortunately it does *not* work with the stream producer, which >> causes >>>> very confusing behavior. It turns out >>>>>> that the RecordCollectorImpl does its own serialization: >>>>>> >>>>>> >>>>>> byte[] keyBytes = keySerializer.serialize(topic, key); >>>>>> byte[] valBytes = valueSerializer.serialize(topic, value); >>>>>> if (partition == null && partitioner != null) { >>>>>> List<PartitionInfo> partitions = this.producer.partitionsFor( >> topic); >>>>>> if (partitions != null && partitions.size() > 0) >>>>>> partition = partitioner.partition(key, value, >> partitions.size()); >>>>>> } >>>>>> } >>>>>> ProducerRecord<byte[], byte[]> serializedRecord = >>>>>> new ProducerRecord<>(topic, partition, timestamp, keyBytes, >>>> valBytes); >>>>>> this.producer.send(serializedRecord) >>>>>> >>>>>> >>>>>> Critically, this means the record actually sent through the >>>> KafkaProducer is already turned into a byte[] kv. >>>>>> So when the Partitioner gets called (since I did not specify a >>>> partition directly, nor a StreamPartitioner), it sees >>>>>> a byte[] and does the "defaultPartition" case. >>>>>> >>>>>> Could someone shed some light on the right way to set this up? Are >>>> normal Kafka partitioners expected to work for >>>>>> Streams? I would much prefer to access my domain object directly >>>> rather than have to de-serialize the JSON we just spent >>>>>> so many CPU cycles making :) >>>>>> >>>>>> I could implement StreamPartitioner, but then I have to remember to >>>> specify it everywhere, otherwise potentially very subtle bugs creep in. >>>>>> >>>>>> Thanks, >>>>>> Steven >>>>>> >>>>> >>>> >>>> >> >> >
signature.asc
Description: OpenPGP digital signature