Hi, We are using version 2.16.0. More about our dependencies:
+- org.apache.beam:beam-sdks-java-core:jar:2.16.0:compile [INFO] | +- org.apache.beam:beam-model-job-management:jar:2.16.0:compile [INFO] | +- org.apache.beam:beam-vendor-bytebuddy-1_9_3:jar:0.1:compile [INFO] | \- org.tukaani:xz:jar:1.8:compile [INFO] +- org.apache.beam:beam-sdks-java-io-kafka:jar:2.16.0:compile [INFO] | \- org.springframework:spring-expression:jar:5.0.13.RELEASE:compile [INFO] | \- org.springframework:spring-core:jar:5.0.13.RELEASE:compile [INFO] | \- org.springframework:spring-jcl:jar:5.0.13.RELEASE:compile +- org.apache.beam:beam-runners-spark:jar:2.16.0:compile [INFO] | +- org.apache.beam:beam-runners-core-java:jar:2.16.0:compile [INFO] | | +- org.apache.beam:beam-model-fn-execution:jar:2.16.0:compile [INFO] | | \- org.apache.beam:beam-sdks-java-fn-execution:jar:2.16.0:compile [INFO] | \- org.apache.beam:beam-runners-java-fn-execution:jar:2.16.0:compile [INFO] | \- org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:jar:2.16.0:compile When building Kafka input source, we subscribe to multiple topics, i.e. .withTopics(list of multiple topics). At a second thought, I think that it might be the memory issue. Before the job get killed, I think at some point I see Spark log saying that the executor is out of memory. Let me observe more and get back. Regards Din > On 30 Jun BE 2563, at 00:26, Alexey Romanenko <aromanenko....@gmail.com> > wrote: > > I don’t think it’s a known issue. Could you tell with version of Beam you use? > >> On 28 Jun 2020, at 14:43, wang Wu <faran...@gmail.com> wrote: >> >> Hi, >> We run Beam pipeline on Spark in the streaming mode. We subscribe to >> multiple Kafka topics. Our job run fine until it is under heavy load: >> millions of Kafka messages coming per seconds. The exception look like >> concurrency issue. Is it a known bug in Beam or some Spark configuration we >> could do to avoid? >> Our code roughly look like this >> For KafkaIO >> input >> .getPipeline() >> .apply( >> "ReadFromKafka", >> KafkaIO.readBytes() >> .withBootstrapServers(XXX) >> .withTopics(YYY) >> .withConsumerConfigUpdates( >> ImmutableMap.of( >> "group.id", >> "ZZZ")) >> .withReadCommitted() >> .commitOffsetsInFinalize()) >> .apply( >> "AAA", >> ParDo.of( >> KafkaRecordToFeatureRowDoFn.newBuilder() >> .setSuccessTag(getSuccessTag()) >> .setFailureTag(getFailureTag()) >> .build()) >> .withOutputTags(getSuccessTag(), >> TupleTagList.of(getFailureTag()))); >> Running Pipeline with Spark Runner: >> PipelineResult result = pipeline.run(); >> result.waitUntilFinish(); >> >> -- >> Exceptions: >> >> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already >> consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1 >> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to >> exception java.lang.IllegalStateException. >> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as >> it was not found on disk or in memory >> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to >> exception java.lang.IllegalStateException. >> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as >> it was not found on disk or in memory >> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to >> exception java.lang.IllegalStateException. >> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as >> it was not found on disk or in memory >> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to >> exception java.lang.IllegalStateException. >> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as >> it was not found on disk or in memory >> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to >> exception java.lang.IllegalStateException. >> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as >> it was not found on disk or in memory >> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to >> exception java.lang.IllegalStateException. >> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as >> it was not found on disk or in memory >> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID >> 203) >> java.lang.IllegalStateException >> at java.util.ArrayList$Itr.remove(ArrayList.java:872) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423) >> at >> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172) >> at >> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245) >> at >> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232) >> at >> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177) >> at >> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107) >> at >> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) >> at >> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >> at >> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >> at >> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >> at >> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >> at >> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >> at >> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >> at >> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >> at >> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >> at >> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >> at >> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >> at >> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) >> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >> at >> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >> at >> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) >> at org.apache.spark.scheduler.Task.run(Task.scala:123) >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204 >> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204) >> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty >> blocks including 0 local blocks and 0 remote blocks >> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches >> in 0 ms >> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty >> blocks including 0 local blocks and 0 remote blocks >> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches >> in 0 ms >> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start >> reading from default. >> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to >> exception java.util.ConcurrentModificationException. >> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to >> exception java.util.ConcurrentModificationException. >> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as >> it was not found on disk or in memory >> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed >> as it was not found on disk or in memory >> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to >> exception java.util.ConcurrentModificationException. >> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as >> it was not found on disk or in memory >> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID >> 202) >> java.util.ConcurrentModificationException >> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) >> >