Hi Nan, What I'm suggesting is do the entire topology in the PAPI, sorry if I didn't make this clear from before.
Thanks, Bill On Thu, Feb 7, 2019 at 10:41 AM Nan Xu <nanxu1...@gmail.com> wrote: > thanks, just to make sure I understand this correctly,. > > I have some processing logic using DSL, after those processing, I have a > kstream, from this kstream, I need to do a transform and put result to > different topics. To use processor api, I need to put this kstream to a > topic, then use topology.addSource("source-node", "input-topic"); > something like > > val streamBuilder = new StreamsBuilder() > val inputStream = streamBuilder.stream[String, StoreInput](begin_topic) > > //some DSL processing.... > val resultKStream = inputStream.map(xxxx).fitler.............. > > > resultKStream .to("inter_topic") > > final Topology topology = new Topology(); > topology.addSource("source-node", " inter_topic"); > topology.addProcessor("transformer", () -> new MyTransfomer(), > "source-node"); > > so if I have to put my intermediate result to inter_topic, is there any > performance implication? Not sure if I am right, but sounds to me that will > cause one more hop from client(stream app) to kakfa brokers. beginning DSL > processing is happening on the client side. then have to put the result > back to broker, then read back to client to use processor api. > > Thanks, > Nan > > > > > On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <b...@confluent.io> wrote: > > > Hi Nan, > > > > To forward to the 3 different topics it will probably be easier to do > this > > in the Processor API. Based off what your stated in your question, the > > topology will look something like this: > > > > final Topology topology = new Topology(); > > topology.addSource("source-node", "input-topic"); > > topology.addProcessor("transformer", () -> new MyTransfomer(), > > "source-node"); > > topology.addSink("sink-node-1", "output-topic-1", "transformer"); > > topology.addSink("sink-node-2", "output-topic-2", "transformer"); > > topology.addSink("sink-node-3", "output-topic-3", "transformer"); > > > > As you can see, the "transformer" is the parent node of all 3 sink nodes. > > Then in your Transformer, you can forward the key-value pairs by using > one > > of two approaches. > > > > Sending to all child nodes with this call: > > > > context().forward(key, value, To.all()). > > > > Or by listing each child node individually like so > > > > context().forward(key, value, To.child("sink-node-1")); > > context().forward(key, value, To.child("sink-node-2")); > > context().forward(key, value, To.child("sink-node-3")); > > > > HTH, > > > > Bill > > > > > > > > > > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1...@gmail.com> wrote: > > > > > when I do the transform, for a single input record, I need to output 3 > > > different records, those 3 records are in different classes. I want to > > > send the each type of records to a separate topic, my understanding is > I > > > should use > > > > > > context.forward inside the transformer like > > > > > > Transformer{.. > > > context.forward(key, record1, To.child("topic1")) > > > context.forward(key, value1, To.child("topic2")) > > > } > > > but how do I define those processor, I can create them in topology but > > who > > > should be their parent? what's the name of the parent? > > > > > > stream.transform(transformer) don't give me a way to say processor > name. > > > > > > Thanks, > > > Nan > > > > > >