that will be really helpful, thanks for the heads up.

On Thu, Feb 7, 2019 at 7:36 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Nan,
>
> Glad it helps with your case. Just another note that in the next release
> when KIP-307 is in place [1], you can actually combine the DSL with PAPI by
> naming the last operator that creates your transformed KStream, and then
> manually add the sink nodes like:
>
> stream2 = stream1.transform(Named.as("myName"));
>
> topology = builder.build();
>
> // continue adding to the built topology
> topology.addSink(... "myName");
>
> ---------
>
> Or you can also rely on flatTransform [2] to reduce "transform.flatMap" to
> a single operator.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> [2]
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues
>
>
> Guozhang
>
> On Thu, Feb 7, 2019 at 12:59 PM Nan Xu <nanxu1...@gmail.com> wrote:
>
> > awesome, this solution is great, thanks a lot.
> >
> > Nan
> >
> > On Thu, Feb 7, 2019 at 2:28 PM Bill Bejeck <b...@confluent.io> wrote:
> >
> > > Hi Nan,
> > >
> > > l see what you are saying about reproducing a join in the PAPI.
> > >
> > > I have another thought.
> > >
> > >    1. Have your Transform return a List [r1, r2, r3]
> > >    2. Then after your transform operation use a  flatMapValues operator
> > as
> > >    this will forward KV pairs of (k, r1), (k, r2), and (k, r3).
> > >
> > > From there you have two choices.
> > >
> > >  1. If you are using Kafka Streams v 2.0+, you can create an instance
> of
> > > TopicNameExtractor.
> > > The TopicNameExtractor can return the appropriate topic name based on
> the
> > > instance type of the value.
> > > Then you would look something like
> > >
> inputStream.transform(transformer).flatMapValues(...).to(MyTopicChooser,
> > > Produced(...));
> > >
> > > 2. If you are using a version of Kafka Streams prior to v 2.0 then
> first
> > > create 3 org.apache.kafka.streams.kstream.Predicate instances.
> > >
> > >    - Predicate p1 = (k,v) -> v instanceof r1;
> > >    - Predicate p2 = (k,v) -> v instanceof r2;
> > >    - Predicate p3 = (k,v) -> v instanceof r3;
> > >
> > >  You will still use the flatMapValues operator, but now you'd follow it
> > > with the branch operator and have the resulting stream instances in the
> > > array foward to the appropriate topic
> > >
> > >  val allStreams =
> > > inputStream.transform(transformer).flatMapValues(...).branch(p1,
> > > p2, p3);
> > >  val allStreams(0).to("topic1"..);
> > >  val allStreams(1).to("topic2"..);
> > >  val allStreams(2).to("topic3"..);
> > >
> > > HTH,
> > > Bill
> > >
> > >
> > >
> > > On Thu, Feb 7, 2019 at 11:51 AM Nan Xu <nanxu1...@gmail.com> wrote:
> > >
> > > > hmm, but my DSL logic at beginning involve some join between
> different
> > > > streams, so I feel that will be quit complex to write everything in
> > PAPI.
> > > > what if I do this. in the transform, I return all 3 classes as a
> tuple.
> > > > then to map 3 times on the same stream like this
> > > > transformer {
> > > > return (class1Instance, class2Instance, class3Instance)
> > > > }
> > > > val kstream = inputStream.transform(transformer)
> > > > kstream.map((r1,r2,r3) => r1).to("topic1")
> > > > kstream.map((r1,r2,r3) => r2).to("topic2")
> > > > kstream.map((r1,r2,r3) => r3).to("topic3")
> > > > but don't know if it is the recommended way.
> > > >
> > > > Thanks,
> > > > Nan
> > > >
> > > > On Thu, Feb 7, 2019 at 10:12 AM Bill Bejeck <b...@confluent.io>
> wrote:
> > > >
> > > > > Hi Nan,
> > > > >
> > > > > I wanted to follow up some more.
> > > > >
> > > > > Since you need your Transformer forward to 3 output topics or more
> > > > > generally any time you want a processor to forward to multiple
> child
> > > > nodes
> > > > > or specific nodes in the topology, you can best achieve this kind
> of
> > > > > control and flexibility using the PAPI.
> > > > >
> > > > > Thanks,
> > > > > Bill
> > > > >
> > > > > On Thu, Feb 7, 2019 at 10:47 AM Bill Bejeck <b...@confluent.io>
> > wrote:
> > > > >
> > > > > > Hi Nan,
> > > > > >
> > > > > > What I'm suggesting is do the entire topology in the PAPI, sorry
> > if I
> > > > > > didn't make this clear from before.
> > > > > >
> > > > > > Thanks,
> > > > > > Bill
> > > > > >
> > > > > > On Thu, Feb 7, 2019 at 10:41 AM Nan Xu <nanxu1...@gmail.com>
> > wrote:
> > > > > >
> > > > > >> thanks, just to make sure I understand this correctly,.
> > > > > >>
> > > > > >> I have some processing logic using DSL, after those processing,
> I
> > > > have a
> > > > > >> kstream, from this kstream, I need to do a transform and put
> > result
> > > to
> > > > > >> different topics. To use processor api, I need to put this
> kstream
> > > to
> > > > a
> > > > > >> topic, then use topology.addSource("source-node",
> "input-topic");
> > > > > >> something like
> > > > > >>
> > > > > >>  val streamBuilder = new StreamsBuilder()
> > > > > >>  val inputStream = streamBuilder.stream[String,
> > > > StoreInput](begin_topic)
> > > > > >>
> > > > > >> //some DSL processing....
> > > > > >> val resultKStream = inputStream.map(xxxx).fitler..............
> > > > > >>
> > > > > >>
> > > > > >> resultKStream .to("inter_topic")
> > > > > >>
> > > > > >> final Topology topology = new Topology();
> > > > > >> topology.addSource("source-node", " inter_topic");
> > > > > >> topology.addProcessor("transformer", () -> new MyTransfomer(),
> > > > > >> "source-node");
> > > > > >>
> > > > > >> so if I have to put my intermediate result to inter_topic, is
> > there
> > > > any
> > > > > >> performance implication? Not sure if I am right, but sounds to
> me
> > > that
> > > > > >> will
> > > > > >> cause one more hop from client(stream app) to kakfa brokers.
> > > > beginning
> > > > > >> DSL
> > > > > >> processing is happening on the client side.  then have to put
> the
> > > > result
> > > > > >> back to broker, then read back to client to use processor api.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Nan
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <b...@confluent.io>
> > > wrote:
> > > > > >>
> > > > > >> > Hi Nan,
> > > > > >> >
> > > > > >> > To forward to the 3 different topics it will probably be
> easier
> > to
> > > > do
> > > > > >> this
> > > > > >> > in the Processor API.  Based off what your stated in your
> > > question,
> > > > > the
> > > > > >> > topology will look something like this:
> > > > > >> >
> > > > > >> > final Topology topology = new Topology();
> > > > > >> > topology.addSource("source-node", "input-topic");
> > > > > >> > topology.addProcessor("transformer", () -> new MyTransfomer(),
> > > > > >> > "source-node");
> > > > > >> > topology.addSink("sink-node-1", "output-topic-1",
> > "transformer");
> > > > > >> > topology.addSink("sink-node-2", "output-topic-2",
> > "transformer");
> > > > > >> > topology.addSink("sink-node-3", "output-topic-3",
> > "transformer");
> > > > > >> >
> > > > > >> > As you can see, the "transformer" is the parent node of all 3
> > sink
> > > > > >> nodes.
> > > > > >> > Then in your Transformer, you can forward the key-value pairs
> by
> > > > using
> > > > > >> one
> > > > > >> > of two approaches.
> > > > > >> >
> > > > > >> > Sending to all child nodes with this call:
> > > > > >> >
> > > > > >> > context().forward(key, value, To.all()).
> > > > > >> >
> > > > > >> > Or by listing each child node individually like so
> > > > > >> >
> > > > > >> > context().forward(key, value, To.child("sink-node-1"));
> > > > > >> > context().forward(key, value, To.child("sink-node-2"));
> > > > > >> > context().forward(key, value, To.child("sink-node-3"));
> > > > > >> >
> > > > > >> > HTH,
> > > > > >> >
> > > > > >> > Bill
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1...@gmail.com>
> > > wrote:
> > > > > >> >
> > > > > >> > > when I do the transform, for a single input record, I need
> to
> > > > > output 3
> > > > > >> > > different records, those 3 records are in different classes.
> > I
> > > > want
> > > > > >> to
> > > > > >> > > send the each type of records to a separate topic, my
> > > > understanding
> > > > > >> is I
> > > > > >> > > should use
> > > > > >> > >
> > > > > >> > > context.forward inside the transformer  like
> > > > > >> > >
> > > > > >> > > Transformer{..
> > > > > >> > > context.forward(key, record1, To.child("topic1"))
> > > > > >> > > context.forward(key, value1, To.child("topic2"))
> > > > > >> > > }
> > > > > >> > > but how do I define those processor, I can create them in
> > > topology
> > > > > but
> > > > > >> > who
> > > > > >> > > should be their parent? what's the name of the parent?
> > > > > >> > >
> > > > > >> > > stream.transform(transformer) don't give me a way to say
> > > processor
> > > > > >> name.
> > > > > >> > >
> > > > > >> > > Thanks,
> > > > > >> > > Nan
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to