[ 
https://issues.apache.org/jira/browse/HIVE-20825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Zeng updated HIVE-20825:
----------------------------
    Attachment: hive-merge-invalid-orc-repro.log
                hive-merge-invalid-orc-repro.hql

> Hive ACID Merge generates invalid ORC files (bucket files 0 or 3 bytes in 
> length) causing the "Not a valid ORC file" error
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-20825
>                 URL: https://issues.apache.org/jira/browse/HIVE-20825
>             Project: Hive
>          Issue Type: Bug
>          Components: Hive, ORC, Transactions
>    Affects Versions: 2.2.0, 2.3.1, 2.3.2
>         Environment: Hive 2.3.x on Amazon EMR 5.8.0 to 5.18.0
>            Reporter: Tom Zeng
>            Priority: Major
>         Attachments: hive-merge-invalid-orc-repro.hql, 
> hive-merge-invalid-orc-repro.log
>
>
> When using Hive ACID Merge (supported with the ORC format) to update/insert 
> data, bucket files with 0 byte or 3 bytes (file content is three character: 
> ORC) are generated during MERGE INTO operations which finish with no errors. 
> Subsequent queries on the base table will get "Not a valid ORC file" error.
>  
> The following script can be used to reproduce the issue:
> set hive.auto.convert.join=false;
> set hive.enforce.bucketing=true;
> set hive.exec.dynamic.partition.mode = nonstrict;
> set hive.support.concurrency=true;
> set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
> drop table if exists mergedelta_txt_1;
> drop table if exists mergedelta_txt_2;
> CREATE TABLE mergedelta_txt_1 (
> id_str varchar(12), time_key int, value bigint)
> PARTITIONED BY (date_key int)
> ROW FORMAT DELIMITED
> STORED AS TEXTFILE;
> CREATE TABLE mergedelta_txt_2 (
> id_str varchar(12), time_key int, value bigint)
> PARTITIONED BY (date_key int)
> ROW FORMAT DELIMITED
> STORED AS TEXTFILE;
> INSERT INTO TABLE mergedelta_txt_1
> partition(date_key=20170103)
> VALUES
>  ("AB94LIENR0",46700,12345676836978),
>  ("AB94LIENR1",46825,12345676836978),
>  ("AB94LIENS0",46709,12345676836978),
>  ("AB94LIENS1",46834,12345676836978),
>  ("AB94LIENT0",46709,12345676836978),
>  ("AB94LIENT1",46834,12345676836978),
>  ("AB94LIENU0",46718,12345676836978),
>  ("AB94LIENU1",46844,12345676836978),
>  ("AB94LIENV0",46719,12345676836978),
>  ("AB94LIENV1",46844,12345676836978),
>  ("AB94LIENW0",46728,12345676836978),
>  ("AB94LIENW1",46854,12345676836978),
>  ("AB94LIENX0",46728,12345676836978),
>  ("AB94LIENX1",46854,12345676836978),
>  ("AB94LIENY0",46737,12345676836978),
>  ("AB94LIENY1",46863,12345676836978),
>  ("AB94LIENZ0",46738,12345676836978),
>  ("AB94LIENZ1",46863,12345676836978),
>  ("AB94LIERA0",47176,12345676836982),
>  ("AB94LIERA1",47302,12345676836982);
> INSERT INTO TABLE mergedelta_txt_2
> partition(date_key=20170103)
> VALUES 
>  ("AB94LIENT1",46834,12345676836978),
>  ("AB94LIENU0",46718,12345676836978),
>  ("AB94LIENU1",46844,12345676836978),
>  ("AB94LIENV0",46719,12345676836978),
>  ("AB94LIENV1",46844,12345676836978),
>  ("AB94LIENW0",46728,12345676836978),
>  ("AB94LIENW1",46854,12345676836978),
>  ("AB94LIENX0",46728,12345676836978),
>  ("AB94LIENX1",46854,12345676836978),
>  ("AB94LIENY0",46737,12345676836978),
>  ("AB94LIENY1",46863,12345676836978),
>  ("AB94LIENZ0",46738,12345676836978),
>  ("AB94LIENZ1",46863,12345676836978),
>  ("AB94LIERA0",47176,12345676836982),
>  ("AB94LIERA1",47302,12345676836982),
>  ("AB94LIERA2",47418,12345676836982),
>  ("AB94LIERB0",47176,12345676836982),
>  ("AB94LIERB1",47302,12345676836982),
>  ("AB94LIERB2",47418,12345676836982),
>  ("AB94LIERC0",47185,12345676836982);
> DROP TABLE IF EXISTS mergebase_1;
> CREATE TABLE mergebase_1 (
> id_str varchar(12) , time_key int , value bigint)
> PARTITIONED BY (date_key int)
> CLUSTERED BY (id_str,time_key) INTO 32 BUCKETS
> STORED AS ORC
> TBLPROPERTIES (
>  'orc.compress'='SNAPPY',
>  'pk_columns'='id_str,date_key,time_key',
>  'NO_AUTO_COMPACTION'='true',
>  'transactional'='true');
> MERGE INTO mergebase_1 AS base
> USING (SELECT * 
>  FROM (
>  SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY 
> id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk 
>  FROM mergedelta_txt_1
>  DISTRIBUTE BY date_key
>  ) rankedtbl 
>  WHERE rankedtbl.rk=1
> ) AS delta
> ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND 
> delta.time_key=base.time_key
> WHEN MATCHED THEN UPDATE SET value=delta.value
> WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , 
> delta.value, delta.date_key);
> MERGE INTO mergebase_1 AS base
> USING (SELECT * 
>  FROM (
>  SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY 
> id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk 
>  FROM mergedelta_txt_2
>  DISTRIBUTE BY date_key
>  ) rankedtbl 
>  WHERE rankedtbl.rk=1
> ) AS delta
> ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND 
> delta.time_key=base.time_key
> WHEN MATCHED THEN UPDATE SET value=delta.value
> WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , 
> delta.value, delta.date_key);
> select count(*) from mergebase_1;



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to