Tbh, I don’t see why it can be improved if these two pipelines will use the same amount of slots and resources.
> On 3 Jul 2020, at 11:49, wang Wu <faran...@gmail.com> wrote: > > One question: We have 5 topics and we adding them all to KafkaIO. Will it > help to improve the throughput of pipeline if: > We add only 1 topic to KafkaIO and create 5 PCollection (unbounded) . Each > collection will go through the same transform and write to the same final > sink. > > Regards > Dinh > >> On 30 Jun BE 2563, at 23:53, wang Wu <faran...@gmail.com >> <mailto:faran...@gmail.com>> wrote: >> >> We encountered similar exception with KafkaUnboundedReader. By similarity I >> mean it start from >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint >> >> And it ends at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance >> Just another type of concurrency bug. >> >> I am sorry for the long stack trace >> My question is: >> 1. Is the code of KafkaUnboundedReader thread-safe? Especially this code: >> curBatch = Iterators.cycle(new ArrayList<>(partitionStates)); >> 2. What really happened behind the scene? I mean which parallel tasks share >> the same reader? How it relates to MicrobatchSource? >> >> Regards >> Dinh >> >> 20/06/30 21:12:26 INFO ApplicationMaster: Unregistering ApplicationMaster >> with FAILED (diag message: User class threw exception: >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> java.util.NoSuchElementException >> at >> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71) >> at >> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44) >> at >> org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:184) >> at >> org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:219) >> at >> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104) >> at >> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90) >> at >> feast.ingestion.ImportJobOnSpark.runPipeline(ImportJobOnSpark.java:199) >> at feast.ingestion.ImportJobOnSpark.main(ImportJobOnSpark.java:77) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684) >> Caused by: java.util.NoSuchElementException >> at java.util.ArrayList$Itr.next(ArrayList.java:862) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418) >> at >> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:168) >> at >> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245) >> at >> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232) >> at >> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177) >> at >> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107) >> at >> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) >> at >> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >> at >> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >> at >> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) >> at org.apache.spark.scheduler.Task.run(Task.scala:123) >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) >> >> >>> On 30 Jun BE 2563, at 00:26, Alexey Romanenko <aromanenko....@gmail.com >>> <mailto:aromanenko....@gmail.com>> wrote: >>> >>> I don’t think it’s a known issue. Could you tell with version of Beam you >>> use? >>> >>>> On 28 Jun 2020, at 14:43, wang Wu <faran...@gmail.com >>>> <mailto:faran...@gmail.com>> wrote: >>>> >>>> Hi, >>>> We run Beam pipeline on Spark in the streaming mode. We subscribe to >>>> multiple Kafka topics. Our job run fine until it is under heavy load: >>>> millions of Kafka messages coming per seconds. The exception look like >>>> concurrency issue. Is it a known bug in Beam or some Spark configuration >>>> we could do to avoid? >>>> Our code roughly look like this >>>> For KafkaIO >>>> input >>>> .getPipeline() >>>> .apply( >>>> "ReadFromKafka", >>>> KafkaIO.readBytes() >>>> .withBootstrapServers(XXX) >>>> .withTopics(YYY) >>>> .withConsumerConfigUpdates( >>>> ImmutableMap.of( >>>> "group.id <http://group.id/>", >>>> "ZZZ")) >>>> .withReadCommitted() >>>> .commitOffsetsInFinalize()) >>>> .apply( >>>> "AAA", >>>> ParDo.of( >>>> KafkaRecordToFeatureRowDoFn.newBuilder() >>>> .setSuccessTag(getSuccessTag()) >>>> .setFailureTag(getFailureTag()) >>>> .build()) >>>> .withOutputTags(getSuccessTag(), >>>> TupleTagList.of(getFailureTag()))); >>>> Running Pipeline with Spark Runner: >>>> PipelineResult result = pipeline.run(); >>>> result.waitUntilFinish(); >>>> >>>> -- >>>> Exceptions: >>>> >>>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already >>>> consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1 >>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to >>>> exception java.lang.IllegalStateException. >>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as >>>> it was not found on disk or in memory >>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to >>>> exception java.lang.IllegalStateException. >>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed >>>> as it was not found on disk or in memory >>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to >>>> exception java.lang.IllegalStateException. >>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed >>>> as it was not found on disk or in memory >>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to >>>> exception java.lang.IllegalStateException. >>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed >>>> as it was not found on disk or in memory >>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to >>>> exception java.lang.IllegalStateException. >>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed >>>> as it was not found on disk or in memory >>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to >>>> exception java.lang.IllegalStateException. >>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed >>>> as it was not found on disk or in memory >>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID >>>> 203) >>>> java.lang.IllegalStateException >>>> at java.util.ArrayList$Itr.remove(ArrayList.java:872) >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423) >>>> at >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172) >>>> at >>>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245) >>>> at >>>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232) >>>> at >>>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177) >>>> at >>>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107) >>>> at >>>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) >>>> at >>>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) >>>> at >>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) >>>> at >>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>> at >>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >>>> at >>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>>> at >>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>>> at >>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>>> at >>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>>> at >>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>>> at >>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>>> at >>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>>> at >>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>>> at >>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>>> at >>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>>> at >>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>>> at >>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>>> at >>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >>>> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>>> at >>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>>> at >>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:123) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) >>>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204 >>>> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204) >>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty >>>> blocks including 0 local blocks and 0 remote blocks >>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote >>>> fetches in 0 ms >>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty >>>> blocks including 0 local blocks and 0 remote blocks >>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote >>>> fetches in 0 ms >>>> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, >>>> start reading from default. >>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to >>>> exception java.util.ConcurrentModificationException. >>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due >>>> to exception java.util.ConcurrentModificationException. >>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as >>>> it was not found on disk or in memory >>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed >>>> as it was not found on disk or in memory >>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to >>>> exception java.util.ConcurrentModificationException. >>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed >>>> as it was not found on disk or in memory >>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID >>>> 202) >>>> java.util.ConcurrentModificationException >>>> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) >>>> >>> >> >