I don’t think it’s a known issue. Could you tell with version of Beam you use?
> On 28 Jun 2020, at 14:43, wang Wu <faran...@gmail.com> wrote: > > Hi, > We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple > Kafka topics. Our job run fine until it is under heavy load: millions of > Kafka messages coming per seconds. The exception look like concurrency issue. > Is it a known bug in Beam or some Spark configuration we could do to avoid? > Our code roughly look like this > For KafkaIO > input > .getPipeline() > .apply( > "ReadFromKafka", > KafkaIO.readBytes() > .withBootstrapServers(XXX) > .withTopics(YYY) > .withConsumerConfigUpdates( > ImmutableMap.of( > "group.id", > "ZZZ")) > .withReadCommitted() > .commitOffsetsInFinalize()) > .apply( > "AAA", > ParDo.of( > KafkaRecordToFeatureRowDoFn.newBuilder() > .setSuccessTag(getSuccessTag()) > .setFailureTag(getFailureTag()) > .build()) > .withOutputTags(getSuccessTag(), > TupleTagList.of(getFailureTag()))); > Running Pipeline with Spark Runner: > PipelineResult result = pipeline.run(); > result.waitUntilFinish(); > > -- > Exceptions: > > 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already > consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1 > 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to > exception java.lang.IllegalStateException. > 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it > was not found on disk or in memory > 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to > exception java.lang.IllegalStateException. > 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as > it was not found on disk or in memory > 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to > exception java.lang.IllegalStateException. > 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as > it was not found on disk or in memory > 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to > exception java.lang.IllegalStateException. > 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as > it was not found on disk or in memory > 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to > exception java.lang.IllegalStateException. > 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as > it was not found on disk or in memory > 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to > exception java.lang.IllegalStateException. > 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as > it was not found on disk or in memory > 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID > 203) > java.lang.IllegalStateException > at java.util.ArrayList$Itr.remove(ArrayList.java:872) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423) > at > org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204 > 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204) > 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty > blocks including 0 local blocks and 0 remote blocks > 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches > in 0 ms > 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty > blocks including 0 local blocks and 0 remote blocks > 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches > in 0 ms > 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start > reading from default. > 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to > exception java.util.ConcurrentModificationException. > 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to > exception java.util.ConcurrentModificationException. > 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it > was not found on disk or in memory > 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as > it was not found on disk or in memory > 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to > exception java.util.ConcurrentModificationException. > 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as > it was not found on disk or in memory > 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID > 202) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) >