Well, I think what you're doing is unusual for sure.  The Streams API is
really about transforming streams of data from input to output... so
therefore the API doesn't have an injection point like you're looking for.
I'd say it's intentional (I'm just a user though).

If I were in your shoes, I'd probably decouple the applications -- make the
portion that accepts messages over HTTP and produces to a Kafka topic one
application, and make the stream processor another app.  That would allow
them to be deployed and scaled separately (eg. they may not always require
the same hardware, capacity, yada yada).

Mathieu


On Fri, Feb 10, 2017 at 2:22 PM, Steven Schlansker <
sschlans...@opentable.com> wrote:

>
> > On Feb 10, 2017, at 1:09 PM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
> >
> > Hey Steven,
> >
> > If you have one KStream, and you want to produce to a topic that is read
> by
> > another KStream, you'd use the ".through" method of the first KStream.
> > ".through" both outputs to a topic and returns a KStream that reads from
> > that topic.  (".to" just outputs to a topic)
> >
> > If you want to produce data from a different application into a Kafka
> > Streams app, you'd be using the Kafka Producer without the Kafka Streams
> > library.  (Or Kafka Connect, or any other way to produce to Kafka).  You
> > could use any implementation/configuration of partitioner.class that
> you'd
> > want to.
> >
>
> I am writing essentially a distributed state machine using Kafka.  I get
> the sense
> that most users / examples are using it more for ETL or streaming data
> processing,
> which is probably why my need here seems a little strange.
>
> I expect to accept messages over HTTP and then wish to offer it into the
> processing
> stream.  The only actor in my model currently *is* the Kafka Streams app,
> there is no
> upstream or downstream collaborator to feed it data over Kafka (yet).
>
> I get that I can use the normal Producer with the partitioner below, but I
> consider
> the code a little ugly and probably could be improved.  Is the lack of a
> Kafka Streams
> level produce intentional?  Am I thinking about the problem wrong?
>
> > Mathieu
> >
> >
> > On Fri, Feb 10, 2017 at 1:50 PM, Steven Schlansker <
> > sschlans...@opentable.com> wrote:
> >
> >> So then I guess my problem really is that I am operating at two
> different
> >> levels of abstraction.
> >> How do I produce to a KStream?  I could imagine a method:
> >>
> >> public void KStream.put(K, V, Callback?);
> >>
> >> but I don't see anything like that.  Nor do the "QueryableStoreTypes"
> >> really seem like what I want either.
> >>
> >> Currently I do this, which I feel isn't the most elegant solution:
> >>
> >> public class MyPartitioner implements Partitioner,
> >> StreamPartitioner<String, ChatMessage> {
> >>    @Override
> >>    public int partition(String topic, Object key, byte[] keyBytes,
> Object
> >> value, byte[] valueBytes, Cluster cluster) {
> >>        final List<PartitionInfo> partitions =
> cluster.partitionsForTopic(
> >> topic);
> >>        return partition0(keyBytes, value, partitions.size());
> >>    }
> >>
> >>    @Override
> >>    public Integer partition(String key, ChatMessage value, int
> >> numPartitions) {
> >>        return partition0(null, value, numPartitions);
> >>    }
> >>
> >>    @VisibleForTesting
> >>    int partition0(byte[] keyBytes, Object value, final int
> numPartitions)
> >> {
> >>        if (value instanceof ChatMessage) {
> >>            return messagePartition((ChatMessage) value, numPartitions);
> >>        }
> >>        // same as DefaultPartitioner except we assume no null keys
> >>        return Utils.toPositive(Utils.murmur2(keyBytes)) %
> numPartitions;
> >>    }
> >> }
> >>
> >> If I could produce to a StreamPartition'd KStream, then I could work
> only
> >> at one layer.
> >> That would have the additional benefit that I would no longer need to
> >> configure and
> >> own my own KafkaProducers.
> >>
> >>> On Feb 9, 2017, at 8:25 PM, Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>>
> >>> It's by design.
> >>>
> >>> The reason it, that Streams uses a single producer to write to
> different
> >>> output topic. As different output topics might have different key
> and/or
> >>> value types, the producer is instantiated with byte[] as key and value
> >>> type, and Streams serialized the data before handing it to the producer
> >>> -- Streams knows the topology and can pick the right serializer
> >>> according to the current key and value type.
> >>>
> >>> That's the reason why KStream#to() has an overload allowing to specify
> a
> >>> custom StreamPartitioner that will be called by Streams (not the
> >>> producer) to compute the partition before serializing the data. For
> this
> >>> case, the partition (to write the data into) is given to the producer
> >>> directly and the producer does not call it's own partitioner.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 2/9/17 3:49 PM, Steven Schlansker wrote:
> >>>> Hi, I discovered what I consider to be really confusing behavior --
> >> wondering if this is by design or a bug.
> >>>>
> >>>> The Kafka Partitioner interface:
> >>>> public int partition(String topic, Object key, byte[] keyBytes, Object
> >> value, byte[] valueBytes, Cluster cluster);
> >>>>
> >>>> has both "Object value" and "byte[] valueBytes" provided.  I naïvely
> >> assumed that "value" would be the pre-serialized
> >>>> domain object.
> >>>>
> >>>> I set up a KStream:
> >>>>
> >>>>
> >>>> builder.stream(Serdes.String(), requestSerde, requestTopic)
> >>>>      .mapValues(this::requestToMessage)
> >>>>      .to(Serdes.String(), messageSerde, messageTopic);
> >>>>
> >>>>
> >>>> I produce to the "messageTopic" both via this Stream as well as by a
> >> normal KafkaProducer.
> >>>>
> >>>> I thought this should be sufficient to partition both ways:
> >>>>
> >>>> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
> >> MessagePartitioner.class);
> >>>>
> >>>> The partitioner has special understanding of the Message type and
> >> behaves as a DefaultPartitioner otherwise.
> >>>> Roughly,
> >>>>
> >>>>
> >>>> int partition(...) {
> >>>>   return value instanceof Message ? specialPartition((Message)value)
> >> : defaultPartition(keyBytes);
> >>>> }
> >>>>
> >>>>
> >>>> This works great for KafkaProducer.  The "value" field is indeed my
> >> Message type and partitions are assigned
> >>>> correctly.
> >>>> Unfortunately it does *not* work with the stream producer, which
> causes
> >> very confusing behavior.  It turns out
> >>>> that the RecordCollectorImpl does its own serialization:
> >>>>
> >>>>
> >>>> byte[] keyBytes = keySerializer.serialize(topic, key);
> >>>> byte[] valBytes = valueSerializer.serialize(topic, value);
> >>>> if (partition == null && partitioner != null) {
> >>>>   List<PartitionInfo> partitions = this.producer.partitionsFor(
> topic);
> >>>>   if (partitions != null && partitions.size() > 0)
> >>>>       partition = partitioner.partition(key, value,
> partitions.size());
> >>>>   }
> >>>> }
> >>>> ProducerRecord<byte[], byte[]> serializedRecord =
> >>>>   new ProducerRecord<>(topic, partition, timestamp, keyBytes,
> >> valBytes);
> >>>> this.producer.send(serializedRecord)
> >>>>
> >>>>
> >>>> Critically, this means the record actually sent through the
> >> KafkaProducer is already turned into a byte[] kv.
> >>>> So when the Partitioner gets called (since I did not specify a
> >> partition directly, nor a StreamPartitioner), it sees
> >>>> a byte[] and does the "defaultPartition" case.
> >>>>
> >>>> Could someone shed some light on the right way to set this up?  Are
> >> normal Kafka partitioners expected to work for
> >>>> Streams?  I would much prefer to access my domain object directly
> >> rather than have to de-serialize the JSON we just spent
> >>>> so many CPU cycles making :)
> >>>>
> >>>> I could implement StreamPartitioner, but then I have to remember to
> >> specify it everywhere, otherwise potentially very subtle bugs creep in.
> >>>>
> >>>> Thanks,
> >>>> Steven
> >>>>
> >>>
> >>
> >>
>
>

Reply via email to