stym06 opened a new issue #4713:
URL: https://github.com/apache/hudi/issues/4713


   **Describe the problem you faced**
   
   While upserting Mongo oplogs from Kafka to Blob, facing Executor OOM 
   
   **Environment Description**
   
   * Hudi version : 0.9.0
   
   * Spark version : 2.4.4
   
   * Hive version : 3.1.2
   
   * Hadoop version : 2.7.3
   
   * Storage (HDFS/S3/GCS..) : Azure Blob
   
   * Running on Docker? (yes/no) : K8s
   
   
   **Additional context**
   
   Spark K8s yaml file
   ```
   apiVersion: "sparkoperator.k8s.io/v1beta2"
   kind: SparkApplication
   metadata:
     name: hudi-ss-ah-ds-{{ ti.job_id }}
     namespace: dataplatform
     labels:
       spark_name: hudi-ss-ah-ds-{{ ti.job_id }}
       dag_name: hudi-ss-ah
       task_name: ds
       environment: "prod"
       cloud: "aws"
       tier: "t2"
       team: "dataplatform"
       service_type: "airflow"
       k8s_cluster_name: "tapi"
       plip_version: 0.1.10-dp-ev
   
   spec:
     type: Java
     mode: cluster
     image: "hudi-ds:4"
     imagePullPolicy: Always
     mainClass: org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
     mainApplicationFile: 
"local:///opt/spark/hudi/hudi-utilities-bundle_2.11-0.9.0-SNAPSHOT.jar"
     deps:
       packages:
         - org.apache.spark:spark-avro_2.11:2.4.4
     sparkConf:
       "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
       "spark.memory.fraction": "0.2"
       "spark.memory.storageFraction": "0.2"
     arguments:
       - "--table-type"
       - "COPY_ON_WRITE"
       - "--props"
       - "/opt/spark/hudi/config/source.properties"
       - "--schemaprovider-class"
       - "org.apache.hudi.utilities.schema.SchemaRegistryProvider"
       - "--source-class"
       - "org.apache.hudi.utilities.sources.JsonKafkaSource"
       - "--target-base-path"
       - "s3a://<ourbucket>/fusion/mongo/data/application_histories"
       - "--target-table"
       - "application_histories"
       - "--op"
       - "UPSERT"
       - "--source-ordering-field"
       - "__ts_ms"
       - "--continuous"
       - "--min-sync-interval-seconds"
       - "60"
     sparkVersion: "2.4.4"
     restartPolicy:
       type: Always
       onFailureRetries: 100000
       onFailureRetryInterval: 60
       onSubmissionFailureRetries: 100000
       onSubmissionFailureRetryInterval: 60
     timeToLiveSeconds: 3600
     volumes:
       - name: hudi-ss-ah-ds
         configMap:
           name: hudi-ss-ah-ds
     driver:
       env:
         - name: HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key
           value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key }}
         - name: HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key
           value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key }}
         - name: HOODIE_ENV_fs_DOT_s3a_DOT_impl
           value: org.apache.hadoop.fs.s3a.S3AFileSystem
       cores: 1
       coreLimit: "1200m"
       memory: "4G"
       serviceAccount: "dataplatform"
       volumeMounts:
         - name: hudi-ss-ah-ds
           mountPath: /opt/spark/hudi/config
           subpath: config.yaml
       javaOptions: "-Dnetworkaddress.cache.ttl=60 -Duser.timezone=IST 
-XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps 
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_driver.hprof"
   
     executor:
       env:
         - name: HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key
           value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key }}
         - name: HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key
           value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key }}
         - name: HOODIE_ENV_fs_DOT_s3a_DOT_impl
           value: org.apache.hadoop.fs.s3a.S3AFileSystem
       cores: 1
       instances: 20
       memory: "6G"
       volumeMounts:
         - name: hudi-ss-ah-ds
           mountPath: /opt/spark/hudi/config
           subpath: config.yaml
       javaOptions: "-Dnetworkaddress.cache.ttl=60 -Duser.timezone=IST 
-XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps 
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_driver.hprof"
     sparkUIOptions:
       ingressAnnotations:
         kubernetes.io/ingress.class: nginx
     monitoring:
       exposeDriverMetrics: true
       exposeExecutorMetrics: true
       prometheus:
         jmxExporterJar: 
"/opt/spark/hudi/prometheus/jmx_prometheus_javaagent-0.16.1.jar"
         port: 8090
   
   ```
   source.properties 
   ```
   #base properties
   hoodie.upsert.shuffle.parallelism=500
   hoodie.insert.shuffle.parallelism=50
   hoodie.delete.shuffle.parallelism=50
   hoodie.bulkinsert.shuffle.parallelism=10
   hoodie.embed.timeline.server=true
   hoodie.filesystem.view.type=EMBEDDED_KV_STORE
   hoodie.compact.inline=false
   
   #datasource properties
   
   
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/mongo.self_signup.application_histories-value/versions/latest
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=
   hoodie.deltastreamer.source.kafka.topic=self_signup.application_histories
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
   hoodie.deltastreamer.kafka.source.maxEvents=50000
   
   #cleaning
   hoodie.cleaner.policy=KEEP_LATEST_COMMITS
   hoodie.cleaner.commits.retained=1
   hoodie.clean.async=true
   
   #archival
   hoodie.keep.min.commits=12
   hoodie.keep.max.commits=15
   
   #kafka props
   bootstrap.servers=localhost:9092
   auto.offset.reset=earliest
   schema.registry.url=http://localhost:8081
   
   #prometheus
   hoodie.metrics.on=true
   hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY
   
hoodie.metrics.pushgateway.host=k8s-prometheus-pushgateway.observability.svc.cluster.local
   hoodie.metrics.pushgateway.port=9091
   hoodie.metrics.pushgateway.delete.on.shutdown=false
   hoodie.metrics.pushgateway.random.job.name.suffix=false
   hoodie.metrics.pushgateway.job.name=hudi-ss-ah
   
   ```
   
   **Stacktrace**
   
   ```
   22/01/28 17:40:05 INFO DAGScheduler: ShuffleMapStage 39 (countByKey at 
SparkHoodieBloomIndex.java:114) failed in 5.523 s due to 
org.apache.spark.shuffle.FetchFailedException: Failure while fetching 
StreamChunkId{streamId=489876428219, chunkIndex=0}: java.io.IOException: Out of 
memory
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.FileDispatcherImpl.read(FileDispatcherImpl.java:46)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:197)
        at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:159)
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readLong(DataInputStream.java:416)
        at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:208)
        at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:382)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at 
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
        at 
org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:87)
        at 
org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:130)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:101)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
   
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:156)
        at 
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:84)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to