Ok great. I'll get it too it this weekend. Thanks, Bill
On Fri, Nov 20, 2015 at 8:04 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Bill, > > Thanks for point it out! I think this is a real bug. Do you want to file PR > in github with the fix? You can find the instructions here: > > https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes > > Guozhang > > On Fri, Nov 20, 2015 at 8:32 AM, Bill Bejeck <bbej...@gmail.com> wrote: > > > Hi All, > > > > I'm starting to experiment with the lower-level Processor Client API > found > > on the KIP-28 wiki. > > > > When starting the KafkaStream I get the following Exception: > > > > Exception in thread "main" java.util.NoSuchElementException: id: SINK > > at > > > > > org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40) > > at > > > > > org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387) > > at > > > > > org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339) > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:139) > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:120) > > at > org.apache.kafka.streams.KafkaStreaming.<init>(KafkaStreaming.java:110) > > at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35) > > > > The TopologyBuilder is being built like so: > > topologyBuilder.addSource("SOURCE", new StringDeserializer(), new > > StringDeserializer(), "src-topic") > > .addProcessor("PROCESS", new > > GenericProcessorClient(replaceVowels), "SOURCE") > > .addSink("SINK", "dest-topic", new StringSerializer(), > new > > StringSerializer(), "PROCESS"); > > > > Looks to me the cause of the error is that in TopologyBuilder.addSink > > method the sink is never connected with it's parent. > > > > When I added the following two lines to the addSink method, the Exception > > goes away. > > > > nodeGrouper.add(name); > > nodeGrouper.unite(name, parentNames); > > > > Is this a bug or am I doing something incorrect? > > > > Thanks, > > Bill > > > > > > -- > -- Guozhang >