Ok great.  I'll get it too it this weekend.

Thanks,
Bill

On Fri, Nov 20, 2015 at 8:04 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Bill,
>
> Thanks for point it out! I think this is a real bug. Do you want to file PR
> in github with the fix? You can find the instructions here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes
>
> Guozhang
>
> On Fri, Nov 20, 2015 at 8:32 AM, Bill Bejeck <bbej...@gmail.com> wrote:
>
> > Hi All,
> >
> > I'm starting to experiment with the lower-level Processor Client API
> found
> > on the KIP-28 wiki.
> >
> > When starting the KafkaStream I get the following Exception:
> >
> > Exception in thread "main" java.util.NoSuchElementException: id: SINK
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40)
> > at
> >
> >
> org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387)
> > at
> >
> >
> org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:139)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:120)
> > at
> org.apache.kafka.streams.KafkaStreaming.<init>(KafkaStreaming.java:110)
> > at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35)
> >
> > The TopologyBuilder is being built like so:
> > topologyBuilder.addSource("SOURCE", new StringDeserializer(), new
> > StringDeserializer(), "src-topic")
> >                 .addProcessor("PROCESS", new
> > GenericProcessorClient(replaceVowels), "SOURCE")
> >                 .addSink("SINK", "dest-topic", new StringSerializer(),
> new
> > StringSerializer(), "PROCESS");
> >
> > Looks to me the cause of the error is that in  TopologyBuilder.addSink
> > method the sink  is never connected with it's parent.
> >
> > When I added the following two lines to the addSink method, the Exception
> > goes away.
> >
> >   nodeGrouper.add(name);
> >   nodeGrouper.unite(name, parentNames);
> >
> > Is this a bug or am I doing something incorrect?
> >
> > Thanks,
> > Bill
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to