rohit-m-99 opened a new issue, #6015: URL: https://github.com/apache/hudi/issues/6015
**Describe the problem you faced** A clear and concise description of the problem. Upgrading to 0.11.1 , the deltastreamer is failing to write to a 6GB bucket. It is failing on the `Building workload profile` <img width="1431" alt="image" src="https://user-images.githubusercontent.com/84733594/176726541-e833e7a3-9724-454b-8c95-02d6ceddd298.png"> **To Reproduce** ``` #!/bin/bash spark-submit \ --jars /opt/spark/jars/hudi-spark3-bundle.jar,/opt/spark/jars/hadoop-aws.jar,/opt/spark/jars/aws-java-sdk.jar,/opt/spark/jars/spark-avro.jar \ --master spark://spark-master:7077 \ --total-executor-cores 40 \ --executor-memory 4g \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --target-table per_tick_stats \ --table-type COPY_ON_WRITE \ --min-sync-interval-seconds 30 \ --source-limit 25000000 \ --continuous \ --source-ordering-field STATOVYGIYLUMVSF6YLU \ --target-base-path s3a://simian-example-prod-output/stats/querying \ --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3a://example-prod-output/stats/ingesting \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \ --hoodie-conf hoodie.datasource.write.recordkey.field=STATONUW25LMMF2GS33OL5ZHK3S7NFSA____,STATONUW2X3UNFWWK___ \ --hoodie-conf hoodie.datasource.write.precombine.field=STATOVYGIYLUMVSF6YLU \ --hoodie-conf hoodie.clustering.plan.strategy.sort.columns= STATONUW25LMMF2GS33OL5ZHK3S7NFSA____,STATMJQXIY3IL5ZHK3S7NFSA____ \ --hoodie-conf hoodie.datasource.write.partitionpath.field= \ --hoodie-conf hoodie.clustering.inline=false \ --hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=100000000 \ --hoodie-conf hoodie.clustering.inline.max.commits=4 \ --hoodie-conf hoodie.metadata.enable=false \ --hoodie-conf hoodie.metadata.index.column.stats.enable=false ``` **Expected behavior** Able to ingest fairly quickly given the source limit **Environment Description** * Hudi version : 0.11.1 * Spark version : 3.2.1 * Hive version : * Hadoop version : 3.3.1 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : Yes **Additional context** Add any other context about the problem here. **Stacktrace** ```Add the stacktrace of the error.``` ``` 22/06/30 00:34:32 WARN BlockManagerMaster: Failed to remove broadcast 17 with removeFromMaster = true - Cannot receive any reply from /10.10.228.207:58666 in 120 seconds. This timeout is contr olled by spark.rpc.askTimeout org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from /10.10.228.207:58666 in 120 seconds. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at scala.util.Failure.recover(Try.scala:234) at scala.concurrent.Future.$anonfun$recover$1(Future.scala:395) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99) at scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) at scala.concurrent.Promise.complete(Promise.scala:53) at scala.concurrent.Promise.complete$(Promise.scala:52) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at scala.concurrent.BatchingExecutor$Batch.processBatch$1(BatchingExecutor.scala:67) at scala.concurrent.BatchingExecutor$Batch.$anonfun$run$1(BatchingExecutor.scala:82) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85) at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:59) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875) at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:110) at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) at scala.concurrent.Promise.tryFailure(Promise.scala:112) at scala.concurrent.Promise.tryFailure$(Promise.scala:112) at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187) at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:214) at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:264) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply from /10.10.228.207:58666 in 120 seconds at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:265) ... 7 more 22/06/30 00:34:32 ERROR ContextCleaner: Error cleaning broadcast 17 org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from /10.10.228.207:58666 in 120 seconds. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at scala.util.Failure.recover(Try.scala:234) at scala.concurrent.Future.$anonfun$recover$1(Future.scala:395) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99) at scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) at scala.concurrent.Promise.complete(Promise.scala:53) at scala.concurrent.Promise.complete$(Promise.scala:52) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at scala.concurrent.BatchingExecutor$Batch.processBatch$1(BatchingExecutor.scala:67) at scala.concurrent.BatchingExecutor$Batch.$anonfun$run$1(BatchingExecutor.scala:82) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85) at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:59) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875) at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:110) at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) at scala.concurrent.Promise.tryFailure(Promise.scala:112) at scala.concurrent.Promise.tryFailure$(Promise.scala:112) at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187) at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:214) at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:264) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply from /10.10.228.207:58666 in 120 seconds at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:265) ... 7 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
