Ho my, I'm dumb. One can give multiple predicates to `KStream.branch()`. 2017-01-18 17:18 GMT+01:00 Nicolas Fouché <nfou...@onfocus.io>:
> The reason I would not use `KStream.transform()` is that I want to call > `ProcessorContext.forward()` several times, to different children. These > children are sinks. > My use case: I need to route my beacons to different topics. Right now, I > use a series of `KStream.branch()` calls [1]. But would it be more > "elegant" to be able to add 5 sinks to a topology, and forward my records > to them in a custom processor ? > > Damian: About `TopologyBuilder.addProcessor(...)`, as far as I know, I > have to give a parent processor. But the parent processor was generated by > a high-level topologies. And names of processors created by > `KStreamBuilder` are not accessible. (unless by inspecting the topology > nodes I guess) > > [1] https://gist.github.com/nfo/c4936a24601352db23b18653a8ccc352 > > Thanks. > Nicolas > > > 2017-01-18 15:56 GMT+01:00 Michael Noll <mich...@confluent.io>: > >> Nicolas, >> >> if I understand your question correctly you'd like to add further >> operations after having called `KStream#process()`, which -- as you report >> -- doesn't work because `process()` returns void. >> >> If that's indeed the case, +1 to Damian's suggest to use >> `KStream.transform()` instead of `KStream.process()`. >> >> -Michael >> >> >> >> >> On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <damian....@gmail.com> wrote: >> >> > You could possibly also use KStream.transform(...) >> > >> > On Wed, 18 Jan 2017 at 14:22 Damian Guy <damian....@gmail.com> wrote: >> > >> > > Hi Nicolas, >> > > >> > > Good question! I'm not sure why it is a terminal operation, maybe one >> of >> > > the original authors can chip in. However, you could probably work >> around >> > > it by using TopologyBuilder.addProcessor(...) rather then >> > KStream.process >> > > >> > > Thanks, >> > > Damian >> > > >> > > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nfou...@onfocus.io> >> wrote: >> > > >> > > Hi, >> > > >> > > as far as I understand, calling `KStream.process` prevents the >> developer >> > > from adding further operations to a `KStreamBuilder` [1], because its >> > > return type is `void`. Good. >> > > >> > > But it also prevents the developer from adding operations to its >> > superclass >> > > `TopologyBuilder`. In my case I wanted to add a sink, and the parent >> of >> > > this sink would be the name of the Processor that is created by >> > > `KStream.process`. Is there any reason why this method does not return >> > the >> > > processor name [2] ? Is it because it would be a bad idea continuing >> > > building my topology with the low-level API ? >> > > >> > > [1] >> > > >> > > https://github.com/confluentinc/examples/blob/3. >> > 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/ >> > MixAndMatchLambdaIntegrationTest.java%23L56 >> > > [2] >> > > >> > > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24 >> > 35985d9101/streams/src/main/java/org/apache/kafka/streams/ >> > kstream/internals/KStreamImpl.java#L391 >> > > >> > > >> > > Thanks. >> > > Nicolas. >> > > >> > > >> > >> > >