Tbh, I don’t see why it can be improved if these two pipelines will use the 
same amount of slots and resources. 
> On 3 Jul 2020, at 11:49, wang Wu <faran...@gmail.com> wrote:
> 
> One question: We have 5 topics and we adding them all to KafkaIO. Will it 
> help to improve the throughput of pipeline if:
> We add only 1 topic to KafkaIO and create 5 PCollection (unbounded) . Each 
> collection will go through the same transform and write to the same final 
> sink.
> 
> Regards
> Dinh
> 
>> On 30 Jun BE 2563, at 23:53, wang Wu <faran...@gmail.com 
>> <mailto:faran...@gmail.com>> wrote:
>> 
>> We encountered similar exception with KafkaUnboundedReader. By similarity I 
>> mean it start from
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint
>> 
>> And it ends at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance
>> Just another type of concurrency bug.
>> 
>> I am sorry for the long stack trace
>> My question is:
>> 1. Is the code of KafkaUnboundedReader thread-safe? Especially this code:
>> curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
>> 2. What really happened behind the scene? I mean which parallel tasks share 
>> the same reader? How it relates to MicrobatchSource?
>> 
>> Regards
>> Dinh
>> 
>> 20/06/30 21:12:26 INFO ApplicationMaster: Unregistering ApplicationMaster 
>> with FAILED (diag message: User class threw exception: 
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
>> java.util.NoSuchElementException
>>      at 
>> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>>      at 
>> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>>      at 
>> org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:184)
>>      at 
>> org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:219)
>>      at 
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
>>      at 
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
>>      at 
>> feast.ingestion.ImportJobOnSpark.runPipeline(ImportJobOnSpark.java:199)
>>      at feast.ingestion.ImportJobOnSpark.main(ImportJobOnSpark.java:77)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>      at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>      at 
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
>> Caused by: java.util.NoSuchElementException
>>      at java.util.ArrayList$Itr.next(ArrayList.java:862)
>>      at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418)
>>      at 
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:168)
>>      at 
>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>>      at 
>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>>      at 
>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>>      at 
>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>      at 
>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>      at 
>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>      at 
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>      at 
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>      at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>      at 
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>      at 
>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>      at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>      at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>      at 
>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>      at 
>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>      at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>      at 
>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>      at 
>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>      at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>>      at org.apache.spark.scheduler.Task.run(Task.scala:123)
>>      at 
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>> 
>> 
>>> On 30 Jun BE 2563, at 00:26, Alexey Romanenko <aromanenko....@gmail.com 
>>> <mailto:aromanenko....@gmail.com>> wrote:
>>> 
>>> I don’t think it’s a known issue. Could you tell with version of Beam you 
>>> use?
>>> 
>>>> On 28 Jun 2020, at 14:43, wang Wu <faran...@gmail.com 
>>>> <mailto:faran...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> We run Beam pipeline on Spark in the streaming mode. We subscribe to 
>>>> multiple Kafka topics. Our job run fine until it is under heavy load: 
>>>> millions of Kafka messages coming per seconds. The exception look like 
>>>> concurrency issue. Is it a known bug in Beam or some Spark configuration 
>>>> we could do to avoid?
>>>> Our code roughly look like this
>>>> For KafkaIO
>>>> input
>>>>       .getPipeline()
>>>>       .apply(
>>>>           "ReadFromKafka",
>>>>           KafkaIO.readBytes()
>>>>               .withBootstrapServers(XXX)
>>>>               .withTopics(YYY)
>>>>               .withConsumerConfigUpdates(
>>>>                   ImmutableMap.of(
>>>>                       "group.id <http://group.id/>",
>>>>                       "ZZZ"))
>>>>               .withReadCommitted()
>>>>               .commitOffsetsInFinalize())
>>>>       .apply(
>>>>           "AAA",
>>>>           ParDo.of(
>>>>                   KafkaRecordToFeatureRowDoFn.newBuilder()
>>>>                       .setSuccessTag(getSuccessTag())
>>>>                       .setFailureTag(getFailureTag())
>>>>                       .build())
>>>>               .withOutputTags(getSuccessTag(), 
>>>> TupleTagList.of(getFailureTag())));
>>>> Running Pipeline with Spark Runner:
>>>> PipelineResult result = pipeline.run();
>>>> result.waitUntilFinish();
>>>> 
>>>> --
>>>> Exceptions:
>>>> 
>>>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already 
>>>> consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to 
>>>> exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as 
>>>> it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to 
>>>> exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed 
>>>> as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to 
>>>> exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed 
>>>> as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to 
>>>> exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed 
>>>> as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to 
>>>> exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed 
>>>> as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to 
>>>> exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed 
>>>> as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 
>>>> 203)
>>>> java.lang.IllegalStateException
>>>>    at java.util.ArrayList$Itr.remove(ArrayList.java:872)
>>>>    at 
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
>>>>    at 
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
>>>>    at 
>>>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>>>>    at 
>>>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>>>>    at 
>>>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>>>>    at 
>>>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>>>    at 
>>>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>>>    at 
>>>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>>>    at 
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>>>    at 
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>>    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>    at 
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>>    at 
>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>>    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>>    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>>    at 
>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>>    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>>    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>>    at 
>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>>    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>>    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>>    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>>    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>>    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>>    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>>    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>>    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>>    at 
>>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>>    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>>    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>>>>    at org.apache.spark.scheduler.Task.run(Task.scala:123)
>>>>    at 
>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>>>    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>>>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>    at java.lang.Thread.run(Thread.java:748)
>>>> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
>>>> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty 
>>>> blocks including 0 local blocks and 0 remote blocks
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote 
>>>> fetches in 0 ms
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty 
>>>> blocks including 0 local blocks and 0 remote blocks
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote 
>>>> fetches in 0 ms
>>>> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, 
>>>> start reading from default.
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to 
>>>> exception java.util.ConcurrentModificationException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due 
>>>> to exception java.util.ConcurrentModificationException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as 
>>>> it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed 
>>>> as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to 
>>>> exception java.util.ConcurrentModificationException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed 
>>>> as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 
>>>> 202)
>>>> java.util.ConcurrentModificationException
>>>>    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>>>> 
>>> 
>> 
> 

Reply via email to