i've had the same problem with elasticsearch when auto-create is enabled for 
indexes. 

i've spent hours debugging what ended up being a typo combined with auto-create.

you definitely want to fail-fast in this scenario.

> On Oct 8, 2016, at 6:26 PM, Cody Koeninger <c...@koeninger.org> wrote:
> 
> So I just now retested this with 1.5.2, and 2.0.0, and the behavior is
> exactly the same across spark versions.
> 
> If the topic hasn't been created, you will get that error on startup,
> because the topic doesn't exist and thus doesn't have metadata.
> 
> If you have auto.create.topics.enable set to true on the broker
> config, the request will fairly quickly lead to the topic being
> created after the fact.
> 
> All you have to do is hit up-arrow-enter and re-submit the spark job,
> the second time around the topic will exist.  That seems pretty low
> effort.
> 
> I'd rather stick with having an early error for those of us that
> prefer to run with auto.create set to false (because it makes sure the
> topic is actually set up the way you want, reduces the likelihood of
> spurious topics being created, etc).
> 
> 
> 
> On Sat, Oct 8, 2016 at 11:44 AM, Dmitry Goldenberg
> <dgoldenberg...@gmail.com> wrote:
>> Hi,
>> 
>> I am trying to start up a simple consumer that streams from a Kafka topic,
>> using Spark 2.0.0:
>> 
>> spark-streaming_2.11
>> spark-streaming-kafka-0-8_2.11
>> 
>> I was getting an error as below until I created the topic in Kafka. From
>> integrating Spark 1.5, I never used to hit this check; we were able to start
>> all of our Spark Kafka consumers, then start the producers, and have Kafka
>> automatically create the topics once the first message for a given topic was
>> published.
>> 
>> Is there something I might be doing to cause this topic existence check in
>> KafkaCluster.scala to kick in? I'd much rather be able to not have to
>> pre-create the topics before I start the consumers.  Any thoughts/comments
>> would be appreciated.
>> 
>> Thanks.
>> - Dmitry
>> 
>> ========================================================================
>> 
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.nio.channels.ClosedChannelException
>> 
>> java.nio.channels.ClosedChannelException
>> 
>> org.apache.spark.SparkException: Error getting partition metadata for
>> '<topic name>'. Does the topic exist?
>> 
>>        at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)
>> 
>>        at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)
>> 
>>        at scala.util.Either.fold(Either.scala:98)
>> 
>>        at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:372)
>> 
>>        at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>> 
>>        at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>> 
>>        at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
>> 
>>        at
>> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>> 
>>        at
>> com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.createContext(KafkaSparkStreamingDriver.java:253)
>> 
>>        at
>> com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.execute(KafkaSparkStreamingDriver.java:166)
>> 
>>        at
>> com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.main(KafkaSparkStreamingDriver.java:305)
>> 
>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 
>>        at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 
>>        at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 
>>        at java.lang.reflect.Method.invoke(Method.java:498)
>> 
>>        at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
>> 
>>        at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
>> 
>>        at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
>> 
>>        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
>> 
>>        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

Reply via email to