Hi Nan, l see what you are saying about reproducing a join in the PAPI.
I have another thought. 1. Have your Transform return a List [r1, r2, r3] 2. Then after your transform operation use a flatMapValues operator as this will forward KV pairs of (k, r1), (k, r2), and (k, r3). >From there you have two choices. 1. If you are using Kafka Streams v 2.0+, you can create an instance of TopicNameExtractor. The TopicNameExtractor can return the appropriate topic name based on the instance type of the value. Then you would look something like inputStream.transform(transformer).flatMapValues(...).to(MyTopicChooser, Produced(...)); 2. If you are using a version of Kafka Streams prior to v 2.0 then first create 3 org.apache.kafka.streams.kstream.Predicate instances. - Predicate p1 = (k,v) -> v instanceof r1; - Predicate p2 = (k,v) -> v instanceof r2; - Predicate p3 = (k,v) -> v instanceof r3; You will still use the flatMapValues operator, but now you'd follow it with the branch operator and have the resulting stream instances in the array foward to the appropriate topic val allStreams = inputStream.transform(transformer).flatMapValues(...).branch(p1, p2, p3); val allStreams(0).to("topic1"..); val allStreams(1).to("topic2"..); val allStreams(2).to("topic3"..); HTH, Bill On Thu, Feb 7, 2019 at 11:51 AM Nan Xu <nanxu1...@gmail.com> wrote: > hmm, but my DSL logic at beginning involve some join between different > streams, so I feel that will be quit complex to write everything in PAPI. > what if I do this. in the transform, I return all 3 classes as a tuple. > then to map 3 times on the same stream like this > transformer { > return (class1Instance, class2Instance, class3Instance) > } > val kstream = inputStream.transform(transformer) > kstream.map((r1,r2,r3) => r1).to("topic1") > kstream.map((r1,r2,r3) => r2).to("topic2") > kstream.map((r1,r2,r3) => r3).to("topic3") > but don't know if it is the recommended way. > > Thanks, > Nan > > On Thu, Feb 7, 2019 at 10:12 AM Bill Bejeck <b...@confluent.io> wrote: > > > Hi Nan, > > > > I wanted to follow up some more. > > > > Since you need your Transformer forward to 3 output topics or more > > generally any time you want a processor to forward to multiple child > nodes > > or specific nodes in the topology, you can best achieve this kind of > > control and flexibility using the PAPI. > > > > Thanks, > > Bill > > > > On Thu, Feb 7, 2019 at 10:47 AM Bill Bejeck <b...@confluent.io> wrote: > > > > > Hi Nan, > > > > > > What I'm suggesting is do the entire topology in the PAPI, sorry if I > > > didn't make this clear from before. > > > > > > Thanks, > > > Bill > > > > > > On Thu, Feb 7, 2019 at 10:41 AM Nan Xu <nanxu1...@gmail.com> wrote: > > > > > >> thanks, just to make sure I understand this correctly,. > > >> > > >> I have some processing logic using DSL, after those processing, I > have a > > >> kstream, from this kstream, I need to do a transform and put result to > > >> different topics. To use processor api, I need to put this kstream to > a > > >> topic, then use topology.addSource("source-node", "input-topic"); > > >> something like > > >> > > >> val streamBuilder = new StreamsBuilder() > > >> val inputStream = streamBuilder.stream[String, > StoreInput](begin_topic) > > >> > > >> //some DSL processing.... > > >> val resultKStream = inputStream.map(xxxx).fitler.............. > > >> > > >> > > >> resultKStream .to("inter_topic") > > >> > > >> final Topology topology = new Topology(); > > >> topology.addSource("source-node", " inter_topic"); > > >> topology.addProcessor("transformer", () -> new MyTransfomer(), > > >> "source-node"); > > >> > > >> so if I have to put my intermediate result to inter_topic, is there > any > > >> performance implication? Not sure if I am right, but sounds to me that > > >> will > > >> cause one more hop from client(stream app) to kakfa brokers. > beginning > > >> DSL > > >> processing is happening on the client side. then have to put the > result > > >> back to broker, then read back to client to use processor api. > > >> > > >> Thanks, > > >> Nan > > >> > > >> > > >> > > >> > > >> On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <b...@confluent.io> wrote: > > >> > > >> > Hi Nan, > > >> > > > >> > To forward to the 3 different topics it will probably be easier to > do > > >> this > > >> > in the Processor API. Based off what your stated in your question, > > the > > >> > topology will look something like this: > > >> > > > >> > final Topology topology = new Topology(); > > >> > topology.addSource("source-node", "input-topic"); > > >> > topology.addProcessor("transformer", () -> new MyTransfomer(), > > >> > "source-node"); > > >> > topology.addSink("sink-node-1", "output-topic-1", "transformer"); > > >> > topology.addSink("sink-node-2", "output-topic-2", "transformer"); > > >> > topology.addSink("sink-node-3", "output-topic-3", "transformer"); > > >> > > > >> > As you can see, the "transformer" is the parent node of all 3 sink > > >> nodes. > > >> > Then in your Transformer, you can forward the key-value pairs by > using > > >> one > > >> > of two approaches. > > >> > > > >> > Sending to all child nodes with this call: > > >> > > > >> > context().forward(key, value, To.all()). > > >> > > > >> > Or by listing each child node individually like so > > >> > > > >> > context().forward(key, value, To.child("sink-node-1")); > > >> > context().forward(key, value, To.child("sink-node-2")); > > >> > context().forward(key, value, To.child("sink-node-3")); > > >> > > > >> > HTH, > > >> > > > >> > Bill > > >> > > > >> > > > >> > > > >> > > > >> > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1...@gmail.com> wrote: > > >> > > > >> > > when I do the transform, for a single input record, I need to > > output 3 > > >> > > different records, those 3 records are in different classes. I > want > > >> to > > >> > > send the each type of records to a separate topic, my > understanding > > >> is I > > >> > > should use > > >> > > > > >> > > context.forward inside the transformer like > > >> > > > > >> > > Transformer{.. > > >> > > context.forward(key, record1, To.child("topic1")) > > >> > > context.forward(key, value1, To.child("topic2")) > > >> > > } > > >> > > but how do I define those processor, I can create them in topology > > but > > >> > who > > >> > > should be their parent? what's the name of the parent? > > >> > > > > >> > > stream.transform(transformer) don't give me a way to say processor > > >> name. > > >> > > > > >> > > Thanks, > > >> > > Nan > > >> > > > > >> > > > >> > > > > > >