Hi,
We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple 
Kafka topics. Our job run fine until it is under heavy load: millions of Kafka 
messages coming per seconds. The exception look like concurrency issue. Is it a 
known bug in Beam or some Spark configuration we could do to avoid?
Our code roughly look like this
For KafkaIO
input
        .getPipeline()
        .apply(
            "ReadFromKafka",
            KafkaIO.readBytes()
                .withBootstrapServers(XXX)
                .withTopics(YYY)
                .withConsumerConfigUpdates(
                    ImmutableMap.of(
                        "group.id",
                        "ZZZ"))
                .withReadCommitted()
                .commitOffsetsInFinalize())
        .apply(
            "AAA",
            ParDo.of(
                    KafkaRecordToFeatureRowDoFn.newBuilder()
                        .setSuccessTag(getSuccessTag())
                        .setFailureTag(getFailureTag())
                        .build())
                .withOutputTags(getSuccessTag(), 
TupleTagList.of(getFailureTag())));
Running Pipeline with Spark Runner:
PipelineResult result = pipeline.run();
result.waitUntilFinish();

--
Exceptions:

20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already 
consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to 
exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it 
was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to 
exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it 
was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to 
exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as it 
was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to 
exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as it 
was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to 
exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as it 
was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to 
exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as it 
was not found on disk or in memory
20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 203)
java.lang.IllegalStateException
        at java.util.ArrayList$Itr.remove(ArrayList.java:872)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
        at 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
        at 
org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
        at 
org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
        at 
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
        at 
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
        at 
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
        at 
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks 
including 0 local blocks and 0 remote blocks
20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 
0 ms
20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks 
including 0 local blocks and 0 remote blocks
20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 
0 ms
20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start 
reading from default.
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to 
exception java.util.ConcurrentModificationException.
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to 
exception java.util.ConcurrentModificationException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it 
was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as 
it was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to 
exception java.util.ConcurrentModificationException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it 
was not found on disk or in memory
20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 202)
java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)

Reply via email to