One question: We have 5 topics and we adding them all to KafkaIO. Will it help to improve the throughput of pipeline if: We add only 1 topic to KafkaIO and create 5 PCollection (unbounded) . Each collection will go through the same transform and write to the same final sink.
Regards Dinh > On 30 Jun BE 2563, at 23:53, wang Wu <[email protected]> wrote: > > We encountered similar exception with KafkaUnboundedReader. By similarity I > mean it start from > org.apache.spark.rdd.RDD.computeOrReadCheckpoint > > And it ends at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance > Just another type of concurrency bug. > > I am sorry for the long stack trace > My question is: > 1. Is the code of KafkaUnboundedReader thread-safe? Especially this code: > curBatch = Iterators.cycle(new ArrayList<>(partitionStates)); > 2. What really happened behind the scene? I mean which parallel tasks share > the same reader? How it relates to MicrobatchSource? > > Regards > Dinh > > 20/06/30 21:12:26 INFO ApplicationMaster: Unregistering ApplicationMaster > with FAILED (diag message: User class threw exception: > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.util.NoSuchElementException > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71) > at > org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44) > at > org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:184) > at > org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:219) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90) > at > feast.ingestion.ImportJobOnSpark.runPipeline(ImportJobOnSpark.java:199) > at feast.ingestion.ImportJobOnSpark.main(ImportJobOnSpark.java:77) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684) > Caused by: java.util.NoSuchElementException > at java.util.ArrayList$Itr.next(ArrayList.java:862) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418) > at > org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:168) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > > >> On 30 Jun BE 2563, at 00:26, Alexey Romanenko <[email protected] >> <mailto:[email protected]>> wrote: >> >> I don’t think it’s a known issue. Could you tell with version of Beam you >> use? >> >>> On 28 Jun 2020, at 14:43, wang Wu <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Hi, >>> We run Beam pipeline on Spark in the streaming mode. We subscribe to >>> multiple Kafka topics. Our job run fine until it is under heavy load: >>> millions of Kafka messages coming per seconds. The exception look like >>> concurrency issue. Is it a known bug in Beam or some Spark configuration we >>> could do to avoid? >>> Our code roughly look like this >>> For KafkaIO >>> input >>> .getPipeline() >>> .apply( >>> "ReadFromKafka", >>> KafkaIO.readBytes() >>> .withBootstrapServers(XXX) >>> .withTopics(YYY) >>> .withConsumerConfigUpdates( >>> ImmutableMap.of( >>> "group.id <http://group.id/>", >>> "ZZZ")) >>> .withReadCommitted() >>> .commitOffsetsInFinalize()) >>> .apply( >>> "AAA", >>> ParDo.of( >>> KafkaRecordToFeatureRowDoFn.newBuilder() >>> .setSuccessTag(getSuccessTag()) >>> .setFailureTag(getFailureTag()) >>> .build()) >>> .withOutputTags(getSuccessTag(), >>> TupleTagList.of(getFailureTag()))); >>> Running Pipeline with Spark Runner: >>> PipelineResult result = pipeline.run(); >>> result.waitUntilFinish(); >>> >>> -- >>> Exceptions: >>> >>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already >>> consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1 >>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to >>> exception java.lang.IllegalStateException. >>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as >>> it was not found on disk or in memory >>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to >>> exception java.lang.IllegalStateException. >>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as >>> it was not found on disk or in memory >>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to >>> exception java.lang.IllegalStateException. >>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as >>> it was not found on disk or in memory >>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to >>> exception java.lang.IllegalStateException. >>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as >>> it was not found on disk or in memory >>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to >>> exception java.lang.IllegalStateException. >>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as >>> it was not found on disk or in memory >>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to >>> exception java.lang.IllegalStateException. >>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed >>> as it was not found on disk or in memory >>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID >>> 203) >>> java.lang.IllegalStateException >>> at java.util.ArrayList$Itr.remove(ArrayList.java:872) >>> at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423) >>> at >>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172) >>> at >>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245) >>> at >>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232) >>> at >>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177) >>> at >>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107) >>> at >>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) >>> at >>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) >>> at >>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) >>> at >>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>> at >>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >>> at >>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>> at >>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>> at >>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>> at >>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>> at >>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>> at >>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>> at >>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>> at >>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>> at >>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>> at >>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>> at >>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>> at >>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>> at >>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>> at >>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>> at >>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) >>> at org.apache.spark.scheduler.Task.run(Task.scala:123) >>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) >>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204 >>> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204) >>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty >>> blocks including 0 local blocks and 0 remote blocks >>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote >>> fetches in 0 ms >>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty >>> blocks including 0 local blocks and 0 remote blocks >>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote >>> fetches in 0 ms >>> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, >>> start reading from default. >>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to >>> exception java.util.ConcurrentModificationException. >>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to >>> exception java.util.ConcurrentModificationException. >>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as >>> it was not found on disk or in memory >>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed >>> as it was not found on disk or in memory >>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to >>> exception java.util.ConcurrentModificationException. >>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as >>> it was not found on disk or in memory >>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID >>> 202) >>> java.util.ConcurrentModificationException >>> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) >>> >> >
