haoxie-aws opened a new issue, #5939:
URL: https://github.com/apache/hudi/issues/5939

   **Describe the problem you faced**
   
   Hi Hudi team! I have some spark executors intermittently die. When I look 
into the tasks assigned to dead executors, the tasks were trying to write 
parquet files that were over 320MB according to the logs of other executors 
that completed the tasks afterwards. However our PARQUET_MAX_FILE_SIZE is set 
to 100MB. I also noticed “AvgRecordSize => 26” in the driver log when executors 
die, while AvgRecordSize is usually above 100 for runs that don’t have 
executors die. I’m guessing the underestimated record size made Hudi decide to 
load more record in memory than it can handle and die due to out of memory. 
   
   So I took two steps here.
   * To verify if it is the underestimated record size that is causing the 
issue I added a lower bound of estimated record size which is 0.7 * 
COPY_ON_WRITE_RECORD_SIZE_ESTIMATE. COPY_ON_WRITE_RECORD_SIZE_ESTIMATE is 
configured to 110 in my setup. With this change executors stop dying. So I 
think it confirms that underestimated record size is the cause of dead 
executors.
   ```
   diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
   index c54c526253..2cf2b4521b 100644
   --- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
   +++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
   @@ -383,6 +383,6 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> extends SparkHo
          // make this fail safe.
          LOG.error("Error trying to compute average bytes/record ", t);
        }
   -    return avgSize;
   +    return Math.max(avgSize, (long)(0.7 * 
hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate()));
      }
    }
   ```
   * To understand where the small average record size are from, I looked into 
hudi commits stats. From the screenshot below we can clearly see that average 
record size for replacecommit is consistently smaller than the size for a 
normal commit, and it matches what I see about AvgRecordSize in logs. I also 
looked into the column size of some parquet files and found that the file 
generated by replacecommit has significantly fewer different values for some 
dimensions, therefore it has lower compression ratio.
   
   
   ![Image 
(1)](https://user-images.githubusercontent.com/86327802/175003442-237dc07f-ddc4-4e0d-87ec-c37986096d9a.jpg)
   
   My setup:
   
   - Hudi 0.11.0
   - CoW + inline clustering
   - Both PARQUET_MAX_FILE_SIZE and PARQUET_SMALL_FILE_LIMIT are 100MB.
   - I have a few partitions in my table, each partition has around 200GB data.
   - Spark job runs on AWS Glue G.2X workers.
   
   **Expected behavior**
   
   Hudi should prevent killing spark executors.
   
   **Environment Description**
   
   * Hudi version : 0.11.0
   
   * Spark version : 3.1.2 
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to