No problem with that. It's perfectly explained in https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java .
2017-01-18 19:41 GMT+01:00 Michael Noll <mich...@confluent.io>: > Nicolas, > > here's some information I shared on StackOverflow (perhaps a bit outdated > by now, was back in Aug 2016) about how you can add a state store when > using KStreamBuilder: http://stackoverflow.com/a/39086805/1743580 > > -Michael > > > > > On Wed, Jan 18, 2017 at 5:18 PM, Nicolas Fouché <nfou...@onfocus.io> > wrote: > > > The reason I would not use `KStream.transform()` is that I want to call > > `ProcessorContext.forward()` several times, to different children. These > > children are sinks. > > My use case: I need to route my beacons to different topics. Right now, I > > use a series of `KStream.branch()` calls [1]. But would it be more > > "elegant" to be able to add 5 sinks to a topology, and forward my records > > to them in a custom processor ? > > > > Damian: About `TopologyBuilder.addProcessor(...)`, as far as I know, I > > have > > to give a parent processor. But the parent processor was generated by a > > high-level topologies. And names of processors created by > `KStreamBuilder` > > are not accessible. (unless by inspecting the topology nodes I guess) > > > > [1] https://gist.github.com/nfo/c4936a24601352db23b18653a8ccc352 > > > > Thanks. > > Nicolas > > > > > > 2017-01-18 15:56 GMT+01:00 Michael Noll <mich...@confluent.io>: > > > > > Nicolas, > > > > > > if I understand your question correctly you'd like to add further > > > operations after having called `KStream#process()`, which -- as you > > report > > > -- doesn't work because `process()` returns void. > > > > > > If that's indeed the case, +1 to Damian's suggest to use > > > `KStream.transform()` instead of `KStream.process()`. > > > > > > -Michael > > > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <damian....@gmail.com> > > wrote: > > > > > > > You could possibly also use KStream.transform(...) > > > > > > > > On Wed, 18 Jan 2017 at 14:22 Damian Guy <damian....@gmail.com> > wrote: > > > > > > > > > Hi Nicolas, > > > > > > > > > > Good question! I'm not sure why it is a terminal operation, maybe > one > > > of > > > > > the original authors can chip in. However, you could probably work > > > around > > > > > it by using TopologyBuilder.addProcessor(...) rather then > > > > KStream.process > > > > > > > > > > Thanks, > > > > > Damian > > > > > > > > > > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nfou...@onfocus.io> > > > wrote: > > > > > > > > > > Hi, > > > > > > > > > > as far as I understand, calling `KStream.process` prevents the > > > developer > > > > > from adding further operations to a `KStreamBuilder` [1], because > its > > > > > return type is `void`. Good. > > > > > > > > > > But it also prevents the developer from adding operations to its > > > > superclass > > > > > `TopologyBuilder`. In my case I wanted to add a sink, and the > parent > > of > > > > > this sink would be the name of the Processor that is created by > > > > > `KStream.process`. Is there any reason why this method does not > > return > > > > the > > > > > processor name [2] ? Is it because it would be a bad idea > continuing > > > > > building my topology with the low-level API ? > > > > > > > > > > [1] > > > > > > > > > > https://github.com/confluentinc/examples/blob/3. > > > > 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/ > > > > MixAndMatchLambdaIntegrationTest.java%23L56 > > > > > [2] > > > > > > > > > > https://github.com/apache/kafka/blob/ > b6011918fbc36bfaa465bdcc750e24 > > > > 35985d9101/streams/src/main/java/org/apache/kafka/streams/ > > > > kstream/internals/KStreamImpl.java#L391 > > > > > > > > > > > > > > > Thanks. > > > > > Nicolas. > > > > > > > > > > > > > > > > > > > >