Hi Nan,

I wanted to follow up some more.

Since you need your Transformer forward to 3 output topics or more
generally any time you want a processor to forward to multiple child nodes
or specific nodes in the topology, you can best achieve this kind of
control and flexibility using the PAPI.

Thanks,
Bill

On Thu, Feb 7, 2019 at 10:47 AM Bill Bejeck <b...@confluent.io> wrote:

> Hi Nan,
>
> What I'm suggesting is do the entire topology in the PAPI, sorry if I
> didn't make this clear from before.
>
> Thanks,
> Bill
>
> On Thu, Feb 7, 2019 at 10:41 AM Nan Xu <nanxu1...@gmail.com> wrote:
>
>> thanks, just to make sure I understand this correctly,.
>>
>> I have some processing logic using DSL, after those processing, I have a
>> kstream, from this kstream, I need to do a transform and put result to
>> different topics. To use processor api, I need to put this kstream to a
>> topic, then use topology.addSource("source-node", "input-topic");
>> something like
>>
>>  val streamBuilder = new StreamsBuilder()
>>  val inputStream = streamBuilder.stream[String, StoreInput](begin_topic)
>>
>> //some DSL processing....
>> val resultKStream = inputStream.map(xxxx).fitler..............
>>
>>
>> resultKStream .to("inter_topic")
>>
>> final Topology topology = new Topology();
>> topology.addSource("source-node", " inter_topic");
>> topology.addProcessor("transformer", () -> new MyTransfomer(),
>> "source-node");
>>
>> so if I have to put my intermediate result to inter_topic, is there any
>> performance implication? Not sure if I am right, but sounds to me that
>> will
>> cause one more hop from client(stream app) to kakfa brokers.  beginning
>> DSL
>> processing is happening on the client side.  then have to put the result
>> back to broker, then read back to client to use processor api.
>>
>> Thanks,
>> Nan
>>
>>
>>
>>
>> On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <b...@confluent.io> wrote:
>>
>> > Hi Nan,
>> >
>> > To forward to the 3 different topics it will probably be easier to do
>> this
>> > in the Processor API.  Based off what your stated in your question, the
>> > topology will look something like this:
>> >
>> > final Topology topology = new Topology();
>> > topology.addSource("source-node", "input-topic");
>> > topology.addProcessor("transformer", () -> new MyTransfomer(),
>> > "source-node");
>> > topology.addSink("sink-node-1", "output-topic-1", "transformer");
>> > topology.addSink("sink-node-2", "output-topic-2", "transformer");
>> > topology.addSink("sink-node-3", "output-topic-3", "transformer");
>> >
>> > As you can see, the "transformer" is the parent node of all 3 sink
>> nodes.
>> > Then in your Transformer, you can forward the key-value pairs by using
>> one
>> > of two approaches.
>> >
>> > Sending to all child nodes with this call:
>> >
>> > context().forward(key, value, To.all()).
>> >
>> > Or by listing each child node individually like so
>> >
>> > context().forward(key, value, To.child("sink-node-1"));
>> > context().forward(key, value, To.child("sink-node-2"));
>> > context().forward(key, value, To.child("sink-node-3"));
>> >
>> > HTH,
>> >
>> > Bill
>> >
>> >
>> >
>> >
>> > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1...@gmail.com> wrote:
>> >
>> > > when I do the transform, for a single input record, I need to output 3
>> > > different records, those 3 records are in different classes.  I want
>> to
>> > > send the each type of records to a separate topic, my understanding
>> is I
>> > > should use
>> > >
>> > > context.forward inside the transformer  like
>> > >
>> > > Transformer{..
>> > > context.forward(key, record1, To.child("topic1"))
>> > > context.forward(key, value1, To.child("topic2"))
>> > > }
>> > > but how do I define those processor, I can create them in topology but
>> > who
>> > > should be their parent? what's the name of the parent?
>> > >
>> > > stream.transform(transformer) don't give me a way to say processor
>> name.
>> > >
>> > > Thanks,
>> > > Nan
>> > >
>> >
>>
>

Reply via email to