stym06 opened a new issue #4713: URL: https://github.com/apache/hudi/issues/4713
**Describe the problem you faced** While upserting Mongo oplogs from Kafka to Blob, facing Executor OOM **Environment Description** * Hudi version : 0.9.0 * Spark version : 2.4.4 * Hive version : 3.1.2 * Hadoop version : 2.7.3 * Storage (HDFS/S3/GCS..) : Azure Blob * Running on Docker? (yes/no) : K8s **Additional context** Spark K8s yaml file ``` apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: hudi-ss-ah-ds-{{ ti.job_id }} namespace: dataplatform labels: spark_name: hudi-ss-ah-ds-{{ ti.job_id }} dag_name: hudi-ss-ah task_name: ds environment: "prod" cloud: "aws" tier: "t2" team: "dataplatform" service_type: "airflow" k8s_cluster_name: "tapi" plip_version: 0.1.10-dp-ev spec: type: Java mode: cluster image: "hudi-ds:4" imagePullPolicy: Always mainClass: org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer mainApplicationFile: "local:///opt/spark/hudi/hudi-utilities-bundle_2.11-0.9.0-SNAPSHOT.jar" deps: packages: - org.apache.spark:spark-avro_2.11:2.4.4 sparkConf: "spark.serializer": "org.apache.spark.serializer.KryoSerializer" "spark.memory.fraction": "0.2" "spark.memory.storageFraction": "0.2" arguments: - "--table-type" - "COPY_ON_WRITE" - "--props" - "/opt/spark/hudi/config/source.properties" - "--schemaprovider-class" - "org.apache.hudi.utilities.schema.SchemaRegistryProvider" - "--source-class" - "org.apache.hudi.utilities.sources.JsonKafkaSource" - "--target-base-path" - "s3a://<ourbucket>/fusion/mongo/data/application_histories" - "--target-table" - "application_histories" - "--op" - "UPSERT" - "--source-ordering-field" - "__ts_ms" - "--continuous" - "--min-sync-interval-seconds" - "60" sparkVersion: "2.4.4" restartPolicy: type: Always onFailureRetries: 100000 onFailureRetryInterval: 60 onSubmissionFailureRetries: 100000 onSubmissionFailureRetryInterval: 60 timeToLiveSeconds: 3600 volumes: - name: hudi-ss-ah-ds configMap: name: hudi-ss-ah-ds driver: env: - name: HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key }} - name: HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key }} - name: HOODIE_ENV_fs_DOT_s3a_DOT_impl value: org.apache.hadoop.fs.s3a.S3AFileSystem cores: 1 coreLimit: "1200m" memory: "4G" serviceAccount: "dataplatform" volumeMounts: - name: hudi-ss-ah-ds mountPath: /opt/spark/hudi/config subpath: config.yaml javaOptions: "-Dnetworkaddress.cache.ttl=60 -Duser.timezone=IST -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_driver.hprof" executor: env: - name: HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key }} - name: HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key }} - name: HOODIE_ENV_fs_DOT_s3a_DOT_impl value: org.apache.hadoop.fs.s3a.S3AFileSystem cores: 1 instances: 20 memory: "6G" volumeMounts: - name: hudi-ss-ah-ds mountPath: /opt/spark/hudi/config subpath: config.yaml javaOptions: "-Dnetworkaddress.cache.ttl=60 -Duser.timezone=IST -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_driver.hprof" sparkUIOptions: ingressAnnotations: kubernetes.io/ingress.class: nginx monitoring: exposeDriverMetrics: true exposeExecutorMetrics: true prometheus: jmxExporterJar: "/opt/spark/hudi/prometheus/jmx_prometheus_javaagent-0.16.1.jar" port: 8090 ``` source.properties ``` #base properties hoodie.upsert.shuffle.parallelism=500 hoodie.insert.shuffle.parallelism=50 hoodie.delete.shuffle.parallelism=50 hoodie.bulkinsert.shuffle.parallelism=10 hoodie.embed.timeline.server=true hoodie.filesystem.view.type=EMBEDDED_KV_STORE hoodie.compact.inline=false #datasource properties hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/mongo.self_signup.application_histories-value/versions/latest hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field= hoodie.deltastreamer.source.kafka.topic=self_signup.application_histories hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator hoodie.deltastreamer.kafka.source.maxEvents=50000 #cleaning hoodie.cleaner.policy=KEEP_LATEST_COMMITS hoodie.cleaner.commits.retained=1 hoodie.clean.async=true #archival hoodie.keep.min.commits=12 hoodie.keep.max.commits=15 #kafka props bootstrap.servers=localhost:9092 auto.offset.reset=earliest schema.registry.url=http://localhost:8081 #prometheus hoodie.metrics.on=true hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY hoodie.metrics.pushgateway.host=k8s-prometheus-pushgateway.observability.svc.cluster.local hoodie.metrics.pushgateway.port=9091 hoodie.metrics.pushgateway.delete.on.shutdown=false hoodie.metrics.pushgateway.random.job.name.suffix=false hoodie.metrics.pushgateway.job.name=hudi-ss-ah ``` **Stacktrace** ``` 22/01/28 17:40:05 INFO DAGScheduler: ShuffleMapStage 39 (countByKey at SparkHoodieBloomIndex.java:114) failed in 5.523 s due to org.apache.spark.shuffle.FetchFailedException: Failure while fetching StreamChunkId{streamId=489876428219, chunkIndex=0}: java.io.IOException: Out of memory at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.FileDispatcherImpl.read(FileDispatcherImpl.java:46) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:159) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readLong(DataInputStream.java:416) at org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:208) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:382) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:87) at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:130) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:101) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:156) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:84) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org