that will be really helpful, thanks for the heads up. On Thu, Feb 7, 2019 at 7:36 PM Guozhang Wang <wangg...@gmail.com> wrote:
> Hi Nan, > > Glad it helps with your case. Just another note that in the next release > when KIP-307 is in place [1], you can actually combine the DSL with PAPI by > naming the last operator that creates your transformed KStream, and then > manually add the sink nodes like: > > stream2 = stream1.transform(Named.as("myName")); > > topology = builder.build(); > > // continue adding to the built topology > topology.addSink(... "myName"); > > --------- > > Or you can also rely on flatTransform [2] to reduce "transform.flatMap" to > a single operator. > > [1] > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > [2] > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues > > > Guozhang > > On Thu, Feb 7, 2019 at 12:59 PM Nan Xu <nanxu1...@gmail.com> wrote: > > > awesome, this solution is great, thanks a lot. > > > > Nan > > > > On Thu, Feb 7, 2019 at 2:28 PM Bill Bejeck <b...@confluent.io> wrote: > > > > > Hi Nan, > > > > > > l see what you are saying about reproducing a join in the PAPI. > > > > > > I have another thought. > > > > > > 1. Have your Transform return a List [r1, r2, r3] > > > 2. Then after your transform operation use a flatMapValues operator > > as > > > this will forward KV pairs of (k, r1), (k, r2), and (k, r3). > > > > > > From there you have two choices. > > > > > > 1. If you are using Kafka Streams v 2.0+, you can create an instance > of > > > TopicNameExtractor. > > > The TopicNameExtractor can return the appropriate topic name based on > the > > > instance type of the value. > > > Then you would look something like > > > > inputStream.transform(transformer).flatMapValues(...).to(MyTopicChooser, > > > Produced(...)); > > > > > > 2. If you are using a version of Kafka Streams prior to v 2.0 then > first > > > create 3 org.apache.kafka.streams.kstream.Predicate instances. > > > > > > - Predicate p1 = (k,v) -> v instanceof r1; > > > - Predicate p2 = (k,v) -> v instanceof r2; > > > - Predicate p3 = (k,v) -> v instanceof r3; > > > > > > You will still use the flatMapValues operator, but now you'd follow it > > > with the branch operator and have the resulting stream instances in the > > > array foward to the appropriate topic > > > > > > val allStreams = > > > inputStream.transform(transformer).flatMapValues(...).branch(p1, > > > p2, p3); > > > val allStreams(0).to("topic1"..); > > > val allStreams(1).to("topic2"..); > > > val allStreams(2).to("topic3"..); > > > > > > HTH, > > > Bill > > > > > > > > > > > > On Thu, Feb 7, 2019 at 11:51 AM Nan Xu <nanxu1...@gmail.com> wrote: > > > > > > > hmm, but my DSL logic at beginning involve some join between > different > > > > streams, so I feel that will be quit complex to write everything in > > PAPI. > > > > what if I do this. in the transform, I return all 3 classes as a > tuple. > > > > then to map 3 times on the same stream like this > > > > transformer { > > > > return (class1Instance, class2Instance, class3Instance) > > > > } > > > > val kstream = inputStream.transform(transformer) > > > > kstream.map((r1,r2,r3) => r1).to("topic1") > > > > kstream.map((r1,r2,r3) => r2).to("topic2") > > > > kstream.map((r1,r2,r3) => r3).to("topic3") > > > > but don't know if it is the recommended way. > > > > > > > > Thanks, > > > > Nan > > > > > > > > On Thu, Feb 7, 2019 at 10:12 AM Bill Bejeck <b...@confluent.io> > wrote: > > > > > > > > > Hi Nan, > > > > > > > > > > I wanted to follow up some more. > > > > > > > > > > Since you need your Transformer forward to 3 output topics or more > > > > > generally any time you want a processor to forward to multiple > child > > > > nodes > > > > > or specific nodes in the topology, you can best achieve this kind > of > > > > > control and flexibility using the PAPI. > > > > > > > > > > Thanks, > > > > > Bill > > > > > > > > > > On Thu, Feb 7, 2019 at 10:47 AM Bill Bejeck <b...@confluent.io> > > wrote: > > > > > > > > > > > Hi Nan, > > > > > > > > > > > > What I'm suggesting is do the entire topology in the PAPI, sorry > > if I > > > > > > didn't make this clear from before. > > > > > > > > > > > > Thanks, > > > > > > Bill > > > > > > > > > > > > On Thu, Feb 7, 2019 at 10:41 AM Nan Xu <nanxu1...@gmail.com> > > wrote: > > > > > > > > > > > >> thanks, just to make sure I understand this correctly,. > > > > > >> > > > > > >> I have some processing logic using DSL, after those processing, > I > > > > have a > > > > > >> kstream, from this kstream, I need to do a transform and put > > result > > > to > > > > > >> different topics. To use processor api, I need to put this > kstream > > > to > > > > a > > > > > >> topic, then use topology.addSource("source-node", > "input-topic"); > > > > > >> something like > > > > > >> > > > > > >> val streamBuilder = new StreamsBuilder() > > > > > >> val inputStream = streamBuilder.stream[String, > > > > StoreInput](begin_topic) > > > > > >> > > > > > >> //some DSL processing.... > > > > > >> val resultKStream = inputStream.map(xxxx).fitler.............. > > > > > >> > > > > > >> > > > > > >> resultKStream .to("inter_topic") > > > > > >> > > > > > >> final Topology topology = new Topology(); > > > > > >> topology.addSource("source-node", " inter_topic"); > > > > > >> topology.addProcessor("transformer", () -> new MyTransfomer(), > > > > > >> "source-node"); > > > > > >> > > > > > >> so if I have to put my intermediate result to inter_topic, is > > there > > > > any > > > > > >> performance implication? Not sure if I am right, but sounds to > me > > > that > > > > > >> will > > > > > >> cause one more hop from client(stream app) to kakfa brokers. > > > > beginning > > > > > >> DSL > > > > > >> processing is happening on the client side. then have to put > the > > > > result > > > > > >> back to broker, then read back to client to use processor api. > > > > > >> > > > > > >> Thanks, > > > > > >> Nan > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <b...@confluent.io> > > > wrote: > > > > > >> > > > > > >> > Hi Nan, > > > > > >> > > > > > > >> > To forward to the 3 different topics it will probably be > easier > > to > > > > do > > > > > >> this > > > > > >> > in the Processor API. Based off what your stated in your > > > question, > > > > > the > > > > > >> > topology will look something like this: > > > > > >> > > > > > > >> > final Topology topology = new Topology(); > > > > > >> > topology.addSource("source-node", "input-topic"); > > > > > >> > topology.addProcessor("transformer", () -> new MyTransfomer(), > > > > > >> > "source-node"); > > > > > >> > topology.addSink("sink-node-1", "output-topic-1", > > "transformer"); > > > > > >> > topology.addSink("sink-node-2", "output-topic-2", > > "transformer"); > > > > > >> > topology.addSink("sink-node-3", "output-topic-3", > > "transformer"); > > > > > >> > > > > > > >> > As you can see, the "transformer" is the parent node of all 3 > > sink > > > > > >> nodes. > > > > > >> > Then in your Transformer, you can forward the key-value pairs > by > > > > using > > > > > >> one > > > > > >> > of two approaches. > > > > > >> > > > > > > >> > Sending to all child nodes with this call: > > > > > >> > > > > > > >> > context().forward(key, value, To.all()). > > > > > >> > > > > > > >> > Or by listing each child node individually like so > > > > > >> > > > > > > >> > context().forward(key, value, To.child("sink-node-1")); > > > > > >> > context().forward(key, value, To.child("sink-node-2")); > > > > > >> > context().forward(key, value, To.child("sink-node-3")); > > > > > >> > > > > > > >> > HTH, > > > > > >> > > > > > > >> > Bill > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1...@gmail.com> > > > wrote: > > > > > >> > > > > > > >> > > when I do the transform, for a single input record, I need > to > > > > > output 3 > > > > > >> > > different records, those 3 records are in different classes. > > I > > > > want > > > > > >> to > > > > > >> > > send the each type of records to a separate topic, my > > > > understanding > > > > > >> is I > > > > > >> > > should use > > > > > >> > > > > > > > >> > > context.forward inside the transformer like > > > > > >> > > > > > > > >> > > Transformer{.. > > > > > >> > > context.forward(key, record1, To.child("topic1")) > > > > > >> > > context.forward(key, value1, To.child("topic2")) > > > > > >> > > } > > > > > >> > > but how do I define those processor, I can create them in > > > topology > > > > > but > > > > > >> > who > > > > > >> > > should be their parent? what's the name of the parent? > > > > > >> > > > > > > > >> > > stream.transform(transformer) don't give me a way to say > > > processor > > > > > >> name. > > > > > >> > > > > > > > >> > > Thanks, > > > > > >> > > Nan > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang >