Hi, We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple Kafka topics. Our job run fine until it is under heavy load: millions of Kafka messages coming per seconds. The exception look like concurrency issue. Is it a known bug in Beam or some Spark configuration we could do to avoid? Our code roughly look like this For KafkaIO input .getPipeline() .apply( "ReadFromKafka", KafkaIO.readBytes() .withBootstrapServers(XXX) .withTopics(YYY) .withConsumerConfigUpdates( ImmutableMap.of( "group.id", "ZZZ")) .withReadCommitted() .commitOffsetsInFinalize()) .apply( "AAA", ParDo.of( KafkaRecordToFeatureRowDoFn.newBuilder() .setSuccessTag(getSuccessTag()) .setFailureTag(getFailureTag()) .build()) .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag()))); Running Pipeline with Spark Runner: PipelineResult result = pipeline.run(); result.waitUntilFinish();
-- Exceptions: 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.lang.IllegalStateException. 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.lang.IllegalStateException. 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to exception java.lang.IllegalStateException. 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as it was not found on disk or in memory 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to exception java.lang.IllegalStateException. 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as it was not found on disk or in memory 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to exception java.lang.IllegalStateException. 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as it was not found on disk or in memory 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to exception java.lang.IllegalStateException. 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as it was not found on disk or in memory 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 203) java.lang.IllegalStateException at java.util.ArrayList$Itr.remove(ArrayList.java:872) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423) at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172) at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245) at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232) at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177) at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107) at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204) 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start reading from default. 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.util.ConcurrentModificationException. 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to exception java.util.ConcurrentModificationException. 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as it was not found on disk or in memory 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.util.ConcurrentModificationException. 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 202) java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)