hmm, but my DSL logic at beginning involve some join between different streams, so I feel that will be quit complex to write everything in PAPI. what if I do this. in the transform, I return all 3 classes as a tuple. then to map 3 times on the same stream like this transformer { return (class1Instance, class2Instance, class3Instance) } val kstream = inputStream.transform(transformer) kstream.map((r1,r2,r3) => r1).to("topic1") kstream.map((r1,r2,r3) => r2).to("topic2") kstream.map((r1,r2,r3) => r3).to("topic3") but don't know if it is the recommended way.
Thanks, Nan On Thu, Feb 7, 2019 at 10:12 AM Bill Bejeck <b...@confluent.io> wrote: > Hi Nan, > > I wanted to follow up some more. > > Since you need your Transformer forward to 3 output topics or more > generally any time you want a processor to forward to multiple child nodes > or specific nodes in the topology, you can best achieve this kind of > control and flexibility using the PAPI. > > Thanks, > Bill > > On Thu, Feb 7, 2019 at 10:47 AM Bill Bejeck <b...@confluent.io> wrote: > > > Hi Nan, > > > > What I'm suggesting is do the entire topology in the PAPI, sorry if I > > didn't make this clear from before. > > > > Thanks, > > Bill > > > > On Thu, Feb 7, 2019 at 10:41 AM Nan Xu <nanxu1...@gmail.com> wrote: > > > >> thanks, just to make sure I understand this correctly,. > >> > >> I have some processing logic using DSL, after those processing, I have a > >> kstream, from this kstream, I need to do a transform and put result to > >> different topics. To use processor api, I need to put this kstream to a > >> topic, then use topology.addSource("source-node", "input-topic"); > >> something like > >> > >> val streamBuilder = new StreamsBuilder() > >> val inputStream = streamBuilder.stream[String, StoreInput](begin_topic) > >> > >> //some DSL processing.... > >> val resultKStream = inputStream.map(xxxx).fitler.............. > >> > >> > >> resultKStream .to("inter_topic") > >> > >> final Topology topology = new Topology(); > >> topology.addSource("source-node", " inter_topic"); > >> topology.addProcessor("transformer", () -> new MyTransfomer(), > >> "source-node"); > >> > >> so if I have to put my intermediate result to inter_topic, is there any > >> performance implication? Not sure if I am right, but sounds to me that > >> will > >> cause one more hop from client(stream app) to kakfa brokers. beginning > >> DSL > >> processing is happening on the client side. then have to put the result > >> back to broker, then read back to client to use processor api. > >> > >> Thanks, > >> Nan > >> > >> > >> > >> > >> On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <b...@confluent.io> wrote: > >> > >> > Hi Nan, > >> > > >> > To forward to the 3 different topics it will probably be easier to do > >> this > >> > in the Processor API. Based off what your stated in your question, > the > >> > topology will look something like this: > >> > > >> > final Topology topology = new Topology(); > >> > topology.addSource("source-node", "input-topic"); > >> > topology.addProcessor("transformer", () -> new MyTransfomer(), > >> > "source-node"); > >> > topology.addSink("sink-node-1", "output-topic-1", "transformer"); > >> > topology.addSink("sink-node-2", "output-topic-2", "transformer"); > >> > topology.addSink("sink-node-3", "output-topic-3", "transformer"); > >> > > >> > As you can see, the "transformer" is the parent node of all 3 sink > >> nodes. > >> > Then in your Transformer, you can forward the key-value pairs by using > >> one > >> > of two approaches. > >> > > >> > Sending to all child nodes with this call: > >> > > >> > context().forward(key, value, To.all()). > >> > > >> > Or by listing each child node individually like so > >> > > >> > context().forward(key, value, To.child("sink-node-1")); > >> > context().forward(key, value, To.child("sink-node-2")); > >> > context().forward(key, value, To.child("sink-node-3")); > >> > > >> > HTH, > >> > > >> > Bill > >> > > >> > > >> > > >> > > >> > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1...@gmail.com> wrote: > >> > > >> > > when I do the transform, for a single input record, I need to > output 3 > >> > > different records, those 3 records are in different classes. I want > >> to > >> > > send the each type of records to a separate topic, my understanding > >> is I > >> > > should use > >> > > > >> > > context.forward inside the transformer like > >> > > > >> > > Transformer{.. > >> > > context.forward(key, record1, To.child("topic1")) > >> > > context.forward(key, value1, To.child("topic2")) > >> > > } > >> > > but how do I define those processor, I can create them in topology > but > >> > who > >> > > should be their parent? what's the name of the parent? > >> > > > >> > > stream.transform(transformer) don't give me a way to say processor > >> name. > >> > > > >> > > Thanks, > >> > > Nan > >> > > > >> > > >> > > >