Re: [Streaming] ConcurrentModificationExceptions when Windowing

2017-01-11 Thread Kalvin Chau
I've filed an issue here https://issues.apache.org/jira/browse/SPARK-19185, let me know if I missed anything! --Kalvin On Wed, Jan 11, 2017 at 5:43 PM Shixiong(Ryan) Zhu wrote: > Thanks for reporting this. Finally I understood the root cause. Could you > file a JIRA on https://issues.apache.org

Re: [Streaming] ConcurrentModificationExceptions when Windowing

2017-01-11 Thread Kalvin Chau
Here is the minimal code example where I was able to replicate: Batch interval is set to 2 to get the exceptions to happen more often. val kafkaParams = Map[String, Object]( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[KafkaAvroDeserializer], "value.deserializer" -> classOf[

Re: [Streaming] ConcurrentModificationExceptions when Windowing

2017-01-11 Thread Kalvin Chau
I have not modified that configuration setting, and that doesn't seem to be documented anywhere. Does the Kafka 0.10 require the number of cores on an executor be set to 1? I didn't see that documented anywhere either. On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu wrote: > Do you change "s

Re: [Streaming] ConcurrentModificationExceptions when Windowing

2017-01-11 Thread Kalvin Chau
"spark.speculation" is not set, so it would be whatever the default is. On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu wrote: > Or do you enable "spark.speculation"? If not, Spark Streaming should not > launch two tasks using the same TopicPartition. > > On Wed, Jan 11, 2017 at 3:33 PM, Kal

Re: [Streaming] ConcurrentModificationExceptions when Windowing

2017-01-11 Thread Shixiong(Ryan) Zhu
Could you post your codes, please? On Wed, Jan 11, 2017 at 3:53 PM, Kalvin Chau wrote: > "spark.speculation" is not set, so it would be whatever the default is. > > > On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Or do you enable "spark.speculation"?

Re: [Streaming] ConcurrentModificationExceptions when Windowing

2017-01-11 Thread Shixiong(Ryan) Zhu
Or do you enable "spark.speculation"? If not, Spark Streaming should not launch two tasks using the same TopicPartition. On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau wrote: > I have not modified that configuration setting, and that doesn't seem to > be documented anywhere. > > Does the Kafka 0.1

Re: [Streaming] ConcurrentModificationExceptions when Windowing

2017-01-11 Thread Kalvin Chau
I'm not re-using any InputDStreams actually, this is one InputDStream that has a window applied to it. Then when Spark creates and assigns tasks to read from the Topic, one executor gets assigned two tasks to read from the same TopicPartition, and uses the same CachedKafkaConsumer to read from the

Re: [Streaming] ConcurrentModificationExceptions when Windowing

2017-01-11 Thread Shixiong(Ryan) Zhu
Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka 0.10 connector requires it must be 1. On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau wrote: > I'm not re-using any InputDStreams actually, this is one InputDStream that > has a window applied to it. > Then when Spark creates and

Re: [Streaming] ConcurrentModificationExceptions when Windowing

2017-01-11 Thread Shixiong(Ryan) Zhu
I think you may reuse the kafka DStream (the DStream returned by createDirectStream). If you need to read from the same Kafka source, you need to create another DStream. On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau wrote: > Hi, > > We've been running into ConcurrentModificationExcpetions "KafkaC

[Streaming] ConcurrentModificationExceptions when Windowing

2017-01-11 Thread Kalvin Chau
Hi, We've been running into ConcurrentModificationExcpetions "KafkaConsumer is not safe for multi-threaded access" with the CachedKafkaConsumer. I've been working through debugging this issue and after looking through some of the spark source code I think this is a bug. Our set up is: Spark 2.0.2