i've had the same problem with elasticsearch when auto-create is enabled for indexes.
i've spent hours debugging what ended up being a typo combined with auto-create. you definitely want to fail-fast in this scenario. > On Oct 8, 2016, at 6:26 PM, Cody Koeninger <c...@koeninger.org> wrote: > > So I just now retested this with 1.5.2, and 2.0.0, and the behavior is > exactly the same across spark versions. > > If the topic hasn't been created, you will get that error on startup, > because the topic doesn't exist and thus doesn't have metadata. > > If you have auto.create.topics.enable set to true on the broker > config, the request will fairly quickly lead to the topic being > created after the fact. > > All you have to do is hit up-arrow-enter and re-submit the spark job, > the second time around the topic will exist. That seems pretty low > effort. > > I'd rather stick with having an early error for those of us that > prefer to run with auto.create set to false (because it makes sure the > topic is actually set up the way you want, reduces the likelihood of > spurious topics being created, etc). > > > > On Sat, Oct 8, 2016 at 11:44 AM, Dmitry Goldenberg > <dgoldenberg...@gmail.com> wrote: >> Hi, >> >> I am trying to start up a simple consumer that streams from a Kafka topic, >> using Spark 2.0.0: >> >> spark-streaming_2.11 >> spark-streaming-kafka-0-8_2.11 >> >> I was getting an error as below until I created the topic in Kafka. From >> integrating Spark 1.5, I never used to hit this check; we were able to start >> all of our Spark Kafka consumers, then start the producers, and have Kafka >> automatically create the topics once the first message for a given topic was >> published. >> >> Is there something I might be doing to cause this topic existence check in >> KafkaCluster.scala to kick in? I'd much rather be able to not have to >> pre-create the topics before I start the consumers. Any thoughts/comments >> would be appreciated. >> >> Thanks. >> - Dmitry >> >> ======================================================================== >> >> Exception in thread "main" org.apache.spark.SparkException: >> java.nio.channels.ClosedChannelException >> >> java.nio.channels.ClosedChannelException >> >> org.apache.spark.SparkException: Error getting partition metadata for >> '<topic name>'. Does the topic exist? >> >> at >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373) >> >> at >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373) >> >> at scala.util.Either.fold(Either.scala:98) >> >> at >> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:372) >> >> at >> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) >> >> at >> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) >> >> at >> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) >> >> at >> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) >> >> at >> com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.createContext(KafkaSparkStreamingDriver.java:253) >> >> at >> com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.execute(KafkaSparkStreamingDriver.java:166) >> >> at >> com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.main(KafkaSparkStreamingDriver.java:305) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:498) >> >> at >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729) >> >> at >> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) >> >> at >> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) >> >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) >> >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >