Hi Nan,

To forward to the 3 different topics it will probably be easier to do this
in the Processor API.  Based off what your stated in your question, the
topology will look something like this:

final Topology topology = new Topology();
topology.addSource("source-node", "input-topic");
topology.addProcessor("transformer", () -> new MyTransfomer(),
topology.addSink("sink-node-1", "output-topic-1", "transformer");
topology.addSink("sink-node-2", "output-topic-2", "transformer");
topology.addSink("sink-node-3", "output-topic-3", "transformer");

As you can see, the "transformer" is the parent node of all 3 sink nodes.
Then in your Transformer, you can forward the key-value pairs by using one
of two approaches.

Sending to all child nodes with this call:

context().forward(key, value, To.all()).

Or by listing each child node individually like so

context().forward(key, value, To.child("sink-node-1"));
context().forward(key, value, To.child("sink-node-2"));
context().forward(key, value, To.child("sink-node-3"));



On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1...@gmail.com> wrote:

> when I do the transform, for a single input record, I need to output 3
> different records, those 3 records are in different classes.  I want to
> send the each type of records to a separate topic, my understanding is I
> should use
> context.forward inside the transformer  like
> Transformer{..
> context.forward(key, record1, To.child("topic1"))
> context.forward(key, value1, To.child("topic2"))
> }
> but how do I define those processor, I can create them in topology but who
> should be their parent? what's the name of the parent?
> stream.transform(transformer) don't give me a way to say processor name.
> Thanks,
> Nan

Reply via email to