Hi Nan, Glad it helps with your case. Just another note that in the next release when KIP-307 is in place [1], you can actually combine the DSL with PAPI by naming the last operator that creates your transformed KStream, and then manually add the sink nodes like:
stream2 = stream1.transform(Named.as("myName")); topology = builder.build(); // continue adding to the built topology topology.addSink(... "myName"); --------- Or you can also rely on flatTransform [2] to reduce "transform.flatMap" to a single operator. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL [2] https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues Guozhang On Thu, Feb 7, 2019 at 12:59 PM Nan Xu <nanxu1...@gmail.com> wrote: > awesome, this solution is great, thanks a lot. > > Nan > > On Thu, Feb 7, 2019 at 2:28 PM Bill Bejeck <b...@confluent.io> wrote: > > > Hi Nan, > > > > l see what you are saying about reproducing a join in the PAPI. > > > > I have another thought. > > > > 1. Have your Transform return a List [r1, r2, r3] > > 2. Then after your transform operation use a flatMapValues operator > as > > this will forward KV pairs of (k, r1), (k, r2), and (k, r3). > > > > From there you have two choices. > > > > 1. If you are using Kafka Streams v 2.0+, you can create an instance of > > TopicNameExtractor. > > The TopicNameExtractor can return the appropriate topic name based on the > > instance type of the value. > > Then you would look something like > > inputStream.transform(transformer).flatMapValues(...).to(MyTopicChooser, > > Produced(...)); > > > > 2. If you are using a version of Kafka Streams prior to v 2.0 then first > > create 3 org.apache.kafka.streams.kstream.Predicate instances. > > > > - Predicate p1 = (k,v) -> v instanceof r1; > > - Predicate p2 = (k,v) -> v instanceof r2; > > - Predicate p3 = (k,v) -> v instanceof r3; > > > > You will still use the flatMapValues operator, but now you'd follow it > > with the branch operator and have the resulting stream instances in the > > array foward to the appropriate topic > > > > val allStreams = > > inputStream.transform(transformer).flatMapValues(...).branch(p1, > > p2, p3); > > val allStreams(0).to("topic1"..); > > val allStreams(1).to("topic2"..); > > val allStreams(2).to("topic3"..); > > > > HTH, > > Bill > > > > > > > > On Thu, Feb 7, 2019 at 11:51 AM Nan Xu <nanxu1...@gmail.com> wrote: > > > > > hmm, but my DSL logic at beginning involve some join between different > > > streams, so I feel that will be quit complex to write everything in > PAPI. > > > what if I do this. in the transform, I return all 3 classes as a tuple. > > > then to map 3 times on the same stream like this > > > transformer { > > > return (class1Instance, class2Instance, class3Instance) > > > } > > > val kstream = inputStream.transform(transformer) > > > kstream.map((r1,r2,r3) => r1).to("topic1") > > > kstream.map((r1,r2,r3) => r2).to("topic2") > > > kstream.map((r1,r2,r3) => r3).to("topic3") > > > but don't know if it is the recommended way. > > > > > > Thanks, > > > Nan > > > > > > On Thu, Feb 7, 2019 at 10:12 AM Bill Bejeck <b...@confluent.io> wrote: > > > > > > > Hi Nan, > > > > > > > > I wanted to follow up some more. > > > > > > > > Since you need your Transformer forward to 3 output topics or more > > > > generally any time you want a processor to forward to multiple child > > > nodes > > > > or specific nodes in the topology, you can best achieve this kind of > > > > control and flexibility using the PAPI. > > > > > > > > Thanks, > > > > Bill > > > > > > > > On Thu, Feb 7, 2019 at 10:47 AM Bill Bejeck <b...@confluent.io> > wrote: > > > > > > > > > Hi Nan, > > > > > > > > > > What I'm suggesting is do the entire topology in the PAPI, sorry > if I > > > > > didn't make this clear from before. > > > > > > > > > > Thanks, > > > > > Bill > > > > > > > > > > On Thu, Feb 7, 2019 at 10:41 AM Nan Xu <nanxu1...@gmail.com> > wrote: > > > > > > > > > >> thanks, just to make sure I understand this correctly,. > > > > >> > > > > >> I have some processing logic using DSL, after those processing, I > > > have a > > > > >> kstream, from this kstream, I need to do a transform and put > result > > to > > > > >> different topics. To use processor api, I need to put this kstream > > to > > > a > > > > >> topic, then use topology.addSource("source-node", "input-topic"); > > > > >> something like > > > > >> > > > > >> val streamBuilder = new StreamsBuilder() > > > > >> val inputStream = streamBuilder.stream[String, > > > StoreInput](begin_topic) > > > > >> > > > > >> //some DSL processing.... > > > > >> val resultKStream = inputStream.map(xxxx).fitler.............. > > > > >> > > > > >> > > > > >> resultKStream .to("inter_topic") > > > > >> > > > > >> final Topology topology = new Topology(); > > > > >> topology.addSource("source-node", " inter_topic"); > > > > >> topology.addProcessor("transformer", () -> new MyTransfomer(), > > > > >> "source-node"); > > > > >> > > > > >> so if I have to put my intermediate result to inter_topic, is > there > > > any > > > > >> performance implication? Not sure if I am right, but sounds to me > > that > > > > >> will > > > > >> cause one more hop from client(stream app) to kakfa brokers. > > > beginning > > > > >> DSL > > > > >> processing is happening on the client side. then have to put the > > > result > > > > >> back to broker, then read back to client to use processor api. > > > > >> > > > > >> Thanks, > > > > >> Nan > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <b...@confluent.io> > > wrote: > > > > >> > > > > >> > Hi Nan, > > > > >> > > > > > >> > To forward to the 3 different topics it will probably be easier > to > > > do > > > > >> this > > > > >> > in the Processor API. Based off what your stated in your > > question, > > > > the > > > > >> > topology will look something like this: > > > > >> > > > > > >> > final Topology topology = new Topology(); > > > > >> > topology.addSource("source-node", "input-topic"); > > > > >> > topology.addProcessor("transformer", () -> new MyTransfomer(), > > > > >> > "source-node"); > > > > >> > topology.addSink("sink-node-1", "output-topic-1", > "transformer"); > > > > >> > topology.addSink("sink-node-2", "output-topic-2", > "transformer"); > > > > >> > topology.addSink("sink-node-3", "output-topic-3", > "transformer"); > > > > >> > > > > > >> > As you can see, the "transformer" is the parent node of all 3 > sink > > > > >> nodes. > > > > >> > Then in your Transformer, you can forward the key-value pairs by > > > using > > > > >> one > > > > >> > of two approaches. > > > > >> > > > > > >> > Sending to all child nodes with this call: > > > > >> > > > > > >> > context().forward(key, value, To.all()). > > > > >> > > > > > >> > Or by listing each child node individually like so > > > > >> > > > > > >> > context().forward(key, value, To.child("sink-node-1")); > > > > >> > context().forward(key, value, To.child("sink-node-2")); > > > > >> > context().forward(key, value, To.child("sink-node-3")); > > > > >> > > > > > >> > HTH, > > > > >> > > > > > >> > Bill > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1...@gmail.com> > > wrote: > > > > >> > > > > > >> > > when I do the transform, for a single input record, I need to > > > > output 3 > > > > >> > > different records, those 3 records are in different classes. > I > > > want > > > > >> to > > > > >> > > send the each type of records to a separate topic, my > > > understanding > > > > >> is I > > > > >> > > should use > > > > >> > > > > > > >> > > context.forward inside the transformer like > > > > >> > > > > > > >> > > Transformer{.. > > > > >> > > context.forward(key, record1, To.child("topic1")) > > > > >> > > context.forward(key, value1, To.child("topic2")) > > > > >> > > } > > > > >> > > but how do I define those processor, I can create them in > > topology > > > > but > > > > >> > who > > > > >> > > should be their parent? what's the name of the parent? > > > > >> > > > > > > >> > > stream.transform(transformer) don't give me a way to say > > processor > > > > >> name. > > > > >> > > > > > > >> > > Thanks, > > > > >> > > Nan > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > -- -- Guozhang