[ 
https://issues.apache.org/jira/browse/HIVE-29632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vikram Ahuja updated HIVE-29632:
--------------------------------
    Issue Type: Bug  (was: Improvement)

> hive.fetch.task.caching=true (default) causes unbounded heap allocation thus 
> crashing HiveServer2 with OutOfMemoryError
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-29632
>                 URL: https://issues.apache.org/jira/browse/HIVE-29632
>             Project: Hive
>          Issue Type: Bug
>    Affects Versions: 4.0.0, 4.0.1
>            Reporter: Vikram Ahuja
>            Assignee: Vikram Ahuja
>            Priority: Critical
>
>   {{hive.fetch.task.caching}} defaults to {{true}} in Hive 4. When a query 
> qualifies for fetch task optimization 
> ({{{}hive.fetch.task.conversion=more{}}}), {{FetchTask.execute()}} pre-loads 
> the entire result set into a {{List}} in JVM heap before serving any rows to 
> the client. On a non-ACID table with no LIMIT clause, this attempts to load 
> all the rows(INT_MAX), causing {{OutOfMemoryError}} on any table large enough 
> to fill the heap.
>  
> The threshold guard ({{{}hive.fetch.task.conversion.threshold=200MB{}}}) does 
> not protect against this because it compares compressed on-disk bytes against 
> the threshold, not JVM heap cost after deserialization. An ORC/Parquet file 
> of 150 MB on disk can expand to 30+ GB of Java {{String}} objects in heap.
>  
> The feature was introduced in HIVE-25976 specifically to prevent Hive Cleaner 
> race conditions on transactional (ACID) tables, where files can be deleted 
> mid-fetch by the Cleaner. It has no benefit for non-ACID tables as the 
> Cleaner does not operate on them, yet it is applied unconditionally to all 
> table types.
>  
>  *What hive.fetch.task.caching=true does*
>  When enabled, {{SimpleFetchOptimizer}} sets {{cachingEnabled=true}} on the 
> {{{}FetchTask{}}}.  {{Driver}} calls {{{}fetchTask.execute(){}}},
>  which calls {{executeInner(fetchedData)}} with:
> {code:java}
> // FetchTask.java
>   public int execute() {
>     if (cachingEnabled)
> {       executeInner(fetchedData);  // loads ALL rows before serving any     }
>     return 0;
>   }
>   private boolean executeInner(List target) {
>     int rowsRet;
>     if (cachingEnabled)
> {       rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE; 
>     }
>     // ...
>     while (sink.getNumRows() < rowsRet)
> {       fetch.pushRow();  // reads every row from HDFS into fetchedData     }
>   } {code}
>  Each row is serialized to a tab-separated {{java.lang.String}} by 
> {{DefaultFetchFormatter}} before being stored in {{{}fetchedData: 
> ArrayList{}}}. All ORC/Parquet optimizations (dictionary encoding, RLE, 
> columnar layout, block compression) are discarded. Repeated values (e.g. a 
> country code column with 10 distinct values across 40M rows) become 40M 
> separate {{String}} objects with no sharing.
>  *Memory amplification*
>   ||Representation||Size||
>   |ORC/Parquet compressed on disk|~150 MB|
>   |Decompressed raw bytes|~3 GB|
>   |Java String objects in heap (tab-separated, no compression, no dictionary 
> sharing)|~34 GB|
>   The ~200x amplification causes {{OutOfMemoryError}} on any heap smaller 
> than the fully-deserialized result set.
>  
> *Steps to Reproduce*
> {code:java}
>   – Low-cardinality Parquet table (high compression ratio is essential to 
> stay under threshold)
>   CREATE TABLE transactions (
>     txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
>     txn_dt STRING, ctry_cd STRING, prod_cd STRING,
>     status_cd STRING, channel_cd STRING, proc_cd STRING
>   )
>   STORED AS PARQUET
>   TBLPROPERTIES ("parquet.compression"="SNAPPY");
>   {code}
> Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant 
> IDs, 4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M 
> rows. Insert 4 times to reach 16M rows (~120 MB on disk total).
> {code:java}
>   – Verify file size stays under 200MB threshold (caching will be enabled)
>   – hdfs dfs -du -s -h /warehouse/.../transactions/
>   – Expected: ~120 MB
>   – Trigger OOM
>   SELECT * FROM transactions;{code}
>  
>  *HiveServer2 configuration*
>   -Xmx6g
>   hive.fetch.task.conversion=more (default in Hive 4)
>   hive.fetch.task.caching=true          (default in Hive 4)
>   hive.fetch.task.conversion.threshold=209715200  (200MB default)
> *Observed GC pattern before crash*
>   [GC pause (G1 Evacuation Pause)]  heap: 2048M->2040M(6144M)
>   [GC pause (G1 Evacuation Pause)]  heap: 4096M->4090M(6144M)
>   [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
>   [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
>   java.lang.OutOfMemoryError: Java heap space
>  *Actual Behavior*
>   - {{FetchTask}} pre-loads all rows into {{fetchedData: ArrayList}} before 
> returning any rows
>   - Heap fills to 100%, G1GC enters a death spiral of Full GC with zero bytes 
> freed
>   - HiveServer2 crashes with {{java.lang.OutOfMemoryError: Java heap space}}
>  *Fix*
>   The fix is a one-line guard in {{SimpleFetchOptimizer.optimize()}} to 
> disable caching for non-transactional tables:
> {code:java}
>   // SimpleFetchOptimizer.java
>   boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
>       HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
>   if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
>     LOG.debug("Fetch task caching is enabled but table {} is not 
> transactional. " +
>         "Caching is only supported for ACID tables to prevent Cleaner race 
> conditions. Disabling.",
>         fetch.table.getCompleteName());
>     cachingEnabled = false;
>   }
>   fetchTask.setCachingEnabled(cachingEnabled); {code}
>   This preserves the original HIVE-25976 intent (caching for ACID tables) 
> while eliminating the OOM risk for all other table types.
>  *Additional issues not addressed by this fix (follow-up work)*
>   This issue will also exist for ACID tables and the threshold uses 
> compressed disk size — {{checkThresholdWithMetastoreStats()}} should use 
> {{RAW_DATA_SIZE}} or {{numRows}} × estimated row size instead of 
> {{TOTAL_SIZE}}
> Environment
>   - Hive 4.0.x
>   - Java 17
>   - G1GC
>   - Feature introduced in HIVE-25976
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to