Could you post your codes, please? On Wed, Jan 11, 2017 at 3:53 PM, Kalvin Chau <kalvinnc...@gmail.com> wrote:
> "spark.speculation" is not set, so it would be whatever the default is. > > > On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Or do you enable "spark.speculation"? If not, Spark Streaming should not >> launch two tasks using the same TopicPartition. >> >> On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau <kalvinnc...@gmail.com> >> wrote: >> >> I have not modified that configuration setting, and that doesn't seem to >> be documented anywhere. >> >> Does the Kafka 0.10 require the number of cores on an executor be set to >> 1? I didn't see that documented anywhere either. >> >> On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >> Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka >> 0.10 connector requires it must be 1. >> >> On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <kalvinnc...@gmail.com> >> wrote: >> >> I'm not re-using any InputDStreams actually, this is one InputDStream >> that has a window applied to it. >> Then when Spark creates and assigns tasks to read from the Topic, one >> executor gets assigned two tasks to read from the same TopicPartition, and >> uses the same CachedKafkaConsumer to read from the TopicPartition causing >> the ConcurrentModificationException in one of the worker threads. >> >> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >> I think you may reuse the kafka DStream (the DStream returned by >> createDirectStream). If you need to read from the same Kafka source, you >> need to create another DStream. >> >> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <kalvinnc...@gmail.com> >> wrote: >> >> Hi, >> >> We've been running into ConcurrentModificationExcpetions "KafkaConsumer >> is not safe for multi-threaded access" with the CachedKafkaConsumer. I've >> been working through debugging this issue and after looking through some of >> the spark source code I think this is a bug. >> >> Our set up is: >> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using >> Spark-Streaming-Kafka-010 >> spark.executor.cores 1 >> spark.mesos.extra.cores 1 >> >> Batch interval: 10s, window interval: 180s, and slide interval: 30s >> >> We would see the exception when in one executor there are two task worker >> threads assigned the same Topic+Partition, but a different set of offsets. >> >> They would both get the same CachedKafkaConsumer, and whichever task >> thread went first would seek and poll for all the records, and at the same >> time the second thread would try to seek to its offset but fail because it >> is unable to acquire the lock. >> >> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y >> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z >> >> Time1 E0 Task0 - Seeks and starts to poll >> Time1 E0 Task1 - Attempts to seek, but fails >> >> Here are some relevant logs: >> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing >> topic test-topic, partition 2 offsets 4394204414 -> 4394238058 >> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing >> topic test-topic, partition 2 offsets 4394238058 -> 4394257712 >> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG >> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset >> 4394204414 requested 4394204414 >> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG >> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset >> 4394204414 requested 4394238058 >> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: >> Initial fetch for spark-executor-consumer test-topic 2 4394238058 >> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG >> CachedKafkaConsumer: Seeking to test-topic-2 4394238058 >> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: >> Putting block rdd_199_2 failed due to an exception >> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block >> rdd_199_2 could not be removed as it was not found on disk or in memory >> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception >> in task 49.0 in stage 45.0 (TID 3201) >> java.util.ConcurrentModificationException: KafkaConsumer is not safe for >> multi-threaded access >> at org.apache.kafka.clients.consumer.KafkaConsumer. >> acquire(KafkaConsumer.java:1431) >> at org.apache.kafka.clients.consumer.KafkaConsumer.seek( >> KafkaConsumer.java:1132) >> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer. >> seek(CachedKafkaConsumer.scala:95) >> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer. >> get(CachedKafkaConsumer.scala:69) >> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( >> KafkaRDD.scala:227) >> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( >> KafkaRDD.scala:193) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes( >> MemoryStore.scala:360) >> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply( >> BlockManager.scala:951) >> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply( >> BlockManager.scala:926) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) >> at org.apache.spark.storage.BlockManager.doPutIterator( >> BlockManager.scala:926) >> at org.apache.spark.storage.BlockManager.getOrElseUpdate( >> BlockManager.scala:670) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) >> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at org.apache.spark.scheduler.ShuffleMapTask.runTask( >> ShuffleMapTask.scala:79) >> at org.apache.spark.scheduler.ShuffleMapTask.runTask( >> ShuffleMapTask.scala:47) >> at org.apache.spark.scheduler.Task.run(Task.scala:86) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >> at java.util.concurrent.ThreadPoolExecutor.runWorker( >> ThreadPoolExecutor.java:1142) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG >> CachedKafkaConsumer: Polled [test-topic-2] 8237 >> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG >> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset >> 4394204415 requested 4394204415 >> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG >> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset >> 4394204416 requested 4394204416 >> ... >> >> It looks like when WindowedDStream does the getOrCompute call its >> computing all the sets of of offsets it needs and tries to farm out the >> work in parallel. So each available worker task gets each set of offsets >> that need to be read. >> >> After realizing what was going on I tested four states: >> >> - spark.executor.cores 1 and spark.mesos.extra.cores 0 >> - No Exceptions >> - spark.executor.cores 1 and spark.mesos.extra.cores 1 >> - ConcurrentModificationException >> - spark.executor.cores 2 and spark.mesos.extra.cores 0 >> - ConcurrentModificationException >> - spark.executor.cores 2 and spark.mesos.extra.cores 1 >> - ConcurrentModificationException >> >> >> I'm not sure what the best solution to this is if we want to be able to >> have N tasks threads read from the same TopicPartition to increase >> parallelization. You could possibly allow N CachedKafkaConsumers for the >> same TopicPartition. >> >> Any thoughts on this? >> >> Thanks, >> Kalvin >> >> >> >> >>