[ 
https://issues.apache.org/jira/browse/KAFKA-2872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15020507#comment-15020507
 ] 

ASF GitHub Bot commented on KAFKA-2872:
---------------------------------------

GitHub user bbejeck opened a pull request:

    https://github.com/apache/kafka/pull/572

    KAFKA-2872 Fixed addSink method connecting sink with parent source(s)…

    Starting a KafkaStream was getting an error due to the fact that the 
TopologyBuilder.addSink method was not connecting the sink with it parent(s) 
processor/sources.  Just needed to wire up the sink with it parent(s) in 
TopologyBuilder.addSink .

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bbejeck/kafka 
KAFKA-2872_kafka_stream_sink_not_connected_to_parent

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #572
    
----
commit 514796557b904ed77441cc61393b7180eb046e86
Author: bbejeck <bbej...@gmail.com>
Date:   2015-11-21T04:12:18Z

    KAFKA-2872 Fixed addSink method connecting sink with parent source(s) or 
parent processor(s)

----


> Error starting KafkaStream caused by sink not being connected to parent 
> source/processor nodes
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2872
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2872
>             Project: Kafka
>          Issue Type: Bug
>          Components: kafka streams
>    Affects Versions: 0.9.0.0
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>
> When starting the KafkaStream I get the following Exception:
> Exception in thread "main" java.util.NoSuchElementException: id: SINK
>       at 
> org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40)
>       at 
> org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387)
>       at 
> org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:139)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:120)
>       at 
> org.apache.kafka.streams.KafkaStreaming.<init>(KafkaStreaming.java:110)
>       at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35)
> The TopologyBuilder is being built like so:
> topologyBuilder.addSource("SOURCE", new StringDeserializer(), new 
> StringDeserializer(), "src-topic")
>                 .addProcessor("PROCESS", new 
> GenericProcessorClient(replaceVowels), "SOURCE")
>                 .addSink("SINK", "dest-topic", new StringSerializer(), new 
> StringSerializer(), "PROCESS");
> Looks to me the cause of the error is that in  TopologyBuilder.addSink method 
> the sink  is never connected with it's parent.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to