thanks, just to make sure I understand this correctly,. I have some processing logic using DSL, after those processing, I have a kstream, from this kstream, I need to do a transform and put result to different topics. To use processor api, I need to put this kstream to a topic, then use topology.addSource("source-node", "input-topic"); something like
val streamBuilder = new StreamsBuilder() val inputStream = streamBuilder.stream[String, StoreInput](begin_topic) //some DSL processing.... val resultKStream = inputStream.map(xxxx).fitler.............. resultKStream .to("inter_topic") final Topology topology = new Topology(); topology.addSource("source-node", " inter_topic"); topology.addProcessor("transformer", () -> new MyTransfomer(), "source-node"); so if I have to put my intermediate result to inter_topic, is there any performance implication? Not sure if I am right, but sounds to me that will cause one more hop from client(stream app) to kakfa brokers. beginning DSL processing is happening on the client side. then have to put the result back to broker, then read back to client to use processor api. Thanks, Nan On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <b...@confluent.io> wrote: > Hi Nan, > > To forward to the 3 different topics it will probably be easier to do this > in the Processor API. Based off what your stated in your question, the > topology will look something like this: > > final Topology topology = new Topology(); > topology.addSource("source-node", "input-topic"); > topology.addProcessor("transformer", () -> new MyTransfomer(), > "source-node"); > topology.addSink("sink-node-1", "output-topic-1", "transformer"); > topology.addSink("sink-node-2", "output-topic-2", "transformer"); > topology.addSink("sink-node-3", "output-topic-3", "transformer"); > > As you can see, the "transformer" is the parent node of all 3 sink nodes. > Then in your Transformer, you can forward the key-value pairs by using one > of two approaches. > > Sending to all child nodes with this call: > > context().forward(key, value, To.all()). > > Or by listing each child node individually like so > > context().forward(key, value, To.child("sink-node-1")); > context().forward(key, value, To.child("sink-node-2")); > context().forward(key, value, To.child("sink-node-3")); > > HTH, > > Bill > > > > > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1...@gmail.com> wrote: > > > when I do the transform, for a single input record, I need to output 3 > > different records, those 3 records are in different classes. I want to > > send the each type of records to a separate topic, my understanding is I > > should use > > > > context.forward inside the transformer like > > > > Transformer{.. > > context.forward(key, record1, To.child("topic1")) > > context.forward(key, value1, To.child("topic2")) > > } > > but how do I define those processor, I can create them in topology but > who > > should be their parent? what's the name of the parent? > > > > stream.transform(transformer) don't give me a way to say processor name. > > > > Thanks, > > Nan > > >