I have not modified that configuration setting, and that doesn't seem to be documented anywhere.
Does the Kafka 0.10 require the number of cores on an executor be set to 1? I didn't see that documented anywhere either. On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu <shixi...@databricks.com> wrote: > Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka 0.10 > connector requires it must be 1. > > On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <kalvinnc...@gmail.com> > wrote: > > I'm not re-using any InputDStreams actually, this is one InputDStream that > has a window applied to it. > Then when Spark creates and assigns tasks to read from the Topic, one > executor gets assigned two tasks to read from the same TopicPartition, and > uses the same CachedKafkaConsumer to read from the TopicPartition causing > the ConcurrentModificationException in one of the worker threads. > > On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > > I think you may reuse the kafka DStream (the DStream returned by > createDirectStream). If you need to read from the same Kafka source, you > need to create another DStream. > > On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <kalvinnc...@gmail.com> > wrote: > > Hi, > > We've been running into ConcurrentModificationExcpetions "KafkaConsumer is > not safe for multi-threaded access" with the CachedKafkaConsumer. I've been > working through debugging this issue and after looking through some of the > spark source code I think this is a bug. > > Our set up is: > Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using > Spark-Streaming-Kafka-010 > spark.executor.cores 1 > spark.mesos.extra.cores 1 > > Batch interval: 10s, window interval: 180s, and slide interval: 30s > > We would see the exception when in one executor there are two task worker > threads assigned the same Topic+Partition, but a different set of offsets. > > They would both get the same CachedKafkaConsumer, and whichever task > thread went first would seek and poll for all the records, and at the same > time the second thread would try to seek to its offset but fail because it > is unable to acquire the lock. > > Time0 E0 Task0 - TopicPartition("abc", 0) X to Y > Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z > > Time1 E0 Task0 - Seeks and starts to poll > Time1 E0 Task1 - Attempts to seek, but fails > > Here are some relevant logs: > 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing > topic test-topic, partition 2 offsets 4394204414 -> 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing > topic test-topic, partition 2 offsets 4394238058 -> 4394257712 > 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested > 4394204414 > 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested > 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: > Initial fetch for spark-executor-consumer test-topic 2 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: > Seeking to test-topic-2 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting > block rdd_199_2 failed due to an exception > 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block > rdd_199_2 could not be removed as it was not found on disk or in memory > 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception > in task 49.0 in stage 45.0 (TID 3201) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: > Polled [test-topic-2] 8237 > 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested > 4394204415 > 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested > 4394204416 > ... > > It looks like when WindowedDStream does the getOrCompute call its > computing all the sets of of offsets it needs and tries to farm out the > work in parallel. So each available worker task gets each set of offsets > that need to be read. > > After realizing what was going on I tested four states: > > - spark.executor.cores 1 and spark.mesos.extra.cores 0 > - No Exceptions > - spark.executor.cores 1 and spark.mesos.extra.cores 1 > - ConcurrentModificationException > - spark.executor.cores 2 and spark.mesos.extra.cores 0 > - ConcurrentModificationException > - spark.executor.cores 2 and spark.mesos.extra.cores 1 > - ConcurrentModificationException > > > I'm not sure what the best solution to this is if we want to be able to > have N tasks threads read from the same TopicPartition to increase > parallelization. You could possibly allow N CachedKafkaConsumers for the > same TopicPartition. > > Any thoughts on this? > > Thanks, > Kalvin > > > >