So I just now retested this with 1.5.2, and 2.0.0, and the behavior is
exactly the same across spark versions.

If the topic hasn't been created, you will get that error on startup,
because the topic doesn't exist and thus doesn't have metadata.

If you have auto.create.topics.enable set to true on the broker
config, the request will fairly quickly lead to the topic being
created after the fact.

All you have to do is hit up-arrow-enter and re-submit the spark job,
the second time around the topic will exist.  That seems pretty low
effort.

I'd rather stick with having an early error for those of us that
prefer to run with auto.create set to false (because it makes sure the
topic is actually set up the way you want, reduces the likelihood of
spurious topics being created, etc).



On Sat, Oct 8, 2016 at 11:44 AM, Dmitry Goldenberg
<dgoldenberg...@gmail.com> wrote:
> Hi,
>
> I am trying to start up a simple consumer that streams from a Kafka topic,
> using Spark 2.0.0:
>
> spark-streaming_2.11
> spark-streaming-kafka-0-8_2.11
>
> I was getting an error as below until I created the topic in Kafka. From
> integrating Spark 1.5, I never used to hit this check; we were able to start
> all of our Spark Kafka consumers, then start the producers, and have Kafka
> automatically create the topics once the first message for a given topic was
> published.
>
> Is there something I might be doing to cause this topic existence check in
> KafkaCluster.scala to kick in? I'd much rather be able to not have to
> pre-create the topics before I start the consumers.  Any thoughts/comments
> would be appreciated.
>
> Thanks.
> - Dmitry
>
> ========================================================================
>
> Exception in thread "main" org.apache.spark.SparkException:
> java.nio.channels.ClosedChannelException
>
> java.nio.channels.ClosedChannelException
>
> org.apache.spark.SparkException: Error getting partition metadata for
> '<topic name>'. Does the topic exist?
>
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)
>
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)
>
>         at scala.util.Either.fold(Either.scala:98)
>
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:372)
>
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
>
>         at
> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>
>         at
> com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.createContext(KafkaSparkStreamingDriver.java:253)
>
>         at
> com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.execute(KafkaSparkStreamingDriver.java:166)
>
>         at
> com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.main(KafkaSparkStreamingDriver.java:305)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:498)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
>
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
>
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to