KafkaUnboundedReader is not thread-safe and, maybe I’m wrong,  but I don’t 
think it’s supposed to be so since every KafkaUnboundedReader is supposed to 
read from every split, represented by KafkaUnboundedSource, independently. 

Though, in KafkaIO case, if total number of splits is less than number of all 
topic partitions to read, then one or more readers will read from more than one 
partition. However it will be done in cycle, not in parallel. 

To read messages and offsets, KafkaUnboundedReader uses two Kafka consumers, 
spawn in different threads. So we need to check it there is no issue under high 
load with that.

Btw, which Kafka client version do you use?

> On 30 Jun 2020, at 18:53, wang Wu <faran...@gmail.com> wrote:
> 
> We encountered similar exception with KafkaUnboundedReader. By similarity I 
> mean it start from
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint
> 
> And it ends at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance
> Just another type of concurrency bug.
> 
> I am sorry for the long stack trace
> My question is:
> 1. Is the code of KafkaUnboundedReader thread-safe? Especially this code:
> curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
> 2. What really happened behind the scene? I mean which parallel tasks share 
> the same reader? How it relates to MicrobatchSource?
> 
> Regards
> Dinh
> 
> 20/06/30 21:12:26 INFO ApplicationMaster: Unregistering ApplicationMaster 
> with FAILED (diag message: User class threw exception: 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.util.NoSuchElementException
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:184)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:219)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
>       at 
> feast.ingestion.ImportJobOnSpark.runPipeline(ImportJobOnSpark.java:199)
>       at feast.ingestion.ImportJobOnSpark.main(ImportJobOnSpark.java:77)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
> Caused by: java.util.NoSuchElementException
>       at java.util.ArrayList$Itr.next(ArrayList.java:862)
>       at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418)
>       at 
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:168)
>       at 
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>       at 
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>       at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>       at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>       at 
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>       at 
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>       at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>       at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>       at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>       at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>       at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>       at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:123)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> 
> 
>> On 30 Jun BE 2563, at 00:26, Alexey Romanenko <aromanenko....@gmail.com 
>> <mailto:aromanenko....@gmail.com>> wrote:
>> 
>> I don’t think it’s a known issue. Could you tell with version of Beam you 
>> use?
>> 
>>> On 28 Jun 2020, at 14:43, wang Wu <faran...@gmail.com 
>>> <mailto:faran...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> We run Beam pipeline on Spark in the streaming mode. We subscribe to 
>>> multiple Kafka topics. Our job run fine until it is under heavy load: 
>>> millions of Kafka messages coming per seconds. The exception look like 
>>> concurrency issue. Is it a known bug in Beam or some Spark configuration we 
>>> could do to avoid?
>>> Our code roughly look like this
>>> For KafkaIO
>>> input
>>>       .getPipeline()
>>>       .apply(
>>>           "ReadFromKafka",
>>>           KafkaIO.readBytes()
>>>               .withBootstrapServers(XXX)
>>>               .withTopics(YYY)
>>>               .withConsumerConfigUpdates(
>>>                   ImmutableMap.of(
>>>                       "group.id <http://group.id/>",
>>>                       "ZZZ"))
>>>               .withReadCommitted()
>>>               .commitOffsetsInFinalize())
>>>       .apply(
>>>           "AAA",
>>>           ParDo.of(
>>>                   KafkaRecordToFeatureRowDoFn.newBuilder()
>>>                       .setSuccessTag(getSuccessTag())
>>>                       .setFailureTag(getFailureTag())
>>>                       .build())
>>>               .withOutputTags(getSuccessTag(), 
>>> TupleTagList.of(getFailureTag())));
>>> Running Pipeline with Spark Runner:
>>> PipelineResult result = pipeline.run();
>>> result.waitUntilFinish();
>>> 
>>> --
>>> Exceptions:
>>> 
>>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already 
>>> consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to 
>>> exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as 
>>> it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to 
>>> exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as 
>>> it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to 
>>> exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as 
>>> it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to 
>>> exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as 
>>> it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to 
>>> exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as 
>>> it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to 
>>> exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed 
>>> as it was not found on disk or in memory
>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 
>>> 203)
>>> java.lang.IllegalStateException
>>>     at java.util.ArrayList$Itr.remove(ArrayList.java:872)
>>>     at 
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
>>>     at 
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
>>>     at 
>>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>>>     at 
>>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>>>     at 
>>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>>>     at 
>>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>>     at 
>>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>>     at 
>>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>>     at 
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>>     at 
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>     at 
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>     at 
>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>     at 
>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>     at 
>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>     at 
>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>     at 
>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>     at 
>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>     at 
>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>     at 
>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>     at 
>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>     at 
>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>     at 
>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>     at 
>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>     at 
>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>     at 
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>     at 
>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>     at 
>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:123)
>>>     at 
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
>>> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty 
>>> blocks including 0 local blocks and 0 remote blocks
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote 
>>> fetches in 0 ms
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty 
>>> blocks including 0 local blocks and 0 remote blocks
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote 
>>> fetches in 0 ms
>>> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, 
>>> start reading from default.
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to 
>>> exception java.util.ConcurrentModificationException.
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to 
>>> exception java.util.ConcurrentModificationException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as 
>>> it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed 
>>> as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to 
>>> exception java.util.ConcurrentModificationException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as 
>>> it was not found on disk or in memory
>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 
>>> 202)
>>> java.util.ConcurrentModificationException
>>>     at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>>> 
>> 
> 

Reply via email to