[ 
https://issues.apache.org/jira/browse/HIVE-29632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vikram Ahuja updated HIVE-29632:
--------------------------------
    Description: 
  {{hive.fetch.task.caching}} defaults to {{true}} in Hive 4. When a query 
qualifies for fetch task optimization 
({{{}hive.fetch.task.conversion=more{}}}), {{FetchTask.execute()}} pre-loads 
the entire result set into a {{List}} in JVM heap before serving any rows to 
the client. On a non-ACID table with no LIMIT clause, this attempts to load all 
the rows(INT_MAX), causing {{OutOfMemoryError}} on any table large enough to 
fill the heap.

 

The threshold guard ({{{}hive.fetch.task.conversion.threshold=200MB{}}}) does 
not protect against this because it compares compressed on-disk bytes against 
the threshold, not JVM heap cost after deserialization. An ORC/Parquet file of 
150 MB on disk can expand to 30+ GB of Java {{String}} objects in heap.

 

The feature was introduced in HIVE-25976 specifically to prevent Hive Cleaner 
race conditions on transactional (ACID) tables, where files can be deleted 
mid-fetch by the Cleaner. It has no benefit for non-ACID tables as the Cleaner 
does not operate on them, yet it is applied unconditionally to all table types.

 

 *What hive.fetch.task.caching=true does*

 When enabled, {{SimpleFetchOptimizer}} sets {{cachingEnabled=true}} on the 
{{{}FetchTask{}}}.  {{Driver}} calls {{{}fetchTask.execute(){}}},
 which calls {{executeInner(fetchedData)}} with:
{code:java}
// FetchTask.java
  public int execute() {
    if (cachingEnabled)
{       executeInner(fetchedData);  // loads ALL rows before serving any     }
    return 0;
  }
  private boolean executeInner(List target) {
    int rowsRet;
    if (cachingEnabled)
{       rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;   
  }
    // ...
    while (sink.getNumRows() < rowsRet)
{       fetch.pushRow();  // reads every row from HDFS into fetchedData     }
  } {code}
 Each row is serialized to a tab-separated {{java.lang.String}} by 
{{DefaultFetchFormatter}} before being stored in {{{}fetchedData: 
ArrayList{}}}. All ORC/Parquet optimizations (dictionary encoding, RLE, 
columnar layout, block compression) are discarded. Repeated values (e.g. a 
country code column with 10 distinct values across 40M rows) become 40M 
separate {{String}} objects with no sharing.

 *Memory amplification*

  ||Representation||Size||
  |ORC/Parquet compressed on disk|~150 MB|
  |Decompressed raw bytes|~3 GB|
  |Java String objects in heap (tab-separated, no compression, no dictionary 
sharing)|~34 GB|

  The ~200x amplification causes {{OutOfMemoryError}} on any heap smaller than 
the fully-deserialized result set.

 

*Steps to Reproduce*

  h3. Setup

  \{code:sql}
  – Low-cardinality Parquet table (high compression ratio is essential to stay 
under threshold)
  CREATE TABLE transactions (
    txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
    txn_dt STRING, ctry_cd STRING, prod_cd STRING,
    status_cd STRING, channel_cd STRING, proc_cd STRING
  )
  STORED AS PARQUET
  TBLPROPERTIES ("parquet.compression"="SNAPPY");
  \{code}

  Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant 
IDs, 4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M rows. 
Insert 4 times
  to reach 16M rows (~120 MB on disk total).

  \{code:sql}
  – Verify file size stays under 200MB threshold (caching will be enabled)
  – hdfs dfs -du -s -h /warehouse/.../transactions/
  – Expected: ~120 MB

  – Trigger OOM
  SELECT * FROM transactions;
  \{code}

  h3. HiveServer2 configuration

  \{noformat}
  -Xmx6g
  hive.fetch.task.conversion=more
  hive.fetch.task.caching=true          (default in Hive 4)
  hive.fetch.task.conversion.threshold=209715200  (200MB default)
  \{noformat}

  h3. Observed GC pattern before crash

  \{noformat}
  [GC pause (G1 Evacuation Pause)]  heap: 2048M->2040M(6144M)
  [GC pause (G1 Evacuation Pause)]  heap: 4096M->4090M(6144M)
  [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
  [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
  java.lang.OutOfMemoryError: Java heap space
  \{noformat}

  ---h2. Expected Behavior

  - {{SELECT * FROM non_acid_table}} completes successfully, streaming rows 
batch-by-batch to the client
  - Heap usage remains bounded during fetch; GC can reclaim memory between 
queries
  - {{hive.fetch.task.caching}} only affects transactional (ACID) tables, which 
are the only table type for which the Cleaner race condition exists

  h2. Actual Behavior

  - {{FetchTask}} pre-loads all rows into {{fetchedData: ArrayList}} before 
returning any rows
  - Heap fills to 100%; G1GC enters a death spiral of Full GC with zero bytes 
freed
  - HiveServer2 crashes with {{java.lang.OutOfMemoryError: Java heap space}}

  ---h2. Fix

  The fix is a one-line guard in {{SimpleFetchOptimizer.optimize()}} to disable 
caching for non-transactional tables:

  \{code:java}
  // SimpleFetchOptimizer.java
  boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
      HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
  if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
    LOG.debug("Fetch task caching is enabled but table {} is not transactional. 
" +
        "Caching is only supported for ACID tables to prevent Cleaner race 
conditions. Disabling.",
        fetch.table.getCompleteName());
    cachingEnabled = false;
  }
  fetchTask.setCachingEnabled(cachingEnabled);
  \{code}

  This preserves the original HIVE-25976 intent (caching for ACID tables) while 
eliminating the OOM risk for all other table types.

  h3. Additional issues not addressed by this fix (follow-up work)

  Threshold uses compressed disk size — {{checkThresholdWithMetastoreStats()}} 
should use {{RAW_DATA_SIZE}} or {{numRows}} × estimated row size instead of 
{{TOTAL_SIZE}}

  No LIMIT guard — when caching is enabled and {{{}work.getLimit() < 0{}}}, 
{{rowsRet = Integer.MAX_VALUE}} should fall back to streaming

  Default should be false — {{HIVE_FETCH_TASK_CACHING}} defaults to 
{{{}true{}}}; it should default to {{false}} and require explicit opt-in

  ---h2. Environment

  - Hive 4.0.x
  - Java 17
  - G1GC
  - Feature introduced in HIVE-25976

  h2. Workaround

  Set in {{hive-site.xml}} and restart HiveServer2:

  \{code:xml}

    hive.fetch.task.caching
    false

  \{code}

  was:
  {{hive.fetch.task.caching}} defaults to {{true}} in Hive 4. When a query 
qualifies for fetch task optimization ({{{}hive.fetch.task.conversion=more{}}}),
  {{FetchTask.execute()}} pre-loads the entire result set into a {{List}} in 
JVM heap before serving any rows to the client. On a non-ACID table with no 
LIMIT clause, this
   attempts to load all the rows(INT_MAX), causing {{OutOfMemoryError}} on any 
table large enough to fill the heap.

  The threshold guard ({{{}hive.fetch.task.conversion.threshold=200MB{}}}) does 
not protect against this because it compares compressed on-disk bytes against 
the threshold,
  not JVM heap cost after deserialization. An ORC/Parquet file of 150 MB on 
disk can expand to 30+ GB of Java {{String}} objects in heap.

  The feature was introduced in HIVE-25976 specifically to prevent Hive Cleaner 
race conditions on transactional (ACID)
  tables, where files can be deleted mid-fetch by the Cleaner. It has no 
benefit for non-ACID tables as the Cleaner does not operate on them, yet it is 
applied
  unconditionally to all table types.

 

  h3. What hive.fetch.task.caching=true does

  When enabled, {{SimpleFetchOptimizer}} sets {{cachingEnabled=true}} on the 
{{{}FetchTask{}}}. After Tez/MR execution completes, {{Driver}} calls 
{{{}fetchTask.execute(){}}},
  which calls {{executeInner(fetchedData)}} with:
{code:java}
// FetchTask.java
  public int execute() {
    if (cachingEnabled)
{       executeInner(fetchedData);  // loads ALL rows before serving any     }
    return 0;
  }
  private boolean executeInner(List target) {
    int rowsRet;
    if (cachingEnabled)
{       rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;   
  }
    // ...
    while (sink.getNumRows() < rowsRet)
{       fetch.pushRow();  // reads every row from HDFS into fetchedData     }
  } {code}
  Each row is serialized to a tab-separated {{java.lang.String}} by 
{{DefaultFetchFormatter}} before being stored in {{{}fetchedData: 
ArrayList{}}}. All ORC/Parquet
  optimizations (dictionary encoding, RLE, columnar layout, block compression) 
are discarded. Repeated values (e.g. a country code column with 10 distinct 
values across
  40M rows) become 40M separate {{String}} objects with no sharing.

  h3. Memory amplification

  ||Representation||Size||
  |ORC/Parquet compressed on disk|~150 MB|
  |Decompressed raw bytes|~3 GB|
  |Java String objects in heap (tab-separated, no compression, no dictionary 
sharing)|~34 GB|

  The ~200x amplification causes {{OutOfMemoryError}} on any heap smaller than 
the fully-deserialized result set.

 

  h3. Broken threshold guard

  {{SimpleFetchOptimizer.checkThresholdWithMetastoreStats()}} uses 
{{StatsSetupConst.TOTAL_SIZE}} from HMS stats, which is the compressed file 
size on disk:

  \{code:java}
  // SimpleFetchOptimizer.java - FetchData.checkThresholdWithMetastoreStats()
  long dataSize = StatsUtils.getTotalSize(table);  // compressed bytes!
  status = (threshold - dataSize) >= 0 ? Status.PASS : Status.FAIL;
  \{code}

  A 150 MB ORC file passes the default 200 MB threshold check, caching is 
enabled, and 34 GB of String objects flood the heap. The {{HiveConf}} javadoc 
for
  {{HIVE_FETCH_TASK_CACHING}} acknowledges this: "the 
hive.fetch.task.conversion.threshold must be adjusted accordingly. That is 
200MB by default which must be lowered in
  case of enabled caching" — but this is never enforced in code.

  h3. Retention amplifies impact

  With {{hive.server2.idle.operation.timeout=2h}} (default), unclosed JDBC 
operations retain {{fetchedData}} in heap for up to 2 hours. Multiple 
concurrent large queries
  cause additive pressure. There is no soft/weak reference, no memory-pressure 
eviction, and no size cap on {{{}fetchedData{}}}.

  h3. Why non-ACID tables have no need for caching

  The feature was designed for transactional tables only. The Hive Cleaner 
compacts delta files and deletes old base/delta files  for a slow JDBC client 
fetching 10K rows
   at a time over minutes, the Cleaner can delete files mid-fetch, causing 
{{{}FileNotFoundException{}}}. Pre-loading into RAM solves this.

  For non-ACID tables (managed, external, ORC, Parquet, Iceberg), the Cleaner 
never runs. There is no race condition to prevent. Caching on these tables 
provides zero
  benefit while introducing unbounded heap allocation.

  ---h2. Steps to Reproduce

  h3. Setup

  \{code:sql}
  – Low-cardinality Parquet table (high compression ratio is essential to stay 
under threshold)
  CREATE TABLE transactions (
    txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
    txn_dt STRING, ctry_cd STRING, prod_cd STRING,
    status_cd STRING, channel_cd STRING, proc_cd STRING
  )
  STORED AS PARQUET
  TBLPROPERTIES ("parquet.compression"="SNAPPY");
  \{code}

  Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant 
IDs, 4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M rows. 
Insert 4 times
  to reach 16M rows (~120 MB on disk total).

  \{code:sql}
  – Verify file size stays under 200MB threshold (caching will be enabled)
  – hdfs dfs -du -s -h /warehouse/.../transactions/
  – Expected: ~120 MB

  – Trigger OOM
  SELECT * FROM transactions;
  \{code}

  h3. HiveServer2 configuration

  \{noformat}
  -Xmx6g
  hive.fetch.task.conversion=more
  hive.fetch.task.caching=true          (default in Hive 4)
  hive.fetch.task.conversion.threshold=209715200  (200MB default)
  \{noformat}

  h3. Observed GC pattern before crash

  \{noformat}
  [GC pause (G1 Evacuation Pause)]  heap: 2048M->2040M(6144M)
  [GC pause (G1 Evacuation Pause)]  heap: 4096M->4090M(6144M)
  [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
  [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
  java.lang.OutOfMemoryError: Java heap space
  \{noformat}

  ---h2. Expected Behavior

  - {{SELECT * FROM non_acid_table}} completes successfully, streaming rows 
batch-by-batch to the client
  - Heap usage remains bounded during fetch; GC can reclaim memory between 
queries
  - {{hive.fetch.task.caching}} only affects transactional (ACID) tables, which 
are the only table type for which the Cleaner race condition exists

  h2. Actual Behavior

  - {{FetchTask}} pre-loads all rows into {{fetchedData: ArrayList}} before 
returning any rows
  - Heap fills to 100%; G1GC enters a death spiral of Full GC with zero bytes 
freed
  - HiveServer2 crashes with {{java.lang.OutOfMemoryError: Java heap space}}

  ---h2. Fix

  The fix is a one-line guard in {{SimpleFetchOptimizer.optimize()}} to disable 
caching for non-transactional tables:

  \{code:java}
  // SimpleFetchOptimizer.java
  boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
      HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
  if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
    LOG.debug("Fetch task caching is enabled but table {} is not transactional. 
" +
        "Caching is only supported for ACID tables to prevent Cleaner race 
conditions. Disabling.",
        fetch.table.getCompleteName());
    cachingEnabled = false;
  }
  fetchTask.setCachingEnabled(cachingEnabled);
  \{code}

  This preserves the original HIVE-25976 intent (caching for ACID tables) while 
eliminating the OOM risk for all other table types.

  h3. Additional issues not addressed by this fix (follow-up work)

  Threshold uses compressed disk size — {{checkThresholdWithMetastoreStats()}} 
should use {{RAW_DATA_SIZE}} or {{numRows}} × estimated row size instead of 
{{TOTAL_SIZE}}

  No LIMIT guard — when caching is enabled and {{{}work.getLimit() < 0{}}}, 
{{rowsRet = Integer.MAX_VALUE}} should fall back to streaming

  Default should be false — {{HIVE_FETCH_TASK_CACHING}} defaults to 
{{{}true{}}}; it should default to {{false}} and require explicit opt-in

  ---h2. Environment

  - Hive 4.0.x
  - Java 17
  - G1GC
  - Feature introduced in HIVE-25976

  h2. Workaround

  Set in {{hive-site.xml}} and restart HiveServer2:

  \{code:xml}

    hive.fetch.task.caching
    false

  \{code}


> hive.fetch.task.caching=true (default) causes unbounded heap allocation on 
> non-ACID tables, crashing HiveServer2 with OutOfMemoryError
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-29632
>                 URL: https://issues.apache.org/jira/browse/HIVE-29632
>             Project: Hive
>          Issue Type: Improvement
>    Affects Versions: 4.0.0, 4.0.1
>            Reporter: Vikram Ahuja
>            Assignee: Vikram Ahuja
>            Priority: Critical
>
>   {{hive.fetch.task.caching}} defaults to {{true}} in Hive 4. When a query 
> qualifies for fetch task optimization 
> ({{{}hive.fetch.task.conversion=more{}}}), {{FetchTask.execute()}} pre-loads 
> the entire result set into a {{List}} in JVM heap before serving any rows to 
> the client. On a non-ACID table with no LIMIT clause, this attempts to load 
> all the rows(INT_MAX), causing {{OutOfMemoryError}} on any table large enough 
> to fill the heap.
>  
> The threshold guard ({{{}hive.fetch.task.conversion.threshold=200MB{}}}) does 
> not protect against this because it compares compressed on-disk bytes against 
> the threshold, not JVM heap cost after deserialization. An ORC/Parquet file 
> of 150 MB on disk can expand to 30+ GB of Java {{String}} objects in heap.
>  
> The feature was introduced in HIVE-25976 specifically to prevent Hive Cleaner 
> race conditions on transactional (ACID) tables, where files can be deleted 
> mid-fetch by the Cleaner. It has no benefit for non-ACID tables as the 
> Cleaner does not operate on them, yet it is applied unconditionally to all 
> table types.
>  
>  *What hive.fetch.task.caching=true does*
>  When enabled, {{SimpleFetchOptimizer}} sets {{cachingEnabled=true}} on the 
> {{{}FetchTask{}}}.  {{Driver}} calls {{{}fetchTask.execute(){}}},
>  which calls {{executeInner(fetchedData)}} with:
> {code:java}
> // FetchTask.java
>   public int execute() {
>     if (cachingEnabled)
> {       executeInner(fetchedData);  // loads ALL rows before serving any     }
>     return 0;
>   }
>   private boolean executeInner(List target) {
>     int rowsRet;
>     if (cachingEnabled)
> {       rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE; 
>     }
>     // ...
>     while (sink.getNumRows() < rowsRet)
> {       fetch.pushRow();  // reads every row from HDFS into fetchedData     }
>   } {code}
>  Each row is serialized to a tab-separated {{java.lang.String}} by 
> {{DefaultFetchFormatter}} before being stored in {{{}fetchedData: 
> ArrayList{}}}. All ORC/Parquet optimizations (dictionary encoding, RLE, 
> columnar layout, block compression) are discarded. Repeated values (e.g. a 
> country code column with 10 distinct values across 40M rows) become 40M 
> separate {{String}} objects with no sharing.
>  *Memory amplification*
>   ||Representation||Size||
>   |ORC/Parquet compressed on disk|~150 MB|
>   |Decompressed raw bytes|~3 GB|
>   |Java String objects in heap (tab-separated, no compression, no dictionary 
> sharing)|~34 GB|
>   The ~200x amplification causes {{OutOfMemoryError}} on any heap smaller 
> than the fully-deserialized result set.
>  
> *Steps to Reproduce*
>   h3. Setup
>   \{code:sql}
>   – Low-cardinality Parquet table (high compression ratio is essential to 
> stay under threshold)
>   CREATE TABLE transactions (
>     txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
>     txn_dt STRING, ctry_cd STRING, prod_cd STRING,
>     status_cd STRING, channel_cd STRING, proc_cd STRING
>   )
>   STORED AS PARQUET
>   TBLPROPERTIES ("parquet.compression"="SNAPPY");
>   \{code}
>   Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant 
> IDs, 4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M 
> rows. Insert 4 times
>   to reach 16M rows (~120 MB on disk total).
>   \{code:sql}
>   – Verify file size stays under 200MB threshold (caching will be enabled)
>   – hdfs dfs -du -s -h /warehouse/.../transactions/
>   – Expected: ~120 MB
>   – Trigger OOM
>   SELECT * FROM transactions;
>   \{code}
>   h3. HiveServer2 configuration
>   \{noformat}
>   -Xmx6g
>   hive.fetch.task.conversion=more
>   hive.fetch.task.caching=true          (default in Hive 4)
>   hive.fetch.task.conversion.threshold=209715200  (200MB default)
>   \{noformat}
>   h3. Observed GC pattern before crash
>   \{noformat}
>   [GC pause (G1 Evacuation Pause)]  heap: 2048M->2040M(6144M)
>   [GC pause (G1 Evacuation Pause)]  heap: 4096M->4090M(6144M)
>   [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
>   [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
>   java.lang.OutOfMemoryError: Java heap space
>   \{noformat}
>   ---h2. Expected Behavior
>   - {{SELECT * FROM non_acid_table}} completes successfully, streaming rows 
> batch-by-batch to the client
>   - Heap usage remains bounded during fetch; GC can reclaim memory between 
> queries
>   - {{hive.fetch.task.caching}} only affects transactional (ACID) tables, 
> which are the only table type for which the Cleaner race condition exists
>   h2. Actual Behavior
>   - {{FetchTask}} pre-loads all rows into {{fetchedData: ArrayList}} before 
> returning any rows
>   - Heap fills to 100%; G1GC enters a death spiral of Full GC with zero bytes 
> freed
>   - HiveServer2 crashes with {{java.lang.OutOfMemoryError: Java heap space}}
>   ---h2. Fix
>   The fix is a one-line guard in {{SimpleFetchOptimizer.optimize()}} to 
> disable caching for non-transactional tables:
>   \{code:java}
>   // SimpleFetchOptimizer.java
>   boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
>       HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
>   if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
>     LOG.debug("Fetch task caching is enabled but table {} is not 
> transactional. " +
>         "Caching is only supported for ACID tables to prevent Cleaner race 
> conditions. Disabling.",
>         fetch.table.getCompleteName());
>     cachingEnabled = false;
>   }
>   fetchTask.setCachingEnabled(cachingEnabled);
>   \{code}
>   This preserves the original HIVE-25976 intent (caching for ACID tables) 
> while eliminating the OOM risk for all other table types.
>   h3. Additional issues not addressed by this fix (follow-up work)
>   Threshold uses compressed disk size — 
> {{checkThresholdWithMetastoreStats()}} should use {{RAW_DATA_SIZE}} or 
> {{numRows}} × estimated row size instead of {{TOTAL_SIZE}}
>   No LIMIT guard — when caching is enabled and {{{}work.getLimit() < 0{}}}, 
> {{rowsRet = Integer.MAX_VALUE}} should fall back to streaming
>   Default should be false — {{HIVE_FETCH_TASK_CACHING}} defaults to 
> {{{}true{}}}; it should default to {{false}} and require explicit opt-in
>   ---h2. Environment
>   - Hive 4.0.x
>   - Java 17
>   - G1GC
>   - Feature introduced in HIVE-25976
>   h2. Workaround
>   Set in {{hive-site.xml}} and restart HiveServer2:
>   \{code:xml}
>     hive.fetch.task.caching
>     false
>   \{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to